In WAL receiver and WAL server, some accesses to their corresponding
shared memory control structs were done without holding any kind of
lock, which could lead to inconsistent and possibly insecure results.
In walsender, fix by clarifying the locking rules and following them
correctly, as documented in the new comment in walsender_private.h;
namely that some members can be read in walsender itself without a lock,
because the only writes occur in the same process. The rest of the
struct requires spinlock for accesses, as usual.
In walreceiver, fix by always holding spinlock while accessing the
struct.
While there is potentially a problem in all branches, it is minor in
stable ones. This only became a real problem in pg10 because of quorum
commit in synchronous replication (commit
3901fd70cc7c), and a potential
security problem in walreceiver because a superuser() check was removed
by default monitoring roles (commit
25fff40798fc). Thus, no backpatch.
In passing, clean up some leftover braces which were used to create
unconditional blocks. Once upon a time these were used for
volatile-izing accesses to those shmem structs, which is no longer
required. Many other occurrences of this pattern remain.
Author: Michaël Paquier
Reported-by: Michaël Paquier
Reviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro,
Robert Haas
Discussion: https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
*/
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
TupleDesc tupdesc;
Datum *values;
bool *nulls;
- WalRcvData *walrcv = WalRcv;
+ int pid;
+ bool ready_to_display;
WalRcvState state;
XLogRecPtr receive_start_lsn;
TimeLineID receive_start_tli;
char *slotname;
char *conninfo;
+ /* Take a lock to ensure value consistency */
+ SpinLockAcquire(&WalRcv->mutex);
+ pid = (int) WalRcv->pid;
+ ready_to_display = WalRcv->ready_to_display;
+ state = WalRcv->walRcvState;
+ receive_start_lsn = WalRcv->receiveStart;
+ receive_start_tli = WalRcv->receiveStartTLI;
+ received_lsn = WalRcv->receivedUpto;
+ received_tli = WalRcv->receivedTLI;
+ last_send_time = WalRcv->lastMsgSendTime;
+ last_receipt_time = WalRcv->lastMsgReceiptTime;
+ latest_end_lsn = WalRcv->latestWalEnd;
+ latest_end_time = WalRcv->latestWalEndTime;
+ slotname = pstrdup(WalRcv->slotname);
+ conninfo = pstrdup(WalRcv->conninfo);
+ SpinLockRelease(&WalRcv->mutex);
+
/*
* No WAL receiver (or not ready yet), just return a tuple with NULL
* values
*/
- if (walrcv->pid == 0 || !walrcv->ready_to_display)
+ if (pid == 0 || !ready_to_display)
PG_RETURN_NULL();
/* determine result type */
values = palloc0(sizeof(Datum) * tupdesc->natts);
nulls = palloc0(sizeof(bool) * tupdesc->natts);
- /* Take a lock to ensure value consistency */
- SpinLockAcquire(&walrcv->mutex);
- state = walrcv->walRcvState;
- receive_start_lsn = walrcv->receiveStart;
- receive_start_tli = walrcv->receiveStartTLI;
- received_lsn = walrcv->receivedUpto;
- received_tli = walrcv->receivedTLI;
- last_send_time = walrcv->lastMsgSendTime;
- last_receipt_time = walrcv->lastMsgReceiptTime;
- latest_end_lsn = walrcv->latestWalEnd;
- latest_end_time = walrcv->latestWalEndTime;
- slotname = pstrdup(walrcv->slotname);
- conninfo = pstrdup(walrcv->conninfo);
- SpinLockRelease(&walrcv->mutex);
-
/* Fetch values */
- values[0] = Int32GetDatum(walrcv->pid);
+ values[0] = Int32GetDatum(pid);
if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
{
}
/* Returns the record as Datum */
- PG_RETURN_DATUM(HeapTupleGetDatum(
- heap_form_tuple(tupdesc, values, nulls)));
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
- {
- WalSnd *walsnd = MyWalSnd;
-
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
- }
+ SpinLockAcquire(&MyWalSnd->mutex);
+ MyWalSnd->sentPtr = sentPtr;
+ SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
sentPtr = MyReplicationSlot->data.confirmed_flush;
/* Also update the sent position status in shared memory */
- {
- WalSnd *walsnd = MyWalSnd;
-
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
- SpinLockRelease(&walsnd->mutex);
- }
+ SpinLockAcquire(&MyWalSnd->mutex);
+ MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&MyWalSnd->mutex);
replication_active = true;
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
walsnd->needreload = true;
SpinLockRelease(&walsnd->mutex);
}
for (i = 0; i < max_wal_senders; i++)
{
- WalSndState state;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
continue;
}
- state = walsnd->state;
- SpinLockRelease(&walsnd->mutex);
-
- if (state != WALSNDSTATE_STOPPING)
+ if (walsnd->state != WALSNDSTATE_STOPPING)
{
all_stopped = false;
+ SpinLockRelease(&walsnd->mutex);
break;
}
+ SpinLockRelease(&walsnd->mutex);
}
/* safe to leave if confirmation is done for all WAL senders */
TimeOffset flushLag;
TimeOffset applyLag;
int priority;
+ int pid;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
+ pid = walsnd->pid;
sentPtr = walsnd->sentPtr;
state = walsnd->state;
write = walsnd->write;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
- values[0] = Int32GetDatum(walsnd->pid);
+ values[0] = Int32GetDatum(pid);
if (!superuser())
{
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
- priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
+ priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
if (writeLag < 0)
nulls[6] = true;
*/
char slotname[NAMEDATALEN];
+ /* set true once conninfo is ready to display (obfuscated pwds etc) */
+ bool ready_to_display;
+
slock_t mutex; /* locks shared variables shown above */
/*
*/
bool force_reply;
- /* set true once conninfo is ready to display (obfuscated pwds etc) */
- bool ready_to_display;
-
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
/*
* Each walsender has a WalSnd struct in shared memory.
+ *
+ * This struct is protected by 'mutex', with two exceptions: one is
+ * sync_standby_priority as noted below. The other exception is that some
+ * members are only written by the walsender process itself, and thus that
+ * process is free to read those members without holding spinlock. pid and
+ * needreload always require the spinlock to be held for all accesses.
*/
typedef struct WalSnd
{
- pid_t pid; /* this walsender's process id, or 0 */
+ pid_t pid; /* this walsender's PID, or 0 if not active */
+
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be