From b2c95a3798ff39fc24d71b6655ddfe0e4cb3f378 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Tue, 8 Aug 2017 16:07:46 -0400 Subject: [PATCH] Fix replication origin-related race conditions Similar to what was fixed in commit 9915de6c1cb2 for replication slots, but this time it's related to replication origins: DROP SUBSCRIPTION attempts to drop the replication origin, but that fails if the replication worker process hasn't yet marked it unused. This causes failures in the buildfarm: ERROR: could not drop replication origin with OID 1, in use by PID 34069 Like the aforementioned commit, fix by having the process running DROP SUBSCRIPTION sleep until the worker marks the the replication origin struct as free. This uses a condition variable on each replication origin shmem state struct, so that the session trying to drop can sleep and expect to be awakened by the process keeping the origin open. Also fix a SGML markup in the previous commit. Discussion: https://postgr.es/m/20170808001433.rozlseaf4m2wkw3n@alvherre.pgsql --- doc/src/sgml/monitoring.sgml | 8 +++- src/backend/commands/subscriptioncmds.c | 2 +- src/backend/postmaster/pgstat.c | 3 ++ src/backend/replication/logical/origin.c | 57 +++++++++++++++++++----- src/include/pgstat.h | 1 + src/include/replication/origin.h | 2 +- 6 files changed, 58 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index eb20c9c543..12d5628266 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1222,11 +1222,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser LibPQWalReceiverConnect - Waiting in WAL receiver to establish connection to remote server. + Waiting in WAL receiver to establish connection to remote server. LibPQWalReceiverReceive - Waiting in WAL receiver to receive data from remote server. + Waiting in WAL receiver to receive data from remote server. SSLOpenServer @@ -1302,6 +1302,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ProcArrayGroupUpdate Waiting for group leader to clear transaction id at transaction end. + + ReplicationOriginDrop + Waiting for a replication origin to become inactive to be dropped. + ReplicationSlotDrop Waiting for a replication slot to become inactive to be dropped. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3593712791..ae40f7164d 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) - replorigin_drop(originid); + replorigin_drop(originid, false); /* * If there is no slot associated with the subscription, we can finish diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3f5fb796a5..1f75e2e97d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3609,6 +3609,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: event_name = "ProcArrayGroupUpdate"; break; + case WAIT_EVENT_REPLICATION_ORIGIN_DROP: + event_name = "ReplicationOriginDrop"; + break; case WAIT_EVENT_REPLICATION_SLOT_DROP: event_name = "ReplicationSlotDrop"; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1c665312a4..9e1b19bb35 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -79,15 +79,15 @@ #include "access/xact.h" #include "catalog/indexing.h" - #include "nodes/execnodes.h" #include "replication/origin.h" #include "replication/logical.h" - +#include "pgstat.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/condition_variable.h" #include "storage/copydir.h" #include "utils/builtins.h" @@ -124,6 +124,11 @@ typedef struct ReplicationState */ int acquired_by; + /* + * Condition variable that's signalled when acquired_by changes. + */ + ConditionVariable origin_cv; + /* * Lock protecting remote_lsn and local_lsn. */ @@ -324,9 +329,9 @@ replorigin_create(char *roname) * Needs to be called in a transaction. */ void -replorigin_drop(RepOriginId roident) +replorigin_drop(RepOriginId roident, bool nowait) { - HeapTuple tuple = NULL; + HeapTuple tuple; Relation rel; int i; @@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident) rel = heap_open(ReplicationOriginRelationId, ExclusiveLock); +restart: + tuple = NULL; /* cleanup the slot state info */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); @@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident) { if (state->acquired_by != 0) { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop replication origin with OID %d, in use by PID %d", - state->roident, - state->acquired_by))); + ConditionVariable *cv; + + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not drop replication origin with OID %d, in use by PID %d", + state->roident, + state->acquired_by))); + cv = &state->origin_cv; + + LWLockRelease(ReplicationOriginLock); + ConditionVariablePrepareToSleep(cv); + ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); + ConditionVariableCancelSleep(); + goto restart; } /* first WAL log */ @@ -382,7 +399,7 @@ replorigin_drop(RepOriginId roident) CommandCounterIncrement(); - /* now release lock again, */ + /* now release lock again */ heap_close(rel, ExclusiveLock); } @@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void) MemSet(replication_states, 0, ReplicationOriginShmemSize()); for (i = 0; i < max_replication_slots; i++) + { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); + ConditionVariableInit(&replication_states[i].origin_cv); + } } LWLockRegisterTranche(replication_states_ctl->tranche_id, @@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush) static void ReplicationOriginExitCleanup(int code, Datum arg) { + ConditionVariable *cv = NULL; + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (session_replication_state != NULL && session_replication_state->acquired_by == MyProcPid) { + cv = &session_replication_state->origin_cv; + session_replication_state->acquired_by = 0; session_replication_state = NULL; } LWLockRelease(ReplicationOriginLock); + + if (cv) + ConditionVariableBroadcast(cv); } /* @@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node) session_replication_state->acquired_by = MyProcPid; LWLockRelease(ReplicationOriginLock); + + /* probably this one is pointless */ + ConditionVariableBroadcast(&session_replication_state->origin_cv); } /* @@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node) void replorigin_session_reset(void) { + ConditionVariable *cv; + Assert(max_replication_slots != 0); if (session_replication_state == NULL) @@ -1077,9 +1109,12 @@ replorigin_session_reset(void) LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; + cv = &session_replication_state->origin_cv; session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); } /* @@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS) roident = replorigin_by_name(name, false); Assert(OidIsValid(roident)); - replorigin_drop(roident); + replorigin_drop(roident, false); pfree(name); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 43ea55e9eb..cb05d9b81e 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -812,6 +812,7 @@ typedef enum WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_PARALLEL_BITMAP_SCAN, WAIT_EVENT_PROCARRAY_GROUP_UPDATE, + WAIT_EVENT_REPLICATION_ORIGIN_DROP, WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index ca56c01469..a9595c3c3d 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(char *name, bool missing_ok); extern RepOriginId replorigin_create(char *name); -extern void replorigin_drop(RepOriginId roident); +extern void replorigin_drop(RepOriginId roident, bool nowait); extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname); -- 2.40.0