From 9915de6c1cb2c9b87f5f504c97832cdf3a809753 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Tue, 25 Jul 2017 13:26:49 -0400 Subject: [PATCH] Fix race conditions in replication slot operations MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit It is relatively easy to get a replication slot to look as still active while one process is in the process of getting rid of it; when some other process tries to "acquire" the slot, it would fail with an error message of "replication slot XYZ is active for PID N". The error message in itself is fine, except that when the intention is to drop the slot, it is unhelpful: the useful behavior would be to wait until the slot is no longer acquired, so that the drop can proceed. To implement this, we use a condition variable so that slot acquisition can be told to wait on that condition variable if the slot is already acquired, and we make any change in active_pid broadcast a signal on the condition variable. Thus, as soon as the slot is released, the drop will proceed properly. Reported by: Tom Lane Discussion: https://postgr.es/m/11904.1499039688@sss.pgh.pa.us Authors: Petr Jelínek, Álvaro Herrera --- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 122 +++++++++++++----- src/backend/replication/slotfuncs.c | 34 +++-- src/backend/replication/walsender.c | 6 +- src/include/replication/slot.h | 10 +- 5 files changed, 118 insertions(+), 56 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 363ca82cb0..a3ba2b1266 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name)); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dc7de20e11..08c0b1b285 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); + ConditionVariableInit(&slot->active_cv); } } } @@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific, LWLockRelease(ReplicationSlotControlLock); /* - * Now that the slot has been marked as in_use and in_active, it's safe to + * Now that the slot has been marked as in_use and active, it's safe to * let somebody else try to allocate a slot. */ LWLockRelease(ReplicationSlotAllocationLock); + + /* Let everybody know we've modified this slot */ + ConditionVariableBroadcast(&slot->active_cv); } /* * Find a previously created slot and mark it as used by this backend. */ void -ReplicationSlotAcquire(const char *name) +ReplicationSlotAcquire(const char *name, bool nowait) { - ReplicationSlot *slot = NULL; + ReplicationSlot *slot; + int active_pid; int i; - int active_pid = 0; /* Keep compiler quiet */ +retry: Assert(MyReplicationSlot == NULL); - /* Search for the named slot and mark it active if we find it. */ + /* + * Search for the named slot and mark it active if we find it. If the + * slot is already active, we exit the loop with active_pid set to the PID + * of the backend that owns it. + */ + active_pid = 0; + slot = NULL; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -339,35 +350,66 @@ ReplicationSlotAcquire(const char *name) if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { + /* + * This is the slot we want. We don't know yet if it's active, + * so get ready to sleep on it in case it is. (We may end up not + * sleeping, but we don't want to do this while holding the + * spinlock.) + */ + ConditionVariablePrepareToSleep(&s->active_cv); + SpinLockAcquire(&s->mutex); + active_pid = s->active_pid; if (active_pid == 0) active_pid = s->active_pid = MyProcPid; + SpinLockRelease(&s->mutex); slot = s; + break; } } LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot or it was already active, error out. */ + /* If we did not find the slot, error out. */ if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); + + /* + * If we found the slot but it's already active in another backend, we + * either error out or retry after a short wait, as caller specified. + */ if (active_pid != MyProcPid) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + { + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is active for PID %d", + name, active_pid))); + + /* Wait here until we get signaled, and then restart */ + ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + goto retry; + } + else + ConditionVariableCancelSleep(); /* no sleep needed after all */ + + /* Let everybody know we've modified this slot */ + ConditionVariableBroadcast(&slot->active_cv); /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; } /* - * Release a replication slot, this or another backend can ReAcquire it - * later. Resources this slot requires will be preserved. + * Release the replication slot that this backend considers to own. + * + * This or another backend can re-acquire the slot later. + * Resources this slot requires will be preserved. */ void ReplicationSlotRelease(void) @@ -385,17 +427,6 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else if (slot->data.persistency == RS_PERSISTENT) - { - /* - * Mark persistent slot inactive. We're not freeing it, just - * disconnecting. - */ - SpinLockAcquire(&slot->mutex); - slot->active_pid = 0; - SpinLockRelease(&slot->mutex); - } - /* * If slot needed to temporarily restrain both data and catalog xmin to @@ -412,6 +443,18 @@ ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + if (slot->data.persistency == RS_PERSISTENT) + { + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting, but wake up others that may be waiting for it. + */ + SpinLockAcquire(&slot->mutex); + slot->active_pid = 0; + SpinLockRelease(&slot->mutex); + ConditionVariableBroadcast(&slot->active_cv); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -430,32 +473,43 @@ ReplicationSlotCleanup(void) Assert(MyReplicationSlot == NULL); - /* - * No need for locking as we are only interested in slots active in - * current process and those are not touched by other processes. - */ +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); if (s->active_pid == MyProcPid) { - Assert(s->in_use && s->data.persistency == RS_TEMPORARY); + Assert(s->data.persistency == RS_TEMPORARY); + SpinLockRelease(&s->mutex); + LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ ReplicationSlotDropPtr(s); + + ConditionVariableBroadcast(&s->active_cv); + goto restart; } + else + SpinLockRelease(&s->mutex); } + + LWLockRelease(ReplicationSlotControlLock); } /* * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name) +ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -527,6 +581,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) slot->active_pid = 0; SpinLockRelease(&slot->mutex); + /* wake up anyone waiting on this slot */ + ConditionVariableBroadcast(&slot->active_cv); + ereport(fail_softly ? WARNING : ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", @@ -535,15 +592,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) /* * The slot is definitely gone. Lock out concurrent scans of the array - * long enough to kill it. It's OK to clear the active flag here without + * long enough to kill it. It's OK to clear the active PID here without * grabbing the mutex because nobody else can be scanning the array here, * and nobody can be attached to this slot and thus access it without * scanning the array. + * + * Also wake up processes waiting for it. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active_pid = 0; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); + ConditionVariableBroadcast(&slot->active_cv); /* * Slot is dead and doesn't prevent resource removal anymore, recompute diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6dc808874d..d4cbd83bde 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name)); + ReplicationSlotDrop(NameStr(*name), false); PG_RETURN_VOID(); } @@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (slotno = 0; slotno < max_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; @@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) NameData plugin; int i; - SpinLockAcquire(&slot->mutex); if (!slot->in_use) - { - SpinLockRelease(&slot->mutex); continue; - } - else - { - xmin = slot->data.xmin; - catalog_xmin = slot->data.catalog_xmin; - database = slot->data.database; - restart_lsn = slot->data.restart_lsn; - confirmed_flush_lsn = slot->data.confirmed_flush; - namecpy(&slot_name, &slot->data.name); - namecpy(&plugin, &slot->data.plugin); - - active_pid = slot->active_pid; - persistency = slot->data.persistency; - } + + SpinLockAcquire(&slot->mutex); + + xmin = slot->data.xmin; + catalog_xmin = slot->data.catalog_xmin; + database = slot->data.database; + restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; + namecpy(&slot_name, &slot->data.name); + namecpy(&plugin, &slot->data.plugin); + active_pid = slot->active_pid; + persistency = slot->data.persistency; + SpinLockRelease(&slot->mutex); memset(nulls, 0, sizeof(nulls)); @@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + LWLockRelease(ReplicationSlotControlLock); tuplestore_donestoring(tupstore); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 002143b26a..9a2babef1e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } @@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a283f4e2b8..0bf2611fe9 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -12,6 +12,7 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogreader.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -19,7 +20,7 @@ /* * Behaviour of replication slots, upon release or crash. * - * Slots marked as PERSISTENT are crashsafe and will not be dropped when + * Slots marked as PERSISTENT are crash-safe and will not be dropped when * released. Slots marked as EPHEMERAL will be dropped when released or after * restarts. * @@ -117,6 +118,9 @@ typedef struct ReplicationSlot /* is somebody performing io on this slot? */ LWLock io_in_progress_lock; + /* Condition variable signalled when active_pid changes */ + ConditionVariable active_cv; + /* all the remaining data is only used for logical slots */ /* @@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); -- 2.40.0