]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/copy.c
Fix reporting of violations in ExecConstraints, again.
[postgresql] / src / backend / commands / copy.c
index ba89b292d1e7eca9685b9e3c10d40a5e147826bd..73677be59e03dc2d79806b07125e176a7671ec4e 100644 (file)
@@ -60,7 +60,8 @@ typedef enum CopyDest
 {
        COPY_FILE,                                      /* to/from file (or a piped program) */
        COPY_OLD_FE,                            /* to/from frontend (2.0 protocol) */
-       COPY_NEW_FE                                     /* to/from frontend (3.0 protocol) */
+       COPY_NEW_FE,                            /* to/from frontend (3.0 protocol) */
+       COPY_CALLBACK                           /* to/from callback function */
 } CopyDest;
 
 /*
@@ -109,6 +110,7 @@ typedef struct CopyStateData
        List       *attnumlist;         /* integer list of attnums to copy */
        char       *filename;           /* filename, or NULL for STDIN/STDOUT */
        bool            is_program;             /* is 'filename' a program to popen? */
+       copy_data_source_cb     data_source_cb;         /* function for reading data*/
        bool            binary;                 /* binary format? */
        bool            oids;                   /* include OIDs? */
        bool            freeze;                 /* freeze rows on loading? */
@@ -299,7 +301,6 @@ static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
-static uint64 CopyFrom(CopyState cstate);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
                                        CommandId mycid, int hi_options,
                                        ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
@@ -529,6 +530,9 @@ CopySendEndOfRow(CopyState cstate)
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
                        break;
+               case COPY_CALLBACK:
+                       Assert(false); /* Not yet supported. */
+                       break;
        }
 
        resetStringInfo(fe_msgbuf);
@@ -643,6 +647,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
                                bytesread += avail;
                        }
                        break;
+               case COPY_CALLBACK:
+                       bytesread = cstate->data_source_cb(databuf, minread, maxread);
+                       break;
        }
 
        return bytesread;
@@ -969,7 +976,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
                PreventCommandIfParallelMode("COPY FROM");
 
                cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
-                                                          stmt->attlist, stmt->options);
+                                                          NULL, stmt->attlist, stmt->options);
                cstate->range_table = range_table;
                *processed = CopyFrom(cstate);  /* copy from file to database */
                EndCopyFrom(cstate);
@@ -1463,8 +1470,9 @@ BeginCopy(ParseState *pstate,
                 * function and is executed repeatedly.  (See also the same hack in
                 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
                 */
-               rewritten = pg_analyze_and_rewrite((RawStmt *) copyObject(raw_query),
-                                                                                  pstate->p_sourcetext, NULL, 0);
+               rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
+                                                                                  pstate->p_sourcetext, NULL, 0,
+                                                                                  NULL);
 
                /* check that we got back something we can work with */
                if (rewritten == NIL)
@@ -1567,7 +1575,7 @@ BeginCopy(ParseState *pstate,
                cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
                                                                                        GetActiveSnapshot(),
                                                                                        InvalidSnapshot,
-                                                                                       dest, NULL, 0);
+                                                                                       dest, NULL, NULL, 0);
 
                /*
                 * Call ExecutorStart to prepare the plan for execution.
@@ -2067,7 +2075,7 @@ CopyTo(CopyState cstate)
        else
        {
                /* run the plan --- the dest receiver will send tuples */
-               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
                processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
@@ -2286,7 +2294,7 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static uint64
+uint64
 CopyFrom(CopyState cstate)
 {
        HeapTuple       tuple;
@@ -2498,8 +2506,7 @@ CopyFrom(CopyState cstate)
 
        for (;;)
        {
-               TupleTableSlot *slot,
-                                  *oldslot;
+               TupleTableSlot *slot;
                bool            skip_tuple;
                Oid                     loaded_oid = InvalidOid;
 
@@ -2541,7 +2548,6 @@ CopyFrom(CopyState cstate)
                ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
                /* Determine the partition to heap_insert the tuple into */
-               oldslot = slot;
                if (cstate->partition_dispatch_info)
                {
                        int                     leaf_part_index;
@@ -2643,7 +2649,7 @@ CopyFrom(CopyState cstate)
                                /* Check the constraints of the tuple */
                                if (cstate->rel->rd_att->constr ||
                                        resultRelInfo->ri_PartitionCheck)
-                                       ExecConstraints(resultRelInfo, slot, oldslot, estate);
+                                       ExecConstraints(resultRelInfo, slot, estate);
 
                                if (useHeapMultiInsert)
                                {
@@ -2878,6 +2884,7 @@ BeginCopyFrom(ParseState *pstate,
                          Relation rel,
                          const char *filename,
                          bool is_program,
+                         copy_data_source_cb data_source_cb,
                          List *attnamelist,
                          List *options)
 {
@@ -2992,7 +2999,12 @@ BeginCopyFrom(ParseState *pstate,
        cstate->num_defaults = num_defaults;
        cstate->is_program = is_program;
 
-       if (pipe)
+       if (data_source_cb)
+       {
+               cstate->copy_dest = COPY_CALLBACK;
+               cstate->data_source_cb = data_source_cb;
+       }
+       else if (pipe)
        {
                Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput == DestRemote)