LOCKTAG locktag;
/* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ if (!TransactionIdIsValid(xid) ||
+ TransactionIdDidCommit(xid) ||
+ TransactionIdDidAbort(xid))
return;
elog(trace_recovery(DEBUG4),
}
/*
- * StandbyReleaseLocksMany
- * Release standby locks held by XIDs < removeXid
- *
- * If keepPreparedXacts is true, keep prepared transactions even if
- * they're older than removeXid
+ * Called at end of recovery and when we see a shutdown checkpoint.
*/
-static void
-StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
+void
+StandbyReleaseAllLocks(void)
+{
+ ListCell *cell,
+ *prev,
+ *next;
+ LOCKTAG locktag;
+
+ elog(trace_recovery(DEBUG2), "release all standby locks");
+
+ prev = NULL;
+ for (cell = list_head(RecoveryLockList); cell; cell = next)
+ {
+ xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+
+ next = lnext(cell);
+
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %u rel %u",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ elog(LOG,
+ "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+ lock->xid, lock->dbOid, lock->relOid);
+ RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+ pfree(lock);
+ }
+}
+
+/*
+ * StandbyReleaseOldLocks
+ * Release standby locks held by XIDs that aren't running, as long
+ * as they're not prepared transactions.
+ */
+void
+StandbyReleaseOldLocks(int nxids, TransactionId *xids)
{
ListCell *cell,
*prev,
*next;
LOCKTAG locktag;
- /*
- * Release all matching locks.
- */
prev = NULL;
for (cell = list_head(RecoveryLockList); cell; cell = next)
{
xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ bool remove = false;
next = lnext(cell);
- if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid))
+ Assert(TransactionIdIsValid(lock->xid));
+
+ if (StandbyTransactionIdIsPrepared(lock->xid))
+ remove = false;
+ else
+ {
+ int i;
+ bool found = false;
+
+ for (i = 0; i < nxids; i++)
+ {
+ if (lock->xid == xids[i])
+ {
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * If its not a running transaction, remove it.
+ */
+ if (!found)
+ remove = true;
+ }
+
+ if (remove)
{
- if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid))
- continue;
elog(trace_recovery(DEBUG4),
"releasing recovery lock: xid %u db %u rel %u",
lock->xid, lock->dbOid, lock->relOid);
}
}
-/*
- * Called at end of recovery and when we see a shutdown checkpoint.
- */
-void
-StandbyReleaseAllLocks(void)
-{
- elog(trace_recovery(DEBUG2), "release all standby locks");
- StandbyReleaseLocksMany(InvalidTransactionId, false);
-}
-
-/*
- * StandbyReleaseOldLocks
- * Release standby locks held by XIDs < removeXid, as long
- * as they're not prepared transactions.
- */
-void
-StandbyReleaseOldLocks(TransactionId removeXid)
-{
- StandbyReleaseLocksMany(removeXid, true);
-}
-
/*
* --------------------------------------------------------------------
* Recovery handling for Rmgr RM_STANDBY_ID
* Later, when we apply the running xact data we must be careful to ignore
* transactions already committed, since those commits raced ahead when
* making WAL entries.
+ *
+ * The loose timing also means that locks may be recorded that have a
+ * zero xid, since xids are removed from procs before locks are removed.
+ * So we must prune the lock list down to ensure we hold locks only for
+ * currently running xids, performed by StandbyReleaseOldLocks().
+ * Zero xids should no longer be possible, but we may be replaying WAL
+ * from a time when they were possible.
*/
void
LogStandbySnapshot(TransactionId *nextXid)