]> granicus.if.org Git - postgresql/commitdiff
Fix locking in WAL receiver/sender shmem state structs
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 30 Jun 2017 22:06:33 +0000 (18:06 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 30 Jun 2017 22:06:33 +0000 (18:06 -0400)
In WAL receiver and WAL server, some accesses to their corresponding
shared memory control structs were done without holding any kind of
lock, which could lead to inconsistent and possibly insecure results.

In walsender, fix by clarifying the locking rules and following them
correctly, as documented in the new comment in walsender_private.h;
namely that some members can be read in walsender itself without a lock,
because the only writes occur in the same process.  The rest of the
struct requires spinlock for accesses, as usual.

In walreceiver, fix by always holding spinlock while accessing the
struct.

While there is potentially a problem in all branches, it is minor in
stable ones.  This only became a real problem in pg10 because of quorum
commit in synchronous replication (commit 3901fd70cc7c), and a potential
security problem in walreceiver because a superuser() check was removed
by default monitoring roles (commit 25fff40798fc).  Thus, no backpatch.

In passing, clean up some leftover braces which were used to create
unconditional blocks.  Once upon a time these were used for
volatile-izing accesses to those shmem structs, which is no longer
required.  Many other occurrences of this pattern remain.

Author: Michaël Paquier
Reported-by: Michaël Paquier
Reviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro,
Robert Haas
Discussion: https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com

src/backend/replication/syncrep.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walreceiver.h
src/include/replication/walsender_private.h

index 5fd47689dd2a62f79af1129942926ab9ef2b02a8..6a28becdad57c2019b799e200325924e771d9ca2 100644 (file)
@@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
 
        for (i = 0; i < max_wal_senders; i++)
        {
+               XLogRecPtr      flush;
+               WalSndState     state;
+               int                     pid;
+
                walsnd = &WalSndCtl->walsnds[i];
 
+               SpinLockAcquire(&walsnd->mutex);
+               pid = walsnd->pid;
+               flush = walsnd->flush;
+               state = walsnd->state;
+               SpinLockRelease(&walsnd->mutex);
+
                /* Must be active */
-               if (walsnd->pid == 0)
+               if (pid == 0)
                        continue;
 
                /* Must be streaming */
-               if (walsnd->state != WALSNDSTATE_STREAMING)
+               if (state != WALSNDSTATE_STREAMING)
                        continue;
 
                /* Must be synchronous */
@@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
                        continue;
 
                /* Must have a valid flush position */
-               if (XLogRecPtrIsInvalid(walsnd->flush))
+               if (XLogRecPtrIsInvalid(flush))
                        continue;
 
                /*
@@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
         */
        for (i = 0; i < max_wal_senders; i++)
        {
+               XLogRecPtr      flush;
+               WalSndState     state;
+               int                     pid;
+
                walsnd = &WalSndCtl->walsnds[i];
 
+               SpinLockAcquire(&walsnd->mutex);
+               pid = walsnd->pid;
+               flush = walsnd->flush;
+               state = walsnd->state;
+               SpinLockRelease(&walsnd->mutex);
+
                /* Must be active */
-               if (walsnd->pid == 0)
+               if (pid == 0)
                        continue;
 
                /* Must be streaming */
-               if (walsnd->state != WALSNDSTATE_STREAMING)
+               if (state != WALSNDSTATE_STREAMING)
                        continue;
 
                /* Must be synchronous */
@@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
                        continue;
 
                /* Must have a valid flush position */
-               if (XLogRecPtrIsInvalid(walsnd->flush))
+               if (XLogRecPtrIsInvalid(flush))
                        continue;
 
                /*
index 8a249e22b9f113810cb94911af6f43104f7695d2..ea9d21a46b396cb6dd4da7443d6df73a23fa6eaa 100644 (file)
@@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
        TupleDesc       tupdesc;
        Datum      *values;
        bool       *nulls;
-       WalRcvData *walrcv = WalRcv;
+       int                     pid;
+       bool            ready_to_display;
        WalRcvState state;
        XLogRecPtr      receive_start_lsn;
        TimeLineID      receive_start_tli;
@@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
        char       *slotname;
        char       *conninfo;
 
+       /* Take a lock to ensure value consistency */
+       SpinLockAcquire(&WalRcv->mutex);
+       pid = (int) WalRcv->pid;
+       ready_to_display = WalRcv->ready_to_display;
+       state = WalRcv->walRcvState;
+       receive_start_lsn = WalRcv->receiveStart;
+       receive_start_tli = WalRcv->receiveStartTLI;
+       received_lsn = WalRcv->receivedUpto;
+       received_tli = WalRcv->receivedTLI;
+       last_send_time = WalRcv->lastMsgSendTime;
+       last_receipt_time = WalRcv->lastMsgReceiptTime;
+       latest_end_lsn = WalRcv->latestWalEnd;
+       latest_end_time = WalRcv->latestWalEndTime;
+       slotname = pstrdup(WalRcv->slotname);
+       conninfo = pstrdup(WalRcv->conninfo);
+       SpinLockRelease(&WalRcv->mutex);
+
        /*
         * No WAL receiver (or not ready yet), just return a tuple with NULL
         * values
         */
-       if (walrcv->pid == 0 || !walrcv->ready_to_display)
+       if (pid == 0 || !ready_to_display)
                PG_RETURN_NULL();
 
        /* determine result type */
@@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
        values = palloc0(sizeof(Datum) * tupdesc->natts);
        nulls = palloc0(sizeof(bool) * tupdesc->natts);
 
-       /* Take a lock to ensure value consistency */
-       SpinLockAcquire(&walrcv->mutex);
-       state = walrcv->walRcvState;
-       receive_start_lsn = walrcv->receiveStart;
-       receive_start_tli = walrcv->receiveStartTLI;
-       received_lsn = walrcv->receivedUpto;
-       received_tli = walrcv->receivedTLI;
-       last_send_time = walrcv->lastMsgSendTime;
-       last_receipt_time = walrcv->lastMsgReceiptTime;
-       latest_end_lsn = walrcv->latestWalEnd;
-       latest_end_time = walrcv->latestWalEndTime;
-       slotname = pstrdup(walrcv->slotname);
-       conninfo = pstrdup(walrcv->conninfo);
-       SpinLockRelease(&walrcv->mutex);
-
        /* Fetch values */
-       values[0] = Int32GetDatum(walrcv->pid);
+       values[0] = Int32GetDatum(pid);
 
        if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
        {
@@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
        }
 
        /* Returns the record as Datum */
-       PG_RETURN_DATUM(HeapTupleGetDatum(
-                                                                         heap_form_tuple(tupdesc, values, nulls)));
+       PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
index da0553e016a8db5e154f75800f7997b76e2cfe7d..002143b26a23aab881cb8dbf1b47052112bc1220 100644 (file)
@@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd)
                sentPtr = cmd->startpoint;
 
                /* Initialize shared memory status, too */
-               {
-                       WalSnd     *walsnd = MyWalSnd;
-
-                       SpinLockAcquire(&walsnd->mutex);
-                       walsnd->sentPtr = sentPtr;
-                       SpinLockRelease(&walsnd->mutex);
-               }
+               SpinLockAcquire(&MyWalSnd->mutex);
+               MyWalSnd->sentPtr = sentPtr;
+               SpinLockRelease(&MyWalSnd->mutex);
 
                SyncRepInitConfig();
 
@@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
        sentPtr = MyReplicationSlot->data.confirmed_flush;
 
        /* Also update the sent position status in shared memory */
-       {
-               WalSnd     *walsnd = MyWalSnd;
-
-               SpinLockAcquire(&walsnd->mutex);
-               walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
-               SpinLockRelease(&walsnd->mutex);
-       }
+       SpinLockAcquire(&MyWalSnd->mutex);
+       MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+       SpinLockRelease(&MyWalSnd->mutex);
 
        replication_active = true;
 
@@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void)
        {
                WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
+               SpinLockAcquire(&walsnd->mutex);
                if (walsnd->pid == 0)
+               {
+                       SpinLockRelease(&walsnd->mutex);
                        continue;
-
-               SpinLockAcquire(&walsnd->mutex);
+               }
                walsnd->needreload = true;
                SpinLockRelease(&walsnd->mutex);
        }
@@ -3071,7 +3065,6 @@ WalSndWaitStopping(void)
 
                for (i = 0; i < max_wal_senders; i++)
                {
-                       WalSndState state;
                        WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
                        SpinLockAcquire(&walsnd->mutex);
@@ -3082,14 +3075,13 @@ WalSndWaitStopping(void)
                                continue;
                        }
 
-                       state = walsnd->state;
-                       SpinLockRelease(&walsnd->mutex);
-
-                       if (state != WALSNDSTATE_STOPPING)
+                       if (walsnd->state != WALSNDSTATE_STOPPING)
                        {
                                all_stopped = false;
+                               SpinLockRelease(&walsnd->mutex);
                                break;
                        }
+                       SpinLockRelease(&walsnd->mutex);
                }
 
                /* safe to leave if confirmation is done for all WAL senders */
@@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                TimeOffset      flushLag;
                TimeOffset      applyLag;
                int                     priority;
+               int                     pid;
                WalSndState state;
                Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
                bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
+               SpinLockAcquire(&walsnd->mutex);
                if (walsnd->pid == 0)
+               {
+                       SpinLockRelease(&walsnd->mutex);
                        continue;
-
-               SpinLockAcquire(&walsnd->mutex);
+               }
+               pid = walsnd->pid;
                sentPtr = walsnd->sentPtr;
                state = walsnd->state;
                write = walsnd->write;
@@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                SpinLockRelease(&walsnd->mutex);
 
                memset(nulls, 0, sizeof(nulls));
-               values[0] = Int32GetDatum(walsnd->pid);
+               values[0] = Int32GetDatum(pid);
 
                if (!superuser())
                {
@@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                         * which always returns an invalid flush location, as an
                         * asynchronous standby.
                         */
-                       priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
+                       priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
 
                        if (writeLag < 0)
                                nulls[6] = true;
index c8652dbd489246f21c51920999e9319a9c902ed6..9a8b2e207ec3e2eb62cd909cb7eee3b2e90baae8 100644 (file)
@@ -114,6 +114,9 @@ typedef struct
         */
        char            slotname[NAMEDATALEN];
 
+       /* set true once conninfo is ready to display (obfuscated pwds etc) */
+       bool            ready_to_display;
+
        slock_t         mutex;                  /* locks shared variables shown above */
 
        /*
@@ -122,9 +125,6 @@ typedef struct
         */
        bool            force_reply;
 
-       /* set true once conninfo is ready to display (obfuscated pwds etc) */
-       bool            ready_to_display;
-
        /*
         * Latch used by startup process to wake up walreceiver after telling it
         * where to start streaming (after setting receiveStart and
index 0aa80d5c3e28d189755a019a33f3fdf02808f895..17c68cba235f5a5cd9a220923f361df045ec36fe 100644 (file)
@@ -30,10 +30,17 @@ typedef enum WalSndState
 
 /*
  * Each walsender has a WalSnd struct in shared memory.
+ *
+ * This struct is protected by 'mutex', with two exceptions: one is
+ * sync_standby_priority as noted below.  The other exception is that some
+ * members are only written by the walsender process itself, and thus that
+ * process is free to read those members without holding spinlock.  pid and
+ * needreload always require the spinlock to be held for all accesses.
  */
 typedef struct WalSnd
 {
-       pid_t           pid;                    /* this walsender's process id, or 0 */
+       pid_t           pid;                    /* this walsender's PID, or 0 if not active */
+
        WalSndState state;                      /* this walsender's state */
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */
        bool            needreload;             /* does currently-open file need to be