From 572d6ee6d41b43b8871f42c7dbbef795523b2dbf Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 30 Jun 2017 18:06:33 -0400 Subject: [PATCH] Fix locking in WAL receiver/sender shmem state structs MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit In WAL receiver and WAL server, some accesses to their corresponding shared memory control structs were done without holding any kind of lock, which could lead to inconsistent and possibly insecure results. In walsender, fix by clarifying the locking rules and following them correctly, as documented in the new comment in walsender_private.h; namely that some members can be read in walsender itself without a lock, because the only writes occur in the same process. The rest of the struct requires spinlock for accesses, as usual. In walreceiver, fix by always holding spinlock while accessing the struct. While there is potentially a problem in all branches, it is minor in stable ones. This only became a real problem in pg10 because of quorum commit in synchronous replication (commit 3901fd70cc7c), and a potential security problem in walreceiver because a superuser() check was removed by default monitoring roles (commit 25fff40798fc). Thus, no backpatch. In passing, clean up some leftover braces which were used to create unconditional blocks. Once upon a time these were used for volatile-izing accesses to those shmem structs, which is no longer required. Many other occurrences of this pattern remain. Author: Michaël Paquier Reported-by: Michaël Paquier Reviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro, Robert Haas Discussion: https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com --- src/backend/replication/syncrep.c | 32 +++++++++++--- src/backend/replication/walreceiver.c | 42 ++++++++++--------- src/backend/replication/walsender.c | 46 ++++++++++----------- src/include/replication/walreceiver.h | 6 +-- src/include/replication/walsender_private.h | 9 +++- 5 files changed, 80 insertions(+), 55 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 5fd47689dd..6a28becdad 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* @@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8a249e22b9..ea9d21a46b 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) TupleDesc tupdesc; Datum *values; bool *nulls; - WalRcvData *walrcv = WalRcv; + int pid; + bool ready_to_display; WalRcvState state; XLogRecPtr receive_start_lsn; TimeLineID receive_start_tli; @@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) char *slotname; char *conninfo; + /* Take a lock to ensure value consistency */ + SpinLockAcquire(&WalRcv->mutex); + pid = (int) WalRcv->pid; + ready_to_display = WalRcv->ready_to_display; + state = WalRcv->walRcvState; + receive_start_lsn = WalRcv->receiveStart; + receive_start_tli = WalRcv->receiveStartTLI; + received_lsn = WalRcv->receivedUpto; + received_tli = WalRcv->receivedTLI; + last_send_time = WalRcv->lastMsgSendTime; + last_receipt_time = WalRcv->lastMsgReceiptTime; + latest_end_lsn = WalRcv->latestWalEnd; + latest_end_time = WalRcv->latestWalEndTime; + slotname = pstrdup(WalRcv->slotname); + conninfo = pstrdup(WalRcv->conninfo); + SpinLockRelease(&WalRcv->mutex); + /* * No WAL receiver (or not ready yet), just return a tuple with NULL * values */ - if (walrcv->pid == 0 || !walrcv->ready_to_display) + if (pid == 0 || !ready_to_display) PG_RETURN_NULL(); /* determine result type */ @@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) values = palloc0(sizeof(Datum) * tupdesc->natts); nulls = palloc0(sizeof(bool) * tupdesc->natts); - /* Take a lock to ensure value consistency */ - SpinLockAcquire(&walrcv->mutex); - state = walrcv->walRcvState; - receive_start_lsn = walrcv->receiveStart; - receive_start_tli = walrcv->receiveStartTLI; - received_lsn = walrcv->receivedUpto; - received_tli = walrcv->receivedTLI; - last_send_time = walrcv->lastMsgSendTime; - last_receipt_time = walrcv->lastMsgReceiptTime; - latest_end_lsn = walrcv->latestWalEnd; - latest_end_time = walrcv->latestWalEndTime; - slotname = pstrdup(walrcv->slotname); - conninfo = pstrdup(walrcv->conninfo); - SpinLockRelease(&walrcv->mutex); - /* Fetch values */ - values[0] = Int32GetDatum(walrcv->pid); + values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { @@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) } /* Returns the record as Datum */ - PG_RETURN_DATUM(HeapTupleGetDatum( - heap_form_tuple(tupdesc, values, nulls))); + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index da0553e016..002143b26a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd) sentPtr = cmd->startpoint; /* Initialize shared memory status, too */ - { - WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); - } + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = sentPtr; + SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); @@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) sentPtr = MyReplicationSlot->data.confirmed_flush; /* Also update the sent position status in shared memory */ - { - WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = MyReplicationSlot->data.restart_lsn; - SpinLockRelease(&walsnd->mutex); - } + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&MyWalSnd->mutex); replication_active = true; @@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } @@ -3071,7 +3065,6 @@ WalSndWaitStopping(void) for (i = 0; i < max_wal_senders; i++) { - WalSndState state; WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); @@ -3082,14 +3075,13 @@ WalSndWaitStopping(void) continue; } - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - if (state != WALSNDSTATE_STOPPING) + if (walsnd->state != WALSNDSTATE_STOPPING) { all_stopped = false; + SpinLockRelease(&walsnd->mutex); break; } + SpinLockRelease(&walsnd->mutex); } /* safe to leave if confirmation is done for all WAL senders */ @@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset flushLag; TimeOffset applyLag; int priority; + int pid; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } + pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; @@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(walsnd->pid); + values[0] = Int32GetDatum(pid); if (!superuser()) { @@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * which always returns an invalid flush location, as an * asynchronous standby. */ - priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; + priority = XLogRecPtrIsInvalid(flush) ? 0 : priority; if (writeLag < 0) nulls[6] = true; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index c8652dbd48..9a8b2e207e 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -114,6 +114,9 @@ typedef struct */ char slotname[NAMEDATALEN]; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ + bool ready_to_display; + slock_t mutex; /* locks shared variables shown above */ /* @@ -122,9 +125,6 @@ typedef struct */ bool force_reply; - /* set true once conninfo is ready to display (obfuscated pwds etc) */ - bool ready_to_display; - /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 0aa80d5c3e..17c68cba23 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -30,10 +30,17 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. + * + * This struct is protected by 'mutex', with two exceptions: one is + * sync_standby_priority as noted below. The other exception is that some + * members are only written by the walsender process itself, and thus that + * process is free to read those members without holding spinlock. pid and + * needreload always require the spinlock to be held for all accesses. */ typedef struct WalSnd { - pid_t pid; /* this walsender's process id, or 0 */ + pid_t pid; /* this walsender's PID, or 0 if not active */ + WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be -- 2.40.0