]> granicus.if.org Git - postgresql/commitdiff
Clean up latch related code.
authorAndres Freund <andres@anarazel.de>
Tue, 6 Jun 2017 23:13:00 +0000 (16:13 -0700)
committerAndres Freund <andres@anarazel.de>
Tue, 6 Jun 2017 23:13:00 +0000 (16:13 -0700)
The larger part of this patch replaces usages of MyProc->procLatch
with MyLatch.  The latter works even early during backend startup,
where MyProc->procLatch doesn't yet.  While the affected code
shouldn't run in cases where it's not initialized, it might get copied
into places where it might.  Using MyLatch is simpler and a bit faster
to boot, so there's little point to stick with the previous coding.

While doing so I noticed some weaknesses around newly introduced uses
of latches that could lead to missed events, and an omitted
CHECK_FOR_INTERRUPTS() call in worker_spi.

As all the actual bugs are in v10 code, there doesn't seem to be
sufficient reason to backpatch this.

Author: Andres Freund
Discussion:
    https://postgr.es/m/20170606195321.sjmenrfgl2nu6j63@alap3.anarazel.de
    https://postgr.es/m/20170606210405.sim3yl6vpudhmufo@alap3.anarazel.de
Backpatch: -

src/backend/access/transam/parallel.c
src/backend/libpq/pqmq.c
src/backend/postmaster/bgworker.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/storage/lmgr/condition_variable.c
src/test/modules/worker_spi/worker_spi.c

index cb22174270677ce2339bfcfef15d013ffbacf3ce..16a0bee61037b719c2be28f3d750ddf89a9b392a 100644 (file)
@@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                if (!anyone_alive)
                        break;
 
-               WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1,
+               WaitLatch(MyLatch, WL_LATCH_SET, -1,
                                  WAIT_EVENT_PARALLEL_FINISH);
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
        }
 
        if (pcxt->toc != NULL)
index 96939327c38a2cee4f4373baaf6c6054f59d5bfd..8fbc03819d96702be8727ddd51e4370c18874a66 100644 (file)
@@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
                if (result != SHM_MQ_WOULD_BLOCK)
                        break;
 
-               WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0,
+               WaitLatch(MyLatch, WL_LATCH_SET, 0,
                                  WAIT_EVENT_MQ_PUT_MESSAGE);
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
                CHECK_FOR_INTERRUPTS();
        }
 
index c3454276bfa406dcc2a47d9737bccd1c643e625e..712d700481db1e69f4a55f39837d6224a5bef0fe 100644 (file)
@@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
                if (status == BGWH_STOPPED)
                        break;
 
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
                                           WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
                        break;
                }
 
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
        }
 
        return status;
index 726d1b5bd81f812649ab7836fc2329eb5993532a..89c34b822521b266eb6757587c32d876e7f30ba3 100644 (file)
@@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
                                   ? WL_SOCKET_READABLE
                                   : WL_SOCKET_WRITEABLE);
 
-               rc = WaitLatchOrSocket(&MyProc->procLatch,
+               rc = WaitLatchOrSocket(MyLatch,
                                                           WL_POSTMASTER_DEATH |
                                                           WL_LATCH_SET | io_flag,
                                                           PQsocket(conn->streamConn),
@@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
                /* Interrupted? */
                if (rc & WL_LATCH_SET)
                {
-                       ResetLatch(&MyProc->procLatch);
+                       ResetLatch(MyLatch);
                        CHECK_FOR_INTERRUPTS();
                }
 
@@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
                         * the signal arrives in the middle of establishment of
                         * replication connection.
                         */
-                       ResetLatch(&MyProc->procLatch);
-                       rc = WaitLatchOrSocket(&MyProc->procLatch,
+                       rc = WaitLatchOrSocket(MyLatch,
                                                                   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
                                                                   WL_LATCH_SET,
                                                                   PQsocket(streamConn),
                                                                   0,
                                                                   WAIT_EVENT_LIBPQWALRECEIVER);
+
+                       /* Emergency bailout? */
                        if (rc & WL_POSTMASTER_DEATH)
                                exit(1);
 
-                       /* interrupted */
+                       /* Interrupted? */
                        if (rc & WL_LATCH_SET)
                        {
+                               ResetLatch(MyLatch);
                                CHECK_FOR_INTERRUPTS();
-                               continue;
                        }
                        if (PQconsumeInput(streamConn) == 0)
                                return NULL;    /* trouble */
index 5aaf24bfe4fa0550acf7a080dedd780b7747a30f..5a3274b2c2392509bba5ec4c7f29a866bdc00498 100644 (file)
@@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
+               /* emergency bailout if postmaster has died */
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(MyLatch);
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
        }
 
        return;
@@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
                LWLockRelease(LogicalRepWorkerLock);
 
-               CHECK_FOR_INTERRUPTS();
-
                /* Wait for signal. */
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
@@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(&MyProc->procLatch);
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
 
                /* Check worker status. */
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
                CHECK_FOR_INTERRUPTS();
 
                /* Wait for more work. */
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
 
@@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(&MyProc->procLatch);
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
        }
 }
 
@@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg)
                }
 
                /* Wait for more work. */
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           wait_time,
                                           WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
                if (got_SIGHUP)
                {
                        got_SIGHUP = false;
                        ProcessConfigFile(PGC_SIGHUP);
                }
-
-               ResetLatch(&MyProc->procLatch);
        }
 
        LogicalRepCtx->launcher_pid = 0;
index ed66602935ffdacd2b099430166fc8376edd799c..6e55d2d606950a1738bd17c180c29fff188d6406 100644 (file)
@@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
                if (!worker)
                        return false;
 
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
        }
 
        return false;
@@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state)
                if (MyLogicalRepWorker->relstate == expected_state)
                        return true;
 
-               rc = WaitLatch(&MyProc->procLatch,
+               rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                           1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
@@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
        }
 
        return false;
@@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
                /*
                 * Wait for more data or latch.
                 */
-               rc = WaitLatchOrSocket(&MyProc->procLatch,
+               rc = WaitLatchOrSocket(MyLatch,
                                                           WL_SOCKET_READABLE | WL_LATCH_SET |
                                                           WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                                           fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
@@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
        }
 
        return bytesread;
index 51a64487cd9ac934a6a2306a09667a200aee1d5f..999d627c87205629c3c7e3ac784dd7abe22a5f5d 100644 (file)
@@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                /*
                 * Wait for more data or latch.
                 */
-               rc = WaitLatchOrSocket(&MyProc->procLatch,
+               rc = WaitLatchOrSocket(MyLatch,
                                                           WL_SOCKET_READABLE | WL_LATCH_SET |
                                                           WL_TIMEOUT | WL_POSTMASTER_DEATH,
                                                           fd, NAPTIME_PER_CYCLE,
@@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
                if (got_SIGHUP)
                {
                        got_SIGHUP = false;
@@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                        send_feedback(last_received, requestReply, requestReply);
                }
-
-               ResetLatch(&MyProc->procLatch);
        }
 }
 
index 5afb21121b6d08c442e00eb494a19bb571ca710c..b4b7d28dd5dbbff1265e152dea43cfd7603e8bcd 100644 (file)
@@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
        {
                cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
                AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
-                                                 &MyProc->procLatch, NULL);
+                                                 MyLatch, NULL);
        }
 
        /*
         * Reset my latch before adding myself to the queue and before entering
         * the caller's predicate loop.
         */
-       ResetLatch(&MyProc->procLatch);
+       ResetLatch(MyLatch);
 
        /* Add myself to the wait queue. */
        SpinLockAcquire(&cv->mutex);
@@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
                WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
 
                /* Reset latch before testing whether we can return. */
-               ResetLatch(&MyProc->procLatch);
+               ResetLatch(MyLatch);
 
                /*
                 * If this process has been taken out of the wait list, then we know
index 9abfc714a997d7af26db101062152221033a0aa1..553baf0045413d783768d70a66c9f83abb38cdbb 100644 (file)
@@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg)
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
+               CHECK_FOR_INTERRUPTS();
+
                /*
                 * In case of a SIGHUP, just reload the configuration.
                 */