From af33039317ddc4a0e38a02e2255c2bf453115fd2 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Fri, 29 Jul 2016 19:31:06 -0400 Subject: [PATCH] Fix worst memory leaks in tqueue.c. TupleQueueReaderNext() leaks like a sieve if it has to do any tuple disassembly/reconstruction. While we could try to clean up its allocations piecemeal, it seems like a better idea just to insist that it should be run in a short-lived memory context, so that any transient space goes away automatically. I chose to have nodeGather.c switch into its existing per-tuple context before the call, rather than inventing a separate context inside tqueue.c. This is sufficient to stop all leakage in the simple case I exhibited earlier today (see link below), but it does not deal with leaks induced in more complex cases by tqueue.c's insistence on using TopMemoryContext for data that it's not actually trying hard to keep track of. That issue is intertwined with another major source of inefficiency, namely failure to cache lookup results across calls, so it seems best to deal with it separately. In passing, improve some comments, and modify gather_readnext's method for deciding when it's visited all the readers so that it's more obviously correct. (I'm not actually convinced that the previous code *is* correct in the case of a reader deletion; it certainly seems fragile.) Discussion: <32763.1469821037@sss.pgh.pa.us> --- src/backend/executor/nodeGather.c | 45 ++++++++++++++++++------------- src/backend/executor/tqueue.c | 22 +++++++++------ src/include/executor/tqueue.h | 8 +++--- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 313b234454..93a566ba62 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -214,8 +214,11 @@ ExecGather(GatherState *node) /* * Reset per-tuple memory context to free any expression evaluation * storage allocated in the previous tuple cycle. Note we can't do this - * until we're done projecting. + * until we're done projecting. This will also clear any previous tuple + * returned by a TupleQueueReader; to make sure we don't leave a dangling + * pointer around, clear the working slot first. */ + ExecClearTuple(node->funnel_slot); econtext = node->ps.ps_ExprContext; ResetExprContext(econtext); @@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate) PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *outerTupleSlot; TupleTableSlot *fslot = gatherstate->funnel_slot; + MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; HeapTuple tup; while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { if (gatherstate->reader != NULL) { + MemoryContext oldContext; + + /* Run TupleQueueReaders in per-tuple context */ + oldContext = MemoryContextSwitchTo(tupleContext); tup = gather_readnext(gatherstate); + MemoryContextSwitchTo(oldContext); if (HeapTupleIsValid(tup)) { @@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate) fslot, /* slot in which to store the tuple */ InvalidBuffer, /* buffer associated with this * tuple */ - true); /* pfree this pointer if not from heap */ - + false); /* slot should not pfree tuple */ return fslot; } } @@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate) static HeapTuple gather_readnext(GatherState *gatherstate) { - int waitpos = gatherstate->nextreader; + int nvisited = 0; for (;;) { @@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate) */ if (readerdone) { + Assert(!tup); DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) @@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate) ExecShutdownGatherWorkers(gatherstate); return NULL; } - else - { - memmove(&gatherstate->reader[gatherstate->nextreader], - &gatherstate->reader[gatherstate->nextreader + 1], - sizeof(TupleQueueReader *) - * (gatherstate->nreaders - gatherstate->nextreader)); - if (gatherstate->nextreader >= gatherstate->nreaders) - gatherstate->nextreader = 0; - if (gatherstate->nextreader < waitpos) - --waitpos; - } + memmove(&gatherstate->reader[gatherstate->nextreader], + &gatherstate->reader[gatherstate->nextreader + 1], + sizeof(TupleQueueReader *) + * (gatherstate->nreaders - gatherstate->nextreader)); + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; continue; } @@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate) * every tuple, but it turns out to be much more efficient to keep * reading from the same queue until that would require blocking. */ - gatherstate->nextreader = - (gatherstate->nextreader + 1) % gatherstate->nreaders; + gatherstate->nextreader++; + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; - /* Have we visited every TupleQueueReader? */ - if (gatherstate->nextreader == waitpos) + /* Have we visited every (surviving) TupleQueueReader? */ + nvisited++; + if (nvisited >= gatherstate->nreaders) { /* * If (still) running plan locally, return NULL so caller can @@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate) WaitLatch(MyLatch, WL_LATCH_SET, 0); CHECK_FOR_INTERRUPTS(); ResetLatch(MyLatch); + nvisited = 0; } } } diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index e81c333e4c..64555599ce 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader) /* * Fetch a tuple from a tuple queue reader. * + * The return value is NULL if there are no remaining tuples or if + * nowait = true and no tuple is ready to return. *done, if not NULL, + * is set to true when there are no remaining tuples and otherwise to false. + * + * The returned tuple, if any, is allocated in CurrentMemoryContext. + * That should be a short-lived (tuple-lifespan) context, because we are + * pretty cavalier about leaking memory in that context if we have to do + * tuple remapping. + * * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * accumulate bytes from a partially-read message, so it's useful to call * this with nowait = true even if nothing is returned. - * - * The return value is NULL if there are no remaining queues or if - * nowait = true and no tuple is ready to return. *done, if not NULL, - * is set to true when queue is detached and otherwise to false. */ HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) @@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) * OK, we got a message. Process it. * * One-byte messages are mode switch messages, so that we can switch - * between "control" and "data" mode. When in "data" mode, each - * message (unless exactly one byte) is a tuple. When in "control" - * mode, each message provides a transient-typmod-to-tupledesc mapping - * so we can interpret future tuples. + * between "control" and "data" mode. Otherwise, when in "data" mode, + * each message is a tuple. When in "control" mode, each message + * provides a transient-typmod-to-tupledesc mapping to let us + * interpret future tuples. Both of those cases certainly require + * more than one byte, so no confusion is possible. */ if (nbytes == 1) { diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h index 4f23c00feb..3a0aba162d 100644 --- a/src/include/executor/tqueue.h +++ b/src/include/executor/tqueue.h @@ -17,15 +17,17 @@ #include "storage/shm_mq.h" #include "tcop/dest.h" +/* Opaque struct, only known inside tqueue.c. */ +typedef struct TupleQueueReader TupleQueueReader; + /* Use this to send tuples to a shm_mq. */ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); /* Use these to receive tuples from a shm_mq. */ -typedef struct TupleQueueReader TupleQueueReader; extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc); -extern void DestroyTupleQueueReader(TupleQueueReader *funnel); -extern HeapTuple TupleQueueReaderNext(TupleQueueReader *, +extern void DestroyTupleQueueReader(TupleQueueReader *reader); +extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done); #endif /* TQUEUE_H */ -- 2.40.0