]> granicus.if.org Git - postgresql/blobdiff - src/backend/storage/lmgr/lock.c
Restructure LOCKTAG as per discussions of a couple months ago.
[postgresql] / src / backend / storage / lmgr / lock.c
index d0c5a055f9e0d24a144eb547cdd69c1392bc954b..576fc205299eebe48047cab9f3dcdf5169ab35fe 100644 (file)
@@ -3,12 +3,12 @@
  * lock.c
  *       POSTGRES low-level lock mechanism
  *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.120 2003/02/18 02:13:24 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
  *     Interface:
  *
  *     LockAcquire(), LockRelease(), LockMethodTableInit(),
- *     LockMethodTableRename(), LockReleaseAll,
+ *     LockMethodTableRename(), LockReleaseAll(),
  *     LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <unistd.h>
 #include <signal.h>
+#include <unistd.h>
 
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
+#include "utils/resowner.h"
 
 
 /* This configuration variable is used to set the lock table size */
@@ -46,12 +47,24 @@ int                 max_locks_per_xact; /* set by guc.c */
 #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
 
 
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                  LOCK *lock, PROCLOCK *proclock);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
-                                int *myHolding);
+/*
+ * map from lock method id to the lock table data structures
+ */
+static LockMethod LockMethods[MAX_LOCK_METHODS];
+static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
 
-static char *lock_mode_names[] =
+/* exported so lmgr.c can initialize it */
+int                    NumLockMethods;
+
+
+/* private state for GrantAwaitedLock */
+static LOCALLOCK *awaitedLock;
+static ResourceOwner awaitedOwner;
+
+
+static const char *const lock_mode_names[] =
 {
        "INVALID",
        "AccessShareLock",
@@ -84,7 +97,7 @@ static char *lock_mode_names[] =
  * --------
  */
 
-int                    Trace_lock_oidmin = BootstrapObjectIdData;
+int                    Trace_lock_oidmin = FirstNormalObjectId;
 bool           Trace_locks = false;
 bool           Trace_userlocks = false;
 int                    Trace_lock_table = 0;
@@ -95,10 +108,11 @@ inline static bool
 LOCK_DEBUG_ENABLED(const LOCK *lock)
 {
        return
-               (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
-          || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
-                && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
-               || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
+               (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD)
+                 || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD))
+                && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin))
+               || (Trace_lock_table
+                       && (lock->tag.locktag_field2 == Trace_lock_table));
 }
 
 
@@ -107,12 +121,14 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 {
        if (LOCK_DEBUG_ENABLED(lock))
                elog(LOG,
-                        "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
+                        "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
                         "req(%d,%d,%d,%d,%d,%d,%d)=%d "
                         "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
                         where, MAKE_OFFSET(lock),
-                        lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
-                        lock->tag.objId.blkno, lock->grantMask,
+                        lock->tag.locktag_field1, lock->tag.locktag_field2,
+                        lock->tag.locktag_field3, lock->tag.locktag_field4,
+                        lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
+                        lock->grantMask,
                         lock->requested[1], lock->requested[2], lock->requested[3],
                         lock->requested[4], lock->requested[5], lock->requested[6],
                         lock->requested[7], lock->nRequested,
@@ -126,20 +142,13 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 inline static void
 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 {
-       if (
-       (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
-         || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
-        && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
-               || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
-               )
+       if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
                elog(LOG,
-                        "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+               "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
                         where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
                         PROCLOCK_LOCKMETHOD(*(proclockP)),
                         proclockP->tag.proc, proclockP->tag.xid,
-                  proclockP->holding[1], proclockP->holding[2], proclockP->holding[3],
-                  proclockP->holding[4], proclockP->holding[5], proclockP->holding[6],
-                        proclockP->holding[7], proclockP->nHolding);
+                        (int) proclockP->holdMask);
 }
 
 #else                                                  /* not LOCK_DEBUG */
@@ -149,22 +158,15 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 #endif   /* not LOCK_DEBUG */
 
 
-/*
- * These are to simplify/speed up some bit arithmetic.
- *
- * XXX is a fetch from a static array really faster than a shift?
- * Wouldn't bet on it...
- */
-
-static LOCKMASK BITS_OFF[MAX_LOCKMODES];
-static LOCKMASK BITS_ON[MAX_LOCKMODES];
-
-/*
- * map from lockmethod to the lock table structure
- */
-static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
+static void RemoveLocalLock(LOCALLOCK *locallock);
+static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+                  ResourceOwner owner);
+static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
+                                int *myHolding);
+static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+                                               PROCLOCK *proclock, LockMethod lockMethodTable);
 
-static int     NumLockMethods;
 
 /*
  * InitLocks -- Init the lock module.  Create a private data
@@ -173,28 +175,20 @@ static int        NumLockMethods;
 void
 InitLocks(void)
 {
-       int                     i;
-       int                     bit;
-
-       bit = 1;
-       for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
-       {
-               BITS_ON[i] = bit;
-               BITS_OFF[i] = ~bit;
-       }
+       /* NOP */
 }
 
 
 /*
  * Fetch the lock method table associated with a given lock
  */
-LOCKMETHODTABLE *
+LockMethod
 GetLocksMethodTable(LOCK *lock)
 {
-       LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
+       LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
 
-       Assert(lockmethod > 0 && lockmethod < NumLockMethods);
-       return LockMethodTable[lockmethod];
+       Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
+       return LockMethods[lockmethodid];
 }
 
 
@@ -205,16 +199,16 @@ GetLocksMethodTable(LOCK *lock)
  * Notes: just copying.  Should only be called once.
  */
 static void
-LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
-                          LOCKMASK *conflictsP,
+LockMethodInit(LockMethod lockMethodTable,
+                          const LOCKMASK *conflictsP,
                           int numModes)
 {
        int                     i;
 
        lockMethodTable->numLockModes = numModes;
-       numModes++;
-       for (i = 0; i < numModes; i++, conflictsP++)
-               lockMethodTable->conflictTab[i] = *conflictsP;
+       /* copies useless zero element as well as the N lockmodes */
+       for (i = 0; i <= numModes; i++)
+               lockMethodTable->conflictTab[i] = conflictsP[i];
 }
 
 /*
@@ -226,13 +220,14 @@ LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
  * by the postmaster are inherited by each backend, so they must be in
  * TopMemoryContext.
  */
-LOCKMETHOD
-LockMethodTableInit(char *tabName,
-                                       LOCKMASK *conflictsP,
+LOCKMETHODID
+LockMethodTableInit(const char *tabName,
+                                       const LOCKMASK *conflictsP,
                                        int numModes,
                                        int maxBackends)
 {
-       LOCKMETHODTABLE *lockMethodTable;
+       LockMethod      newLockMethod;
+       LOCKMETHODID lockmethodid;
        char       *shmemName;
        HASHCTL         info;
        int                     hash_flags;
@@ -241,15 +236,12 @@ LockMethodTableInit(char *tabName,
                                max_table_size;
 
        if (numModes >= MAX_LOCKMODES)
-       {
-               elog(WARNING, "LockMethodTableInit: too many lock types %d greater than %d",
-                        numModes, MAX_LOCKMODES);
-               return INVALID_LOCKMETHOD;
-       }
+               elog(ERROR, "too many lock types %d (limit is %d)",
+                        numModes, MAX_LOCKMODES - 1);
 
        /* Compute init/max size to request for lock hashtables */
        max_table_size = NLOCKENTS(maxBackends);
-       init_table_size = max_table_size / 10;
+       init_table_size = max_table_size / 2;
 
        /* Allocate a string for the shmem index table lookups. */
        /* This is just temp space in this routine, so palloc is OK. */
@@ -257,62 +249,52 @@ LockMethodTableInit(char *tabName,
 
        /* each lock table has a header in shared memory */
        sprintf(shmemName, "%s (lock method table)", tabName);
-       lockMethodTable = (LOCKMETHODTABLE *)
-               ShmemInitStruct(shmemName, sizeof(LOCKMETHODTABLE), &found);
-
-       if (!lockMethodTable)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
-
-       /*
-        * Lock the LWLock for the table (probably not necessary here)
-        */
-       LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
+       newLockMethod = (LockMethod)
+               ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
 
-       /*
-        * no zero-th table
-        */
-       NumLockMethods = 1;
+       if (!newLockMethod)
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
        /*
         * we're first - initialize
         */
        if (!found)
        {
-               MemSet(lockMethodTable, 0, sizeof(LOCKMETHODTABLE));
-               lockMethodTable->masterLock = LockMgrLock;
-               lockMethodTable->lockmethod = NumLockMethods;
+               MemSet(newLockMethod, 0, sizeof(LockMethodData));
+               newLockMethod->masterLock = LockMgrLock;
+               LockMethodInit(newLockMethod, conflictsP, numModes);
        }
 
        /*
         * other modules refer to the lock table by a lockmethod ID
         */
-       LockMethodTable[NumLockMethods] = lockMethodTable;
-       NumLockMethods++;
-       Assert(NumLockMethods <= MAX_LOCK_METHODS);
+       Assert(NumLockMethods < MAX_LOCK_METHODS);
+       lockmethodid = NumLockMethods++;
+       LockMethods[lockmethodid] = newLockMethod;
 
        /*
         * allocate a hash table for LOCK structs.      This is used to store
         * per-locked-object information.
         */
+       MemSet(&info, 0, sizeof(info));
        info.keysize = sizeof(LOCKTAG);
        info.entrysize = sizeof(LOCK);
        info.hash = tag_hash;
        hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
        sprintf(shmemName, "%s (lock hash)", tabName);
-       lockMethodTable->lockHash = ShmemInitHash(shmemName,
-                                                                                         init_table_size,
-                                                                                         max_table_size,
-                                                                                         &info,
-                                                                                         hash_flags);
+       LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
+                                                                                                        init_table_size,
+                                                                                                        max_table_size,
+                                                                                                        &info,
+                                                                                                        hash_flags);
 
-       if (!lockMethodTable->lockHash)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
+       if (!LockMethodLockHash[lockmethodid])
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
        /*
         * allocate a hash table for PROCLOCK structs.  This is used to store
-        * per-lock-proclock information.
+        * per-lock-holder information.
         */
        info.keysize = sizeof(PROCLOCKTAG);
        info.entrysize = sizeof(PROCLOCK);
@@ -320,54 +302,73 @@ LockMethodTableInit(char *tabName,
        hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
        sprintf(shmemName, "%s (proclock hash)", tabName);
-       lockMethodTable->proclockHash = ShmemInitHash(shmemName,
-                                                                                               init_table_size,
-                                                                                               max_table_size,
-                                                                                               &info,
-                                                                                               hash_flags);
+       LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
+                                                                                                                init_table_size,
+                                                                                                                max_table_size,
+                                                                                                                &info,
+                                                                                                                hash_flags);
+
+       if (!LockMethodProcLockHash[lockmethodid])
+               elog(FATAL, "could not initialize lock table \"%s\"", tabName);
 
-       if (!lockMethodTable->proclockHash)
-               elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
+       /*
+        * allocate a non-shared hash table for LOCALLOCK structs.      This is
+        * used to store lock counts and resource owner information.
+        *
+        * The non-shared table could already exist in this process (this occurs
+        * when the postmaster is recreating shared memory after a backend
+        * crash). If so, delete and recreate it.  (We could simply leave it,
+        * since it ought to be empty in the postmaster, but for safety let's
+        * zap it.)
+        */
+       if (LockMethodLocalHash[lockmethodid])
+               hash_destroy(LockMethodLocalHash[lockmethodid]);
 
-       /* init data structures */
-       LockMethodInit(lockMethodTable, conflictsP, numModes);
+       info.keysize = sizeof(LOCALLOCKTAG);
+       info.entrysize = sizeof(LOCALLOCK);
+       info.hash = tag_hash;
+       hash_flags = (HASH_ELEM | HASH_FUNCTION);
 
-       LWLockRelease(LockMgrLock);
+       sprintf(shmemName, "%s (locallock hash)", tabName);
+       LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
+                                                                                                       128,
+                                                                                                       &info,
+                                                                                                       hash_flags);
 
        pfree(shmemName);
 
-       return lockMethodTable->lockmethod;
+       return lockmethodid;
 }
 
 /*
  * LockMethodTableRename -- allocate another lockmethod ID to the same
  *             lock table.
  *
- * NOTES: Both the lock module and the lock chain (lchain.c)
- *             module use table id's to distinguish between different
- *             kinds of locks.  Short term and long term locks look
- *             the same to the lock table, but are handled differently
- *             by the lock chain manager.      This function allows the
- *             client to use different lockmethods when acquiring/releasing
- *             short term and long term locks, yet store them all in one hashtable.
+ * NOTES: This function makes it possible to have different lockmethodids,
+ *             and hence different locking semantics, while still storing all
+ *             the data in one shared-memory hashtable.
  */
 
-LOCKMETHOD
-LockMethodTableRename(LOCKMETHOD lockmethod)
+LOCKMETHODID
+LockMethodTableRename(LOCKMETHODID lockmethodid)
 {
-       LOCKMETHOD      newLockMethod;
+       LOCKMETHODID newLockMethodId;
 
        if (NumLockMethods >= MAX_LOCK_METHODS)
                return INVALID_LOCKMETHOD;
-       if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+       if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
                return INVALID_LOCKMETHOD;
 
        /* other modules refer to the lock table by a lockmethod ID */
-       newLockMethod = NumLockMethods;
+       newLockMethodId = NumLockMethods;
        NumLockMethods++;
 
-       LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
-       return newLockMethod;
+       LockMethods[newLockMethodId] = LockMethods[lockmethodid];
+       LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
+       LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
+       LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
+
+       return newLockMethodId;
 }
 
 /*
@@ -377,7 +378,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
  *             a FALSE return is to be expected if dontWait is TRUE;
  *             but if dontWait is FALSE, only a parameter error can cause
- *             a FALSE return.  (XXX probably we should just elog on parameter
+ *             a FALSE return.  (XXX probably we should just ereport on parameter
  *             errors, instead of conflating this with failure to acquire lock?)
  *
  * Side Effects: The lock is acquired and recorded in lock tables.
@@ -397,33 +398,16 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *             the lock.  While the lock is active other clients can still
  *             read and write the tuple but they can be aware that it has
  *             been locked at the application level by someone.
- *             User locks use lock tags made of an uint16 and an uint32, for
- *             example 0 and a tuple oid, or any other arbitrary pair of
- *             numbers following a convention established by the application.
- *             In this sense tags don't refer to tuples or database entities.
+ *
  *             User locks and normal locks are completely orthogonal and
- *             they don't interfere with each other, so it is possible
- *             to acquire a normal lock on an user-locked tuple or user-lock
- *             a tuple for which a normal write lock already exists.
+ *             they don't interfere with each other.
+ *
  *             User locks are always non blocking, therefore they are never
  *             acquired if already held by another process.  They must be
  *             released explicitly by the application but they are released
  *             automatically when a backend terminates.
  *             They are indicated by a lockmethod 2 which is an alias for the
- *             normal lock table, and are distinguished from normal locks
- *             by the following differences:
- *
- *                                                                             normal lock             user lock
- *
- *             lockmethod                                              1                               2
- *             tag.dbId                                                database oid    database oid
- *             tag.relId                                               rel oid or 0    0
- *             tag.objId                                               block id                lock id2
- *                                                                             or xact id
- *             tag.offnum                                              0                               lock id1
- *             proclock.xid                                    xid or 0                0
- *             persistence                                             transaction             user or backend
- *                                                                             or backend
+ *             normal lock table.
  *
  *             The lockmode parameter can have the same values for normal locks
  *             although probably only WRITE_LOCK can have some practical use.
@@ -432,55 +416,129 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  */
 
 bool
-LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode, bool dontWait)
 {
+       LOCALLOCKTAG localtag;
+       LOCALLOCK  *locallock;
+       LOCK       *lock;
        PROCLOCK   *proclock;
        PROCLOCKTAG proclocktag;
-       HTAB       *proclockTable;
        bool            found;
-       LOCK       *lock;
+       ResourceOwner owner;
        LWLockId        masterLock;
-       LOCKMETHODTABLE *lockMethodTable;
+       LockMethod      lockMethodTable;
        int                     status;
        int                     myHolding[MAX_LOCKMODES];
        int                     i;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
-               elog(LOG, "LockAcquire: user lock [%u] %s",
-                        locktag->objId.blkno, lock_mode_names[lockmode]);
+       if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+               elog(LOG, "LockAcquire: user lock [%u,%u] %s",
+                        locktag->locktag_field1, locktag->locktag_field2,
+                        lock_mode_names[lockmode]);
 #endif
 
-       /* ???????? This must be changed when short term locks will be used */
-       locktag->lockmethod = lockmethod;
+       /* ugly */
+       locktag->locktag_lockmethodid = lockmethodid;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
-               elog(WARNING, "LockAcquire: bad lock table %d", lockmethod);
+               elog(WARNING, "bad lock table id: %d", lockmethodid);
                return FALSE;
        }
 
+       /* Session locks and user locks are not transactional */
+       if (xid != InvalidTransactionId &&
+               lockmethodid == DEFAULT_LOCKMETHOD)
+               owner = CurrentResourceOwner;
+       else
+               owner = NULL;
+
+       /*
+        * Find or create a LOCALLOCK entry for this lock and lockmode
+        */
+       MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
+       localtag.lock = *locktag;
+       localtag.xid = xid;
+       localtag.mode = lockmode;
+
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &localtag,
+                                                                                 HASH_ENTER, &found);
+       if (!locallock)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory")));
+
+       /*
+        * if it's a new locallock object, initialize it
+        */
+       if (!found)
+       {
+               locallock->lock = NULL;
+               locallock->proclock = NULL;
+               locallock->nLocks = 0;
+               locallock->numLockOwners = 0;
+               locallock->maxLockOwners = 8;
+               locallock->lockOwners = NULL;
+               locallock->lockOwners = (LOCALLOCKOWNER *)
+                       MemoryContextAlloc(TopMemoryContext,
+                                         locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+       }
+       else
+       {
+               /* Make sure there will be room to remember the lock */
+               if (locallock->numLockOwners >= locallock->maxLockOwners)
+               {
+                       int                     newsize = locallock->maxLockOwners * 2;
+
+                       locallock->lockOwners = (LOCALLOCKOWNER *)
+                               repalloc(locallock->lockOwners,
+                                                newsize * sizeof(LOCALLOCKOWNER));
+                       locallock->maxLockOwners = newsize;
+               }
+       }
+
+       /*
+        * If we already hold the lock, we can just increase the count
+        * locally.
+        */
+       if (locallock->nLocks > 0)
+       {
+               GrantLockLocal(locallock, owner);
+               return TRUE;
+       }
+
+       /*
+        * Otherwise we've got to mess with the shared lock table.
+        */
        masterLock = lockMethodTable->masterLock;
 
        LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
        /*
-        * Find or create a lock with this tag
+        * Find or create a lock with this tag.
+        *
+        * Note: if the locallock object already existed, it might have a pointer
+        * to the lock already ... but we probably should not assume that that
+        * pointer is valid, since a lock object with no locks can go away
+        * anytime.
         */
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
                                                                (void *) locktag,
                                                                HASH_ENTER, &found);
        if (!lock)
        {
                LWLockRelease(masterLock);
-               elog(ERROR, "LockAcquire: lock table %d is out of memory",
-                        lockmethod);
-               return FALSE;
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of shared memory"),
+               errhint("You may need to increase max_locks_per_transaction.")));
        }
+       locallock->lock = lock;
 
        /*
         * if it's a new lock object, initialize it
@@ -489,7 +547,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        {
                lock->grantMask = 0;
                lock->waitMask = 0;
-               SHMQueueInit(&(lock->lockHolders));
+               SHMQueueInit(&(lock->procLocks));
                ProcQueueInit(&(lock->waitProcs));
                lock->nRequested = 0;
                lock->nGranted = 0;
@@ -508,8 +566,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        /*
         * Create the hash key for the proclock table.
         */
-       MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
-                                                                                                * needed */
+       MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding */
        proclocktag.lock = MAKE_OFFSET(lock);
        proclocktag.proc = MAKE_OFFSET(MyProc);
        TransactionIdStore(xid, &proclocktag.xid);
@@ -517,34 +574,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        /*
         * Find or create a proclock entry with this tag
         */
-       proclockTable = lockMethodTable->proclockHash;
-       proclock = (PROCLOCK *) hash_search(proclockTable,
-                                                                         (void *) &proclocktag,
-                                                                         HASH_ENTER, &found);
+       proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                               (void *) &proclocktag,
+                                                                               HASH_ENTER, &found);
        if (!proclock)
        {
+               /* Ooops, not enough shmem for the proclock */
+               if (lock->nRequested == 0)
+               {
+                       /*
+                        * There are no other requestors of this lock, so garbage-collect
+                        * the lock object.  We *must* do this to avoid a permanent leak
+                        * of shared memory, because there won't be anything to cause
+                        * anyone to release the lock object later.
+                        */
+                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                                               (void *) &(lock->tag),
+                                                                               HASH_REMOVE, NULL);
+               }
                LWLockRelease(masterLock);
-               elog(ERROR, "LockAcquire: proclock table out of memory");
-               return FALSE;
+               if (!lock)                              /* hash remove failed? */
+                       elog(WARNING, "lock table corrupted");
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of shared memory"),
+               errhint("You may need to increase max_locks_per_transaction.")));
        }
+       locallock->proclock = proclock;
 
        /*
         * If new, initialize the new entry
         */
        if (!found)
        {
-               proclock->nHolding = 0;
-               MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES);
+               proclock->holdMask = 0;
                /* Add proclock to appropriate lists */
-               SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink);
-               SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink);
+               SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
+               SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
                PROCLOCK_PRINT("LockAcquire: new", proclock);
        }
        else
        {
                PROCLOCK_PRINT("LockAcquire: found", proclock);
-               Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
-               Assert(proclock->nHolding <= lock->nGranted);
+               Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
 #ifdef CHECK_DEADLOCK_RISK
 
@@ -565,11 +638,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                 */
                for (i = lockMethodTable->numLockModes; i > 0; i--)
                {
-                       if (proclock->holding[i] > 0)
+                       if (proclock->holdMask & LOCKBIT_ON(i))
                        {
                                if (i >= (int) lockmode)
                                        break;          /* safe: we have a lock >= req level */
-                               elog(LOG, "Deadlock risk: raising lock level"
+                               elog(LOG, "deadlock risk: raising lock level"
                                         " from %s to %s on object %u/%u/%u",
                                         lock_mode_names[i], lock_mode_names[lockmode],
                                 lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
@@ -589,25 +662,14 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
        /*
-        * If I already hold one or more locks of the requested type, just
-        * grant myself another one without blocking.
-        */
-       if (proclock->holding[lockmode] > 0)
-       {
-               GrantLock(lock, proclock, lockmode);
-               PROCLOCK_PRINT("LockAcquire: owning", proclock);
-               LWLockRelease(masterLock);
-               return TRUE;
-       }
-
-       /*
-        * If this process (under any XID) is a proclock of the lock, also grant
+        * If this process (under any XID) is a holder of the lock, just grant
         * myself another one without blocking.
         */
        LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
        if (myHolding[lockmode] > 0)
        {
                GrantLock(lock, proclock, lockmode);
+               GrantLockLocal(locallock, owner);
                PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
                LWLockRelease(masterLock);
                return TRUE;
@@ -629,6 +691,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        {
                /* No conflict with held or previously requested locks */
                GrantLock(lock, proclock, lockmode);
+               GrantLockLocal(locallock, owner);
        }
        else
        {
@@ -636,29 +699,31 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 
                /*
                 * We can't acquire the lock immediately.  If caller specified no
-                * blocking, remove the proclock entry and return FALSE without
+                * blocking, remove useless table entries and return FALSE without
                 * waiting.
                 */
                if (dontWait)
                {
-                       if (proclock->nHolding == 0)
+                       if (proclock->holdMask == 0)
                        {
                                SHMQueueDelete(&proclock->lockLink);
                                SHMQueueDelete(&proclock->procLink);
-                               proclock = (PROCLOCK *) hash_search(proclockTable,
-                                                                                                 (void *) proclock,
-                                                                                                 HASH_REMOVE, NULL);
+                               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                          (void *) &(proclock->tag),
+                                                                                                       HASH_REMOVE, NULL);
                                if (!proclock)
-                                       elog(WARNING, "LockAcquire: remove proclock, table corrupted");
+                                       elog(WARNING, "proclock table corrupted");
                        }
                        else
-                               PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock);
+                               PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
                        lock->nRequested--;
                        lock->requested[lockmode]--;
                        LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
                        Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
                        Assert(lock->nGranted <= lock->nRequested);
                        LWLockRelease(masterLock);
+                       if (locallock->nLocks == 0)
+                               RemoveLocalLock(locallock);
                        return FALSE;
                }
 
@@ -666,15 +731,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                 * Construct bitmask of locks this process holds on this object.
                 */
                {
-                       int                     heldLocks = 0;
-                       int                     tmpMask;
+                       LOCKMASK        heldLocks = 0;
 
-                       for (i = 1, tmpMask = 2;
-                                i <= lockMethodTable->numLockModes;
-                                i++, tmpMask <<= 1)
+                       for (i = 1; i <= lockMethodTable->numLockModes; i++)
                        {
                                if (myHolding[i] > 0)
-                                       heldLocks |= tmpMask;
+                                       heldLocks |= LOCKBIT_ON(i);
                        }
                        MyProc->heldLocks = heldLocks;
                }
@@ -682,7 +744,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                /*
                 * Sleep till someone wakes me up.
                 */
-               status = WaitOnLock(lockmethod, lockmode, lock, proclock);
+               status = WaitOnLock(lockmethodid, locallock, owner);
 
                /*
                 * NOTE: do not do any material change of state between here and
@@ -695,7 +757,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                 * Check the proclock entry status, in case something in the ipc
                 * communication doesn't work correctly.
                 */
-               if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)))
+               if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
                {
                        PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
                        LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
@@ -712,6 +774,23 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        return status == STATUS_OK;
 }
 
+/*
+ * Subroutine to free a locallock entry
+ */
+static void
+RemoveLocalLock(LOCALLOCK *locallock)
+{
+       LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
+
+       pfree(locallock->lockOwners);
+       locallock->lockOwners = NULL;
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &(locallock->tag),
+                                                                                 HASH_REMOVE, NULL);
+       if (!locallock)
+               elog(WARNING, "locallock table corrupted");
+}
+
 /*
  * LockCheckConflicts -- test whether requested lock conflicts
  *             with those already granted
@@ -729,7 +808,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
  * known.  If NULL is passed then these values will be computed internally.
  */
 int
-LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+LockCheckConflicts(LockMethod lockMethodTable,
                                   LOCKMODE lockmode,
                                   LOCK *lock,
                                   PROCLOCK *proclock,
@@ -737,9 +816,8 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
                                   int *myHolding)              /* myHolding[] array or NULL */
 {
        int                     numLockModes = lockMethodTable->numLockModes;
-       int                     bitmask;
-       int                     i,
-                               tmpMask;
+       LOCKMASK        bitmask;
+       int                     i;
        int                     localHolding[MAX_LOCKMODES];
 
        /*
@@ -772,11 +850,10 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
 
        /* Compute mask of lock types held by other processes */
        bitmask = 0;
-       tmpMask = 2;
-       for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
+       for (i = 1; i <= numLockModes; i++)
        {
                if (lock->granted[i] != myHolding[i])
-                       bitmask |= tmpMask;
+                       bitmask |= LOCKBIT_ON(i);
        }
 
        /*
@@ -808,24 +885,29 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
 static void
 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
 {
-       SHM_QUEUE  *procHolders = &(proc->procHolders);
+       SHM_QUEUE  *procLocks = &(proc->procLocks);
        PROCLOCK   *proclock;
-       int                     i;
 
        MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
 
-       proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
-                                                                          offsetof(PROCLOCK, procLink));
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
 
        while (proclock)
        {
                if (lockOffset == proclock->tag.lock)
                {
+                       LOCKMASK        holdMask = proclock->holdMask;
+                       int                     i;
+
                        for (i = 1; i < MAX_LOCKMODES; i++)
-                               myHolding[i] += proclock->holding[i];
+                       {
+                               if (holdMask & LOCKBIT_ON(i))
+                                       myHolding[i]++;
+                       }
                }
 
-               proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
                                                                                   offsetof(PROCLOCK, procLink));
        }
 }
@@ -835,22 +917,126 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
  *             the lock request has been granted.
  *
  * NOTE: if proc was blocked, it also needs to be removed from the wait list
- * and have its waitLock/waitHolder fields cleared.  That's not done here.
+ * and have its waitLock/waitProcLock fields cleared.  That's not done here.
+ *
+ * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
+ * table entry; but since we may be awaking some other process, we can't do
+ * that here; it's done by GrantLockLocal, instead.
  */
 void
 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
 {
        lock->nGranted++;
        lock->granted[lockmode]++;
-       lock->grantMask |= BITS_ON[lockmode];
+       lock->grantMask |= LOCKBIT_ON(lockmode);
        if (lock->granted[lockmode] == lock->requested[lockmode])
-               lock->waitMask &= BITS_OFF[lockmode];
+               lock->waitMask &= LOCKBIT_OFF(lockmode);
+       proclock->holdMask |= LOCKBIT_ON(lockmode);
        LOCK_PRINT("GrantLock", lock, lockmode);
        Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
        Assert(lock->nGranted <= lock->nRequested);
-       proclock->holding[lockmode]++;
-       proclock->nHolding++;
-       Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));
+}
+
+/*
+ * UnGrantLock -- opposite of GrantLock. 
+ *
+ * Updates the lock and proclock data structures to show that the lock
+ * is no longer held nor requested by the current holder.
+ *
+ * Returns true if there were any waiters waiting on the lock that
+ * should now be woken up with ProcLockWakeup.
+ */
+static bool
+UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+                       PROCLOCK *proclock, LockMethod lockMethodTable)
+{
+       bool wakeupNeeded = false;
+
+       Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
+       Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
+       Assert(lock->nGranted <= lock->nRequested);
+
+       /*
+        * fix the general lock stats
+        */
+       lock->nRequested--;
+       lock->requested[lockmode]--;
+       lock->nGranted--;
+       lock->granted[lockmode]--;
+
+       if (lock->granted[lockmode] == 0)
+       {
+               /* change the conflict mask.  No more of this lock type. */
+               lock->grantMask &= LOCKBIT_OFF(lockmode);
+       }
+
+       LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
+
+       /*
+        * We need only run ProcLockWakeup if the released lock conflicts with
+        * at least one of the lock types requested by waiter(s).  Otherwise
+        * whatever conflict made them wait must still exist.  NOTE: before
+        * MVCC, we could skip wakeup if lock->granted[lockmode] was still
+        * positive. But that's not true anymore, because the remaining
+        * granted locks might belong to some waiter, who could now be
+        * awakened because he doesn't conflict with his own locks.
+        */
+       if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
+               wakeupNeeded = true;
+
+       /*
+        * Now fix the per-proclock state.
+        */
+       proclock->holdMask &= LOCKBIT_OFF(lockmode);
+       PROCLOCK_PRINT("UnGrantLock: updated", proclock);
+
+       return wakeupNeeded;
+}
+
+/*
+ * GrantLockLocal -- update the locallock data structures to show
+ *             the lock request has been granted.
+ *
+ * We expect that LockAcquire made sure there is room to add a new
+ * ResourceOwner entry.
+ */
+static void
+GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
+{
+       LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+       int                     i;
+
+       Assert(locallock->numLockOwners < locallock->maxLockOwners);
+       /* Count the total */
+       locallock->nLocks++;
+       /* Count the per-owner lock */
+       for (i = 0; i < locallock->numLockOwners; i++)
+       {
+               if (lockOwners[i].owner == owner)
+               {
+                       lockOwners[i].nLocks++;
+                       return;
+               }
+       }
+       lockOwners[i].owner = owner;
+       lockOwners[i].nLocks = 1;
+       locallock->numLockOwners++;
+}
+
+/*
+ * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
+ *             WaitOnLock on.
+ *
+ * proc.c needs this for the case where we are booted off the lock by
+ * timeout, but discover that someone granted us the lock anyway.
+ *
+ * We could just export GrantLockLocal, but that would require including
+ * resowner.h in lock.h, which creates circularity.
+ */
+void
+GrantAwaitedLock(void)
+{
+       GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
 /*
@@ -862,23 +1048,29 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
  * The locktable's masterLock must be held at entry.
  */
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                  LOCK *lock, PROCLOCK *proclock)
+WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+                  ResourceOwner owner)
 {
-       LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
+       LockMethod      lockMethodTable = LockMethods[lockmethodid];
        char       *new_status,
                           *old_status;
+       size_t          len;
 
-       Assert(lockmethod < NumLockMethods);
+       Assert(lockmethodid < NumLockMethods);
 
-       LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
+       LOCK_PRINT("WaitOnLock: sleeping on lock",
+                          locallock->lock, locallock->tag.mode);
 
        old_status = pstrdup(get_ps_display());
-       new_status = (char *) palloc(strlen(old_status) + 10);
-       strcpy(new_status, old_status);
-       strcat(new_status, " waiting");
+       len = strlen(old_status);
+       new_status = (char *) palloc(len + 8 + 1);
+       memcpy(new_status, old_status, len);
+       strcpy(new_status + len, " waiting");
        set_ps_display(new_status);
 
+       awaitedLock = locallock;
+       awaitedOwner = owner;
+
        /*
         * NOTE: Think not to put any shared-state cleanup after the call to
         * ProcSleep, in either the normal or failure path.  The lock state
@@ -894,33 +1086,35 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
         */
 
        if (ProcSleep(lockMethodTable,
-                                 lockmode,
-                                 lock,
-                                 proclock) != STATUS_OK)
+                                 locallock->tag.mode,
+                                 locallock->lock,
+                                 locallock->proclock) != STATUS_OK)
        {
                /*
                 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
-                * now.  Removal of the proclock and lock objects, if no longer
-                * needed, will happen in xact cleanup (see above for motivation).
+                * now.
                 */
-               LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
+               awaitedLock = NULL;
+               LOCK_PRINT("WaitOnLock: aborting on lock",
+                                  locallock->lock, locallock->tag.mode);
                LWLockRelease(lockMethodTable->masterLock);
+
                /*
-                * Now that we aren't holding the LockMgrLock, print details about
-                * the detected deadlock.  We didn't want to do this before because
-                * sending elog messages to the client while holding the shared lock
-                * is bad for concurrency.
+                * Now that we aren't holding the LockMgrLock, we can give an
+                * error report including details about the detected deadlock.
                 */
                DeadLockReport();
-               elog(ERROR, "deadlock detected");
                /* not reached */
        }
 
+       awaitedLock = NULL;
+
        set_ps_display(old_status);
        pfree(old_status);
        pfree(new_status);
 
-       LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
+       LOCK_PRINT("WaitOnLock: wakeup on lock",
+                          locallock->lock, locallock->tag.mode);
        return STATUS_OK;
 }
 
@@ -930,22 +1124,21 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
  *
  * Locktable lock must be held by caller.
  *
- * NB: this does not remove the process' proclock object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup. (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
+ * NB: this does not clean up any locallock object that may exist for the lock.
  */
 void
 RemoveFromWaitQueue(PGPROC *proc)
 {
        LOCK       *waitLock = proc->waitLock;
+       PROCLOCK   *proclock = proc->waitProcLock;
        LOCKMODE        lockmode = proc->waitLockMode;
+       LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
 
        /* Make sure proc is waiting */
        Assert(proc->links.next != INVALID_OFFSET);
        Assert(waitLock);
        Assert(waitLock->waitProcs.size > 0);
+       Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
 
        /* Remove proc from lock's wait queue */
        SHMQueueDelete(&(proc->links));
@@ -959,18 +1152,42 @@ RemoveFromWaitQueue(PGPROC *proc)
        waitLock->requested[lockmode]--;
        /* don't forget to clear waitMask bit if appropriate */
        if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
-               waitLock->waitMask &= BITS_OFF[lockmode];
+               waitLock->waitMask &= LOCKBIT_OFF(lockmode);
 
        /* Clean up the proc's own state */
        proc->waitLock = NULL;
-       proc->waitHolder = NULL;
+       proc->waitProcLock = NULL;
+
+       /*
+        * Delete the proclock immediately if it represents no already-held locks.
+        * This must happen now because if the owner of the lock decides to release
+        * it, and the requested/granted counts then go to zero, LockRelease
+        * expects there to be no remaining proclocks.
+        */
+       if (proclock->holdMask == 0)
+       {
+               PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
+               SHMQueueDelete(&proclock->lockLink);
+               SHMQueueDelete(&proclock->procLink);
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE, NULL);
+               if (!proclock)
+                       elog(WARNING, "proclock table corrupted");
+       }
+
+       /*
+        * There should still be some requests for the lock ... else what were
+        * we waiting for?  Therefore no need to delete the lock object.
+        */
+       Assert(waitLock->nRequested > 0);
 
        /* See if any other waiters for the lock can be woken up now */
-       ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+       ProcLockWakeup(LockMethods[lockmethodid], waitLock);
 }
 
 /*
- * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
+ * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
  *             release one 'lockmode' lock on it.
  *
  * Side Effects: find any waiting processes that are now wakable,
@@ -980,253 +1197,300 @@ RemoveFromWaitQueue(PGPROC *proc)
  *             come along and request the lock.)
  */
 bool
-LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode)
 {
+       LOCALLOCKTAG localtag;
+       LOCALLOCK  *locallock;
        LOCK       *lock;
-       LWLockId        masterLock;
-       LOCKMETHODTABLE *lockMethodTable;
        PROCLOCK   *proclock;
-       PROCLOCKTAG proclocktag;
-       HTAB       *proclockTable;
-       bool            wakeupNeeded = false;
+       LWLockId        masterLock;
+       LockMethod      lockMethodTable;
+       bool            wakeupNeeded;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
-               elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
+       if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+               elog(LOG, "LockRelease: user lock [%u,%u] %s",
+                        locktag->locktag_field1, locktag->locktag_field2,
+                        lock_mode_names[lockmode]);
 #endif
 
-       /* ???????? This must be changed when short term locks will be used */
-       locktag->lockmethod = lockmethod;
+       /* ugly */
+       locktag->locktag_lockmethodid = lockmethodid;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
                elog(WARNING, "lockMethodTable is null in LockRelease");
                return FALSE;
        }
 
-       masterLock = lockMethodTable->masterLock;
-       LWLockAcquire(masterLock, LW_EXCLUSIVE);
-
        /*
-        * Find a lock with this tag
+        * Find the LOCALLOCK entry for this lock and lockmode
         */
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                               (void *) locktag,
-                                                               HASH_FIND, NULL);
+       MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
+       localtag.lock = *locktag;
+       localtag.xid = xid;
+       localtag.mode = lockmode;
+
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+                                                                                 (void *) &localtag,
+                                                                                 HASH_FIND, NULL);
 
        /*
         * let the caller print its own error message, too. Do not
-        * elog(ERROR).
+        * ereport(ERROR).
         */
-       if (!lock)
+       if (!locallock || locallock->nLocks <= 0)
        {
-               LWLockRelease(masterLock);
-               elog(WARNING, "LockRelease: no such lock");
+               elog(WARNING, "you don't own a lock of type %s",
+                        lock_mode_names[lockmode]);
                return FALSE;
        }
-       LOCK_PRINT("LockRelease: found", lock, lockmode);
 
        /*
-        * Find the proclock entry for this proclock.
+        * Decrease the count for the resource owner.
         */
-       MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
-                                                                                                * needed */
-       proclocktag.lock = MAKE_OFFSET(lock);
-       proclocktag.proc = MAKE_OFFSET(MyProc);
-       TransactionIdStore(xid, &proclocktag.xid);
-
-       proclockTable = lockMethodTable->proclockHash;
-       proclock = (PROCLOCK *) hash_search(proclockTable,
-                                                                         (void *) &proclocktag,
-                                                                         HASH_FIND_SAVE, NULL);
-       if (!proclock)
        {
-               LWLockRelease(masterLock);
-#ifdef USER_LOCKS
-               if (lockmethod == USER_LOCKMETHOD)
-                       elog(WARNING, "LockRelease: no lock with this tag");
+               LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+               ResourceOwner owner;
+               int                     i;
+
+               /* Session locks and user locks are not transactional */
+               if (xid != InvalidTransactionId &&
+                       lockmethodid == DEFAULT_LOCKMETHOD)
+                       owner = CurrentResourceOwner;
                else
-#endif
-                       elog(WARNING, "LockRelease: proclock table corrupted");
-               return FALSE;
+                       owner = NULL;
+
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == owner)
+                       {
+                               Assert(lockOwners[i].nLocks > 0);
+                               if (--lockOwners[i].nLocks == 0)
+                               {
+                                       /* compact out unused slot */
+                                       locallock->numLockOwners--;
+                                       if (i < locallock->numLockOwners)
+                                               lockOwners[i] = lockOwners[locallock->numLockOwners];
+                               }
+                               break;
+                       }
+               }
+               if (i < 0)
+               {
+                       /* don't release a lock belonging to another owner */
+                       elog(WARNING, "you don't own a lock of type %s",
+                                lock_mode_names[lockmode]);
+                       return FALSE;
+               }
        }
-       PROCLOCK_PRINT("LockRelease: found", proclock);
 
        /*
-        * Check that we are actually holding a lock of the type we want to
-        * release.
+        * Decrease the total local count.      If we're still holding the lock,
+        * we're done.
         */
-       if (!(proclock->holding[lockmode] > 0))
-       {
-               PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
-               Assert(proclock->holding[lockmode] >= 0);
-               LWLockRelease(masterLock);
-               elog(WARNING, "LockRelease: you don't own a lock of type %s",
-                        lock_mode_names[lockmode]);
-               return FALSE;
-       }
-       Assert(proclock->nHolding > 0);
-       Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
-       Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
-       Assert(lock->nGranted <= lock->nRequested);
+       locallock->nLocks--;
+
+       if (locallock->nLocks > 0)
+               return TRUE;
 
        /*
-        * fix the general lock stats
+        * Otherwise we've got to mess with the shared lock table.
         */
-       lock->nRequested--;
-       lock->requested[lockmode]--;
-       lock->nGranted--;
-       lock->granted[lockmode]--;
-
-       if (lock->granted[lockmode] == 0)
-       {
-               /* change the conflict mask.  No more of this lock type. */
-               lock->grantMask &= BITS_OFF[lockmode];
-       }
+       masterLock = lockMethodTable->masterLock;
 
-       LOCK_PRINT("LockRelease: updated", lock, lockmode);
-       Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-       Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-       Assert(lock->nGranted <= lock->nRequested);
+       LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
        /*
-        * We need only run ProcLockWakeup if the released lock conflicts with
-        * at least one of the lock types requested by waiter(s).  Otherwise
-        * whatever conflict made them wait must still exist.  NOTE: before
-        * MVCC, we could skip wakeup if lock->granted[lockmode] was still
-        * positive. But that's not true anymore, because the remaining
-        * granted locks might belong to some waiter, who could now be
-        * awakened because he doesn't conflict with his own locks.
+        * We don't need to re-find the lock or proclock, since we kept their
+        * addresses in the locallock table, and they couldn't have been
+        * removed while we were holding a lock on them.
         */
-       if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
-               wakeupNeeded = true;
+       lock = locallock->lock;
+       LOCK_PRINT("LockRelease: found", lock, lockmode);
+       proclock = locallock->proclock;
+       PROCLOCK_PRINT("LockRelease: found", proclock);
 
-       if (lock->nRequested == 0)
+       /*
+        * Double-check that we are actually holding a lock of the type we
+        * want to release.
+        */
+       if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
        {
-               /*
-                * if there's no one waiting in the queue, we just released the
-                * last lock on this object. Delete it from the lock table.
-                */
-               Assert(lockMethodTable->lockHash->hash == tag_hash);
-               lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                                       (void *) &(lock->tag),
-                                                                       HASH_REMOVE,
-                                                                       NULL);
-               if (!lock)
-               {
-                       LWLockRelease(masterLock);
-                       elog(WARNING, "LockRelease: remove lock, table corrupted");
-                       return FALSE;
-               }
-               wakeupNeeded = false;   /* should be false, but make sure */
+               PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
+               LWLockRelease(masterLock);
+               elog(WARNING, "you don't own a lock of type %s",
+                        lock_mode_names[lockmode]);
+               RemoveLocalLock(locallock);
+               return FALSE;
        }
 
-       /*
-        * Now fix the per-proclock lock stats.
-        */
-       proclock->holding[lockmode]--;
-       proclock->nHolding--;
-       PROCLOCK_PRINT("LockRelease: updated", proclock);
-       Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
+       wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
 
        /*
         * If this was my last hold on this lock, delete my entry in the
         * proclock table.
         */
-       if (proclock->nHolding == 0)
+       if (proclock->holdMask == 0)
        {
-               PROCLOCK_PRINT("LockRelease: deleting", proclock);
+               PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
                SHMQueueDelete(&proclock->lockLink);
                SHMQueueDelete(&proclock->procLink);
-               proclock = (PROCLOCK *) hash_search(proclockTable,
-                                                                                 (void *) &proclock,
-                                                                                 HASH_REMOVE_SAVED, NULL);
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE, NULL);
                if (!proclock)
                {
                        LWLockRelease(masterLock);
-                       elog(WARNING, "LockRelease: remove proclock, table corrupted");
+                       elog(WARNING, "proclock table corrupted");
+                       RemoveLocalLock(locallock);
                        return FALSE;
                }
        }
 
-       /*
-        * Wake up waiters if needed.
-        */
-       if (wakeupNeeded)
-               ProcLockWakeup(lockMethodTable, lock);
+       if (lock->nRequested == 0)
+       {
+               /*
+                * We've just released the last lock, so garbage-collect the lock
+                * object.
+                */
+               LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
+               Assert(SHMQueueEmpty(&(lock->procLocks)));
+               lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+                                                                       (void *) &(lock->tag),
+                                                                       HASH_REMOVE, NULL);
+               if (!lock)
+               {
+                       LWLockRelease(masterLock);
+                       elog(WARNING, "lock table corrupted");
+                       RemoveLocalLock(locallock);
+                       return FALSE;
+               }
+       }
+       else
+       {
+               /*
+                * Wake up waiters if needed.
+                */
+               if (wakeupNeeded)
+                       ProcLockWakeup(lockMethodTable, lock);
+       }
 
        LWLockRelease(masterLock);
+
+       RemoveLocalLock(locallock);
        return TRUE;
 }
 
 /*
- * LockReleaseAll -- Release all locks in a process's lock list.
+ * LockReleaseAll -- Release all locks of the specified lock method that
+ *             are held by the current process.
  *
- * Well, not really *all* locks.
+ * Well, not necessarily *all* locks.  The available behaviors are:
  *
- * If 'allxids' is TRUE, all locks of the specified lock method are
- * released, regardless of transaction affiliation.
+ * allxids == true: release all locks regardless of transaction
+ * affiliation.
  *
- * If 'allxids' is FALSE, all locks of the specified lock method and
- * specified XID are released.
+ * allxids == false: release all locks with Xid != 0
+ * (zero is the Xid used for "session" locks).
  */
 bool
-LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
-                          bool allxids, TransactionId xid)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
 {
-       SHM_QUEUE  *procHolders = &(proc->procHolders);
-       PROCLOCK   *proclock;
-       PROCLOCK   *nextHolder;
+       HASH_SEQ_STATUS status;
+       SHM_QUEUE  *procLocks = &(MyProc->procLocks);
        LWLockId        masterLock;
-       LOCKMETHODTABLE *lockMethodTable;
+       LockMethod      lockMethodTable;
        int                     i,
                                numLockModes;
+       LOCALLOCK  *locallock;
+       PROCLOCK   *proclock;
        LOCK       *lock;
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-               elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
-                        lockmethod, proc->pid);
+       if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
 #endif
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
        {
-               elog(WARNING, "LockReleaseAll: bad lockmethod %d", lockmethod);
+               elog(WARNING, "bad lock method: %d", lockmethodid);
                return FALSE;
        }
 
        numLockModes = lockMethodTable->numLockModes;
        masterLock = lockMethodTable->masterLock;
 
+       /*
+        * First we run through the locallock table and get rid of unwanted
+        * entries, then we scan the process's proclocks and get rid of those.
+        * We do this separately because we may have multiple locallock
+        * entries pointing to the same proclock, and we daren't end up with
+        * any dangling pointers.
+        */
+       hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               if (locallock->proclock == NULL || locallock->lock == NULL)
+               {
+                       /*
+                        * We must've run out of shared memory while trying to set up
+                        * this lock.  Just forget the local entry.
+                        */
+                       Assert(locallock->nLocks == 0);
+                       RemoveLocalLock(locallock);
+                       continue;
+               }
+
+               /* Ignore items that are not of the lockmethod to be removed */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+                       continue;
+
+               /*
+                * Ignore locks with Xid=0 unless we are asked to release all
+                * locks
+                */
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
+                       && !allxids)
+                       continue;
+
+               RemoveLocalLock(locallock);
+       }
+
        LWLockAcquire(masterLock, LW_EXCLUSIVE);
 
-       proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
-                                                                          offsetof(PROCLOCK, procLink));
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
 
        while (proclock)
        {
                bool            wakeupNeeded = false;
+               PROCLOCK   *nextHolder;
 
                /* Get link first, since we may unlink/delete this proclock */
-               nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+               nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
                                                                                   offsetof(PROCLOCK, procLink));
 
-               Assert(proclock->tag.proc == MAKE_OFFSET(proc));
+               Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
 
                lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
 
                /* Ignore items that are not of the lockmethod to be removed */
-               if (LOCK_LOCKMETHOD(*lock) != lockmethod)
+               if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
                        goto next_item;
 
-               /* If not allxids, ignore items that are of the wrong xid */
-               if (!allxids && !TransactionIdEquals(xid, proclock->tag.xid))
+               /*
+                * Ignore locks with Xid=0 unless we are asked to release all
+                * locks
+                */
+               if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
+                       && !allxids)
                        goto next_item;
 
                PROCLOCK_PRINT("LockReleaseAll", proclock);
@@ -1234,53 +1498,22 @@ LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
                Assert(lock->nRequested >= 0);
                Assert(lock->nGranted >= 0);
                Assert(lock->nGranted <= lock->nRequested);
-               Assert(proclock->nHolding >= 0);
-               Assert(proclock->nHolding <= lock->nRequested);
+               Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
                /*
                 * fix the general lock stats
                 */
-               if (lock->nRequested != proclock->nHolding)
-               {
-                       for (i = 1; i <= numLockModes; i++)
-                       {
-                               Assert(proclock->holding[i] >= 0);
-                               if (proclock->holding[i] > 0)
-                               {
-                                       lock->requested[i] -= proclock->holding[i];
-                                       lock->granted[i] -= proclock->holding[i];
-                                       Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
-                                       if (lock->granted[i] == 0)
-                                               lock->grantMask &= BITS_OFF[i];
-
-                                       /*
-                                        * Read comments in LockRelease
-                                        */
-                                       if (!wakeupNeeded &&
-                                               lockMethodTable->conflictTab[i] & lock->waitMask)
-                                               wakeupNeeded = true;
-                               }
-                       }
-                       lock->nRequested -= proclock->nHolding;
-                       lock->nGranted -= proclock->nHolding;
-                       Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
-                       Assert(lock->nGranted <= lock->nRequested);
-               }
-               else
+               if (proclock->holdMask)
                {
-                       /*
-                        * This proclock accounts for all the requested locks on the
-                        * object, so we can be lazy and just zero things out.
-                        */
-                       lock->nRequested = 0;
-                       lock->nGranted = 0;
-                       /* Fix the lock status, just for next LOCK_PRINT message. */
                        for (i = 1; i <= numLockModes; i++)
                        {
-                               Assert(lock->requested[i] == lock->granted[i]);
-                               lock->requested[i] = lock->granted[i] = 0;
+                               if (proclock->holdMask & LOCKBIT_ON(i))
+                                       wakeupNeeded |= UnGrantLock(lock, i, proclock,
+                                                                                               lockMethodTable);
                        }
                }
+               Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+               Assert(lock->nGranted <= lock->nRequested);
                LOCK_PRINT("LockReleaseAll: updated", lock, 0);
 
                PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
@@ -1294,14 +1527,14 @@ LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
                /*
                 * remove the proclock entry from the hashtable
                 */
-               proclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash,
-                                                                                 (void *) proclock,
-                                                                                 HASH_REMOVE,
-                                                                                 NULL);
+               proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                                                                       (void *) &(proclock->tag),
+                                                                                       HASH_REMOVE,
+                                                                                       NULL);
                if (!proclock)
                {
                        LWLockRelease(masterLock);
-                       elog(WARNING, "LockReleaseAll: proclock table corrupted");
+                       elog(WARNING, "proclock table corrupted");
                        return FALSE;
                }
 
@@ -1312,14 +1545,14 @@ LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
                         * lock object.
                         */
                        LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
-                       Assert(lockMethodTable->lockHash->hash == tag_hash);
-                       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
                                                                                (void *) &(lock->tag),
                                                                                HASH_REMOVE, NULL);
                        if (!lock)
                        {
                                LWLockRelease(masterLock);
-                               elog(WARNING, "LockReleaseAll: cannot remove lock from HTAB");
+                               elog(WARNING, "lock table corrupted");
                                return FALSE;
                        }
                }
@@ -1333,22 +1566,148 @@ next_item:
        LWLockRelease(masterLock);
 
 #ifdef LOCK_DEBUG
-       if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-               elog(LOG, "LockReleaseAll: done");
+       if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               elog(LOG, "LockReleaseAll done");
 #endif
 
        return TRUE;
 }
 
+/*
+ * LockReleaseCurrentOwner
+ *             Release all locks belonging to CurrentResourceOwner
+ *
+ * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
+ */
+void
+LockReleaseCurrentOwner(void)
+{
+       HASH_SEQ_STATUS status;
+       LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *lockOwners;
+       int                     i;
+
+       hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               /* Ignore items that must be nontransactional */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+                       continue;
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+                       continue;
+
+               /* Scan to see if there are any locks belonging to current owner */
+               lockOwners = locallock->lockOwners;
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == CurrentResourceOwner)
+                       {
+                               Assert(lockOwners[i].nLocks > 0);
+                               if (lockOwners[i].nLocks < locallock->nLocks)
+                               {
+                                       /*
+                                        * We will still hold this lock after forgetting this
+                                        * ResourceOwner.
+                                        */
+                                       locallock->nLocks -= lockOwners[i].nLocks;
+                                       /* compact out unused slot */
+                                       locallock->numLockOwners--;
+                                       if (i < locallock->numLockOwners)
+                                               lockOwners[i] = lockOwners[locallock->numLockOwners];
+                               }
+                               else
+                               {
+                                       Assert(lockOwners[i].nLocks == locallock->nLocks);
+                                       /* We want to call LockRelease just once */
+                                       lockOwners[i].nLocks = 1;
+                                       locallock->nLocks = 1;
+                                       if (!LockRelease(DEFAULT_LOCKMETHOD,
+                                                                        &locallock->tag.lock,
+                                                                        locallock->tag.xid,
+                                                                        locallock->tag.mode))
+                                               elog(WARNING, "LockReleaseCurrentOwner: failed??");
+                               }
+                               break;
+                       }
+               }
+       }
+}
+
+/*
+ * LockReassignCurrentOwner
+ *             Reassign all locks belonging to CurrentResourceOwner to belong
+ *             to its parent resource owner
+ */
+void
+LockReassignCurrentOwner(void)
+{
+       ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
+       HASH_SEQ_STATUS status;
+       LOCALLOCK  *locallock;
+       LOCALLOCKOWNER *lockOwners;
+
+       Assert(parent != NULL);
+
+       hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+       while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+       {
+               int                     i;
+               int                     ic = -1;
+               int                     ip = -1;
+
+               /* Ignore items that must be nontransactional */
+               if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+                       continue;
+               if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+                       continue;
+
+               /*
+                * Scan to see if there are any locks belonging to current owner
+                * or its parent
+                */
+               lockOwners = locallock->lockOwners;
+               for (i = locallock->numLockOwners - 1; i >= 0; i--)
+               {
+                       if (lockOwners[i].owner == CurrentResourceOwner)
+                               ic = i;
+                       else if (lockOwners[i].owner == parent)
+                               ip = i;
+               }
+
+               if (ic < 0)
+                       continue;                       /* no current locks */
+
+               if (ip < 0)
+               {
+                       /* Parent has no slot, so just give it child's slot */
+                       lockOwners[ic].owner = parent;
+               }
+               else
+               {
+                       /* Merge child's count with parent's */
+                       lockOwners[ip].nLocks += lockOwners[ic].nLocks;
+                       /* compact out unused slot */
+                       locallock->numLockOwners--;
+                       if (ic < locallock->numLockOwners)
+                               lockOwners[ic] = lockOwners[locallock->numLockOwners];
+               }
+       }
+}
+
+
+/*
+ * Estimate shared-memory space used for lock tables
+ */
 int
 LockShmemSize(int maxBackends)
 {
        int                     size = 0;
        long            max_table_size = NLOCKENTS(maxBackends);
 
-       size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
-       size += maxBackends * MAXALIGN(sizeof(PGPROC));         /* each MyProc */
-       size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODTABLE));           /* each lockMethodTable */
+       /* lock method headers */
+       size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
 
        /* lockHash table */
        size += hash_estimate_size(max_table_size, sizeof(LOCK));
@@ -1357,6 +1716,9 @@ LockShmemSize(int maxBackends)
        size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
 
        /*
+        * Note we count only one pair of hash tables, since the userlocks
+        * table actually overlays the main one.
+        *
         * Since the lockHash entry count above is only an estimate, add 10%
         * safety margin.
         */
@@ -1392,13 +1754,10 @@ GetLockStatusData(void)
 
        LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
 
-       proclockTable = LockMethodTable[DEFAULT_LOCKMETHOD]->proclockHash;
+       proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
 
        data->nelements = i = proclockTable->hctl->nentries;
 
-       if (i == 0)
-               i = 1;                                  /* avoid palloc(0) if empty table */
-
        data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
        data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
        data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
@@ -1437,7 +1796,7 @@ GetLockmodeName(LOCKMODE mode)
 
 #ifdef LOCK_DEBUG
 /*
- * Dump all locks in the proc->procHolders list.
+ * Dump all locks in the MyProc->procLocks list.
  *
  * Must have already acquired the masterLock.
  */
@@ -1445,28 +1804,28 @@ void
 DumpLocks(void)
 {
        PGPROC     *proc;
-       SHM_QUEUE  *procHolders;
+       SHM_QUEUE  *procLocks;
        PROCLOCK   *proclock;
        LOCK       *lock;
-       int                     lockmethod = DEFAULT_LOCKMETHOD;
-       LOCKMETHODTABLE *lockMethodTable;
+       int                     lockmethodid = DEFAULT_LOCKMETHOD;
+       LockMethod      lockMethodTable;
 
        proc = MyProc;
        if (proc == NULL)
                return;
 
-       procHolders = &proc->procHolders;
+       procLocks = &proc->procLocks;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
                return;
 
        if (proc->waitLock)
                LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
-       proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
-                                                                          offsetof(PROCLOCK, procLink));
+       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+                                                                                offsetof(PROCLOCK, procLink));
 
        while (proclock)
        {
@@ -1477,7 +1836,7 @@ DumpLocks(void)
                PROCLOCK_PRINT("DumpLocks", proclock);
                LOCK_PRINT("DumpLocks", lock, 0);
 
-               proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
                                                                                   offsetof(PROCLOCK, procLink));
        }
 }
@@ -1491,8 +1850,8 @@ DumpAllLocks(void)
        PGPROC     *proc;
        PROCLOCK   *proclock;
        LOCK       *lock;
-       int                     lockmethod = DEFAULT_LOCKMETHOD;
-       LOCKMETHODTABLE *lockMethodTable;
+       int                     lockmethodid = DEFAULT_LOCKMETHOD;
+       LockMethod      lockMethodTable;
        HTAB       *proclockTable;
        HASH_SEQ_STATUS status;
 
@@ -1500,12 +1859,12 @@ DumpAllLocks(void)
        if (proc == NULL)
                return;
 
-       Assert(lockmethod < NumLockMethods);
-       lockMethodTable = LockMethodTable[lockmethod];
+       Assert(lockmethodid < NumLockMethods);
+       lockMethodTable = LockMethods[lockmethodid];
        if (!lockMethodTable)
                return;
 
-       proclockTable = lockMethodTable->proclockHash;
+       proclockTable = LockMethodProcLockHash[lockmethodid];
 
        if (proc->waitLock)
                LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);