]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/copy.c
Fix reporting of violations in ExecConstraints, again.
[postgresql] / src / backend / commands / copy.c
index 949844d979108433f8e92527ff16646017230a61..73677be59e03dc2d79806b07125e176a7671ec4e 100644 (file)
@@ -60,7 +60,8 @@ typedef enum CopyDest
 {
        COPY_FILE,                                      /* to/from file (or a piped program) */
        COPY_OLD_FE,                            /* to/from frontend (2.0 protocol) */
-       COPY_NEW_FE                                     /* to/from frontend (3.0 protocol) */
+       COPY_NEW_FE,                            /* to/from frontend (3.0 protocol) */
+       COPY_CALLBACK                           /* to/from callback function */
 } CopyDest;
 
 /*
@@ -109,6 +110,7 @@ typedef struct CopyStateData
        List       *attnumlist;         /* integer list of attnums to copy */
        char       *filename;           /* filename, or NULL for STDIN/STDOUT */
        bool            is_program;             /* is 'filename' a program to popen? */
+       copy_data_source_cb     data_source_cb;         /* function for reading data*/
        bool            binary;                 /* binary format? */
        bool            oids;                   /* include OIDs? */
        bool            freeze;                 /* freeze rows on loading? */
@@ -299,7 +301,6 @@ static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
                                        CommandId mycid, int hi_options,
                                        ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
@@ -529,6 +530,9 @@ CopySendEndOfRow(CopyState cstate)
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
                        break;
+               case COPY_CALLBACK:
+                       Assert(false); /* Not yet supported. */
+                       break;
        }
 
        resetStringInfo(fe_msgbuf);
@@ -643,6 +647,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                bytesread += avail;
                        }
                        break;
+               case COPY_CALLBACK:
+                       bytesread = cstate->data_source_cb(databuf, minread, maxread);
+                       break;
        }
 
        return bytesread;
@@ -923,7 +930,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                         * relation which we have opened and locked.
                         */
                        from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
-                                                               RelationGetRelationName(rel), -1);
+                                                               pstrdup(RelationGetRelationName(rel)),
+                                                               -1);
 
                        /* Build query */
                        select = makeNode(SelectStmt);
@@ -968,7 +976,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                PreventCommandIfParallelMode("COPY FROM");
 
                cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
-                                                          stmt->attlist, stmt->options);
+                                                          NULL, stmt->attlist, stmt->options);
                cstate->range_table = range_table;
                *processed = CopyFrom(cstate);  /* copy from file to database */
                EndCopyFrom(cstate);
@@ -1123,7 +1131,7 @@ ProcessCopyOptions(ParseState *pstate,
                        if (defel->arg && IsA(defel->arg, A_Star))
                                cstate->force_quote_all = true;
                        else if (defel->arg && IsA(defel->arg, List))
-                               cstate->force_quote = (List *) defel->arg;
+                               cstate->force_quote = castNode(List, defel->arg);
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1462,8 +1470,9 @@ BeginCopy(ParseState *pstate,
                 * function and is executed repeatedly.  (See also the same hack in
                 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
                 */
-               rewritten = pg_analyze_and_rewrite((RawStmt *) copyObject(raw_query),
-                                                                                  pstate->p_sourcetext, NULL, 0);
+               rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+                                                                                  pstate->p_sourcetext, NULL, 0,
+                                                                                  NULL);
 
                /* check that we got back something we can work with */
                if (rewritten == NIL)
@@ -1566,7 +1575,7 @@ BeginCopy(ParseState *pstate,
                cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
                                                                                        GetActiveSnapshot(),
                                                                                        InvalidSnapshot,
-                                                                                       dest, NULL, 0);
+                                                                                       dest, NULL, NULL, 0);
 
                /*
                 * Call ExecutorStart to prepare the plan for execution.
@@ -2066,7 +2075,7 @@ CopyTo(CopyState cstate)
        else
        {
                /* run the plan --- the dest receiver will send tuples */
-               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
                processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
@@ -2285,7 +2294,7 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static uint64
+uint64
 CopyFrom(CopyState cstate)
 {
        HeapTuple       tuple;
@@ -2405,7 +2414,7 @@ CopyFrom(CopyState cstate)
         * earlier scan or command. This ensures that if this subtransaction
         * aborts then the frozen rows won't be visible after xact cleanup. Note
         * that the stronger test of exactly which subtransaction created it is
-        * crucial for correctness of this optimisation.
+        * crucial for correctness of this optimization.
         */
        if (cstate->freeze)
        {
@@ -2497,8 +2506,7 @@ CopyFrom(CopyState cstate)
 
        for (;;)
        {
-               TupleTableSlot *slot,
-                                  *oldslot;
+               TupleTableSlot *slot;
                bool            skip_tuple;
                Oid                     loaded_oid = InvalidOid;
 
@@ -2540,7 +2548,6 @@ CopyFrom(CopyState cstate)
                ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
                /* Determine the partition to heap_insert the tuple into */
-               oldslot = slot;
                if (cstate->partition_dispatch_info)
                {
                        int                     leaf_part_index;
@@ -2642,7 +2649,7 @@ CopyFrom(CopyState cstate)
                                /* Check the constraints of the tuple */
                                if (cstate->rel->rd_att->constr ||
                                        resultRelInfo->ri_PartitionCheck)
-                                       ExecConstraints(resultRelInfo, slot, oldslot, estate);
+                                       ExecConstraints(resultRelInfo, slot, estate);
 
                                if (useHeapMultiInsert)
                                {
@@ -2877,6 +2884,7 @@ BeginCopyFrom(ParseState *pstate,
                          Relation rel,
                          const char *filename,
                          bool is_program,
+                         copy_data_source_cb data_source_cb,
                          List *attnamelist,
                          List *options)
 {
@@ -2972,7 +2980,7 @@ BeginCopyFrom(ParseState *pstate,
                                 * the special case of when the default expression is the
                                 * nextval() of a sequence which in this specific case is
                                 * known to be safe for use with the multi-insert
-                                * optimisation. Hence we use this special case function
+                                * optimization. Hence we use this special case function
                                 * checker rather than the standard check for
                                 * contain_volatile_functions().
                                 */
@@ -2991,7 +2999,12 @@ BeginCopyFrom(ParseState *pstate,
        cstate->num_defaults = num_defaults;
        cstate->is_program = is_program;
 
-       if (pipe)
+       if (data_source_cb)
+       {
+               cstate->copy_dest = COPY_CALLBACK;
+               cstate->data_source_cb = data_source_cb;
+       }
+       else if (pipe)
        {
                Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput == DestRemote)