Assert(IsTransactionState());
+ /*
+ * To interlock against concurrent drops, we hold ExclusiveLock on
+ * pg_replication_origin throughout this funcion.
+ */
rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+ /*
+ * First, clean up the slot state info, if there is any matching slot.
+ */
restart:
tuple = NULL;
- /* cleanup the slot state info */
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationState *state = &replication_states[i];
- /* found our slot */
if (state->roident == roident)
{
+ /* found our slot, is it busy? */
if (state->acquired_by != 0)
{
ConditionVariable *cv;
errmsg("could not drop replication origin with OID %d, in use by PID %d",
state->roident,
state->acquired_by)));
+
+ /*
+ * We must wait and then retry. Since we don't know which CV
+ * to wait on until here, we can't readily use
+ * ConditionVariablePrepareToSleep (calling it here would be
+ * wrong, since we could miss the signal if we did so); just
+ * use ConditionVariableSleep directly.
+ */
cv = &state->origin_cv;
LWLockRelease(ReplicationOriginLock);
- ConditionVariablePrepareToSleep(cv);
+
ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
- ConditionVariableCancelSleep();
goto restart;
}
- /* first WAL log */
+ /* first make a WAL log entry */
{
xl_replorigin_drop xlrec;
XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
}
- /* then reset the in-memory entry */
+ /* then clear the in-memory slot */
state->roident = InvalidRepOriginId;
state->remote_lsn = InvalidXLogRecPtr;
state->local_lsn = InvalidXLogRecPtr;
}
}
LWLockRelease(ReplicationOriginLock);
+ ConditionVariableCancelSleep();
+ /*
+ * Now, we can delete the catalog entry.
+ */
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for replication origin with oid %u",