</row>
<row>
<entry><literal>LibPQWalReceiverConnect</></entry>
- <entry>Waiting in WAL receiver to establish connection to remote server.<entry>
+ <entry>Waiting in WAL receiver to establish connection to remote server.</entry>
</row>
<row>
<entry><literal>LibPQWalReceiverReceive</></entry>
- <entry>Waiting in WAL receiver to receive data from remote server.<entry>
+ <entry>Waiting in WAL receiver to receive data from remote server.</entry>
</row>
<row>
<entry><literal>SSLOpenServer</></entry>
<entry><literal>ProcArrayGroupUpdate</></entry>
<entry>Waiting for group leader to clear transaction id at transaction end.</entry>
</row>
+ <row>
+ <entry><literal>ReplicationOriginDrop</></entry>
+ <entry>Waiting for a replication origin to become inactive to be dropped.</entry>
+ </row>
<row>
<entry><literal>ReplicationSlotDrop</></entry>
<entry>Waiting for a replication slot to become inactive to be dropped.</entry>
#include "access/xact.h"
#include "catalog/indexing.h"
-
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
-
+#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
*/
int acquired_by;
+ /*
+ * Condition variable that's signalled when acquired_by changes.
+ */
+ ConditionVariable origin_cv;
+
/*
* Lock protecting remote_lsn and local_lsn.
*/
* Needs to be called in a transaction.
*/
void
-replorigin_drop(RepOriginId roident)
+replorigin_drop(RepOriginId roident, bool nowait)
{
- HeapTuple tuple = NULL;
+ HeapTuple tuple;
Relation rel;
int i;
rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+restart:
+ tuple = NULL;
/* cleanup the slot state info */
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
{
if (state->acquired_by != 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with OID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ ConditionVariable *cv;
+
+ if (nowait)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not drop replication origin with OID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)));
+ cv = &state->origin_cv;
+
+ LWLockRelease(ReplicationOriginLock);
+ ConditionVariablePrepareToSleep(cv);
+ ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
+ ConditionVariableCancelSleep();
+ goto restart;
}
/* first WAL log */
CommandCounterIncrement();
- /* now release lock again, */
+ /* now release lock again */
heap_close(rel, ExclusiveLock);
}
MemSet(replication_states, 0, ReplicationOriginShmemSize());
for (i = 0; i < max_replication_slots; i++)
+ {
LWLockInitialize(&replication_states[i].lock,
replication_states_ctl->tranche_id);
+ ConditionVariableInit(&replication_states[i].origin_cv);
+ }
}
LWLockRegisterTranche(replication_states_ctl->tranche_id,
static void
ReplicationOriginExitCleanup(int code, Datum arg)
{
+ ConditionVariable *cv = NULL;
+
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
if (session_replication_state != NULL &&
session_replication_state->acquired_by == MyProcPid)
{
+ cv = &session_replication_state->origin_cv;
+
session_replication_state->acquired_by = 0;
session_replication_state = NULL;
}
LWLockRelease(ReplicationOriginLock);
+
+ if (cv)
+ ConditionVariableBroadcast(cv);
}
/*
session_replication_state->acquired_by = MyProcPid;
LWLockRelease(ReplicationOriginLock);
+
+ /* probably this one is pointless */
+ ConditionVariableBroadcast(&session_replication_state->origin_cv);
}
/*
void
replorigin_session_reset(void)
{
+ ConditionVariable *cv;
+
Assert(max_replication_slots != 0);
if (session_replication_state == NULL)
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
session_replication_state->acquired_by = 0;
+ cv = &session_replication_state->origin_cv;
session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
+
+ ConditionVariableBroadcast(cv);
}
/*
roident = replorigin_by_name(name, false);
Assert(OidIsValid(roident));
- replorigin_drop(roident);
+ replorigin_drop(roident, false);
pfree(name);