]> granicus.if.org Git - postgresql/commitdiff
Fix replication origin-related race conditions
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 8 Aug 2017 20:07:46 +0000 (16:07 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 8 Aug 2017 20:07:46 +0000 (16:07 -0400)
Similar to what was fixed in commit 9915de6c1cb2 for replication slots,
but this time it's related to replication origins: DROP SUBSCRIPTION
attempts to drop the replication origin, but that fails if the
replication worker process hasn't yet marked it unused.  This causes
failures in the buildfarm:
ERROR:  could not drop replication origin with OID 1, in use by PID 34069

Like the aforementioned commit, fix by having the process running DROP
SUBSCRIPTION sleep until the worker marks the the replication origin
struct as free.  This uses a condition variable on each replication
origin shmem state struct, so that the session trying to drop can sleep
and expect to be awakened by the process keeping the origin open.

Also fix a SGML markup in the previous commit.

Discussion: https://postgr.es/m/20170808001433.rozlseaf4m2wkw3n@alvherre.pgsql

doc/src/sgml/monitoring.sgml
src/backend/commands/subscriptioncmds.c
src/backend/postmaster/pgstat.c
src/backend/replication/logical/origin.c
src/include/pgstat.h
src/include/replication/origin.h

index eb20c9c543b9a96fd165d64d791673586791e4fd..12d562826697cc39f21d82287872fb097ec1f4a7 100644 (file)
@@ -1222,11 +1222,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
         </row>
         <row>
          <entry><literal>LibPQWalReceiverConnect</></entry>
-         <entry>Waiting in WAL receiver to establish connection to remote server.<entry>
+         <entry>Waiting in WAL receiver to establish connection to remote server.</entry>
         </row>
         <row>
          <entry><literal>LibPQWalReceiverReceive</></entry>
-         <entry>Waiting in WAL receiver to receive data from remote server.<entry>
+         <entry>Waiting in WAL receiver to receive data from remote server.</entry>
         </row>
         <row>
          <entry><literal>SSLOpenServer</></entry>
@@ -1302,6 +1302,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>ProcArrayGroupUpdate</></entry>
          <entry>Waiting for group leader to clear transaction id at transaction end.</entry>
         </row>
+        <row>
+         <entry><literal>ReplicationOriginDrop</></entry>
+         <entry>Waiting for a replication origin to become inactive to be dropped.</entry>
+        </row>
         <row>
          <entry><literal>ReplicationSlotDrop</></entry>
          <entry>Waiting for a replication slot to become inactive to be dropped.</entry>
index 3593712791208a36289b2da097b742202c8fd614..ae40f7164d87a585a60b3b646d6af0955c6fdc0d 100644 (file)
@@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        snprintf(originname, sizeof(originname), "pg_%u", subid);
        originid = replorigin_by_name(originname, true);
        if (originid != InvalidRepOriginId)
-               replorigin_drop(originid);
+               replorigin_drop(originid, false);
 
        /*
         * If there is no slot associated with the subscription, we can finish
index 3f5fb796a5eecef91d6b6865a0e30433c556c190..1f75e2e97d054ea82ce29a3afbe971a30197531a 100644 (file)
@@ -3609,6 +3609,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
                        event_name = "ProcArrayGroupUpdate";
                        break;
+               case WAIT_EVENT_REPLICATION_ORIGIN_DROP:
+                       event_name = "ReplicationOriginDrop";
+                       break;
                case WAIT_EVENT_REPLICATION_SLOT_DROP:
                        event_name = "ReplicationSlotDrop";
                        break;
index 1c665312a485935e6c36bae4fe3cbe545e460c74..9e1b19bb3545cb74eb1a84751bf6f8458fcd67b0 100644 (file)
 #include "access/xact.h"
 
 #include "catalog/indexing.h"
-
 #include "nodes/execnodes.h"
 
 #include "replication/origin.h"
 #include "replication/logical.h"
-
+#include "pgstat.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/copydir.h"
 
 #include "utils/builtins.h"
@@ -124,6 +124,11 @@ typedef struct ReplicationState
         */
        int                     acquired_by;
 
+       /*
+        * Condition variable that's signalled when acquired_by changes.
+        */
+       ConditionVariable origin_cv;
+
        /*
         * Lock protecting remote_lsn and local_lsn.
         */
@@ -324,9 +329,9 @@ replorigin_create(char *roname)
  * Needs to be called in a transaction.
  */
 void
-replorigin_drop(RepOriginId roident)
+replorigin_drop(RepOriginId roident, bool nowait)
 {
-       HeapTuple       tuple = NULL;
+       HeapTuple       tuple;
        Relation        rel;
        int                     i;
 
@@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident)
 
        rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
 
+restart:
+       tuple = NULL;
        /* cleanup the slot state info */
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
@@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident)
                {
                        if (state->acquired_by != 0)
                        {
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_OBJECT_IN_USE),
-                                                errmsg("could not drop replication origin with OID %d, in use by PID %d",
-                                                               state->roident,
-                                                               state->acquired_by)));
+                               ConditionVariable  *cv;
+
+                               if (nowait)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_OBJECT_IN_USE),
+                                                        errmsg("could not drop replication origin with OID %d, in use by PID %d",
+                                                                       state->roident,
+                                                                       state->acquired_by)));
+                               cv = &state->origin_cv;
+
+                               LWLockRelease(ReplicationOriginLock);
+                               ConditionVariablePrepareToSleep(cv);
+                               ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
+                               ConditionVariableCancelSleep();
+                               goto restart;
                        }
 
                        /* first WAL log */
@@ -382,7 +399,7 @@ replorigin_drop(RepOriginId roident)
 
        CommandCounterIncrement();
 
-       /* now release lock again,      */
+       /* now release lock again */
        heap_close(rel, ExclusiveLock);
 }
 
@@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void)
                MemSet(replication_states, 0, ReplicationOriginShmemSize());
 
                for (i = 0; i < max_replication_slots; i++)
+               {
                        LWLockInitialize(&replication_states[i].lock,
                                                         replication_states_ctl->tranche_id);
+                       ConditionVariableInit(&replication_states[i].origin_cv);
+               }
        }
 
        LWLockRegisterTranche(replication_states_ctl->tranche_id,
@@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush)
 static void
 ReplicationOriginExitCleanup(int code, Datum arg)
 {
+       ConditionVariable   *cv = NULL;
+
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
        if (session_replication_state != NULL &&
                session_replication_state->acquired_by == MyProcPid)
        {
+               cv = &session_replication_state->origin_cv;
+
                session_replication_state->acquired_by = 0;
                session_replication_state = NULL;
        }
 
        LWLockRelease(ReplicationOriginLock);
+
+       if (cv)
+               ConditionVariableBroadcast(cv);
 }
 
 /*
@@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node)
        session_replication_state->acquired_by = MyProcPid;
 
        LWLockRelease(ReplicationOriginLock);
+
+       /* probably this one is pointless */
+       ConditionVariableBroadcast(&session_replication_state->origin_cv);
 }
 
 /*
@@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node)
 void
 replorigin_session_reset(void)
 {
+       ConditionVariable  *cv;
+
        Assert(max_replication_slots != 0);
 
        if (session_replication_state == NULL)
@@ -1077,9 +1109,12 @@ replorigin_session_reset(void)
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
        session_replication_state->acquired_by = 0;
+       cv = &session_replication_state->origin_cv;
        session_replication_state = NULL;
 
        LWLockRelease(ReplicationOriginLock);
+
+       ConditionVariableBroadcast(cv);
 }
 
 /*
@@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
        roident = replorigin_by_name(name, false);
        Assert(OidIsValid(roident));
 
-       replorigin_drop(roident);
+       replorigin_drop(roident, false);
 
        pfree(name);
 
index 43ea55e9eb60f377e831226788c8df3d88ef5e50..cb05d9b81e518264eed89db3a11aa661e8ce0c48 100644 (file)
@@ -812,6 +812,7 @@ typedef enum
        WAIT_EVENT_PARALLEL_FINISH,
        WAIT_EVENT_PARALLEL_BITMAP_SCAN,
        WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
+       WAIT_EVENT_REPLICATION_ORIGIN_DROP,
        WAIT_EVENT_REPLICATION_SLOT_DROP,
        WAIT_EVENT_SAFE_SNAPSHOT,
        WAIT_EVENT_SYNC_REP
index ca56c01469f811a557627ed84b19906e814afc88..a9595c3c3da3bf47ca050c48d7f7f809b1774de0 100644 (file)
@@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
 /* API for querying & manipulating replication origins */
 extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
 extern RepOriginId replorigin_create(char *name);
-extern void replorigin_drop(RepOriginId roident);
+extern void replorigin_drop(RepOriginId roident, bool nowait);
 extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
                                  char **roname);