From b4fa938e9f484a3cf0614aac4686cd26c650d27c Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 31 Aug 2017 15:10:24 -0400 Subject: [PATCH] Clean up shm_mq cleanup. The logic around shm_mq_detach was a few bricks shy of a load, because (contrary to the comments for shm_mq_attach) all it did was update the shared shm_mq state. That left us leaking a bit of process-local memory, but much worse, the on_dsm_detach callback for shm_mq_detach was still armed. That means that whenever we ultimately detach from the DSM segment, we'd run shm_mq_detach again for already-detached, possibly long-dead queues. This accidentally fails to fail today, because we only ever re-use a shm_mq's memory for another shm_mq, and multiple detach attempts on the last such shm_mq are fairly harmless. But it's gonna bite us someday, so let's clean it up. To do that, change shm_mq_detach's API so it takes a shm_mq_handle not the underlying shm_mq. This makes the callers simpler in most cases anyway. Also fix a few places in parallel.c that were just pfree'ing the handle structs rather than doing proper cleanup. Back-patch to v10 because of the risk that the revenant shm_mq_detach callbacks would cause a live bug sometime. Since this is an API change, it's too late to do it in 9.6. (We could make a variant patch that preserves API, but I'm not excited enough to do that.) Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us --- src/backend/access/transam/parallel.c | 6 ++-- src/backend/executor/tqueue.c | 9 ++++-- src/backend/libpq/pqmq.c | 10 ++----- src/backend/storage/ipc/shm_mq.c | 43 ++++++++++++++++++++++----- src/include/storage/shm_mq.h | 4 +-- 5 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17b10383e4..ce1b907deb 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) */ any_registrations_failed = true; pcxt->worker[i].bgwhandle = NULL; - pfree(pcxt->worker[i].error_mqh); + shm_mq_detach(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } } @@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt) { TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); - pfree(pcxt->worker[i].error_mqh); + shm_mq_detach(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } } @@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'X': /* Terminate, indicating clean exit */ { - pfree(pcxt->worker[i].error_mqh); + shm_mq_detach(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; break; } diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index a4cfe9685a..4339203085 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; - shm_mq_detach(shm_mq_get_queue(tqueue->queue)); + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); + tqueue->queue = NULL; } /* @@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + /* We probably already detached from queue, but let's be sure */ + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); if (tqueue->tmpcontext != NULL) MemoryContextDelete(tqueue->tmpcontext); if (tqueue->recordhtab != NULL) @@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) void DestroyTupleQueueReader(TupleQueueReader *reader) { - shm_mq_detach(shm_mq_get_queue(reader->queue)); + shm_mq_detach(reader->queue); if (reader->typmodmap != NULL) hash_destroy(reader->typmodmap); /* Is it worth trying to free substructure of the remap tree? */ diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 8fbc03819d..e1a24b62c8 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -21,7 +21,6 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" -static shm_mq *pq_mq; static shm_mq_handle *pq_mq_handle; static bool pq_mq_busy = false; static pid_t pq_mq_parallel_master_pid = 0; @@ -56,7 +55,6 @@ void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) { PqCommMethods = &PqCommMqMethods; - pq_mq = shm_mq_get_queue(mqh); pq_mq_handle = mqh; whereToSendOutput = DestRemote; FrontendProtocol = PG_PROTOCOL_LATEST; @@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg) { - pq_mq = NULL; pq_mq_handle = NULL; whereToSendOutput = DestNone; } @@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len) */ if (pq_mq_busy) { - if (pq_mq != NULL) - shm_mq_detach(pq_mq); - pq_mq = NULL; + if (pq_mq_handle != NULL) + shm_mq_detach(pq_mq_handle); pq_mq_handle = NULL; return EOF; } @@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len) * be generated late in the shutdown sequence, after all DSMs have already * been detached. */ - if (pq_mq == NULL) + if (pq_mq_handle == NULL) return 0; pq_mq_busy = true; diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f45a67cc27..770559a03e 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -83,7 +83,9 @@ struct shm_mq * This structure is a backend-private handle for access to a queue. * * mqh_queue is a pointer to the queue we've attached, and mqh_segment is - * a pointer to the dynamic shared memory segment that contains it. + * an optional pointer to the dynamic shared memory segment that contains it. + * (If mqh_segment is provided, we register an on_dsm_detach callback to + * make sure we detach from the queue before detaching from DSM.) * * If this queue is intended to connect the current process with a background * worker that started it, the user can pass a pointer to the worker handle @@ -139,6 +141,7 @@ struct shm_mq_handle MemoryContext mqh_context; }; +static void shm_mq_detach_internal(shm_mq *mq); static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written); static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, @@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); mqh->mqh_queue = mq; mqh->mqh_segment = seg; - mqh->mqh_buffer = NULL; mqh->mqh_handle = handle; + mqh->mqh_buffer = NULL; mqh->mqh_buflen = 0; mqh->mqh_consume_pending = 0; - mqh->mqh_context = CurrentMemoryContext; mqh->mqh_partial_bytes = 0; + mqh->mqh_expected_bytes = 0; mqh->mqh_length_word_complete = false; mqh->mqh_counterparty_attached = false; + mqh->mqh_context = CurrentMemoryContext; if (seg != NULL) on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); @@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) } /* - * Detach a shared message queue. + * Detach from a shared message queue, and destroy the shm_mq_handle. + */ +void +shm_mq_detach(shm_mq_handle *mqh) +{ + /* Notify counterparty that we're outta here. */ + shm_mq_detach_internal(mqh->mqh_queue); + + /* Cancel on_dsm_detach callback, if any. */ + if (mqh->mqh_segment) + cancel_on_dsm_detach(mqh->mqh_segment, + shm_mq_detach_callback, + PointerGetDatum(mqh->mqh_queue)); + + /* Release local memory associated with handle. */ + if (mqh->mqh_buffer != NULL) + pfree(mqh->mqh_buffer); + pfree(mqh); +} + +/* + * Notify counterparty that we're detaching from shared message queue. * * The purpose of this function is to make sure that the process * with which we're communicating doesn't block forever waiting for us to @@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) * detaches, the receiver can read any messages remaining in the queue; * further reads will return SHM_MQ_DETACHED. If the receiver detaches, * further attempts to send messages will likewise return SHM_MQ_DETACHED. + * + * This is separated out from shm_mq_detach() because if the on_dsm_detach + * callback fires, we only want to do this much. We do not try to touch + * the local shm_mq_handle, as it may have been pfree'd already. */ -void -shm_mq_detach(shm_mq *mq) +static void +shm_mq_detach_internal(shm_mq *mq) { volatile shm_mq *vmq = mq; PGPROC *victim; @@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg) { shm_mq *mq = (shm_mq *) DatumGetPointer(arg); - shm_mq_detach(mq); + shm_mq_detach_internal(mq); } diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 02a93e0222..7709efcc48 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, /* Associate worker handle with shm_mq. */ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *); -/* Break connection. */ -extern void shm_mq_detach(shm_mq *); +/* Break connection, release handle resources. */ +extern void shm_mq_detach(shm_mq_handle *mqh); /* Get the shm_mq from handle. */ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); -- 2.40.0