]> granicus.if.org Git - postgresql/commitdiff
Move RecoveryLockList into a hash table.
authorThomas Munro <tmunro@postgresql.org>
Tue, 26 Jun 2018 05:17:27 +0000 (17:17 +1200)
committerThomas Munro <tmunro@postgresql.org>
Tue, 26 Jun 2018 05:17:27 +0000 (17:17 +1200)
Standbys frequently need to release all locks held by a given xid.
Instead of searching one big list linearly, let's create one list
per xid and put them in a hash table, so we can find what we need
in O(1) time.

Earlier analysis and a prototype were done by David Rowley, though
this isn't his patch.

Back-patch all the way.

Author: Thomas Munro
Diagnosed-by: David Rowley, Andres Freund
Reviewed-by: Andres Freund, Tom Lane, Robert Haas
Discussion: https://postgr.es/m/CAEepm%3D1mL0KiQ2KJ4yuPpLGX94a4Ns_W6TL4EGRouxWibu56pA%40mail.gmail.com
Discussion: https://postgr.es/m/CAKJS1f9vJ841HY%3DwonnLVbfkTWGYWdPN72VMxnArcGCjF3SywA%40mail.gmail.com

src/backend/storage/ipc/standby.c

index d491ece60a6c98ef333adbe293f2b379c5adc6ad..54933a59904aa3049dac3ef6b5ac29ddc0bafbfa 100644 (file)
@@ -29,6 +29,8 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -38,7 +40,7 @@ int                   vacuum_defer_cleanup_age;
 int                    max_standby_archive_delay = 30 * 1000;
 int                    max_standby_streaming_delay = 30 * 1000;
 
-static List *RecoveryLockList;
+static HTAB *RecoveryLockLists;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
                                                                           ProcSignalReason reason);
@@ -46,6 +48,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
+/*
+ * Keep track of all the locks owned by a given transaction.
+ */
+typedef struct RecoveryLockListsEntry
+{
+       TransactionId   xid;
+       List               *locks;
+} RecoveryLockListsEntry;
 
 /*
  * InitRecoveryTransactionEnvironment
@@ -63,6 +73,19 @@ void
 InitRecoveryTransactionEnvironment(void)
 {
        VirtualTransactionId vxid;
+       HASHCTL                 hash_ctl;
+
+       /*
+        * Initialize the hash table for tracking the list of locks held by each
+        * transaction.
+        */
+       memset(&hash_ctl, 0, sizeof(hash_ctl));
+       hash_ctl.keysize = sizeof(TransactionId);
+       hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
+       RecoveryLockLists = hash_create("RecoveryLockLists",
+                                                                       64,
+                                                                       &hash_ctl,
+                                                                       HASH_ELEM | HASH_BLOBS);
 
        /*
         * Initialize shared invalidation management for Startup process, being
@@ -107,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
        /* Release all locks the tracked transactions were holding */
        StandbyReleaseAllLocks();
 
+       /* Destroy the hash table of locks. */
+       hash_destroy(RecoveryLockLists);
+       RecoveryLockLists = NULL;
+
        /* Cleanup our VirtualTransaction */
        VirtualXactLockTableCleanup();
 }
@@ -586,8 +613,8 @@ StandbyLockTimeoutHandler(void)
  * We only keep track of AccessExclusiveLocks, which are only ever held by
  * one transaction on one relation.
  *
- * We keep a single dynamically expandible list of locks in local memory,
- * RecoveryLockList, so we can keep track of the various entries made by
+ * We keep a hash table of lists of locks in local memory keyed by xid,
+ * RecoveryLockLists, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * List elements use type xl_standby_lock, since the WAL record type exactly
@@ -601,8 +628,10 @@ StandbyLockTimeoutHandler(void)
 void
 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 {
+       RecoveryLockListsEntry *entry;
        xl_standby_lock *newlock;
        LOCKTAG         locktag;
+       bool            found;
 
        /* Already processed? */
        if (!TransactionIdIsValid(xid) ||
@@ -616,11 +645,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
        /* dbOid is InvalidOid when we are locking a shared relation. */
        Assert(OidIsValid(relOid));
 
+       /* Create a new list for this xid, if we don't have one already. */
+       entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
+       if (!found)
+       {
+               entry->xid = xid;
+               entry->locks = NIL;
+       }
+
        newlock = palloc(sizeof(xl_standby_lock));
        newlock->xid = xid;
        newlock->dbOid = dbOid;
        newlock->relOid = relOid;
-       RecoveryLockList = lappend(RecoveryLockList, newlock);
+       entry->locks = lappend(entry->locks, newlock);
 
        SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
@@ -628,46 +665,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 }
 
 static void
-StandbyReleaseLocks(TransactionId xid)
+StandbyReleaseLockList(List *locks)
 {
-       ListCell   *cell,
-                          *prev,
-                          *next;
-
-       /*
-        * Release all matching locks and remove them from list
-        */
-       prev = NULL;
-       for (cell = list_head(RecoveryLockList); cell; cell = next)
+       while (locks)
        {
-               xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+               xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
+               LOCKTAG         locktag;
+               elog(trace_recovery(DEBUG4),
+                        "releasing recovery lock: xid %u db %u rel %u",
+                        lock->xid, lock->dbOid, lock->relOid);
+               SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+               if (!LockRelease(&locktag, AccessExclusiveLock, true))
+               {
+                       elog(LOG,
+                                "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+                                lock->xid, lock->dbOid, lock->relOid);
+                       Assert(false);
+               }
+               pfree(lock);
+               locks = list_delete_first(locks);
+       }
+}
 
-               next = lnext(cell);
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+       RecoveryLockListsEntry *entry;
 
-               if (!TransactionIdIsValid(xid) || lock->xid == xid)
+       if (TransactionIdIsValid(xid))
+       {
+               if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
                {
-                       LOCKTAG         locktag;
-
-                       elog(trace_recovery(DEBUG4),
-                                "releasing recovery lock: xid %u db %u rel %u",
-                                lock->xid, lock->dbOid, lock->relOid);
-                       SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-                       if (!LockRelease(&locktag, AccessExclusiveLock, true))
-                               elog(LOG,
-                                        "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                                        lock->xid, lock->dbOid, lock->relOid);
-
-                       RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-                       pfree(lock);
+                       StandbyReleaseLockList(entry->locks);
+                       hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
                }
-               else
-                       prev = cell;
        }
+       else
+               StandbyReleaseAllLocks();
 }
 
 /*
  * Release locks for a transaction tree, starting at xid down, from
- * RecoveryLockList.
+ * RecoveryLockLists.
  *
  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
  * to remove any AccessExclusiveLocks requested by a transaction.
@@ -689,30 +728,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
 void
 StandbyReleaseAllLocks(void)
 {
-       ListCell   *cell,
-                          *prev,
-                          *next;
-       LOCKTAG         locktag;
+       HASH_SEQ_STATUS status;
+       RecoveryLockListsEntry *entry;
 
        elog(trace_recovery(DEBUG2), "release all standby locks");
 
-       prev = NULL;
-       for (cell = list_head(RecoveryLockList); cell; cell = next)
+       hash_seq_init(&status, RecoveryLockLists);
+       while ((entry = hash_seq_search(&status)))
        {
-               xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-
-               next = lnext(cell);
-
-               elog(trace_recovery(DEBUG4),
-                        "releasing recovery lock: xid %u db %u rel %u",
-                        lock->xid, lock->dbOid, lock->relOid);
-               SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-               if (!LockRelease(&locktag, AccessExclusiveLock, true))
-                       elog(LOG,
-                                "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                                lock->xid, lock->dbOid, lock->relOid);
-               RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-               pfree(lock);
+               StandbyReleaseLockList(entry->locks);
+               hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
        }
 }
 
@@ -724,22 +749,17 @@ StandbyReleaseAllLocks(void)
 void
 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 {
-       ListCell   *cell,
-                          *prev,
-                          *next;
-       LOCKTAG         locktag;
+       HASH_SEQ_STATUS status;
+       RecoveryLockListsEntry *entry;
 
-       prev = NULL;
-       for (cell = list_head(RecoveryLockList); cell; cell = next)
+       hash_seq_init(&status, RecoveryLockLists);
+       while ((entry = hash_seq_search(&status)))
        {
-               xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
                bool            remove = false;
 
-               next = lnext(cell);
+               Assert(TransactionIdIsValid(entry->xid));
 
-               Assert(TransactionIdIsValid(lock->xid));
-
-               if (StandbyTransactionIdIsPrepared(lock->xid))
+               if (StandbyTransactionIdIsPrepared(entry->xid))
                        remove = false;
                else
                {
@@ -748,7 +768,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
                        for (i = 0; i < nxids; i++)
                        {
-                               if (lock->xid == xids[i])
+                               if (entry->xid == xids[i])
                                {
                                        found = true;
                                        break;
@@ -764,19 +784,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
                if (remove)
                {
-                       elog(trace_recovery(DEBUG4),
-                                "releasing recovery lock: xid %u db %u rel %u",
-                                lock->xid, lock->dbOid, lock->relOid);
-                       SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-                       if (!LockRelease(&locktag, AccessExclusiveLock, true))
-                               elog(LOG,
-                                        "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-                                        lock->xid, lock->dbOid, lock->relOid);
-                       RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-                       pfree(lock);
+                       StandbyReleaseLockList(entry->locks);
+                       hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
                }
-               else
-                       prev = cell;
        }
 }