]> granicus.if.org Git - postgresql/blobdiff - src/backend/storage/lmgr/lock.c
Restructure LOCKTAG as per discussions of a couple months ago.
[postgresql] / src / backend / storage / lmgr / lock.c
index 5ac8211dc4f5f56db4187ff0efbb18f10ba3348f..576fc205299eebe48047cab9f3dcdf5169ab35fe 100644 (file)
@@ -3,12 +3,12 @@
  * lock.c
  *       POSTGRES low-level lock mechanism
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.83 2001/02/22 23:20:06 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
  *     Interface:
  *
  *     LockAcquire(), LockRelease(), LockMethodTableInit(),
- *     LockMethodTableRename(), LockReleaseAll,
+ *     LockMethodTableRename(), LockReleaseAll(),
  *     LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <sys/types.h>
-#include <unistd.h>
 #include <signal.h>
+#include <unistd.h>
 
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
+#include "utils/resowner.h"
+
+
+/* This configuration variable is used to set the lock table size */
+int                    max_locks_per_xact; /* set by guc.c */
 
-static int     WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                                          LOCK *lock, HOLDER *holder);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
-                                                        int *myHolding);
+#define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
 
-static char *lock_mode_names[] =
+
+/*
+ * map from lock method id to the lock table data structures
+ */
+static LockMethod LockMethods[MAX_LOCK_METHODS];
+static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
+
+/* exported so lmgr.c can initialize it */
+int                    NumLockMethods;
+
+
+/* private state for GrantAwaitedLock */
+static LOCALLOCK *awaitedLock;
+static ResourceOwner awaitedOwner;
+
+
+static const char *const lock_mode_names[] =
 {
        "INVALID",
        "AccessShareLock",
        "RowShareLock",
        "RowExclusiveLock",
+       "ShareUpdateExclusiveLock",
        "ShareLock",
        "ShareRowExclusiveLock",
        "ExclusiveLock",
        "AccessExclusiveLock"
 };
 
-static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
-
 
 #ifdef LOCK_DEBUG
 
 /*------
  * The following configuration options are available for lock debugging:
  *
- *     TRACE_LOCKS      -- give a bunch of output what's going on in this file
- *     TRACE_USERLOCKS  -- same but for user locks
- *     TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
- *                         (use to avoid output on system tables)
- *     TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
- *     DEBUG_DEADLOCKS  -- currently dumps locks at untimely occasions ;)
+ *        TRACE_LOCKS          -- give a bunch of output what's going on in this file
+ *        TRACE_USERLOCKS      -- same but for user locks
+ *        TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
+ *                                                (use to avoid output on system tables)
+ *        TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
+ *        DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
  *
- * Furthermore, but in storage/ipc/spin.c:
- *     TRACE_SPINLOCKS  -- trace spinlocks (pretty useless)
+ * Furthermore, but in storage/lmgr/lwlock.c:
+ *        TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
  *
  * Define LOCK_DEBUG at compile time to get all these enabled.
  * --------
  */
 
-int  Trace_lock_oidmin  = BootstrapObjectIdData;
-bool Trace_locks        = false;
-bool Trace_userlocks    = false;
-int  Trace_lock_table   = 0;
-bool Debug_deadlocks    = false;
+int                    Trace_lock_oidmin = FirstNormalObjectId;
+bool           Trace_locks = false;
+bool           Trace_userlocks = false;
+int                    Trace_lock_table = 0;
+bool           Debug_deadlocks = false;
 
 
 inline static bool
-LOCK_DEBUG_ENABLED(const LOCK * lock)
+LOCK_DEBUG_ENABLED(const LOCK *lock)
 {
        return
-               (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
-                 || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
-                && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
-               || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
+               (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD)
+                 || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD))
+                && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin))
+               || (Trace_lock_table
+                       && (lock->tag.locktag_field2 == Trace_lock_table));
 }
 
 
 inline static void
-LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
+LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 {
        if (LOCK_DEBUG_ENABLED(lock))
-               elog(DEBUG,
-                        "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
+               elog(LOG,
+                        "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
                         "req(%d,%d,%d,%d,%d,%d,%d)=%d "
                         "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
                         where, MAKE_OFFSET(lock),
-                        lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
-                        lock->tag.objId.blkno, lock->grantMask,
+                        lock->tag.locktag_field1, lock->tag.locktag_field2,
+                        lock->tag.locktag_field3, lock->tag.locktag_field4,
+                        lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
+                        lock->grantMask,
                         lock->requested[1], lock->requested[2], lock->requested[3],
                         lock->requested[4], lock->requested[5], lock->requested[6],
                         lock->requested[7], lock->nRequested,
@@ -119,109 +140,55 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
 
 
 inline static void
-HOLDER_PRINT(const char * where, const HOLDER * holderP)
+PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 {
-       if (
-               (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
-                 || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
-                && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
-               || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
-               )
-               elog(DEBUG,
-                        "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
-                        where, MAKE_OFFSET(holderP), holderP->tag.lock,
-                        HOLDER_LOCKMETHOD(*(holderP)),
-                        holderP->tag.proc, holderP->tag.xid,
-                        holderP->holding[1], holderP->holding[2], holderP->holding[3],
-                        holderP->holding[4], holderP->holding[5], holderP->holding[6],
-                        holderP->holding[7], holderP->nHolding);
+       if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
+               elog(LOG,
+               "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
+                        where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
+                        PROCLOCK_LOCKMETHOD(*(proclockP)),
+                        proclockP->tag.proc, proclockP->tag.xid,
+                        (int) proclockP->holdMask);
 }
 
-#else  /* not LOCK_DEBUG */
+#else                                                  /* not LOCK_DEBUG */
 
 #define LOCK_PRINT(where, lock, type)
-#define HOLDER_PRINT(where, holderP)
+#define PROCLOCK_PRINT(where, proclockP)
+#endif   /* not LOCK_DEBUG */
 
-#endif /* not LOCK_DEBUG */
 
+static void RemoveLocalLock(LOCALLOCK *locallock);
+static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+                  ResourceOwner owner);
+static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
+                                int *myHolding);
+static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+                                               PROCLOCK *proclock, LockMethod lockMethodTable);
 
 
-SPINLOCK       LockMgrLock;            /* in Shmem or created in
-                                                                * CreateSpinlocks() */
-
-/*
- * These are to simplify/speed up some bit arithmetic.
- *
- * XXX is a fetch from a static array really faster than a shift?
- * Wouldn't bet on it...
- */
-
-static LOCKMASK BITS_OFF[MAX_LOCKMODES];
-static LOCKMASK BITS_ON[MAX_LOCKMODES];
-
-/*
- * Disable flag
- *
- */
-static bool LockingIsDisabled;
-
-/*
- * map from lockmethod to the lock table structure
- *
- */
-static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
-
-static int     NumLockMethods;
-
 /*
  * InitLocks -- Init the lock module.  Create a private data
  *             structure for constructing conflict masks.
- *
  */
 void
 InitLocks(void)
 {
-       int                     i;
-       int                     bit;
-
-       bit = 1;
-       for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
-       {
-               BITS_ON[i] = bit;
-               BITS_OFF[i] = ~bit;
-       }
-}
-
-/*
- * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
- *
- */
-void
-LockDisable(bool status)
-{
-       LockingIsDisabled = status;
+       /* NOP */
 }
 
-/*
- * Boolean function to determine current locking status
- *
- */
-bool
-LockingDisabled(void)
-{
-       return LockingIsDisabled;
-}
 
 /*
  * Fetch the lock method table associated with a given lock
  */
-LOCKMETHODTABLE *
+LockMethod
 GetLocksMethodTable(LOCK *lock)
 {
-       LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+       LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
 
-       Assert(lockmethod > 0 && lockmethod < NumLockMethods);
-       return LockMethodTable[lockmethod];
+       Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
+       return LockMethods[lockmethodid];
 }
 
 
@@ -232,45 +199,35 @@ GetLocksMethodTable(LOCK *lock)
  * Notes: just copying.  Should only be called once.
  */
 static void
-LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
-                          LOCKMASK *conflictsP,
-                          int *prioP,
+LockMethodInit(LockMethod lockMethodTable,
+                          const LOCKMASK *conflictsP,
                           int numModes)
 {
        int                     i;
 
-       lockMethodTable->ctl->numLockModes = numModes;
-       numModes++;
-       for (i = 0; i < numModes; i++, prioP++, conflictsP++)
-       {
-               lockMethodTable->ctl->conflictTab[i] = *conflictsP;
-               lockMethodTable->ctl->prio[i] = *prioP;
-       }
+       lockMethodTable->numLockModes = numModes;
+       /* copies useless zero element as well as the N lockmodes */
+       for (i = 0; i <= numModes; i++)
+               lockMethodTable->conflictTab[i] = conflictsP[i];
 }
 
 /*
  * LockMethodTableInit -- initialize a lock table structure
  *
- * Notes:
- *             (a) a lock table has four separate entries in the shmem index
- *             table.  This is because every shared hash table and spinlock
- *             has its name stored in the shmem index at its creation.  It
- *             is wasteful, in this case, but not much space is involved.
- *
  * NOTE: data structures allocated here are allocated permanently, using
- * TopMemoryContext and shared memory.  We don't ever release them anyway,
+ * TopMemoryContext and shared memory. We don't ever release them anyway,
  * and in normal multi-backend operation the lock table structures set up
  * by the postmaster are inherited by each backend, so they must be in
  * TopMemoryContext.
  */
-LOCKMETHOD
-LockMethodTableInit(char *tabName,
-                                       LOCKMASK *conflictsP,
-                                       int *prioP,
+LOCKMETHODID
+LockMethodTableInit(const char *tabName,
+                                       const LOCKMASK *conflictsP,
                                        int numModes,
                                        int maxBackends)
 {
-       LOCKMETHODTABLE *lockMethodTable;
+       LockMethod      newLockMethod;
+       LOCKMETHODID lockmethodid;
        char       *shmemName;
        HASHCTL         info;
        int                     hash_flags;
@@ -278,159 +235,156 @@ LockMethodTableInit(char *tabName,
        long            init_table_size,
                                max_table_size;
 
-       if (numModes > MAX_LOCKMODES)
-       {
-               elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
-                        numModes, MAX_LOCKMODES);
-               return INVALID_LOCKMETHOD;
-       }
+       if (numModes >= MAX_LOCKMODES)
+               elog(ERROR, "too many lock types %d (limit is %d)",
+                        numModes, MAX_LOCKMODES - 1);
 
        /* Compute init/max size to request for lock hashtables */
        max_table_size = NLOCKENTS(maxBackends);
-       init_table_size = max_table_size / 10;
+       init_table_size = max_table_size / 2;
 
        /* Allocate a string for the shmem index table lookups. */
        /* This is just temp space in this routine, so palloc is OK. */
        shmemName = (char *) palloc(strlen(tabName) + 32);
 
-       /* each lock table has a non-shared, permanent header */
-       lockMethodTable = (LOCKMETHODTABLE *)
-               MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
+       /* each lock table has a header in shared memory */
+       sprintf(shmemName, "%s (lock method table)", tabName);
+       newLockMethod = (LockMethod)
+               ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
 
-       /*
-        * find/acquire the spinlock for the table
-        *
-        */
-       SpinAcquire(LockMgrLock);
-
-       /*
-        * allocate a control structure from shared memory or attach to it
-        * if it already exists.
-        *
-        */
-       sprintf(shmemName, "%s (ctl)", tabName);
-       lockMethodTable->ctl = (LOCKMETHODCTL *)
-               ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
-
-       if (!lockMethodTable->ctl)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
-
-       /*
-        * no zero-th table
-        *
-        */
-       NumLockMethods = 1;
+       if (!newLockMethod)
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
        /*
         * we're first - initialize
-        *
         */
        if (!found)
        {
-               MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
-               lockMethodTable->ctl->masterLock = LockMgrLock;
-               lockMethodTable->ctl->lockmethod = NumLockMethods;
+               MemSet(newLockMethod, 0, sizeof(LockMethodData));
+               newLockMethod->masterLock = LockMgrLock;
+               LockMethodInit(newLockMethod, conflictsP, numModes);
        }
 
        /*
         * other modules refer to the lock table by a lockmethod ID
-        *
         */
-       LockMethodTable[NumLockMethods] = lockMethodTable;
-       NumLockMethods++;
-       Assert(NumLockMethods <= MAX_LOCK_METHODS);
+       Assert(NumLockMethods < MAX_LOCK_METHODS);
+       lockmethodid = NumLockMethods++;
+       LockMethods[lockmethodid] = newLockMethod;
 
        /*
-        * allocate a hash table for LOCK structs.  This is used
-        * to store per-locked-object information.
-        *
+        * allocate a hash table for LOCK structs.      This is used to store
+        * per-locked-object information.
         */
-       info.keysize = SHMEM_LOCKTAB_KEYSIZE;
-       info.datasize = SHMEM_LOCKTAB_DATASIZE;
+       MemSet(&info, 0, sizeof(info));
+       info.keysize = sizeof(LOCKTAG);
+       info.entrysize = sizeof(LOCK);
        info.hash = tag_hash;
        hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
        sprintf(shmemName, "%s (lock hash)", tabName);
-       lockMethodTable->lockHash = ShmemInitHash(shmemName,
-                                                                                         init_table_size,
-                                                                                         max_table_size,
-                                                                                         &info,
-                                                                                         hash_flags);
+       LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
+                                                                                                        init_table_size,
+                                                                                                        max_table_size,
+                                                                                                        &info,
+                                                                                                        hash_flags);
 
-       if (!lockMethodTable->lockHash)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
+       if (!LockMethodLockHash[lockmethodid])
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
        /*
-        * allocate a hash table for HOLDER structs.  This is used
-        * to store per-lock-holder information.
-        *
+        * allocate a hash table for PROCLOCK structs.  This is used to store
+        * per-lock-holder information.
         */
-       info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
-       info.datasize = SHMEM_HOLDERTAB_DATASIZE;
+       info.keysize = sizeof(PROCLOCKTAG);
+       info.entrysize = sizeof(PROCLOCK);
        info.hash = tag_hash;
        hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
-       sprintf(shmemName, "%s (holder hash)", tabName);
-       lockMethodTable->holderHash = ShmemInitHash(shmemName,
-                                                                                               init_table_size,
-                                                                                               max_table_size,
-                                                                                               &info,
-                                                                                               hash_flags);
+       sprintf(shmemName, "%s (proclock hash)", tabName);
+       LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
+                                                                                                                init_table_size,
+                                                                                                                max_table_size,
+                                                                                                                &info,
+                                                                                                                hash_flags);
 
-       if (!lockMethodTable->holderHash)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
+       if (!LockMethodProcLockHash[lockmethodid])
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
-       /* init ctl data structures */
-       LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
+       /*
+        * allocate a non-shared hash table for LOCALLOCK structs.      This is
+        * used to store lock counts and resource owner information.
+        *
+        * The non-shared table could already exist in this process (this occurs
+        * when the postmaster is recreating shared memory after a backend
+        * crash). If so, delete and recreate it.  (We could simply leave it,
+        * since it ought to be empty in the postmaster, but for safety let's
+        * zap it.)
+        */
+       if (LockMethodLocalHash[lockmethodid])
+               hash_destroy(LockMethodLocalHash[lockmethodid]);
 
-       SpinRelease(LockMgrLock);
+       info.keysize = sizeof(LOCALLOCKTAG);
+       info.entrysize = sizeof(LOCALLOCK);
+       info.hash = tag_hash;
+       hash_flags = (HASH_ELEM | HASH_FUNCTION);
+
+       sprintf(shmemName, "%s (locallock hash)", tabName);
+       LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
+                                                                                                       128,
+                                                                                                       &info,
+                                                                                                       hash_flags);
 
        pfree(shmemName);
 
-       return lockMethodTable->ctl->lockmethod;
+       return lockmethodid;
 }
 
 /*
  * LockMethodTableRename -- allocate another lockmethod ID to the same
  *             lock table.
  *
- * NOTES: Both the lock module and the lock chain (lchain.c)
- *             module use table id's to distinguish between different
- *             kinds of locks.  Short term and long term locks look
- *             the same to the lock table, but are handled differently
- *             by the lock chain manager.      This function allows the
- *             client to use different lockmethods when acquiring/releasing
- *             short term and long term locks, yet store them all in one hashtable.
+ * NOTES: This function makes it possible to have different lockmethodids,
+ *             and hence different locking semantics, while still storing all
+ *             the data in one shared-memory hashtable.
  */
 
-LOCKMETHOD
-LockMethodTableRename(LOCKMETHOD lockmethod)
+LOCKMETHODID
+LockMethodTableRename(LOCKMETHODID lockmethodid)
 {
-       LOCKMETHOD      newLockMethod;
+       LOCKMETHODID newLockMethodId;
 
        if (NumLockMethods >= MAX_LOCK_METHODS)
                return INVALID_LOCKMETHOD;
-       if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+       if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
                return INVALID_LOCKMETHOD;
 
        /* other modules refer to the lock table by a lockmethod ID */
-       newLockMethod = NumLockMethods;
+       newLockMethodId = NumLockMethods;
        NumLockMethods++;
 
-       LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
-       return newLockMethod;
+       LockMethods[newLockMethodId] = LockMethods[lockmethodid];
+       LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
+       LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
+       LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
+
+       return newLockMethodId;
 }
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
  *             set lock if/when no conflicts.
  *
- * Returns: TRUE if parameters are correct, FALSE otherwise.
+ * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
+ *             a FALSE return is to be expected if dontWait is TRUE;
+ *             but if dontWait is FALSE, only a parameter error can cause
+ *             a FALSE return.  (XXX probably we should just ereport on parameter
+ *             errors, instead of conflating this with failure to acquire lock?)
+ *
+ * Side Effects: The lock is acquired and recorded in lock tables.
  *
- * Side Effects: The lock is always acquired.  No way to abort
- *             a lock acquisition other than aborting the transaction.
- *             Lock is recorded in the lkchain.
+ * NOTE: if we wait for the lock, there is no way to abort the wait
+ * short of aborting the transaction.
  *
  *
  * Note on User Locks:
@@ -444,33 +398,16 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *             the lock.  While the lock is active other clients can still
  *             read and write the tuple but they can be aware that it has
  *             been locked at the application level by someone.
- *             User locks use lock tags made of an uint16 and an uint32, for
- *             example 0 and a tuple oid, or any other arbitrary pair of
- *             numbers following a convention established by the application.
- *             In this sense tags don't refer to tuples or database entities.
+ *
  *             User locks and normal locks are completely orthogonal and
- *             they don't interfere with each other, so it is possible
- *             to acquire a normal lock on an user-locked tuple or user-lock
- *             a tuple for which a normal write lock already exists.
+ *             they don't interfere with each other.
+ *
  *             User locks are always non blocking, therefore they are never
  *             acquired if already held by another process.  They must be
  *             released explicitly by the application but they are released
  *             automatically when a backend terminates.
  *             They are indicated by a lockmethod 2 which is an alias for the
- *             normal lock table, and are distinguished from normal locks
- *             by the following differences:
- *
- *                                                                             normal lock             user lock
- *
- *             lockmethod                                              1                               2
- *             tag.dbId                                                database oid    database oid
- *             tag.relId                                               rel oid or 0    0
- *             tag.objId                                               block id                lock id2
- *                                                                             or xact id
- *             tag.offnum                                              0                               lock id1
- *             holder.xid                                              xid or 0                0
- *             persistence                                             transaction             user or backend
- *                                                                             or backend
+ *             normal lock table.
  *
  *             The lockmode parameter can have the same values for normal locks
  *             although probably only WRITE_LOCK can have some practical use.
@@ -479,66 +416,138 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  */
 
 bool
-LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
-                       TransactionId xid, LOCKMODE lockmode)
+LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
+                       TransactionId xid, LOCKMODE lockmode, bool dontWait)
 {
-       HOLDER     *holder;
-       HOLDERTAG       holdertag;
-       HTAB       *holderTable;
-       bool            found;
+       LOCALLOCKTAG localtag;
+       LOCALLOCK  *locallock;
        LOCK       *lock;
-       SPINLOCK        masterLock;
-       LOCKMETHODTABLE *lockMethodTable;
+       PROCLOCK   *proclock;
+       PROCLOCKTAG proclocktag;
+       bool            found;
+       ResourceOwner owner;
+       LWLockId        masterLock;
+       LockMethod      lockMethodTable;
        int                     status;
        int                     myHolding[MAX_LOCKMODES];
        int                     i;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
-               elog(DEBUG, "LockAcquire: user lock [%u] %s",
-                        locktag->objId.blkno, lock_mode_names[lockmode]);
+       if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+               elog(LOG, "LockAcquire: user lock [%u,%u] %s",
+                        locktag->locktag_field1, locktag->locktag_field2,
+                        lock_mode_names[lockmode]);
 #endif
 
-       /* ???????? This must be changed when short term locks will be used */
-       locktag->lockmethod = lockmethod;
+       /* ugly */
+       locktag->locktag_lockmethodid = lockmethodid;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
-               elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
+               elog(WARNING, "bad lock table id: %d", lockmethodid);
                return FALSE;
        }
 
-       if (LockingIsDisabled)
+       /* Session locks and user locks are not transactional */
+       if (xid != InvalidTransactionId &&
+               lockmethodid == DEFAULT_LOCKMETHOD)
+               owner = CurrentResourceOwner;
+       else
+               owner = NULL;
+
+       /*
+        * Find or create a LOCALLOCK entry for this lock and lockmode
+        */
+       MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
+       localtag.lock = *locktag;
+       localtag.xid = xid;
+       localtag.mode = lockmode;
+
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &localtag,
+                                                                                 HASH_ENTER, &found);
+       if (!locallock)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory")));
+
+       /*
+        * if it's a new locallock object, initialize it
+        */
+       if (!found)
+       {
+               locallock->lock = NULL;
+               locallock->proclock = NULL;
+               locallock->nLocks = 0;
+               locallock->numLockOwners = 0;
+               locallock->maxLockOwners = 8;
+               locallock->lockOwners = NULL;
+               locallock->lockOwners = (LOCALLOCKOWNER *)
+                       MemoryContextAlloc(TopMemoryContext,
+                                         locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+       }
+       else
+       {
+               /* Make sure there will be room to remember the lock */
+               if (locallock->numLockOwners >= locallock->maxLockOwners)
+               {
+                       int                     newsize = locallock->maxLockOwners * 2;
+
+                       locallock->lockOwners = (LOCALLOCKOWNER *)
+                               repalloc(locallock->lockOwners,
+                                                newsize * sizeof(LOCALLOCKOWNER));
+                       locallock->maxLockOwners = newsize;
+               }
+       }
+
+       /*
+        * If we already hold the lock, we can just increase the count
+        * locally.
+        */
+       if (locallock->nLocks > 0)
+       {
+               GrantLockLocal(locallock, owner);
                return TRUE;
+       }
 
-       masterLock = lockMethodTable->ctl->masterLock;
+       /*
+        * Otherwise we've got to mess with the shared lock table.
+        */
+       masterLock = lockMethodTable->masterLock;
 
-       SpinAcquire(masterLock);
+       LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
        /*
-        * Find or create a lock with this tag
+        * Find or create a lock with this tag.
+        *
+        * Note: if the locallock object already existed, it might have a pointer
+        * to the lock already ... but we probably should not assume that that
+        * pointer is valid, since a lock object with no locks can go away
+        * anytime.
         */
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                               (void *) locktag,
                                                                HASH_ENTER, &found);
        if (!lock)
        {
-               SpinRelease(masterLock);
-               elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
-               return FALSE;
+               LWLockRelease(masterLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of shared memory"),
+               errhint("You may need to increase max_locks_per_transaction.")));
        }
+       locallock->lock = lock;
 
        /*
         * if it's a new lock object, initialize it
-        *
         */
        if (!found)
        {
                lock->grantMask = 0;
                lock->waitMask = 0;
-               SHMQueueInit(&(lock->lockHolders));
+               SHMQueueInit(&(lock->procLocks));
                ProcQueueInit(&(lock->waitProcs));
                lock->nRequested = 0;
                lock->nGranted = 0;
@@ -555,46 +564,63 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        }
 
        /*
-        * Create the hash key for the holder table.
-        *
+        * Create the hash key for the proclock table.
         */
-       MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
-       holdertag.lock = MAKE_OFFSET(lock);
-       holdertag.proc = MAKE_OFFSET(MyProc);
-       TransactionIdStore(xid, &holdertag.xid);
+       MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding */
+       proclocktag.lock = MAKE_OFFSET(lock);
+       proclocktag.proc = MAKE_OFFSET(MyProc);
+       TransactionIdStore(xid, &proclocktag.xid);
 
        /*
-        * Find or create a holder entry with this tag
+        * Find or create a proclock entry with this tag
         */
-       holderTable = lockMethodTable->holderHash;
-       holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
-                                                                       HASH_ENTER, &found);
-       if (!holder)
+       proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                               (void *) &proclocktag,
+                                                                               HASH_ENTER, &found);
+       if (!proclock)
        {
-               SpinRelease(masterLock);
-               elog(FATAL, "LockAcquire: holder table corrupted");
-               return FALSE;
+               /* Ooops, not enough shmem for the proclock */
+               if (lock->nRequested == 0)
+               {
+                       /*
+                        * There are no other requestors of this lock, so garbage-collect
+                        * the lock object.  We *must* do this to avoid a permanent leak
+                        * of shared memory, because there won't be anything to cause
+                        * anyone to release the lock object later.
+                        */
+                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                                               (void *) &(lock->tag),
+                                                                               HASH_REMOVE, NULL);
+               }
+               LWLockRelease(masterLock);
+               if (!lock)                              /* hash remove failed? */
+                       elog(WARNING, "lock table corrupted");
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of shared memory"),
+               errhint("You may need to increase max_locks_per_transaction.")));
        }
+       locallock->proclock = proclock;
 
        /*
         * If new, initialize the new entry
         */
        if (!found)
        {
-               holder->nHolding = 0;
-               MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
-               /* Add holder to appropriate lists */
-               SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
-               SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
-               HOLDER_PRINT("LockAcquire: new", holder);
+               proclock->holdMask = 0;
+               /* Add proclock to appropriate lists */
+               SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
+               SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
+               PROCLOCK_PRINT("LockAcquire: new", proclock);
        }
        else
        {
-               HOLDER_PRINT("LockAcquire: found", holder);
-               Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-               Assert(holder->nHolding <= lock->nGranted);
+               PROCLOCK_PRINT("LockAcquire: found", proclock);
+               Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
 #ifdef CHECK_DEADLOCK_RISK
+
                /*
                 * Issue warning if we already hold a lower-level lock on this
                 * object and do not hold a lock of the requested level or higher.
@@ -602,130 +628,115 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                 * a deadlock if another backend were following the same code path
                 * at about the same time).
                 *
-                * This is not enabled by default, because it may generate log entries
-                * about user-level coding practices that are in fact safe in context.
-                * It can be enabled to help find system-level problems.
+                * This is not enabled by default, because it may generate log
+                * entries about user-level coding practices that are in fact safe
+                * in context. It can be enabled to help find system-level
+                * problems.
                 *
-                * XXX Doing numeric comparison on the lockmodes is a hack;
-                * it'd be better to use a table.  For now, though, this works.
+                * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
+                * better to use a table.  For now, though, this works.
                 */
-               for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
+               for (i = lockMethodTable->numLockModes; i > 0; i--)
                {
-                       if (holder->holding[i] > 0)
+                       if (proclock->holdMask & LOCKBIT_ON(i))
                        {
                                if (i >= (int) lockmode)
                                        break;          /* safe: we have a lock >= req level */
-                               elog(DEBUG, "Deadlock risk: raising lock level"
+                               elog(LOG, "deadlock risk: raising lock level"
                                         " from %s to %s on object %u/%u/%u",
                                         lock_mode_names[i], lock_mode_names[lockmode],
-                                        lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
+                                lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
                                break;
                        }
                }
-#endif /* CHECK_DEADLOCK_RISK */
+#endif   /* CHECK_DEADLOCK_RISK */
        }
 
        /*
         * lock->nRequested and lock->requested[] count the total number of
-        * requests, whether granted or waiting, so increment those immediately.
-        * The other counts don't increment till we get the lock.
-        *
+        * requests, whether granted or waiting, so increment those
+        * immediately. The other counts don't increment till we get the lock.
         */
        lock->nRequested++;
        lock->requested[lockmode]++;
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
        /*
-        * If I already hold one or more locks of the requested type,
-        * just grant myself another one without blocking.
-        *
+        * If this process (under any XID) is a holder of the lock, just grant
+        * myself another one without blocking.
         */
-       if (holder->holding[lockmode] > 0)
-       {
-               GrantLock(lock, holder, lockmode);
-               HOLDER_PRINT("LockAcquire: owning", holder);
-               SpinRelease(masterLock);
-               return TRUE;
-       }
-
-       /*
-        * If this process (under any XID) is a holder of the lock,
-        * also grant myself another one without blocking.
-        *
-        */
-       LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
+       LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
        if (myHolding[lockmode] > 0)
        {
-               GrantLock(lock, holder, lockmode);
-               HOLDER_PRINT("LockAcquire: my other XID owning", holder);
-               SpinRelease(masterLock);
+               GrantLock(lock, proclock, lockmode);
+               GrantLockLocal(locallock, owner);
+               PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
+               LWLockRelease(masterLock);
                return TRUE;
        }
 
        /*
-        * If lock requested conflicts with locks requested by waiters,
-        * must join wait queue.  Otherwise, check for conflict with
-        * already-held locks.  (That's last because most complex check.)
-        *
+        * If lock requested conflicts with locks requested by waiters, must
+        * join wait queue.  Otherwise, check for conflict with already-held
+        * locks.  (That's last because most complex check.)
         */
-       if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+       if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
                status = STATUS_FOUND;
        else
                status = LockCheckConflicts(lockMethodTable, lockmode,
-                                                                       lock, holder,
+                                                                       lock, proclock,
                                                                        MyProc, myHolding);
 
        if (status == STATUS_OK)
        {
                /* No conflict with held or previously requested locks */
-               GrantLock(lock, holder, lockmode);
+               GrantLock(lock, proclock, lockmode);
+               GrantLockLocal(locallock, owner);
        }
        else
        {
                Assert(status == STATUS_FOUND);
-#ifdef USER_LOCKS
 
                /*
-                * User locks are non blocking. If we can't acquire a lock we must
-                * remove the holder entry and return FALSE without waiting.
+                * We can't acquire the lock immediately.  If caller specified no
+                * blocking, remove useless table entries and return FALSE without
+                * waiting.
                 */
-               if (lockmethod == USER_LOCKMETHOD)
+               if (dontWait)
                {
-                       if (holder->nHolding == 0)
+                       if (proclock->holdMask == 0)
                        {
-                               SHMQueueDelete(&holder->lockLink);
-                               SHMQueueDelete(&holder->procLink);
-                               holder = (HOLDER *) hash_search(holderTable,
-                                                                                               (Pointer) holder,
-                                                                                               HASH_REMOVE, &found);
-                               if (!holder || !found)
-                                       elog(NOTICE, "LockAcquire: remove holder, table corrupted");
+                               SHMQueueDelete(&proclock->lockLink);
+                               SHMQueueDelete(&proclock->procLink);
+                               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                          (void *) &(proclock->tag),
+                                                                                                       HASH_REMOVE, NULL);
+                               if (!proclock)
+                                       elog(WARNING, "proclock table corrupted");
                        }
                        else
-                               HOLDER_PRINT("LockAcquire: NHOLDING", holder);
+                               PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
                        lock->nRequested--;
                        lock->requested[lockmode]--;
-                       LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+                       LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
                        Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
                        Assert(lock->nGranted <= lock->nRequested);
-                       SpinRelease(masterLock);
+                       LWLockRelease(masterLock);
+                       if (locallock->nLocks == 0)
+                               RemoveLocalLock(locallock);
                        return FALSE;
                }
-#endif /* USER_LOCKS */
 
                /*
                 * Construct bitmask of locks this process holds on this object.
                 */
                {
-                       int                     heldLocks = 0;
-                       int                     tmpMask;
+                       LOCKMASK        heldLocks = 0;
 
-                       for (i = 1, tmpMask = 2;
-                                i <= lockMethodTable->ctl->numLockModes;
-                                i++, tmpMask <<= 1)
+                       for (i = 1; i <= lockMethodTable->numLockModes; i++)
                        {
                                if (myHolding[i] > 0)
-                                       heldLocks |= tmpMask;
+                                       heldLocks |= LOCKBIT_ON(i);
                        }
                        MyProc->heldLocks = heldLocks;
                }
@@ -733,35 +744,53 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                /*
                 * Sleep till someone wakes me up.
                 */
-               status = WaitOnLock(lockmethod, lockmode, lock, holder);
+               status = WaitOnLock(lockmethodid, locallock, owner);
 
                /*
                 * NOTE: do not do any material change of state between here and
-                * return.  All required changes in locktable state must have been
-                * done when the lock was granted to us --- see notes in WaitOnLock.
+                * return.      All required changes in locktable state must have been
+                * done when the lock was granted to us --- see notes in
+                * WaitOnLock.
                 */
 
                /*
-                * Check the holder entry status, in case something in the ipc
+                * Check the proclock entry status, in case something in the ipc
                 * communication doesn't work correctly.
                 */
-               if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
+               if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
                {
-                       HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
+                       PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
                        LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
                        /* Should we retry ? */
-                       SpinRelease(masterLock);
+                       LWLockRelease(masterLock);
                        return FALSE;
                }
-               HOLDER_PRINT("LockAcquire: granted", holder);
+               PROCLOCK_PRINT("LockAcquire: granted", proclock);
                LOCK_PRINT("LockAcquire: granted", lock, lockmode);
        }
 
-       SpinRelease(masterLock);
+       LWLockRelease(masterLock);
 
        return status == STATUS_OK;
 }
 
+/*
+ * Subroutine to free a locallock entry
+ */
+static void
+RemoveLocalLock(LOCALLOCK *locallock)
+{
+       LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
+
+       pfree(locallock->lockOwners);
+       locallock->lockOwners = NULL;
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &(locallock->tag),
+                                                                                 HASH_REMOVE, NULL);
+       if (!locallock)
+               elog(WARNING, "locallock table corrupted");
+}
+
 /*
  * LockCheckConflicts -- test whether requested lock conflicts
  *             with those already granted
@@ -777,77 +806,69 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
  *
  * The caller can optionally pass the process's total holding counts, if
  * known.  If NULL is passed then these values will be computed internally.
- *
  */
 int
-LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+LockCheckConflicts(LockMethod lockMethodTable,
                                   LOCKMODE lockmode,
                                   LOCK *lock,
-                                  HOLDER *holder,
-                                  PROC *proc,
+                                  PROCLOCK *proclock,
+                                  PGPROC *proc,
                                   int *myHolding)              /* myHolding[] array or NULL */
 {
-       LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
-       int                     numLockModes = lockctl->numLockModes;
-       int                     bitmask;
-       int                     i,
-                               tmpMask;
+       int                     numLockModes = lockMethodTable->numLockModes;
+       LOCKMASK        bitmask;
+       int                     i;
        int                     localHolding[MAX_LOCKMODES];
 
        /*
-        * first check for global conflicts: If no locks conflict
-        * with my request, then I get the lock.
+        * first check for global conflicts: If no locks conflict with my
+        * request, then I get the lock.
         *
         * Checking for conflict: lock->grantMask represents the types of
-        * currently held locks.  conflictTable[lockmode] has a bit
-        * set for each type of lock that conflicts with request.       Bitwise
-        * compare tells if there is a conflict.
-        *
+        * currently held locks.  conflictTable[lockmode] has a bit set for
+        * each type of lock that conflicts with request.       Bitwise compare
+        * tells if there is a conflict.
         */
-       if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
+       if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
        {
-               HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
+               PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
                return STATUS_OK;
        }
 
        /*
-        * Rats.  Something conflicts. But it could still be my own
-        * lock.  We have to construct a conflict mask
-        * that does not reflect our own locks.  Locks held by the current
-        * process under another XID also count as "our own locks".
-        *
+        * Rats.  Something conflicts. But it could still be my own lock.  We
+        * have to construct a conflict mask that does not reflect our own
+        * locks.  Locks held by the current process under another XID also
+        * count as "our own locks".
         */
        if (myHolding == NULL)
        {
                /* Caller didn't do calculation of total holding for me */
-               LockCountMyLocks(holder->tag.lock, proc, localHolding);
+               LockCountMyLocks(proclock->tag.lock, proc, localHolding);
                myHolding = localHolding;
        }
 
        /* Compute mask of lock types held by other processes */
        bitmask = 0;
-       tmpMask = 2;
-       for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
+       for (i = 1; i <= numLockModes; i++)
        {
                if (lock->granted[i] != myHolding[i])
-                       bitmask |= tmpMask;
+                       bitmask |= LOCKBIT_ON(i);
        }
 
        /*
-        * now check again for conflicts.  'bitmask' describes the types
-        * of locks held by other processes.  If one of these
-        * conflicts with the kind of lock that I want, there is a
-        * conflict and I have to sleep.
-        *
+        * now check again for conflicts.  'bitmask' describes the types of
+        * locks held by other processes.  If one of these conflicts with the
+        * kind of lock that I want, there is a conflict and I have to sleep.
         */
-       if (!(lockctl->conflictTab[lockmode] & bitmask))
+       if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
        {
                /* no conflict. OK to get the lock */
-               HOLDER_PRINT("LockCheckConflicts: resolved", holder);
+               PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
                return STATUS_OK;
        }
 
-       HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
+       PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
        return STATUS_FOUND;
 }
 
@@ -862,53 +883,160 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
  * be a net slowdown.
  */
 static void
-LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
+LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
 {
-       SHM_QUEUE  *procHolders = &(proc->procHolders);
-       HOLDER     *holder;
-       int                     i;
+       SHM_QUEUE  *procLocks = &(proc->procLocks);
+       PROCLOCK   *proclock;
 
        MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
 
-       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-                                                                        offsetof(HOLDER, procLink));
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
 
-       while (holder)
+       while (proclock)
        {
-               if (lockOffset == holder->tag.lock)
+               if (lockOffset == proclock->tag.lock)
                {
+                       LOCKMASK        holdMask = proclock->holdMask;
+                       int                     i;
+
                        for (i = 1; i < MAX_LOCKMODES; i++)
                        {
-                               myHolding[i] += holder->holding[i];
+                               if (holdMask & LOCKBIT_ON(i))
+                                       myHolding[i]++;
                        }
                }
 
-               holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-                                                                                offsetof(HOLDER, procLink));
+               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
+                                                                                  offsetof(PROCLOCK, procLink));
        }
 }
 
 /*
- * GrantLock -- update the lock and holder data structures to show
+ * GrantLock -- update the lock and proclock data structures to show
  *             the lock request has been granted.
  *
  * NOTE: if proc was blocked, it also needs to be removed from the wait list
- * and have its waitLock/waitHolder fields cleared.  That's not done here.
+ * and have its waitLock/waitProcLock fields cleared.  That's not done here.
+ *
+ * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
+ * table entry; but since we may be awaking some other process, we can't do
+ * that here; it's done by GrantLockLocal, instead.
  */
 void
-GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
+GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
 {
        lock->nGranted++;
        lock->granted[lockmode]++;
-       lock->grantMask |= BITS_ON[lockmode];
+       lock->grantMask |= LOCKBIT_ON(lockmode);
        if (lock->granted[lockmode] == lock->requested[lockmode])
-               lock->waitMask &= BITS_OFF[lockmode];
+               lock->waitMask &= LOCKBIT_OFF(lockmode);
+       proclock->holdMask |= LOCKBIT_ON(lockmode);
        LOCK_PRINT("GrantLock", lock, lockmode);
        Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
        Assert(lock->nGranted <= lock->nRequested);
-       holder->holding[lockmode]++;
-       holder->nHolding++;
-       Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
+}
+
+/*
+ * UnGrantLock -- opposite of GrantLock. 
+ *
+ * Updates the lock and proclock data structures to show that the lock
+ * is no longer held nor requested by the current holder.
+ *
+ * Returns true if there were any waiters waiting on the lock that
+ * should now be woken up with ProcLockWakeup.
+ */
+static bool
+UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+                       PROCLOCK *proclock, LockMethod lockMethodTable)
+{
+       bool wakeupNeeded = false;
+
+       Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
+       Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
+       Assert(lock->nGranted <= lock->nRequested);
+
+       /*
+        * fix the general lock stats
+        */
+       lock->nRequested--;
+       lock->requested[lockmode]--;
+       lock->nGranted--;
+       lock->granted[lockmode]--;
+
+       if (lock->granted[lockmode] == 0)
+       {
+               /* change the conflict mask.  No more of this lock type. */
+               lock->grantMask &= LOCKBIT_OFF(lockmode);
+       }
+
+       LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
+
+       /*
+        * We need only run ProcLockWakeup if the released lock conflicts with
+        * at least one of the lock types requested by waiter(s).  Otherwise
+        * whatever conflict made them wait must still exist.  NOTE: before
+        * MVCC, we could skip wakeup if lock->granted[lockmode] was still
+        * positive. But that's not true anymore, because the remaining
+        * granted locks might belong to some waiter, who could now be
+        * awakened because he doesn't conflict with his own locks.
+        */
+       if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
+               wakeupNeeded = true;
+
+       /*
+        * Now fix the per-proclock state.
+        */
+       proclock->holdMask &= LOCKBIT_OFF(lockmode);
+       PROCLOCK_PRINT("UnGrantLock: updated", proclock);
+
+       return wakeupNeeded;
+}
+
+/*
+ * GrantLockLocal -- update the locallock data structures to show
+ *             the lock request has been granted.
+ *
+ * We expect that LockAcquire made sure there is room to add a new
+ * ResourceOwner entry.
+ */
+static void
+GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
+{
+       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+       int                     i;
+
+       Assert(locallock->numLockOwners < locallock->maxLockOwners);
+       /* Count the total */
+       locallock->nLocks++;
+       /* Count the per-owner lock */
+       for (i = 0; i < locallock->numLockOwners; i++)
+       {
+               if (lockOwners[i].owner == owner)
+               {
+                       lockOwners[i].nLocks++;
+                       return;
+               }
+       }
+       lockOwners[i].owner = owner;
+       lockOwners[i].nLocks = 1;
+       locallock->numLockOwners++;
+}
+
+/*
+ * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
+ *             WaitOnLock on.
+ *
+ * proc.c needs this for the case where we are booted off the lock by
+ * timeout, but discover that someone granted us the lock anyway.
+ *
+ * We could just export GrantLockLocal, but that would require including
+ * resowner.h in lock.h, which creates circularity.
+ */
+void
+GrantAwaitedLock(void)
+{
+       GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
 /*
@@ -917,86 +1045,100 @@ GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
  * Caller must have set MyProc->heldLocks to reflect locks already held
  * on the lockable object by this process (under all XIDs).
  *
- * The locktable spinlock must be held at entry.
+ * The locktable's masterLock must be held at entry.
  */
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                  LOCK *lock, HOLDER *holder)
+WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+                  ResourceOwner owner)
 {
-       LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
+       LockMethod      lockMethodTable = LockMethods[lockmethodid];
        char       *new_status,
                           *old_status;
+       size_t          len;
 
-       Assert(lockmethod < NumLockMethods);
+       Assert(lockmethodid < NumLockMethods);
 
-       LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
+       LOCK_PRINT("WaitOnLock: sleeping on lock",
+                          locallock->lock, locallock->tag.mode);
 
        old_status = pstrdup(get_ps_display());
-       new_status = (char *) palloc(strlen(old_status) + 10);
-       strcpy(new_status, old_status);
-       strcat(new_status, " waiting");
+       len = strlen(old_status);
+       new_status = (char *) palloc(len + 8 + 1);
+       memcpy(new_status, old_status, len);
+       strcpy(new_status + len, " waiting");
        set_ps_display(new_status);
 
+       awaitedLock = locallock;
+       awaitedOwner = owner;
+
        /*
-        * NOTE: Think not to put any lock state cleanup after the call to
+        * NOTE: Think not to put any shared-state cleanup after the call to
         * ProcSleep, in either the normal or failure path.  The lock state
-        * must be fully set by the lock grantor, or by HandleDeadLock if we
+        * must be fully set by the lock grantor, or by CheckDeadLock if we
         * give up waiting for the lock.  This is necessary because of the
         * possibility that a cancel/die interrupt will interrupt ProcSleep
         * after someone else grants us the lock, but before we've noticed it.
         * Hence, after granting, the locktable state must fully reflect the
         * fact that we own the lock; we can't do additional work on return.
-        *
+        * Contrariwise, if we fail, any cleanup must happen in xact abort
+        * processing, not here, to ensure it will also happen in the
+        * cancel/die case.
         */
 
        if (ProcSleep(lockMethodTable,
-                                 lockmode,
-                                 lock,
-                                 holder) != STATUS_OK)
+                                 locallock->tag.mode,
+                                 locallock->lock,
+                                 locallock->proclock) != STATUS_OK)
        {
                /*
-                * We failed as a result of a deadlock, see HandleDeadLock().
-                * Quit now.  Removal of the holder and lock objects, if no longer
-                * needed, will happen in xact cleanup (see above for motivation).
-                *
+                * We failed as a result of a deadlock, see CheckDeadLock(). Quit
+                * now.
                 */
-               LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
-               SpinRelease(lockMethodTable->ctl->masterLock);
-               elog(ERROR, DeadLockMessage);
+               awaitedLock = NULL;
+               LOCK_PRINT("WaitOnLock: aborting on lock",
+                                  locallock->lock, locallock->tag.mode);
+               LWLockRelease(lockMethodTable->masterLock);
+
+               /*
+                * Now that we aren't holding the LockMgrLock, we can give an
+                * error report including details about the detected deadlock.
+                */
+               DeadLockReport();
                /* not reached */
        }
 
+       awaitedLock = NULL;
+
        set_ps_display(old_status);
        pfree(old_status);
        pfree(new_status);
 
-       LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
+       LOCK_PRINT("WaitOnLock: wakeup on lock",
+                          locallock->lock, locallock->tag.mode);
        return STATUS_OK;
 }
 
-/*--------------------
+/*
  * Remove a proc from the wait-queue it is on
  * (caller must know it is on one).
  *
  * Locktable lock must be held by caller.
  *
- * NB: this does not remove the process' holder object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup.  (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
- *--------------------
+ * NB: this does not clean up any locallock object that may exist for the lock.
  */
 void
-RemoveFromWaitQueue(PROC *proc)
+RemoveFromWaitQueue(PGPROC *proc)
 {
-       LOCK   *waitLock = proc->waitLock;
-       LOCKMODE lockmode = proc->waitLockMode;
+       LOCK       *waitLock = proc->waitLock;
+       PROCLOCK   *proclock = proc->waitProcLock;
+       LOCKMODE        lockmode = proc->waitLockMode;
+       LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
 
        /* Make sure proc is waiting */
        Assert(proc->links.next != INVALID_OFFSET);
        Assert(waitLock);
        Assert(waitLock->waitProcs.size > 0);
+       Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
 
        /* Remove proc from lock's wait queue */
        SHMQueueDelete(&(proc->links));
@@ -1010,18 +1152,42 @@ RemoveFromWaitQueue(PROC *proc)
        waitLock->requested[lockmode]--;
        /* don't forget to clear waitMask bit if appropriate */
        if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
-               waitLock->waitMask &= BITS_OFF[lockmode];
+               waitLock->waitMask &= LOCKBIT_OFF(lockmode);
 
        /* Clean up the proc's own state */
        proc->waitLock = NULL;
-       proc->waitHolder = NULL;
+       proc->waitProcLock = NULL;
+
+       /*
+        * Delete the proclock immediately if it represents no already-held locks.
+        * This must happen now because if the owner of the lock decides to release
+        * it, and the requested/granted counts then go to zero, LockRelease
+        * expects there to be no remaining proclocks.
+        */
+       if (proclock->holdMask == 0)
+       {
+               PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
+               SHMQueueDelete(&proclock->lockLink);
+               SHMQueueDelete(&proclock->procLink);
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE, NULL);
+               if (!proclock)
+                       elog(WARNING, "proclock table corrupted");
+       }
+
+       /*
+        * There should still be some requests for the lock ... else what were
+        * we waiting for?  Therefore no need to delete the lock object.
+        */
+       Assert(waitLock->nRequested > 0);
 
        /* See if any other waiters for the lock can be woken up now */
-       ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+       ProcLockWakeup(LockMethods[lockmethodid], waitLock);
 }
 
 /*
- * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
+ * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
  *             release one 'lockmode' lock on it.
  *
  * Side Effects: find any waiting processes that are now wakable,
@@ -1031,340 +1197,344 @@ RemoveFromWaitQueue(PROC *proc)
  *             come along and request the lock.)
  */
 bool
-LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode)
 {
+       LOCALLOCKTAG localtag;
+       LOCALLOCK  *locallock;
        LOCK       *lock;
-       SPINLOCK        masterLock;
-       bool            found;
-       LOCKMETHODTABLE *lockMethodTable;
-       HOLDER     *holder;
-       HOLDERTAG       holdertag;
-       HTAB       *holderTable;
-       bool            wakeupNeeded = false;
+       PROCLOCK   *proclock;
+       LWLockId        masterLock;
+       LockMethod      lockMethodTable;
+       bool            wakeupNeeded;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
-               elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
+       if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+               elog(LOG, "LockRelease: user lock [%u,%u] %s",
+                        locktag->locktag_field1, locktag->locktag_field2,
+                        lock_mode_names[lockmode]);
 #endif
 
-       /* ???????? This must be changed when short term locks will be used */
-       locktag->lockmethod = lockmethod;
+       /* ugly */
+       locktag->locktag_lockmethodid = lockmethodid;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
-               elog(NOTICE, "lockMethodTable is null in LockRelease");
+               elog(WARNING, "lockMethodTable is null in LockRelease");
                return FALSE;
        }
 
-       if (LockingIsDisabled)
-               return TRUE;
-
-       masterLock = lockMethodTable->ctl->masterLock;
-       SpinAcquire(masterLock);
-
        /*
-        * Find a lock with this tag
+        * Find the LOCALLOCK entry for this lock and lockmode
         */
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
-                                                               HASH_FIND, &found);
+       MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
+       localtag.lock = *locktag;
+       localtag.xid = xid;
+       localtag.mode = lockmode;
+
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &localtag,
+                                                                                 HASH_FIND, NULL);
 
        /*
         * let the caller print its own error message, too. Do not
-        * elog(ERROR).
+        * ereport(ERROR).
         */
-       if (!lock)
+       if (!locallock || locallock->nLocks <= 0)
        {
-               SpinRelease(masterLock);
-               elog(NOTICE, "LockRelease: locktable corrupted");
-               return FALSE;
-       }
-
-       if (!found)
-       {
-               SpinRelease(masterLock);
-               elog(NOTICE, "LockRelease: no such lock");
+               elog(WARNING, "you don't own a lock of type %s",
+                        lock_mode_names[lockmode]);
                return FALSE;
        }
-       LOCK_PRINT("LockRelease: found", lock, lockmode);
 
        /*
-        * Find the holder entry for this holder.
+        * Decrease the count for the resource owner.
         */
-       MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
-       holdertag.lock = MAKE_OFFSET(lock);
-       holdertag.proc = MAKE_OFFSET(MyProc);
-       TransactionIdStore(xid, &holdertag.xid);
-
-       holderTable = lockMethodTable->holderHash;
-       holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
-                                                                       HASH_FIND_SAVE, &found);
-       if (!holder || !found)
        {
-               SpinRelease(masterLock);
-#ifdef USER_LOCKS
-               if (!found && lockmethod == USER_LOCKMETHOD)
-                       elog(NOTICE, "LockRelease: no lock with this tag");
+               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+               ResourceOwner owner;
+               int                     i;
+
+               /* Session locks and user locks are not transactional */
+               if (xid != InvalidTransactionId &&
+                       lockmethodid == DEFAULT_LOCKMETHOD)
+                       owner = CurrentResourceOwner;
                else
-#endif
-                       elog(NOTICE, "LockRelease: holder table corrupted");
-               return FALSE;
+                       owner = NULL;
+
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == owner)
+                       {
+                               Assert(lockOwners[i].nLocks > 0);
+                               if (--lockOwners[i].nLocks == 0)
+                               {
+                                       /* compact out unused slot */
+                                       locallock->numLockOwners--;
+                                       if (i < locallock->numLockOwners)
+                                               lockOwners[i] = lockOwners[locallock->numLockOwners];
+                               }
+                               break;
+                       }
+               }
+               if (i < 0)
+               {
+                       /* don't release a lock belonging to another owner */
+                       elog(WARNING, "you don't own a lock of type %s",
+                                lock_mode_names[lockmode]);
+                       return FALSE;
+               }
        }
-       HOLDER_PRINT("LockRelease: found", holder);
 
        /*
-        * Check that we are actually holding a lock of the type we want to
-        * release.
+        * Decrease the total local count.      If we're still holding the lock,
+        * we're done.
         */
-       if (!(holder->holding[lockmode] > 0))
-       {
-               HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
-               Assert(holder->holding[lockmode] >= 0);
-               SpinRelease(masterLock);
-               elog(NOTICE, "LockRelease: you don't own a lock of type %s",
-                        lock_mode_names[lockmode]);
-               return FALSE;
-       }
-       Assert(holder->nHolding > 0);
-       Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
-       Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
-       Assert(lock->nGranted <= lock->nRequested);
+       locallock->nLocks--;
+
+       if (locallock->nLocks > 0)
+               return TRUE;
 
        /*
-        * fix the general lock stats
+        * Otherwise we've got to mess with the shared lock table.
         */
-       lock->nRequested--;
-       lock->requested[lockmode]--;
-       lock->nGranted--;
-       lock->granted[lockmode]--;
+       masterLock = lockMethodTable->masterLock;
 
-       if (lock->granted[lockmode] == 0)
+       LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+       /*
+        * We don't need to re-find the lock or proclock, since we kept their
+        * addresses in the locallock table, and they couldn't have been
+        * removed while we were holding a lock on them.
+        */
+       lock = locallock->lock;
+       LOCK_PRINT("LockRelease: found", lock, lockmode);
+       proclock = locallock->proclock;
+       PROCLOCK_PRINT("LockRelease: found", proclock);
+
+       /*
+        * Double-check that we are actually holding a lock of the type we
+        * want to release.
+        */
+       if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
        {
-               /* change the conflict mask.  No more of this lock type. */
-               lock->grantMask &= BITS_OFF[lockmode];
+               PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
+               LWLockRelease(masterLock);
+               elog(WARNING, "you don't own a lock of type %s",
+                        lock_mode_names[lockmode]);
+               RemoveLocalLock(locallock);
+               return FALSE;
        }
 
-       LOCK_PRINT("LockRelease: updated", lock, lockmode);
-       Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-       Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-       Assert(lock->nGranted <= lock->nRequested);
+       wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
 
        /*
-        * We need only run ProcLockWakeup if the released lock conflicts with
-        * at least one of the lock types requested by waiter(s).  Otherwise
-        * whatever conflict made them wait must still exist.  NOTE: before MVCC,
-        * we could skip wakeup if lock->granted[lockmode] was still positive.
-        * But that's not true anymore, because the remaining granted locks might
-        * belong to some waiter, who could now be awakened because he doesn't
-        * conflict with his own locks.
-        *
+        * If this was my last hold on this lock, delete my entry in the
+        * proclock table.
         */
-       if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
-               wakeupNeeded = true;
-
-       if (lock->nRequested == 0)
+       if (proclock->holdMask == 0)
        {
-               /*
-                * if there's no one waiting in the queue,
-                * we just released the last lock on this object.
-                * Delete it from the lock table.
-                *
-                */
-               Assert(lockMethodTable->lockHash->hash == tag_hash);
-               lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                                       (Pointer) &(lock->tag),
-                                                                       HASH_REMOVE,
-                                                                       &found);
-               if (!lock || !found)
+               PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
+               SHMQueueDelete(&proclock->lockLink);
+               SHMQueueDelete(&proclock->procLink);
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE, NULL);
+               if (!proclock)
                {
-                       SpinRelease(masterLock);
-                       elog(NOTICE, "LockRelease: remove lock, table corrupted");
+                       LWLockRelease(masterLock);
+                       elog(WARNING, "proclock table corrupted");
+                       RemoveLocalLock(locallock);
                        return FALSE;
                }
-               wakeupNeeded = false;   /* should be false, but make sure */
        }
 
-       /*
-        * Now fix the per-holder lock stats.
-        */
-       holder->holding[lockmode]--;
-       holder->nHolding--;
-       HOLDER_PRINT("LockRelease: updated", holder);
-       Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-
-       /*
-        * If this was my last hold on this lock, delete my entry in the holder
-        * table.
-        */
-       if (holder->nHolding == 0)
+       if (lock->nRequested == 0)
        {
-               HOLDER_PRINT("LockRelease: deleting", holder);
-               SHMQueueDelete(&holder->lockLink);
-               SHMQueueDelete(&holder->procLink);
-               holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
-                                                                               HASH_REMOVE_SAVED, &found);
-               if (!holder || !found)
+               /*
+                * We've just released the last lock, so garbage-collect the lock
+                * object.
+                */
+               LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
+               Assert(SHMQueueEmpty(&(lock->procLocks)));
+               lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                                       (void *) &(lock->tag),
+                                                                       HASH_REMOVE, NULL);
+               if (!lock)
                {
-                       SpinRelease(masterLock);
-                       elog(NOTICE, "LockRelease: remove holder, table corrupted");
+                       LWLockRelease(masterLock);
+                       elog(WARNING, "lock table corrupted");
+                       RemoveLocalLock(locallock);
                        return FALSE;
                }
        }
+       else
+       {
+               /*
+                * Wake up waiters if needed.
+                */
+               if (wakeupNeeded)
+                       ProcLockWakeup(lockMethodTable, lock);
+       }
 
-       /*
-        * Wake up waiters if needed.
-        */
-       if (wakeupNeeded)
-               ProcLockWakeup(lockMethodTable, lock);
+       LWLockRelease(masterLock);
 
-       SpinRelease(masterLock);
+       RemoveLocalLock(locallock);
        return TRUE;
 }
 
 /*
- * LockReleaseAll -- Release all locks in a process's lock list.
+ * LockReleaseAll -- Release all locks of the specified lock method that
+ *             are held by the current process.
  *
- * Well, not really *all* locks.
+ * Well, not necessarily *all* locks.  The available behaviors are:
  *
- * If 'allxids' is TRUE, all locks of the specified lock method are
- * released, regardless of transaction affiliation.
+ * allxids == true: release all locks regardless of transaction
+ * affiliation.
  *
- * If 'allxids' is FALSE, all locks of the specified lock method and
- * specified XID are released.
+ * allxids == false: release all locks with Xid != 0
+ * (zero is the Xid used for "session" locks).
  */
 bool
-LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
-                          bool allxids, TransactionId xid)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
 {
-       SHM_QUEUE  *procHolders = &(proc->procHolders);
-       HOLDER     *holder;
-       HOLDER     *nextHolder;
-       SPINLOCK        masterLock;
-       LOCKMETHODTABLE *lockMethodTable;
+       HASH_SEQ_STATUS status;
+       SHM_QUEUE  *procLocks = &(MyProc->procLocks);
+       LWLockId        masterLock;
+       LockMethod      lockMethodTable;
        int                     i,
                                numLockModes;
+       LOCALLOCK  *locallock;
+       PROCLOCK   *proclock;
        LOCK       *lock;
-       bool            found;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-               elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
-                        lockmethod, proc->pid);
+       if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
 #endif
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
-               elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
+               elog(WARNING, "bad lock method: %d", lockmethodid);
                return FALSE;
        }
 
-       numLockModes = lockMethodTable->ctl->numLockModes;
-       masterLock = lockMethodTable->ctl->masterLock;
+       numLockModes = lockMethodTable->numLockModes;
+       masterLock = lockMethodTable->masterLock;
+
+       /*
+        * First we run through the locallock table and get rid of unwanted
+        * entries, then we scan the process's proclocks and get rid of those.
+        * We do this separately because we may have multiple locallock
+        * entries pointing to the same proclock, and we daren't end up with
+        * any dangling pointers.
+        */
+       hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
 
-       SpinAcquire(masterLock);
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               if (locallock->proclock == NULL || locallock->lock == NULL)
+               {
+                       /*
+                        * We must've run out of shared memory while trying to set up
+                        * this lock.  Just forget the local entry.
+                        */
+                       Assert(locallock->nLocks == 0);
+                       RemoveLocalLock(locallock);
+                       continue;
+               }
 
-       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-                                                                        offsetof(HOLDER, procLink));
+               /* Ignore items that are not of the lockmethod to be removed */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+                       continue;
 
-       while (holder)
+               /*
+                * Ignore locks with Xid=0 unless we are asked to release all
+                * locks
+                */
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
+                       && !allxids)
+                       continue;
+
+               RemoveLocalLock(locallock);
+       }
+
+       LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
+
+       while (proclock)
        {
                bool            wakeupNeeded = false;
+               PROCLOCK   *nextHolder;
 
-               /* Get link first, since we may unlink/delete this holder */
-               nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-                                                                                        offsetof(HOLDER, procLink));
+               /* Get link first, since we may unlink/delete this proclock */
+               nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
+                                                                                  offsetof(PROCLOCK, procLink));
 
-               Assert(holder->tag.proc == MAKE_OFFSET(proc));
+               Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
 
-               lock = (LOCK *) MAKE_PTR(holder->tag.lock);
+               lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
 
                /* Ignore items that are not of the lockmethod to be removed */
-               if (LOCK_LOCKMETHOD(*lock) != lockmethod)
+               if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
                        goto next_item;
 
-               /* If not allxids, ignore items that are of the wrong xid */
-               if (!allxids && xid != holder->tag.xid)
+               /*
+                * Ignore locks with Xid=0 unless we are asked to release all
+                * locks
+                */
+               if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
+                       && !allxids)
                        goto next_item;
 
-               HOLDER_PRINT("LockReleaseAll", holder);
+               PROCLOCK_PRINT("LockReleaseAll", proclock);
                LOCK_PRINT("LockReleaseAll", lock, 0);
                Assert(lock->nRequested >= 0);
                Assert(lock->nGranted >= 0);
                Assert(lock->nGranted <= lock->nRequested);
-               Assert(holder->nHolding >= 0);
-               Assert(holder->nHolding <= lock->nRequested);
+               Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
                /*
                 * fix the general lock stats
-                *
                 */
-               if (lock->nRequested != holder->nHolding)
-               {
-                       for (i = 1; i <= numLockModes; i++)
-                       {
-                               Assert(holder->holding[i] >= 0);
-                               if (holder->holding[i] > 0)
-                               {
-                                       lock->requested[i] -= holder->holding[i];
-                                       lock->granted[i] -= holder->holding[i];
-                                       Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
-                                       if (lock->granted[i] == 0)
-                                               lock->grantMask &= BITS_OFF[i];
-                                       /*
-                                        * Read comments in LockRelease
-                                        */
-                                       if (!wakeupNeeded &&
-                                               lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
-                                               wakeupNeeded = true;
-                               }
-                       }
-                       lock->nRequested -= holder->nHolding;
-                       lock->nGranted -= holder->nHolding;
-                       Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
-                       Assert(lock->nGranted <= lock->nRequested);
-               }
-               else
+               if (proclock->holdMask)
                {
-                       /*
-                        * This holder accounts for all the requested locks on the object,
-                        * so we can be lazy and just zero things out.
-                        *
-                        */
-                       lock->nRequested = 0;
-                       lock->nGranted = 0;
-                       /* Fix the lock status, just for next LOCK_PRINT message. */
                        for (i = 1; i <= numLockModes; i++)
                        {
-                               Assert(lock->requested[i] == lock->granted[i]);
-                               lock->requested[i] = lock->granted[i] = 0;
+                               if (proclock->holdMask & LOCKBIT_ON(i))
+                                       wakeupNeeded |= UnGrantLock(lock, i, proclock,
+                                                                                               lockMethodTable);
                        }
                }
+               Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+               Assert(lock->nGranted <= lock->nRequested);
                LOCK_PRINT("LockReleaseAll: updated", lock, 0);
 
-               HOLDER_PRINT("LockReleaseAll: deleting", holder);
+               PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
 
                /*
-                * Remove the holder entry from the linked lists
+                * Remove the proclock entry from the linked lists
                 */
-               SHMQueueDelete(&holder->lockLink);
-               SHMQueueDelete(&holder->procLink);
+               SHMQueueDelete(&proclock->lockLink);
+               SHMQueueDelete(&proclock->procLink);
 
                /*
-                * remove the holder entry from the hashtable
+                * remove the proclock entry from the hashtable
                 */
-               holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
-                                                                               (Pointer) holder,
-                                                                               HASH_REMOVE,
-                                                                               &found);
-               if (!holder || !found)
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE,
+                                                                                       NULL);
+               if (!proclock)
                {
-                       SpinRelease(masterLock);
-                       elog(NOTICE, "LockReleaseAll: holder table corrupted");
+                       LWLockRelease(masterLock);
+                       elog(WARNING, "proclock table corrupted");
                        return FALSE;
                }
 
@@ -1373,17 +1543,16 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                        /*
                         * We've just released the last lock, so garbage-collect the
                         * lock object.
-                        *
                         */
                        LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
-                       Assert(lockMethodTable->lockHash->hash == tag_hash);
-                       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                                               (Pointer) &(lock->tag),
-                                                                               HASH_REMOVE, &found);
-                       if (!lock || !found)
+                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                                               (void *) &(lock->tag),
+                                                                               HASH_REMOVE, NULL);
+                       if (!lock)
                        {
-                               SpinRelease(masterLock);
-                               elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
+                               LWLockRelease(masterLock);
+                               elog(WARNING, "lock table corrupted");
                                return FALSE;
                        }
                }
@@ -1391,40 +1560,165 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                        ProcLockWakeup(lockMethodTable, lock);
 
 next_item:
-               holder = nextHolder;
+               proclock = nextHolder;
        }
 
-       SpinRelease(masterLock);
+       LWLockRelease(masterLock);
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-               elog(DEBUG, "LockReleaseAll: done");
+       if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               elog(LOG, "LockReleaseAll done");
 #endif
 
        return TRUE;
 }
 
+/*
+ * LockReleaseCurrentOwner
+ *             Release all locks belonging to CurrentResourceOwner
+ *
+ * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
+ */
+void
+LockReleaseCurrentOwner(void)
+{
+       HASH_SEQ_STATUS status;
+       LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *lockOwners;
+       int                     i;
+
+       hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               /* Ignore items that must be nontransactional */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+                       continue;
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+                       continue;
+
+               /* Scan to see if there are any locks belonging to current owner */
+               lockOwners = locallock->lockOwners;
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == CurrentResourceOwner)
+                       {
+                               Assert(lockOwners[i].nLocks > 0);
+                               if (lockOwners[i].nLocks < locallock->nLocks)
+                               {
+                                       /*
+                                        * We will still hold this lock after forgetting this
+                                        * ResourceOwner.
+                                        */
+                                       locallock->nLocks -= lockOwners[i].nLocks;
+                                       /* compact out unused slot */
+                                       locallock->numLockOwners--;
+                                       if (i < locallock->numLockOwners)
+                                               lockOwners[i] = lockOwners[locallock->numLockOwners];
+                               }
+                               else
+                               {
+                                       Assert(lockOwners[i].nLocks == locallock->nLocks);
+                                       /* We want to call LockRelease just once */
+                                       lockOwners[i].nLocks = 1;
+                                       locallock->nLocks = 1;
+                                       if (!LockRelease(DEFAULT_LOCKMETHOD,
+                                                                        &locallock->tag.lock,
+                                                                        locallock->tag.xid,
+                                                                        locallock->tag.mode))
+                                               elog(WARNING, "LockReleaseCurrentOwner: failed??");
+                               }
+                               break;
+                       }
+               }
+       }
+}
+
+/*
+ * LockReassignCurrentOwner
+ *             Reassign all locks belonging to CurrentResourceOwner to belong
+ *             to its parent resource owner
+ */
+void
+LockReassignCurrentOwner(void)
+{
+       ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
+       HASH_SEQ_STATUS status;
+       LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *lockOwners;
+
+       Assert(parent != NULL);
+
+       hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               int                     i;
+               int                     ic = -1;
+               int                     ip = -1;
+
+               /* Ignore items that must be nontransactional */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+                       continue;
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+                       continue;
+
+               /*
+                * Scan to see if there are any locks belonging to current owner
+                * or its parent
+                */
+               lockOwners = locallock->lockOwners;
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == CurrentResourceOwner)
+                               ic = i;
+                       else if (lockOwners[i].owner == parent)
+                               ip = i;
+               }
+
+               if (ic < 0)
+                       continue;                       /* no current locks */
+
+               if (ip < 0)
+               {
+                       /* Parent has no slot, so just give it child's slot */
+                       lockOwners[ic].owner = parent;
+               }
+               else
+               {
+                       /* Merge child's count with parent's */
+                       lockOwners[ip].nLocks += lockOwners[ic].nLocks;
+                       /* compact out unused slot */
+                       locallock->numLockOwners--;
+                       if (ic < locallock->numLockOwners)
+                               lockOwners[ic] = lockOwners[locallock->numLockOwners];
+               }
+       }
+}
+
+
+/*
+ * Estimate shared-memory space used for lock tables
+ */
 int
 LockShmemSize(int maxBackends)
 {
        int                     size = 0;
+       long            max_table_size = NLOCKENTS(maxBackends);
 
-       size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
-       size += MAXALIGN(maxBackends * sizeof(PROC));           /* each MyProc */
-       size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL));          /* each
-                                                                                                                                * lockMethodTable->ctl */
+       /* lock method headers */
+       size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
 
        /* lockHash table */
-       size += hash_estimate_size(NLOCKENTS(maxBackends),
-                                                          SHMEM_LOCKTAB_KEYSIZE,
-                                                          SHMEM_LOCKTAB_DATASIZE);
+       size += hash_estimate_size(max_table_size, sizeof(LOCK));
 
-       /* holderHash table */
-       size += hash_estimate_size(NLOCKENTS(maxBackends),
-                                                          SHMEM_HOLDERTAB_KEYSIZE,
-                                                          SHMEM_HOLDERTAB_DATASIZE);
+       /* proclockHash table */
+       size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
 
        /*
+        * Note we count only one pair of hash tables, since the userlocks
+        * table actually overlays the main one.
+        *
         * Since the lockHash entry count above is only an estimate, add 10%
         * safety margin.
         */
@@ -1433,54 +1727,117 @@ LockShmemSize(int maxBackends)
        return size;
 }
 
+/*
+ * GetLockStatusData - Return a summary of the lock manager's internal
+ * status, for use in a user-level reporting function.
+ *
+ * The return data consists of an array of PROCLOCK objects, with the
+ * associated PGPROC and LOCK objects for each.  Note that multiple
+ * copies of the same PGPROC and/or LOCK objects are likely to appear.
+ * It is the caller's responsibility to match up duplicates if wanted.
+ *
+ * The design goal is to hold the LockMgrLock for as short a time as possible;
+ * thus, this function simply makes a copy of the necessary data and releases
+ * the lock, allowing the caller to contemplate and format the data for as
+ * long as it pleases.
+ */
+LockData *
+GetLockStatusData(void)
+{
+       LockData   *data;
+       HTAB       *proclockTable;
+       PROCLOCK   *proclock;
+       HASH_SEQ_STATUS seqstat;
+       int                     i;
+
+       data = (LockData *) palloc(sizeof(LockData));
+
+       LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
+
+       proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
+
+       data->nelements = i = proclockTable->hctl->nentries;
+
+       data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
+       data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
+       data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
+       data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
+
+       hash_seq_init(&seqstat, proclockTable);
+
+       i = 0;
+       while ((proclock = hash_seq_search(&seqstat)))
+       {
+               PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
+               LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+
+               data->proclockaddrs[i] = MAKE_OFFSET(proclock);
+               memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
+               memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
+               memcpy(&(data->locks[i]), lock, sizeof(LOCK));
+
+               i++;
+       }
+
+       LWLockRelease(LockMgrLock);
+
+       Assert(i == data->nelements);
+
+       return data;
+}
+
+/* Provide the textual name of any lock mode */
+const char *
+GetLockmodeName(LOCKMODE mode)
+{
+       Assert(mode <= MAX_LOCKMODES);
+       return lock_mode_names[mode];
+}
 
 #ifdef LOCK_DEBUG
 /*
- * Dump all locks in the proc->procHolders list.
+ * Dump all locks in the MyProc->procLocks list.
  *
  * Must have already acquired the masterLock.
  */
 void
 DumpLocks(void)
 {
-       SHMEM_OFFSET location;
-       PROC       *proc;
-       SHM_QUEUE  *procHolders;
-       HOLDER     *holder;
+       PGPROC     *proc;
+       SHM_QUEUE  *procLocks;
+       PROCLOCK   *proclock;
        LOCK       *lock;
-       int                     lockmethod = DEFAULT_LOCKMETHOD;
-       LOCKMETHODTABLE *lockMethodTable;
+       int                     lockmethodid = DEFAULT_LOCKMETHOD;
+       LockMethod      lockMethodTable;
 
-       ShmemPIDLookup(MyProcPid, &location);
-       if (location == INVALID_OFFSET)
+       proc = MyProc;
+       if (proc == NULL)
                return;
-       proc = (PROC *) MAKE_PTR(location);
-       if (proc != MyProc)
-               return;
-       procHolders = &proc->procHolders;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       procLocks = &proc->procLocks;
+
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
                return;
 
        if (proc->waitLock)
                LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
-       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-                                                                        offsetof(HOLDER, procLink));
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
 
-       while (holder)
+       while (proclock)
        {
-               Assert(holder->tag.proc == MAKE_OFFSET(proc));
+               Assert(proclock->tag.proc == MAKE_OFFSET(proc));
 
-               lock = (LOCK *) MAKE_PTR(holder->tag.lock);
+               lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
 
-               HOLDER_PRINT("DumpLocks", holder);
+               PROCLOCK_PRINT("DumpLocks", proclock);
                LOCK_PRINT("DumpLocks", lock, 0);
 
-               holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-                                                                                offsetof(HOLDER, procLink));
+               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
+                                                                                  offsetof(PROCLOCK, procLink));
        }
 }
 
@@ -1490,48 +1847,41 @@ DumpLocks(void)
 void
 DumpAllLocks(void)
 {
-       SHMEM_OFFSET location;
-       PROC       *proc;
-       HOLDER     *holder = NULL;
+       PGPROC     *proc;
+       PROCLOCK   *proclock;
        LOCK       *lock;
-       int                     pid;
-       int                     lockmethod = DEFAULT_LOCKMETHOD;
-       LOCKMETHODTABLE *lockMethodTable;
-       HTAB       *holderTable;
+       int                     lockmethodid = DEFAULT_LOCKMETHOD;
+       LockMethod      lockMethodTable;
+       HTAB       *proclockTable;
        HASH_SEQ_STATUS status;
 
-       pid = getpid();
-       ShmemPIDLookup(pid, &location);
-       if (location == INVALID_OFFSET)
-               return;
-       proc = (PROC *) MAKE_PTR(location);
-       if (proc != MyProc)
+       proc = MyProc;
+       if (proc == NULL)
                return;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
                return;
 
-       holderTable = lockMethodTable->holderHash;
+       proclockTable = LockMethodProcLockHash[lockmethodid];
 
        if (proc->waitLock)
                LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
 
-       hash_seq_init(&status, holderTable);
-       while ((holder = (HOLDER *) hash_seq_search(&status)) &&
-                  (holder != (HOLDER *) TRUE))
+       hash_seq_init(&status, proclockTable);
+       while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
        {
-               HOLDER_PRINT("DumpAllLocks", holder);
+               PROCLOCK_PRINT("DumpAllLocks", proclock);
 
-               if (holder->tag.lock)
+               if (proclock->tag.lock)
                {
-                       lock = (LOCK *) MAKE_PTR(holder->tag.lock);
+                       lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
                        LOCK_PRINT("DumpAllLocks", lock, 0);
                }
                else
-                       elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
+                       elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
        }
 }
 
-#endif /* LOCK_DEBUG */
+#endif   /* LOCK_DEBUG */