]> granicus.if.org Git - postgresql/commitdiff
Fix missing call to table_finish_bulk_insert during COPY
authorDavid Rowley <drowley@postgresql.org>
Mon, 1 Jul 2019 13:23:26 +0000 (01:23 +1200)
committerDavid Rowley <drowley@postgresql.org>
Mon, 1 Jul 2019 13:23:26 +0000 (01:23 +1200)
86b85044e abstracted calls to heap functions in COPY FROM to support a
generic table AM.  However, when performing a copy into a partitioned
table, this commit neglected to call table_finish_bulk_insert for each
partition.  Before 86b85044e, when we always called the heap functions,
there was no need to call heapam_finish_bulk_insert for partitions since
it only did any work when performing a copy without WAL. For partitioned
tables, this was unsupported anyway, so there was no issue. With pluggable
storage, we can't make any assumptions about what the table AM might want
to do in its equivalent function, so we'd better ensure we always call
table_finish_bulk_insert each partition that's received a row.

For now, we make the table_finish_bulk_insert call whenever we evict a
CopyMultiInsertBuffer out of the CopyMultiInsertInfo.  This does mean
that it's possible that we call table_finish_bulk_insert multiple times
per partition, which is not a problem other than being an inefficiency.
Improving this requires a more invasive patch, so let's leave that for
another day.

In passing, move the table_finish_bulk_insert for the target of the COPY
command so that it's only called when we're actually performing bulk
inserts.  We don't need to call this when inserting 1 row at a time.

Reported-by: Robert Haas
Discussion: https://postgr.es/m/CA+TgmoYK=6BpxiJ0tN-p9wtH0BTAfbdxzHhwou0mdud4+BkYuQ@mail.gmail.com

src/backend/commands/copy.c

index f1161f0fee16d26142fd1abb323756722d00fc39..c4dfbdacdf34a1e9009959238daf6494a305cbdc 100644 (file)
@@ -2518,7 +2518,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
  * The buffer must be flushed before cleanup.
  */
 static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
+CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
+                                                        CopyMultiInsertBuffer *buffer)
 {
        int                     i;
 
@@ -2534,6 +2535,9 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer)
        for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
                ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
+       table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
+                                                        miinfo->ti_options);
+
        pfree(buffer);
 }
 
@@ -2585,7 +2589,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
                        buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
                }
 
-               CopyMultiInsertBufferCleanup(buffer);
+               CopyMultiInsertBufferCleanup(miinfo, buffer);
                miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
        }
 }
@@ -2599,7 +2603,7 @@ CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
        ListCell   *lc;
 
        foreach(lc, miinfo->multiInsertBuffers)
-               CopyMultiInsertBufferCleanup(lfirst(lc));
+               CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
 
        list_free(miinfo->multiInsertBuffers);
 }
@@ -3321,9 +3325,6 @@ CopyFrom(CopyState cstate)
        {
                if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
                        CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
-
-               /* Tear down the multi-insert buffer data */
-               CopyMultiInsertInfoCleanup(&multiInsertInfo);
        }
 
        /* Done, clean up */
@@ -3366,7 +3367,13 @@ CopyFrom(CopyState cstate)
 
        FreeExecutorState(estate);
 
-       table_finish_bulk_insert(cstate->rel, ti_options);
+       if (insertMethod != CIM_SINGLE)
+       {
+               table_finish_bulk_insert(cstate->rel, ti_options);
+
+               /* Tear down the multi-insert buffer data */
+               CopyMultiInsertInfoCleanup(&multiInsertInfo);
+       }
 
        return processed;
 }