static void printtup_startup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-static void printtup(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_20(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
static void printtup_shutdown(DestReceiver *self);
static void printtup_destroy(DestReceiver *self);
* printtup --- print a tuple in protocol 3.0
* ----------------
*/
-static void
+static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
/* ----------------
* printtup_20 --- print a tuple in protocol 2.0
* ----------------
*/
-static void
+static bool
printtup_20(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
/* ----------------
* debugtup - print one tuple for an interactive backend
* ----------------
*/
-void
+bool
debugtup(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
printatt((unsigned) i + 1, typeinfo->attrs[i], value);
}
printf("\t----\n");
+
+ return true;
}
/* ----------------
* This is largely same as printtup_20, except we use binary formatting.
* ----------------
*/
-static void
+static bool
printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
/* Return to caller's context, and flush row's temporary memory */
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(myState->tmpcontext);
+
+ return true;
}
/*
* copy_dest_receive --- receive one tuple
*/
-static void
+static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
myState->processed++;
+
+ return true;
}
/*
static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0};
static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
static void intorel_shutdown(DestReceiver *self);
static void intorel_destroy(DestReceiver *self);
/*
* intorel_receive --- receive one tuple
*/
-static void
+static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
+
+ return true;
}
/*
static int matview_maintenance_depth = 0;
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self);
static void refresh_matview_datafill(DestReceiver *dest, Query *query,
/*
* transientrel_receive --- receive one tuple
*/
-static void
+static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
+
+ return true;
}
/*
* practice, this is probably always the case at this point.)
*/
if (sendTuples)
- (*dest->receiveSlot) (slot, dest);
+ {
+ /*
+ * If we are not able to send the tuple, we assume the destination
+ * has closed and no more tuples can be sent. If that's the case,
+ * end the loop.
+ */
+ if (!((*dest->receiveSlot) (slot, dest)))
+ break;
+ }
/*
* Count tuples processed, if this is a SELECT. (For other operation
ExecStoreVirtualTuple(slot);
/* send the tuple to the receiver */
- (*tstate->dest->receiveSlot) (slot, tstate->dest);
+ (void) (*tstate->dest->receiveSlot) (slot, tstate->dest);
/* clean up */
ExecClearTuple(slot);
static void sql_exec_error_callback(void *arg);
static void ShutdownSQLFunction(Datum arg);
static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
static void sqlfunction_shutdown(DestReceiver *self);
static void sqlfunction_destroy(DestReceiver *self);
/*
* sqlfunction_receive --- receive one tuple
*/
-static void
+static bool
sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_sqlfunction *myState = (DR_sqlfunction *) self;
/* Store the filtered tuple into the tuplestore */
tuplestore_puttupleslot(myState->tstore, slot);
+
+ return true;
}
/*
* store tuple retrieved by Executor into SPITupleTable
* of current SPI procedure
*/
-void
+bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{
SPITupleTable *tuptable;
(tuptable->free)--;
MemoryContextSwitchTo(oldcxt);
+
+ return true;
}
/*
* type over a range type over a range type over an array type over a record,
* or something like that.
*/
-static void
+static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple;
+ shm_mq_result result;
/*
* Test to see whether the tupledesc has changed; if so, set up for the
}
/* Send the tuple itself. */
- shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+ result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+
+ if (result == SHM_MQ_DETACHED)
+ return false;
+ else if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to send tuples")));
+
+ return true;
}
/*
} TStoreState;
-static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
-static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
/*
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the easy case where we don't have to detoast.
*/
-static void
+static bool
tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
tuplestore_puttupleslot(myState->tstore, slot);
+
+ return true;
}
/*
* Receive a tuple from the executor and store it in the tuplestore.
* This is for the case where we have to detoast any toasted values.
*/
-static void
+static bool
tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
{
TStoreState *myState = (TStoreState *) self;
/* And release any temporary detoasted values */
for (i = 0; i < nfree; i++)
pfree(DatumGetPointer(myState->tofree[i]));
+
+ return true;
}
/*
* dummy DestReceiver functions
* ----------------
*/
-static void
+static bool
donothingReceive(TupleTableSlot *slot, DestReceiver *self)
{
+ return true;
}
static void
if (!ok)
break;
- (*dest->receiveSlot) (slot, dest);
+ /*
+ * If we are not able to send the tuple, we assume the destination
+ * has closed and no more tuples can be sent. If that's the case,
+ * end the loop.
+ */
+ if (!((*dest->receiveSlot) (slot, dest)))
+ break;
ExecClearTuple(slot);
extern void debugStartup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-extern void debugtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool debugtup(TupleTableSlot *slot, DestReceiver *self);
/* XXX these are really in executor/spi.c */
extern void spi_dest_startup(DestReceiver *self, int operation,
TupleDesc typeinfo);
-extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self);
#endif /* PRINTTUP_H */
* pointers that the executor must call.
*
* Note: the receiveSlot routine must be passed a slot containing a TupleDesc
- * identical to the one given to the rStartup routine.
+ * identical to the one given to the rStartup routine. It returns bool where
+ * a "true" value means "continue processing" and a "false" value means
+ * "stop early, just as if we'd reached the end of the scan".
* ----------------
*/
typedef struct _DestReceiver DestReceiver;
struct _DestReceiver
{
/* Called for each tuple to be output: */
- void (*receiveSlot) (TupleTableSlot *slot,
+ bool (*receiveSlot) (TupleTableSlot *slot,
DestReceiver *self);
/* Per-executor-run initialization and shutdown: */
void (*rStartup) (DestReceiver *self,