]> granicus.if.org Git - postgresql/commitdiff
Clean up shm_mq cleanup.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 31 Aug 2017 19:10:24 +0000 (15:10 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 31 Aug 2017 19:10:24 +0000 (15:10 -0400)
The logic around shm_mq_detach was a few bricks shy of a load, because
(contrary to the comments for shm_mq_attach) all it did was update the
shared shm_mq state.  That left us leaking a bit of process-local
memory, but much worse, the on_dsm_detach callback for shm_mq_detach
was still armed.  That means that whenever we ultimately detach from
the DSM segment, we'd run shm_mq_detach again for already-detached,
possibly long-dead queues.  This accidentally fails to fail today,
because we only ever re-use a shm_mq's memory for another shm_mq, and
multiple detach attempts on the last such shm_mq are fairly harmless.
But it's gonna bite us someday, so let's clean it up.

To do that, change shm_mq_detach's API so it takes a shm_mq_handle
not the underlying shm_mq.  This makes the callers simpler in most
cases anyway.  Also fix a few places in parallel.c that were just
pfree'ing the handle structs rather than doing proper cleanup.

Back-patch to v10 because of the risk that the revenant shm_mq_detach
callbacks would cause a live bug sometime.  Since this is an API
change, it's too late to do it in 9.6.  (We could make a variant
patch that preserves API, but I'm not excited enough to do that.)

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us

src/backend/access/transam/parallel.c
src/backend/executor/tqueue.c
src/backend/libpq/pqmq.c
src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h

index 17b10383e448ffc8da04cbed37f942cde2c2bfe7..ce1b907debd0f7eb4eb732911da58307acd6a808 100644 (file)
@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
                         */
                        any_registrations_failed = true;
                        pcxt->worker[i].bgwhandle = NULL;
-                       pfree(pcxt->worker[i].error_mqh);
+                       shm_mq_detach(pcxt->worker[i].error_mqh);
                        pcxt->worker[i].error_mqh = NULL;
                }
        }
@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
                        {
                                TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
 
-                               pfree(pcxt->worker[i].error_mqh);
+                               shm_mq_detach(pcxt->worker[i].error_mqh);
                                pcxt->worker[i].error_mqh = NULL;
                        }
                }
@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 
                case 'X':                               /* Terminate, indicating clean exit */
                        {
-                               pfree(pcxt->worker[i].error_mqh);
+                               shm_mq_detach(pcxt->worker[i].error_mqh);
                                pcxt->worker[i].error_mqh = NULL;
                                break;
                        }
index 4c4fcf530d722b3e96bcbc6cf38e7a9210c4251b..ee4bec0385e680178c3469ddd04165eee109db6e 100644 (file)
@@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self)
 {
        TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
-       shm_mq_detach(shm_mq_get_queue(tqueue->queue));
+       if (tqueue->queue != NULL)
+               shm_mq_detach(tqueue->queue);
+       tqueue->queue = NULL;
 }
 
 /*
@@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self)
 {
        TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
+       /* We probably already detached from queue, but let's be sure */
+       if (tqueue->queue != NULL)
+               shm_mq_detach(tqueue->queue);
        if (tqueue->tmpcontext != NULL)
                MemoryContextDelete(tqueue->tmpcontext);
        if (tqueue->recordhtab != NULL)
@@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-       shm_mq_detach(shm_mq_get_queue(reader->queue));
+       shm_mq_detach(reader->queue);
        if (reader->typmodmap != NULL)
                hash_destroy(reader->typmodmap);
        /* Is it worth trying to free substructure of the remap tree? */
index 8fbc03819d96702be8727ddd51e4370c18874a66..e1a24b62c8fb10a76848831201adac24051ec621 100644 (file)
@@ -21,7 +21,6 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 
-static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
 static pid_t pq_mq_parallel_master_pid = 0;
@@ -56,7 +55,6 @@ void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
        PqCommMethods = &PqCommMqMethods;
-       pq_mq = shm_mq_get_queue(mqh);
        pq_mq_handle = mqh;
        whereToSendOutput = DestRemote;
        FrontendProtocol = PG_PROTOCOL_LATEST;
@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 static void
 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
 {
-       pq_mq = NULL;
        pq_mq_handle = NULL;
        whereToSendOutput = DestNone;
 }
@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
         */
        if (pq_mq_busy)
        {
-               if (pq_mq != NULL)
-                       shm_mq_detach(pq_mq);
-               pq_mq = NULL;
+               if (pq_mq_handle != NULL)
+                       shm_mq_detach(pq_mq_handle);
                pq_mq_handle = NULL;
                return EOF;
        }
@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
         * be generated late in the shutdown sequence, after all DSMs have already
         * been detached.
         */
-       if (pq_mq == NULL)
+       if (pq_mq_handle == NULL)
                return 0;
 
        pq_mq_busy = true;
index f45a67cc278e402b13b36ea0db327e29ef2a5bd0..770559a03e3c4538e26c8e4f9d01ad2079432a31 100644 (file)
@@ -83,7 +83,9 @@ struct shm_mq
  * This structure is a backend-private handle for access to a queue.
  *
  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
- * a pointer to the dynamic shared memory segment that contains it.
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
  *
  * If this queue is intended to connect the current process with a background
  * worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
        MemoryContext mqh_context;
 };
 
+static void shm_mq_detach_internal(shm_mq *mq);
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
                                  const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
        Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
        mqh->mqh_queue = mq;
        mqh->mqh_segment = seg;
-       mqh->mqh_buffer = NULL;
        mqh->mqh_handle = handle;
+       mqh->mqh_buffer = NULL;
        mqh->mqh_buflen = 0;
        mqh->mqh_consume_pending = 0;
-       mqh->mqh_context = CurrentMemoryContext;
        mqh->mqh_partial_bytes = 0;
+       mqh->mqh_expected_bytes = 0;
        mqh->mqh_length_word_complete = false;
        mqh->mqh_counterparty_attached = false;
+       mqh->mqh_context = CurrentMemoryContext;
 
        if (seg != NULL)
                on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
@@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
 }
 
 /*
- * Detach a shared message queue.
+ * Detach from a shared message queue, and destroy the shm_mq_handle.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+       /* Notify counterparty that we're outta here. */
+       shm_mq_detach_internal(mqh->mqh_queue);
+
+       /* Cancel on_dsm_detach callback, if any. */
+       if (mqh->mqh_segment)
+               cancel_on_dsm_detach(mqh->mqh_segment,
+                                                        shm_mq_detach_callback,
+                                                        PointerGetDatum(mqh->mqh_queue));
+
+       /* Release local memory associated with handle. */
+       if (mqh->mqh_buffer != NULL)
+               pfree(mqh->mqh_buffer);
+       pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
  *
  * The purpose of this function is to make sure that the process
  * with which we're communicating doesn't block forever waiting for us to
@@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
  * detaches, the receiver can read any messages remaining in the queue;
  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much.  We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
  */
-void
-shm_mq_detach(shm_mq *mq)
+static void
+shm_mq_detach_internal(shm_mq *mq)
 {
        volatile shm_mq *vmq = mq;
        PGPROC     *victim;
@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 {
        shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
 
-       shm_mq_detach(mq);
+       shm_mq_detach_internal(mq);
 }
index 02a93e02222415b00539ec7e6a5b47246f03ec6c..7709efcc483fe72775f79fc93f15198d5fdadedf 100644 (file)
@@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
 /* Associate worker handle with shm_mq. */
 extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
 
-/* Break connection. */
-extern void shm_mq_detach(shm_mq *);
+/* Break connection, release handle resources. */
+extern void shm_mq_detach(shm_mq_handle *mqh);
 
 /* Get the shm_mq from handle. */
 extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);