]> granicus.if.org Git - postgresql/commitdiff
Stop the executor if no more tuples can be sent from worker to leader.
authorRobert Haas <rhaas@postgresql.org>
Mon, 6 Jun 2016 18:52:58 +0000 (14:52 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 6 Jun 2016 18:52:58 +0000 (14:52 -0400)
If a Gather node has read as many tuples as it needs (for example, due
to Limit) it may detach the queue connecting it to the worker before
reading all of the worker's tuples.  Rather than let the worker
continue to generate and send all of the results, have it stop after
sending the next tuple.

More could be done here to stop the worker even quicker, but this is
about as well as we can hope to do for 9.6.

This is in response to a problem report from Andreas Seltenreich.
Commit 44339b892a04e94bbb472235882dc6f7023bdc65 should be actually be
sufficient to fix that example even without this change, but it seems
better to do this, too, since we might otherwise waste quite a large
amount of effort in one or more workers.

Discussion: CAA4eK1KOKGqmz9bGu+Z42qhRwMbm4R5rfnqsLCNqFs9j14jzEA@mail.gmail.com

Amit Kapila

14 files changed:
src/backend/access/common/printtup.c
src/backend/commands/copy.c
src/backend/commands/createas.c
src/backend/commands/matview.c
src/backend/executor/execMain.c
src/backend/executor/execTuples.c
src/backend/executor/functions.c
src/backend/executor/spi.c
src/backend/executor/tqueue.c
src/backend/executor/tstoreReceiver.c
src/backend/tcop/dest.c
src/backend/tcop/pquery.c
src/include/access/printtup.h
src/include/tcop/dest.h

index 1939ff5155ba28c7d002d34428d4a5f87301b066..d9664aa6c6b918cbf3a322f37ad77784f9bd64bd 100644 (file)
@@ -26,9 +26,9 @@
 
 static void printtup_startup(DestReceiver *self, int operation,
                                 TupleDesc typeinfo);
-static void printtup(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_20(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
 static void printtup_shutdown(DestReceiver *self);
 static void printtup_destroy(DestReceiver *self);
 
@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
  *             printtup --- print a tuple in protocol 3.0
  * ----------------
  */
-static void
+static bool
 printtup(TupleTableSlot *slot, DestReceiver *self)
 {
        TupleDesc       typeinfo = slot->tts_tupleDescriptor;
@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
        /* Return to caller's context, and flush row's temporary memory */
        MemoryContextSwitchTo(oldcontext);
        MemoryContextReset(myState->tmpcontext);
+
+       return true;
 }
 
 /* ----------------
  *             printtup_20 --- print a tuple in protocol 2.0
  * ----------------
  */
-static void
+static bool
 printtup_20(TupleTableSlot *slot, DestReceiver *self)
 {
        TupleDesc       typeinfo = slot->tts_tupleDescriptor;
@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self)
        /* Return to caller's context, and flush row's temporary memory */
        MemoryContextSwitchTo(oldcontext);
        MemoryContextReset(myState->tmpcontext);
+
+       return true;
 }
 
 /* ----------------
@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
  *             debugtup - print one tuple for an interactive backend
  * ----------------
  */
-void
+bool
 debugtup(TupleTableSlot *slot, DestReceiver *self)
 {
        TupleDesc       typeinfo = slot->tts_tupleDescriptor;
@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
                printatt((unsigned) i + 1, typeinfo->attrs[i], value);
        }
        printf("\t----\n");
+
+       return true;
 }
 
 /* ----------------
@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
  * This is largely same as printtup_20, except we use binary formatting.
  * ----------------
  */
-static void
+static bool
 printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
 {
        TupleDesc       typeinfo = slot->tts_tupleDescriptor;
@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
        /* Return to caller's context, and flush row's temporary memory */
        MemoryContextSwitchTo(oldcontext);
        MemoryContextReset(myState->tmpcontext);
+
+       return true;
 }
index 3201476c9e8dcfc9ad8cf399ef08d5eebaf26331..28dcd340017a1fa53f3db76dd170f9014f738ce2 100644 (file)
@@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * copy_dest_receive --- receive one tuple
  */
-static void
+static bool
 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 {
        DR_copy    *myState = (DR_copy *) self;
@@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
        /* And send the data */
        CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
        myState->processed++;
+
+       return true;
 }
 
 /*
index cb7a145ee5db8be8cdcc7a9572fc352541c3028d..5a853c48a8ba4ab00c6318b3cf910158b3422bfc 100644 (file)
@@ -62,7 +62,7 @@ typedef struct
 static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0};
 
 static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void intorel_shutdown(DestReceiver *self);
 static void intorel_destroy(DestReceiver *self);
 
@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * intorel_receive --- receive one tuple
  */
-static void
+static bool
 intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
        DR_intorel *myState = (DR_intorel *) self;
@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
                                myState->bistate);
 
        /* We know this is a newly created relation, so there are no indexes */
+
+       return true;
 }
 
 /*
index f00aab39e7bd297e89a2cc1a23ceb93bdd9500c6..62e61a26749ce53f2ce697eb3761327b2b53b427 100644 (file)
@@ -56,7 +56,7 @@ typedef struct
 static int     matview_maintenance_depth = 0;
 
 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
@@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * transientrel_receive --- receive one tuple
  */
-static void
+static bool
 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
        DR_transientrel *myState = (DR_transientrel *) self;
@@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
                                myState->bistate);
 
        /* We know this is a newly created relation, so there are no indexes */
+
+       return true;
 }
 
 /*
index ac0230411c3dc3cfc778cfbb1463bff42cefbd98..b5ced388d208046ac70b2be8b4b7fafe6da9cafe 100644 (file)
@@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate,
                 * practice, this is probably always the case at this point.)
                 */
                if (sendTuples)
-                       (*dest->receiveSlot) (slot, dest);
+               {
+                       /*
+                        * If we are not able to send the tuple, we assume the destination
+                        * has closed and no more tuples can be sent. If that's the case,
+                        * end the loop.
+                        */
+                       if (!((*dest->receiveSlot) (slot, dest)))
+                               break;
+               }
 
                /*
                 * Count tuples processed, if this is a SELECT.  (For other operation
index 2b81f60a519a5a9099e1db62c8261039682b0dbe..533050dc8593284b7d05ef8942420318c86d67be 100644 (file)
@@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
        ExecStoreVirtualTuple(slot);
 
        /* send the tuple to the receiver */
-       (*tstate->dest->receiveSlot) (slot, tstate->dest);
+       (void) (*tstate->dest->receiveSlot) (slot, tstate->dest);
 
        /* clean up */
        ExecClearTuple(slot);
index 6e14c9d29677d12d4f72271ff1d03e018f50609e..cd93c045dcba4e22da24ac4266d70646cc54d370 100644 (file)
@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot,
 static void sql_exec_error_callback(void *arg);
 static void ShutdownSQLFunction(Datum arg);
 static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
 static void sqlfunction_shutdown(DestReceiver *self);
 static void sqlfunction_destroy(DestReceiver *self);
 
@@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * sqlfunction_receive --- receive one tuple
  */
-static void
+static bool
 sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
 {
        DR_sqlfunction *myState = (DR_sqlfunction *) self;
@@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
 
        /* Store the filtered tuple into the tuplestore */
        tuplestore_puttupleslot(myState->tstore, slot);
+
+       return true;
 }
 
 /*
index 23cb6f407dddfe4955f2d4b6b0cc052852da057f..7ccabdb44b2f5e2bbf5d4949c9f91eee13c4ff55 100644 (file)
@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
  *             store tuple retrieved by Executor into SPITupleTable
  *             of current SPI procedure
  */
-void
+bool
 spi_printtup(TupleTableSlot *slot, DestReceiver *self)
 {
        SPITupleTable *tuptable;
@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
        (tuptable->free)--;
 
        MemoryContextSwitchTo(oldcxt);
+
+       return true;
 }
 
 /*
index 383b5352cbad826e27ee928955a4281914262e9c..8abb1f16e455631ee18fc1069358069d308d9af8 100644 (file)
@@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
  * type over a range type over a range type over an array type over a record,
  * or something like that.
  */
-static void
+static bool
 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
        TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
        TupleDesc       tupledesc = slot->tts_tupleDescriptor;
        HeapTuple       tuple;
+       shm_mq_result result;
 
        /*
         * Test to see whether the tupledesc has changed; if so, set up for the
@@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
        }
 
        /* Send the tuple itself. */
-       shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+       result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+
+       if (result == SHM_MQ_DETACHED)
+               return false;
+       else if (result != SHM_MQ_SUCCESS)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("unable to send tuples")));
+
+       return true;
 }
 
 /*
index 516440ad32e090dafcf67e10627c0b927a5d65f5..8f1e1b3f50c92b8380146d2b4d306d8893a00183 100644 (file)
@@ -37,8 +37,8 @@ typedef struct
 } TStoreState;
 
 
-static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
-static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
  * Receive a tuple from the executor and store it in the tuplestore.
  * This is for the easy case where we don't have to detoast.
  */
-static void
+static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
 {
        TStoreState *myState = (TStoreState *) self;
 
        tuplestore_puttupleslot(myState->tstore, slot);
+
+       return true;
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
  * This is for the case where we have to detoast any toasted values.
  */
-static void
+static bool
 tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 {
        TStoreState *myState = (TStoreState *) self;
@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
        /* And release any temporary detoasted values */
        for (i = 0; i < nfree; i++)
                pfree(DatumGetPointer(myState->tofree[i]));
+
+       return true;
 }
 
 /*
index 2c7dc6e526799bf4b920620c28ec850cf0b457a3..de45cbc4fb8e8867464b39b25600166a230a9964 100644 (file)
  *             dummy DestReceiver functions
  * ----------------
  */
-static void
+static bool
 donothingReceive(TupleTableSlot *slot, DestReceiver *self)
 {
+       return true;
 }
 
 static void
index fcdc4c347c7919ebb4ef7f89c2f17c6f09f4a7e8..3f6cb12b4e53bf3017895082827b89f0bc805054 100644 (file)
@@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count,
                        if (!ok)
                                break;
 
-                       (*dest->receiveSlot) (slot, dest);
+                       /*
+                        * If we are not able to send the tuple, we assume the destination
+                        * has closed and no more tuples can be sent. If that's the case,
+                        * end the loop.
+                        */
+                       if (!((*dest->receiveSlot) (slot, dest)))
+                               break;
 
                        ExecClearTuple(slot);
 
index 64dde01cd148984c2d71f57579431e5a1e306ee1..608c5642872a3f22d452574e36cd220243f0e9dc 100644 (file)
@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist,
 
 extern void debugStartup(DestReceiver *self, int operation,
                         TupleDesc typeinfo);
-extern void debugtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool debugtup(TupleTableSlot *slot, DestReceiver *self);
 
 /* XXX these are really in executor/spi.c */
 extern void spi_dest_startup(DestReceiver *self, int operation,
                                 TupleDesc typeinfo);
-extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self);
 
 #endif   /* PRINTTUP_H */
index 4e42d61c37ab4e0243b80626d1f61607985797c5..dd80433f74fb932a1a6cd34f9e3a337d474a04e1 100644 (file)
@@ -104,7 +104,9 @@ typedef enum
  *             pointers that the executor must call.
  *
  * Note: the receiveSlot routine must be passed a slot containing a TupleDesc
- * identical to the one given to the rStartup routine.
+ * identical to the one given to the rStartup routine.  It returns bool where
+ * a "true" value means "continue processing" and a "false" value means
+ * "stop early, just as if we'd reached the end of the scan".
  * ----------------
  */
 typedef struct _DestReceiver DestReceiver;
@@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver;
 struct _DestReceiver
 {
        /* Called for each tuple to be output: */
-       void            (*receiveSlot) (TupleTableSlot *slot,
+       bool            (*receiveSlot) (TupleTableSlot *slot,
                                                                                        DestReceiver *self);
        /* Per-executor-run initialization and shutdown: */
        void            (*rStartup) (DestReceiver *self,