]> granicus.if.org Git - postgresql/commitdiff
From: Massimo Dal Zotto <dz@cs.unitn.it>
authorMarc G. Fournier <scrappy@hub.org>
Tue, 25 Aug 1998 21:20:32 +0000 (21:20 +0000)
committerMarc G. Fournier <scrappy@hub.org>
Tue, 25 Aug 1998 21:20:32 +0000 (21:20 +0000)
lock.patch

        I have rewritten lock.c cleaning up the code and adding better
        assert checking I have also added some fields to the lock and
        xid tags for better support of user locks. There is also a new
        function which returns an array of pids owning a lock.
        I'm using this code from over six months and it works fine.

src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/multi.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index 4b83c9c0d16ac3a0dc65f9d15843f49d43aa56ba..b35008a3bb8afc84d393eddbd4d8e3a07c7ade8e 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
  *
  *     Interface:
  *
- *     LockAcquire(), LockRelease(), LockMethodTableInit().
- *
- *     LockReplace() is called only within this module and by the
- *             lkchain module.  It releases a lock without looking
- *             the lock up in the lock table.
+ *     LockAcquire(), LockRelease(), LockMethodTableInit(),
+ *     LockMethodTableRename(), LockReleaseAll, LockOwners()
+ *     LockResolveConflicts(), GrantLock()
  *
  *     NOTE: This module is used to define new lock tables.  The
  *             multi-level lock table (multi.c) used by the heap
@@ -35,6 +33,7 @@
 #include <string.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <signal.h>
 
 #include "postgres.h"
 #include "miscadmin.h"
 #include "utils/palloc.h"
 #include "access/xact.h"
 #include "access/transam.h"
+#include "utils/trace.h"
+#include "utils/ps_status.h"
 
-static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
-
-/*#define LOCK_MGR_DEBUG*/
-
-#ifndef LOCK_MGR_DEBUG
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+                                         TransactionId xid);
 
-#define LOCK_PRINT(where,tag,type)
-#define LOCK_DUMP(where,lock,type)
-#define LOCK_DUMP_AUX(where,lock,type)
+/*
+ * lockDebugRelation can be used to trace unconditionally a single relation,
+ * for example pg_listener, if you suspect there are locking problems.
+ *
+ * lockDebugOidMin is is used to avoid tracing postgres relations, which
+ * would produce a lot of output. Unfortunately most system relations are
+ * created after bootstrap and have oid greater than BootstrapObjectIdData.
+ * If you are using tprintf you should specify a value greater than the max
+ * oid of system relations, which can be found with the following query:
+ *
+ *   select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
+ *
+ * To get a useful lock trace you can use the following pg_options:
+ *
+ *   -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
+ */
+#define LOCKDEBUG(lockmethod)  (pg_options[TRACE_SHORTLOCKS+lockmethod])
+#define lockDebugRelation              (pg_options[TRACE_LOCKRELATION])
+#define lockDebugOidMin                        (pg_options[TRACE_LOCKOIDMIN])
+#define lockReadPriority               (pg_options[OPT_LOCKREADPRIORITY])
+
+#ifdef LOCK_MGR_DEBUG
+#define LOCK_PRINT(where,lock,type) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+            && (lock->tag.relId >= lockDebugOidMin)) \
+           || (lock->tag.relId == lockDebugRelation)) \
+           LOCK_PRINT_AUX(where,lock,type)
+
+#define LOCK_PRINT_AUX(where,lock,type) \
+       TPRINTF(TRACE_ALL, \
+                "%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \
+                "hold(%d,%d,%d,%d,%d)=%d " \
+                "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+                where, \
+                MAKE_OFFSET(lock), \
+                lock->tag.lockmethod, \
+                lock->tag.relId, \
+                lock->tag.dbId, \
+                ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \
+                 lock->tag.tupleId.ip_blkid.bi_lo), \
+                lock->tag.tupleId.ip_posid, \
+                lock->mask, \
+                lock->holders[1], \
+                lock->holders[2], \
+                lock->holders[3], \
+                lock->holders[4], \
+                lock->holders[5], \
+                lock->nHolding, \
+                lock->activeHolders[1], \
+                lock->activeHolders[2], \
+                lock->activeHolders[3], \
+                lock->activeHolders[4], \
+                lock->activeHolders[5], \
+                lock->nActive, \
+                lock->waitProcs.size, \
+                lock_types[type])
+
+#define XID_PRINT(where,xidentP) \
+       if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
+            && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+                        >= lockDebugOidMin)) \
+           || (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
+                       == lockDebugRelation)) \
+           XID_PRINT_AUX(where,xidentP)
+
+#define XID_PRINT_AUX(where,xidentP) \
+       TPRINTF(TRACE_ALL, \
+                "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
+                "hold(%d,%d,%d,%d,%d)=%d", \
+                where, \
+                MAKE_OFFSET(xidentP), \
+                xidentP->tag.lock, \
+                XIDENT_LOCKMETHOD(*(xidentP)), \
+                xidentP->tag.pid, \
+                xidentP->tag.xid, \
+                xidentP->holders[1], \
+                xidentP->holders[2], \
+                xidentP->holders[3], \
+                xidentP->holders[4], \
+                xidentP->holders[5], \
+                xidentP->nHolding)
+
+#define LOCK_TPRINTF(lock, args...) \
+    if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
+            && (lock->tag.relId >= lockDebugOidMin)) \
+           || (lock->tag.relId == lockDebugRelation)) \
+           TPRINTF(TRACE_ALL, args)
+
+#else  /* !LOCK_MGR_DEBUG */
+#define LOCK_PRINT(where,lock,type)
+#define LOCK_PRINT_AUX(where,lock,type)
 #define XID_PRINT(where,xidentP)
+#define XID_PRINT_AUX(where,xidentP)
+#define LOCK_TPRINTF(lock, args...)
+#endif /* !LOCK_MGR_DEBUG */
 
-#else                                                  /* LOCK_MGR_DEBUG */
-
-int                    lockDebug = 0;
-unsigned int lock_debug_oid_min = BootstrapObjectIdData;
 static char *lock_types[] = {
-       "NONE",
+       "",
        "WRITE",
        "READ",
        "WRITE INTENT",
@@ -75,59 +159,6 @@ static char *lock_types[] = {
        "EXTEND"
 };
 
-#define LOCK_PRINT(where,tag,type)\
-       if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
-               elog(DEBUG, \
-                        "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
-                        MyProcPid,\
-                        tag->relId, tag->dbId, \
-                        ((tag->tupleId.ip_blkid.bi_hi<<16)+\
-                         tag->tupleId.ip_blkid.bi_lo),\
-                        tag->tupleId.ip_posid, \
-                        lock_types[type])
-
-#define LOCK_DUMP(where,lock,type)\
-       if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
-               LOCK_DUMP_AUX(where,lock,type)
-
-#define LOCK_DUMP_AUX(where,lock,type)\
-               elog(DEBUG, \
-                        "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
-                        "holders (%d,%d,%d,%d,%d) type (%s)",where, \
-                        MyProcPid,\
-                        lock->tag.relId, lock->tag.dbId, \
-                        ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
-                         lock->tag.tupleId.ip_blkid.bi_lo),\
-                        lock->tag.tupleId.ip_posid, \
-                        lock->nHolding,\
-                        lock->holders[1],\
-                        lock->holders[2],\
-                        lock->holders[3],\
-                        lock->holders[4],\
-                        lock->holders[5],\
-                        lock_types[type])
-
-#define XID_PRINT(where,xidentP)\
-       if ((lockDebug >= 2) && \
-               (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
-                >= lock_debug_oid_min)) \
-               elog(DEBUG,\
-                        "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
-                        "holders (%d,%d,%d,%d,%d)",\
-                        where,\
-                        MyProcPid,\
-                        xidentP->tag.xid,\
-                        xidentP->tag.pid,\
-                        xidentP->tag.lock,\
-                        xidentP->nHolding,\
-                        xidentP->holders[1],\
-                        xidentP->holders[2],\
-                        xidentP->holders[3],\
-                        xidentP->holders[4],\
-                        xidentP->holders[5])
-
-#endif                                                 /* LOCK_MGR_DEBUG */
-
 SPINLOCK       LockMgrLock;            /* in Shmem or created in
                                                                 * CreateSpinlocks() */
 
@@ -171,6 +202,16 @@ InitLocks()
                BITS_ON[i] = bit;
                BITS_OFF[i] = ~bit;
        }
+
+#ifdef LOCK_MGR_DEBUG
+       /*
+        * If lockDebugOidMin value has not been specified
+        * in pg_options set a default value.
+        */
+       if (!lockDebugOidMin) {
+               lockDebugOidMin = BootstrapObjectIdData;
+       }
+#endif
 }
 
 /* -------------------
@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName,
        {
                elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
                         numModes, MAX_LOCKMODES);
-               return (INVALID_TABLEID);
+               return (INVALID_LOCKMETHOD);
        }
 
        /* allocate a string for the shmem index table lookup */
@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName,
        if (!shmemName)
        {
                elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
-               return (INVALID_TABLEID);
+               return (INVALID_LOCKMETHOD);
        }
 
        /* each lock table has a non-shared header */
@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName,
        {
                elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
                pfree(shmemName);
-               return (INVALID_TABLEID);
+               return (INVALID_LOCKMETHOD);
        }
 
        /* ------------------------
@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName,
        if (status)
                return (lockMethodTable->ctl->lockmethod);
        else
-               return (INVALID_TABLEID);
+               return (INVALID_LOCKMETHOD);
 }
 
 /*
@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName,
  *             client to use different lockmethods when acquiring/releasing
  *             short term and long term locks.
  */
-#ifdef NOT_USED
+
 LOCKMETHOD
 LockMethodTableRename(LOCKMETHOD lockmethod)
 {
        LOCKMETHOD newLockMethod;
 
        if (NumLockMethods >= MAX_LOCK_METHODS)
-               return (INVALID_TABLEID);
-       if (LockMethodTable[lockmethod] == INVALID_TABLEID)
-               return (INVALID_TABLEID);
+               return (INVALID_LOCKMETHOD);
+       if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+               return (INVALID_LOCKMETHOD);
 
        /* other modules refer to the lock table by a lockmethod */
        newLockMethod = NumLockMethods;
@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
        LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
        return (newLockMethod);
 }
-#endif
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  * Side Effects: The lock is always acquired.  No way to abort
  *             a lock acquisition other than aborting the transaction.
  *             Lock is recorded in the lkchain.
+ *
 #ifdef USER_LOCKS
+ *
  * Note on User Locks:
+ *
  *             User locks are handled totally on the application side as
  *             long term cooperative locks which extend beyond the normal
  *             transaction boundaries.  Their purpose is to indicate to an
@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *             acquired if already held by another process.  They must be
  *             released explicitly by the application but they are released
  *             automatically when a backend terminates.
- *             They are indicated by a dummy lockmethod 0 which doesn't have
- *             any table allocated but uses the normal lock table, and are
- *             distinguished from normal locks for the following differences:
+ *             They are indicated by a lockmethod 2 which is an alias for the
+ *             normal lock table, and are distinguished from normal locks
+ *             for the following differences:
  *
  *                                                                             normal lock             user lock
  *
- *             lockmethod                                              1                               0
+ *             lockmethod                                              1                               2
  *             tag.relId                                               rel oid                 0
  *             tag.ItemPointerData.ip_blkid    block id                lock id2
  *             tag.ItemPointerData.ip_posid    tuple offset    lock id1
  *             xid.pid                                                 0                               backend pid
- *             xid.xid                                                 current xid             0
+ *             xid.xid                                                 xid or 0                0
  *             persistence                                             transaction             user or backend
  *
  *             The lockmode parameter can have the same values for normal locks
  *             although probably only WRITE_LOCK can have some practical use.
  *
- *                                                                                                             DZ - 4 Oct 1996
+ *                                                                                                             DZ - 22 Nov 1997
 #endif
  */
 
@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        SPINLOCK        masterLock;
        LOCKMETHODTABLE    *lockMethodTable;
        int                     status;
-       TransactionId myXid;
+       TransactionId xid;
 
 #ifdef USER_LOCKS
        int                     is_user_lock;
 
-       is_user_lock = (lockmethod == 0);
+       is_user_lock = (lockmethod == USER_LOCKMETHOD);
        if (is_user_lock)
        {
-               lockmethod = 1;
 #ifdef USER_LOCKS_DEBUG
-               elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
-                        locktag->tupleId.ip_posid,
-                        ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-                         locktag->tupleId.ip_blkid.bi_lo),
-                        lockmode);
+               TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s",
+                               locktag->tupleId.ip_posid,
+                               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                                locktag->tupleId.ip_blkid.bi_lo),
+                               lock_types[lockmode]);
 #endif
        }
 #endif
 
+       /* ???????? This must be changed when short term locks will be used */
+       locktag->lockmethod = lockmethod;
+
        Assert(lockmethod < NumLockMethods);
        lockMethodTable = LockMethodTable[lockmethod];
        if (!lockMethodTable)
@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        if (LockingIsDisabled)
                return (TRUE);
 
-       LOCK_PRINT("Acquire", locktag, lockmode);
        masterLock = lockMethodTable->ctl->masterLock;
 
        SpinAcquire(masterLock);
 
+       /*
+        * Find or create a lock with this tag
+        */
        Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
-
+       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                                                               HASH_ENTER, &found);
        if (!lock)
        {
                SpinRelease(masterLock);
@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        if (!found)
        {
                lock->mask = 0;
-               ProcQueueInit(&(lock->waitProcs));
-               MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
-               MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
                lock->nHolding = 0;
                lock->nActive = 0;
-
+               MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
+               MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
+               ProcQueueInit(&(lock->waitProcs));
                Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
                                                         &(locktag->tupleId.ip_blkid)));
-
+               LOCK_PRINT("LockAcquire: new", lock, lockmode);
+       } else {
+               LOCK_PRINT("LockAcquire: found", lock, lockmode);
+               Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+               Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+               Assert(lock->nActive <= lock->nHolding);
        }
 
        /* ------------------
@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         * ------------------
         */
        xidTable = lockMethodTable->xidHash;
-       myXid = GetCurrentTransactionId();
 
        /* ------------------
         * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         * ------------------
         */
        MemSet(&item, 0, XID_TAGSIZE);          /* must clear padding, needed */
-       TransactionIdStore(myXid, &item.tag.xid);
        item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-       item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+       item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
        if (is_user_lock)
        {
                item.tag.pid = MyProcPid;
-               item.tag.xid = myXid = 0;
-#ifdef USER_LOCKS_DEBUG
-               elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
-                        item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+               item.tag.xid = xid = 0;
+       } else {
+               xid = GetCurrentTransactionId();
+               TransactionIdStore(xid, &item.tag.xid);
        }
+#else
+       xid = GetCurrentTransactionId();
+       TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-       result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
+       /*
+        * Find or create an xid entry with this tag
+        */
+       result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                                                                 HASH_ENTER, &found);
        if (!result)
        {
                elog(NOTICE, "LockAcquire: xid table corrupted");
                return (STATUS_ERROR);
        }
+
+       /*
+        * If not found initialize the new entry
+        */
        if (!found)
        {
-               XID_PRINT("LockAcquire: queueing XidEnt", result);
-               ProcAddLock(&result->queue);
                result->nHolding = 0;
                MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
+               ProcAddLock(&result->queue);
+               XID_PRINT("LockAcquire: new", result);
+       } else {
+               XID_PRINT("LockAcquire: found", result);
+               Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
+               Assert(result->nHolding <= lock->nActive);
        }
 
        /* ----------------
@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         */
        lock->nHolding++;
        lock->holders[lockmode]++;
+       Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
 
        /* --------------------
         * If I'm the only one holding a lock, then there
@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        {
                result->holders[lockmode]++;
                result->nHolding++;
+               XID_PRINT("LockAcquire: owning", result);
+               Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
                GrantLock(lock, lockmode);
                SpinRelease(masterLock);
                return (TRUE);
        }
 
-       Assert(result->nHolding <= lock->nActive);
-
-       status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
-
+       status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
        if (status == STATUS_OK)
                GrantLock(lock, lockmode);
        else if (status == STATUS_FOUND)
        {
 #ifdef USER_LOCKS
-
                /*
-                * User locks are non blocking. If we can't acquire a lock remove
-                * the xid entry and return FALSE without waiting.
+                * User locks are non blocking. If we can't acquire a lock we must
+                * remove the xid entry and return FALSE without waiting.
                 */
                if (is_user_lock)
                {
                        if (!result->nHolding)
                        {
                                SHMQueueDelete(&result->queue);
-                               hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
+                               result = (XIDLookupEnt *) hash_search(xidTable,
+                                                                                                         (Pointer) &result,
+                                                                                                         HASH_REMOVE, &found);
+                               if (!result || !found) {
+                                       elog(NOTICE, "LockAcquire: remove xid, table corrupted");
+                               }
+                       } else {
+                               XID_PRINT_AUX("LockAcquire: NHOLDING", result);
                        }
                        lock->nHolding--;
                        lock->holders[lockmode]--;
+                       LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+                       Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+                       Assert(lock->nActive <= lock->nHolding);
                        SpinRelease(masterLock);
-#ifdef USER_LOCKS_DEBUG
-                       elog(NOTICE, "LockAcquire: user lock failed");
-#endif
                        return (FALSE);
                }
 #endif
-               status = WaitOnLock(lockmethod, lock, lockmode);
-               XID_PRINT("Someone granted me the lock", result);
+               status = WaitOnLock(lockmethod, lock, lockmode, xid);
+               /*
+                * Check the xid entry status, in case something in the
+                * ipc communication doesn't work correctly.
+                */
+               if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) {
+                       XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
+                       LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
+                       /* Should we retry ? */
+                       return (FALSE);
+               }
+               XID_PRINT("LockAcquire: granted", result);
+               LOCK_PRINT("LockAcquire: granted", lock, lockmode);
        }
 
        SpinRelease(masterLock);
@@ -646,7 +725,8 @@ int
 LockResolveConflicts(LOCKMETHOD lockmethod,
                                         LOCK *lock,
                                         LOCKMODE lockmode,
-                                        TransactionId xid)
+                                        TransactionId xid,
+                                        XIDLookupEnt *xidentP)         /* xident ptr or NULL */
 {
        XIDLookupEnt *result,
                                item;
@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
        int                     bitmask;
        int                     i,
                                tmpMask;
+#ifdef USER_LOCKS
+       int                     is_user_lock;
+#endif
 
        numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
        xidTable = LockMethodTable[lockmethod]->xidHash;
 
-       /* ---------------------
-        * read my own statistics from the xid table.  If there
-        * isn't an entry, then we'll just add one.
-        *
-        * Zero out the tag, this clears the padding bytes for long
-        * word alignment and ensures hashing consistency.
-        * ------------------
-        */
-       MemSet(&item, 0, XID_TAGSIZE);
-       TransactionIdStore(xid, &item.tag.xid);
-       item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-       item.tag.pid = pid;
+       if (xidentP) {
+               /*
+                * A pointer to the xid entry was supplied from the caller.
+                * Actually only LockAcquire can do it.
+                */
+               result = xidentP;
+       } else {
+               /* ---------------------
+                * read my own statistics from the xid table.  If there
+                * isn't an entry, then we'll just add one.
+                *
+                * Zero out the tag, this clears the padding bytes for long
+                * word alignment and ensures hashing consistency.
+                * ------------------
+                */
+               MemSet(&item, 0, XID_TAGSIZE);
+               item.tag.lock = MAKE_OFFSET(lock);
+#ifdef USE_XIDTAG_LOCKMETHOD
+               item.tag.lockmethod = lockmethod;
+#endif
+#ifdef USER_LOCKS
+               is_user_lock = (lockmethod == 2);
+               if (is_user_lock) {
+                       item.tag.pid = MyProcPid;
+                       item.tag.xid = 0;
+               } else {
+                       TransactionIdStore(xid, &item.tag.xid);
+               }
+#else
+               TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-       if (!(result = (XIDLookupEnt *)
-                 hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
-       {
-               elog(NOTICE, "LockResolveConflicts: xid table corrupted");
-               return (STATUS_ERROR);
-       }
-       myHolders = result->holders;
+               /*
+                * Find or create an xid entry with this tag
+                */
+               result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                                                                         HASH_ENTER, &found);
+               if (!result)
+               {
+                       elog(NOTICE, "LockResolveConflicts: xid table corrupted");
+                       return (STATUS_ERROR);
+               }
 
-       if (!found)
-       {
-               /* ---------------
-                * we're not holding any type of lock yet.  Clear
-                * the lock stats.
-                * ---------------
+               /*
+                * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN,
+                * if we are trying to resolve a conflict we must already have
+                * allocated an xid entry for this lock.        dz 21-11-1997
                 */
-               MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
-               result->nHolding = 0;
+               if (!found)
+               {
+                       /* ---------------
+                        * we're not holding any type of lock yet.  Clear
+                        * the lock stats.
+                        * ---------------
+                        */
+                       MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders)));
+                       result->nHolding = 0;
+                       XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
+               } else {
+                       XID_PRINT("LockResolveConflicts: found", result);
+               }
        }
+       Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
+       /*
+        * We can control runtime this option. Default is lockReadPriority=0
+        */
+       if (!lockReadPriority)
        {
                /* ------------------------
                 * If someone with a greater priority is waiting for the lock,
@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
                PROC_QUEUE *waitQueue = &(lock->waitProcs);
                PROC       *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-               if (waitQueue->size && topproc->prio > myprio)
+               if (waitQueue->size && topproc->prio > myprio) {
+                       XID_PRINT("LockResolveConflicts: higher priority proc waiting",
+                                         result);
                        return STATUS_FOUND;
+               }
        }
 
        /* ----------------------------
@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
         */
        if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
        {
-
                result->holders[lockmode]++;
                result->nHolding++;
-
-               XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+               XID_PRINT("LockResolveConflicts: no conflict", result);
+               Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
                return (STATUS_OK);
        }
 
@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
         * that does not reflect our own locks.
         * ------------------------
         */
+       myHolders = result->holders;
        bitmask = 0;
        tmpMask = 2;
        for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
         */
        if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
        {
-
                /* no conflict. Get the lock and go on */
-
                result->holders[lockmode]++;
                result->nHolding++;
-
-               XID_PRINT("Conflict Resolved: updated xid entry stats", result);
-
+               XID_PRINT("LockResolveConflicts: resolved", result);
+               Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
                return (STATUS_OK);
-
        }
 
+       XID_PRINT("LockResolveConflicts: conflicting", result);
        return (STATUS_FOUND);
 }
 
+/*
+ * GrantLock -- update the lock data structure to show
+ *             the new lock holder.
+ */
+void
+GrantLock(LOCK *lock, LOCKMODE lockmode)
+{
+       lock->nActive++;
+       lock->activeHolders[lockmode]++;
+       lock->mask |= BITS_ON[lockmode];
+       LOCK_PRINT("GrantLock", lock, lockmode);
+       Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
+       Assert(lock->nActive <= lock->nHolding);
+}
+
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
+                  TransactionId xid)
 {
        PROC_QUEUE *waitQueue = &(lock->waitProcs);
        LOCKMETHODTABLE    *lockMethodTable = LockMethodTable[lockmethod];
        int                     prio = lockMethodTable->ctl->prio[lockmode];
+       char            old_status[64],
+                               new_status[64];
 
        Assert(lockmethod < NumLockMethods);
 
@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
         * synchronization for this queue.      That will not be true if/when
         * people can be deleted from the queue by a SIGINT or something.
         */
-       LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+       LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
+       strcpy(old_status, PS_STATUS);
+       strcpy(new_status, PS_STATUS);
+       strcat(new_status, " waiting");
+       PS_SET_STATUS(new_status);
        if (ProcSleep(waitQueue,
                                  lockMethodTable->ctl->masterLock,
                                  lockmode,
                                  prio,
-                                 lock) != NO_ERROR)
+                                 lock,
+                                 xid) != NO_ERROR)
        {
                /* -------------------
-                * This could have happend as a result of a deadlock, see HandleDeadLock()
-                * Decrement the lock nHolding and holders fields as we are no longer
-                * waiting on this lock.
+                * This could have happend as a result of a deadlock,
+                * see HandleDeadLock().
+                * Decrement the lock nHolding and holders fields as
+                * we are no longer waiting on this lock.
                 * -------------------
                 */
                lock->nHolding--;
                lock->holders[lockmode]--;
-               LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+               LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
+               Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+               Assert(lock->nActive <= lock->nHolding);
                SpinRelease(lockMethodTable->ctl->masterLock);
                elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
+
+               /* not reached */
        }
 
-       LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
+       PS_SET_STATUS(old_status);
+       LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
        return (STATUS_OK);
 }
 
@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        XIDLookupEnt *result,
                                item;
        HTAB       *xidTable;
+       TransactionId xid;
        bool            wakeupNeeded = true;
+       int                     trace_flag;
 
 #ifdef USER_LOCKS
        int                     is_user_lock;
 
-       is_user_lock = (lockmethod == 0);
+       is_user_lock = (lockmethod == USER_LOCKMETHOD);
        if (is_user_lock)
        {
-               lockmethod = 1;
-#ifdef USER_LOCKS_DEBUG
-               elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
-                        locktag->tupleId.ip_posid,
-                        ((locktag->tupleId.ip_blkid.bi_hi << 16) +
-                         locktag->tupleId.ip_blkid.bi_lo),
-                        lockmode);
-#endif
+               TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d",
+                               locktag->tupleId.ip_posid,
+                               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                                locktag->tupleId.ip_blkid.bi_lo),
+                               lockmode);
        }
 #endif
 
+       /* ???????? This must be changed when short term locks will be used */
+       locktag->lockmethod = lockmethod;
+
+#ifdef USER_LOCKS
+       trace_flag = \
+               (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+       trace_flag = TRACE_LOCKS;
+#endif
+
        Assert(lockmethod < NumLockMethods);
        lockMethodTable = LockMethodTable[lockmethod];
        if (!lockMethodTable)
@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        if (LockingIsDisabled)
                return (TRUE);
 
-       LOCK_PRINT("Release", locktag, lockmode);
-
        masterLock = lockMethodTable->ctl->masterLock;
-       xidTable = lockMethodTable->xidHash;
-
        SpinAcquire(masterLock);
 
-       Assert(lockMethodTable->lockHash->hash == tag_hash);
-       lock = (LOCK *)
-               hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
-
-#ifdef USER_LOCKS
-
        /*
-        * If the entry is not found hash_search returns TRUE instead of NULL,
-        * so we must check it explicitly.
+        * Find a lock with this tag
         */
-       if ((is_user_lock) && (lock == (LOCK *) TRUE))
-       {
-               SpinRelease(masterLock);
-               elog(NOTICE, "LockRelease: there are no locks with this tag");
-               return (FALSE);
-       }
-#endif
+       Assert(lockMethodTable->lockHash->hash == tag_hash);
+       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                                                               HASH_FIND, &found);
 
        /*
-        * let the caller print its own error message, too. Do not
-        * elog(ERROR).
+        * let the caller print its own error message, too. Do not elog(ERROR).
         */
        if (!lock)
        {
@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        if (!found)
        {
                SpinRelease(masterLock);
+#ifdef USER_LOCKS
+               if (is_user_lock) {
+                       TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+                       return (FALSE);
+               }
+#endif
                elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
                return (FALSE);
        }
+       LOCK_PRINT("LockRelease: found", lock, lockmode);
+       Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
+       Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
+       Assert(lock->nActive <= lock->nHolding);
 
-       Assert(lock->nHolding > 0);
-
-#ifdef USER_LOCKS
-
-       /*
-        * If this is an user lock it can be removed only after checking that
-        * it was acquired by the current process, so this code is skipped and
-        * executed later.
-        */
-       if (!is_user_lock)
-       {
-#endif
-
-               /*
-                * fix the general lock stats
-                */
-               lock->nHolding--;
-               lock->holders[lockmode]--;
-               lock->nActive--;
-               lock->activeHolders[lockmode]--;
-
-               Assert(lock->nActive >= 0);
-
-               if (!lock->nHolding)
-               {
-                       /* ------------------
-                        * if there's no one waiting in the queue,
-                        * we just released the last lock.
-                        * Delete it from the lock table.
-                        * ------------------
-                        */
-                       Assert(lockMethodTable->lockHash->hash == tag_hash);
-                       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                                               (Pointer) &(lock->tag),
-                                                                               HASH_REMOVE_SAVED,
-                                                                               &found);
-                       Assert(lock && found);
-                       wakeupNeeded = false;
-               }
-#ifdef USER_LOCKS
-       }
-#endif
 
        /* ------------------
         * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         * ------------------
         */
        MemSet(&item, 0, XID_TAGSIZE);
-
-       TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
        item.tag.lock = MAKE_OFFSET(lock);
-#if 0
-       item.tag.pid = MyPid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+       item.tag.lockmethod = lockmethod;
 #endif
-
 #ifdef USER_LOCKS
-       if (is_user_lock)
-       {
+       if (is_user_lock) {
                item.tag.pid = MyProcPid;
-               item.tag.xid = 0;
-#ifdef USER_LOCKS_DEBUG
-               elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
-                        item.tag.lock, item.tag.pid, item.tag.xid);
-#endif
+               item.tag.xid = xid = 0;
+       } else {
+               xid = GetCurrentTransactionId();
+               TransactionIdStore(xid, &item.tag.xid);
        }
+#else
+       xid = GetCurrentTransactionId();
+       TransactionIdStore(xid, &item.tag.xid);
 #endif
 
-       if (!(result = (XIDLookupEnt *) hash_search(xidTable,
-                                                                                               (Pointer) &item,
-                                                                                               HASH_FIND_SAVE,
-                                                                                               &found))
-               || !found)
+       /*
+        * Find an xid entry with this tag
+        */
+       xidTable = lockMethodTable->xidHash;
+       result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
+                                                                                 HASH_FIND_SAVE, &found);
+       if (!result || !found)
        {
                SpinRelease(masterLock);
 #ifdef USER_LOCKS
-               if ((is_user_lock) && (result))
-                       elog(NOTICE, "LockRelease: you don't have a lock on this tag");
-               else
-                       elog(NOTICE, "LockRelease: find xid, table corrupted");
-#else
-               elog(NOTICE, "LockReplace: xid table corrupted");
+               if (!found && is_user_lock) {
+                       TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
+               } else
 #endif
+               elog(NOTICE, "LockReplace: xid table corrupted");
                return (FALSE);
        }
+       XID_PRINT("LockRelease: found", result);
+       Assert(result->tag.lock == MAKE_OFFSET(lock));
+
+       /*
+        * Check that we are actually holding a lock of the type we want
+        * to release.
+        */
+       if (!(result->holders[lockmode] > 0)) {
+               SpinRelease(masterLock);
+               XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
+               elog(NOTICE, "LockRelease: you don't own a lock of type %s",
+                        lock_types[lockmode]);
+               Assert(result->holders[lockmode] >= 0);
+               return (FALSE);
+       }
+       Assert(result->nHolding > 0);
+
+       /*
+        * fix the general lock stats
+        */
+       lock->nHolding--;
+       lock->holders[lockmode]--;
+       lock->nActive--;
+    lock->activeHolders[lockmode]--;
+
+       /* --------------------------
+        * If there are still active locks of the type I just released, no one
+        * should be woken up.  Whoever is asleep will still conflict
+        * with the remaining locks.
+        * --------------------------
+        */
+       if (lock->activeHolders[lockmode])
+       {
+               wakeupNeeded = false;
+       }
+       else
+       {
+               /* change the conflict mask.  No more of this lock type. */
+               lock->mask &= BITS_OFF[lockmode];
+       }
+
+       LOCK_PRINT("LockRelease: updated", lock, lockmode);
+       Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
+       Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
+       Assert(lock->nActive <= lock->nHolding);
+
+       if (!lock->nHolding)
+       {
+               /* ------------------
+                * if there's no one waiting in the queue,
+                * we just released the last lock.
+                * Delete it from the lock table.
+                * ------------------
+                */
+               Assert(lockMethodTable->lockHash->hash == tag_hash);
+               lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+                                                                       (Pointer) &(lock->tag),
+                                                                       HASH_REMOVE,
+                                                                       &found);
+               Assert(lock && found);
+               wakeupNeeded = false;
+       }
 
        /*
         * now check to see if I have any private locks.  If I do, decrement
@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         */
        result->holders[lockmode]--;
        result->nHolding--;
-
-       XID_PRINT("LockRelease updated xid stats", result);
+       XID_PRINT("LockRelease: updated", result);
+       Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
        /*
         * If this was my last hold on this lock, delete my entry in the XID
@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
         */
        if (!result->nHolding)
        {
-#ifdef USER_LOCKS
-               if (result->queue.prev == INVALID_OFFSET)
+               if (result->queue.prev == INVALID_OFFSET) {
                        elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
-               if (result->queue.next == INVALID_OFFSET)
+               }
+               if (result->queue.next == INVALID_OFFSET) {
                        elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
-#endif
+               }
                if (result->queue.next != INVALID_OFFSET)
                        SHMQueueDelete(&result->queue);
-               if (!(result = (XIDLookupEnt *)
-                         hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
-                       !found)
+               XID_PRINT("LockRelease: deleting", result);
+               result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
+                                                                                         HASH_REMOVE_SAVED, &found);
+               if (!result || !found)
                {
                        SpinRelease(masterLock);
-#ifdef USER_LOCKS
                        elog(NOTICE, "LockRelease: remove xid, table corrupted");
-#else
-                       elog(NOTICE, "LockReplace: xid table corrupted");
-#endif
                        return (FALSE);
                }
        }
 
-#ifdef USER_LOCKS
-
-       /*
-        * If this is an user lock remove it now, after the corresponding xid
-        * entry has been found and deleted.
-        */
-       if (is_user_lock)
-       {
-
-               /*
-                * fix the general lock stats
-                */
-               lock->nHolding--;
-               lock->holders[lockmode]--;
-               lock->nActive--;
-               lock->activeHolders[lockmode]--;
-
-               Assert(lock->nActive >= 0);
-
-               if (!lock->nHolding)
-               {
-                       /* ------------------
-                        * if there's no one waiting in the queue,
-                        * we just released the last lock.
-                        * Delete it from the lock table.
-                        * ------------------
-                        */
-                       Assert(lockMethodTable->lockHash->hash == tag_hash);
-                       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
-                                                                               (Pointer) &(lock->tag),
-                                                                               HASH_REMOVE,
-                                                                               &found);
-                       Assert(lock && found);
-                       wakeupNeeded = false;
-               }
-       }
-#endif
-
-       /* --------------------------
-        * If there are still active locks of the type I just released, no one
-        * should be woken up.  Whoever is asleep will still conflict
-        * with the remaining locks.
-        * --------------------------
-        */
-       if (!(lock->activeHolders[lockmode]))
-       {
-               /* change the conflict mask.  No more of this lock type. */
-               lock->mask &= BITS_OFF[lockmode];
-       }
-
        if (wakeupNeeded)
        {
                /* --------------------------
@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
                 */
                ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
        }
+       else
+       {
+               LOCK_TPRINTF(lock, "LockRelease: no wakeup needed");
+       }
 
        SpinRelease(masterLock);
        return (TRUE);
 }
 
-/*
- * GrantLock -- update the lock data structure to show
- *             the new lock holder.
- */
-void
-GrantLock(LOCK *lock, LOCKMODE lockmode)
-{
-       lock->nActive++;
-       lock->activeHolders[lockmode]++;
-       lock->mask |= BITS_ON[lockmode];
-}
-
-#ifdef USER_LOCKS
 /*
  * LockReleaseAll -- Release all locks in a process lock queue.
- *
- * Note: This code is a little complicated by the presence in the
- *              same queue of user locks which can't be removed from the
- *              normal lock queue at the end of a transaction. They must
- *              however be removed when the backend exits.
- *              A dummy lockmethod 0 is used to indicate that we are releasing
- *              the user locks, from the code added to ProcKill().
  */
-#endif
 bool
 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 {
@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
        int                     done;
        XIDLookupEnt *xidLook = NULL;
        XIDLookupEnt *tmp = NULL;
+       XIDLookupEnt *result;
        SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
        SPINLOCK        masterLock;
        LOCKMETHODTABLE    *lockMethodTable;
@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                                numLockModes;
        LOCK       *lock;
        bool            found;
+       int                     trace_flag;
+       int                     xidtag_lockmethod;
 
 #ifdef USER_LOCKS
        int                     is_user_lock_table,
                                count,
-                               nskip;
+                               nleft;
 
-       is_user_lock_table = (lockmethod == 0);
-#ifdef USER_LOCKS_DEBUG
-       elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
-#endif
-       if (is_user_lock_table)
-               lockmethod = 1;
+       count = nleft = 0;
+
+       is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
+       trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
+#else
+       trace_flag = TRACE_LOCKS;
 #endif
+       TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
+                       lockmethod, MyProcPid);
 
        Assert(lockmethod < NumLockMethods);
        lockMethodTable = LockMethodTable[lockmethod];
-       if (!lockMethodTable)
+       if (!lockMethodTable) {
+               elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
                return (FALSE);
-
-       numLockModes = lockMethodTable->ctl->numLockModes;
-       masterLock = lockMethodTable->ctl->masterLock;
+       }
 
        if (SHMQueueEmpty(lockQueue))
                return TRUE;
 
-#ifdef USER_LOCKS
+       numLockModes = lockMethodTable->ctl->numLockModes;
+       masterLock = lockMethodTable->ctl->masterLock;
+
        SpinAcquire(masterLock);
-#endif
        SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
 
-       XID_PRINT("LockReleaseAll", xidLook);
-
-#ifndef USER_LOCKS
-       SpinAcquire(masterLock);
-#else
-       count = nskip = 0;
-#endif
        for (;;)
        {
+               /*
+                * Sometimes the queue appears to be messed up.
+                */
+               if (count++ > 1000)
+               {
+                       elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
+                       nleft = 0;
+                       break;
+               }
+
                /* ---------------------------
                 * XXX Here we assume the shared memory queue is circular and
                 * that we know its internal structure.  Should have some sort of
@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                done = (xidLook->queue.next == end);
                lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-               LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
-
-#ifdef USER_LOCKS
+               xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
+               if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) {
+                       XID_PRINT("LockReleaseAll", xidLook);
+                       LOCK_PRINT("LockReleaseAll", lock, 0);
+               }
 
-               /*
-                * Sometimes the queue appears to be messed up.
-                */
-               if (count++ > 2000)
-               {
-                       elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
-                       nskip = 0;
-                       break;
+#ifdef USE_XIDTAG_LOCKMETHOD
+               if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
+                       elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
+                                xidtag_lockmethod, lock->tag.lockmethod);
+#endif
+               if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) {
+                       TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
+                       nleft++;
+                       goto next_item;
                }
+
+               Assert(lock->nHolding > 0);
+               Assert(lock->nActive > 0);
+               Assert(lock->nActive <= lock->nHolding);
+               Assert(xidLook->nHolding >= 0);
+               Assert(xidLook->nHolding <= lock->nHolding);
+
+#ifdef USER_LOCKS
                if (is_user_lock_table)
                {
                        if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
                        {
-#ifdef USER_LOCKS_DEBUG
-                               elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
-                                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-                               nskip++;
+                               TPRINTF(TRACE_USERLOCKS,
+                                               "LockReleaseAll: skiping normal lock [%d,%d,%d]",
+                                               xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+                               nleft++;
                                goto next_item;
                        }
                        if (xidLook->tag.pid != MyProcPid)
                        {
-                               /* This should never happen */
-#ifdef USER_LOCKS_DEBUG
+                               /* Should never happen */
                                elog(NOTICE,
-                                        "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+                                        "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]",
                                         lock->tag.tupleId.ip_posid,
                                         ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
                                          lock->tag.tupleId.ip_blkid.bi_lo),
-                                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-                               nskip++;
+                                        xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+                               nleft++;
                                goto next_item;
                        }
-#ifdef USER_LOCKS_DEBUG
-                       elog(NOTICE,
-                                "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
-                                lock->tag.tupleId.ip_posid,
-                                ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-                                 lock->tag.tupleId.ip_blkid.bi_lo),
-                                xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
+                       TPRINTF(TRACE_USERLOCKS,
+                                       "LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]",
+                                       lock->tag.tupleId.ip_posid,
+                                       ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+                                        lock->tag.tupleId.ip_blkid.bi_lo),
+                                       xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
                }
                else
                {
-                       if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
+                       /* Can't check xidLook->tag.xid, can be 0 also for normal locks */
+                       if (xidLook->tag.pid != 0)
                        {
-#ifdef USER_LOCKS_DEBUG
-                               elog(NOTICE,
-                                        "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
-                                        lock->tag.tupleId.ip_posid,
-                                        ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
-                                         lock->tag.tupleId.ip_blkid.bi_lo),
-                                 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
-                               nskip++;
+                               TPRINTF(TRACE_LOCKS,
+                                               "LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]",
+                                               lock->tag.tupleId.ip_posid,
+                                               ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
+                                                lock->tag.tupleId.ip_blkid.bi_lo),
+                                               xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
+                               nleft++;
                                goto next_item;
                        }
-#ifdef USER_LOCKS_DEBUG
-                       elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
-                                xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif
                }
 #endif
 
@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                 */
                if (lock->nHolding != xidLook->nHolding)
                {
-                       lock->nHolding -= xidLook->nHolding;
-                       lock->nActive -= xidLook->nHolding;
-                       Assert(lock->nActive >= 0);
                        for (i = 1; i <= numLockModes; i++)
                        {
+                               Assert(xidLook->holders[i] >= 0);
                                lock->holders[i] -= xidLook->holders[i];
                                lock->activeHolders[i] -= xidLook->holders[i];
+                               Assert((lock->holders[i] >= 0) \
+                                          && (lock->activeHolders[i] >= 0));
                                if (!lock->activeHolders[i])
                                        lock->mask &= BITS_OFF[i];
                        }
+                       lock->nHolding -= xidLook->nHolding;
+                       lock->nActive -= xidLook->nHolding;
+                       Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
+                       Assert(lock->nActive <= lock->nHolding);
                }
                else
                {
@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                         * --------------
                         */
                        lock->nHolding = 0;
+                       /* Fix the lock status, just for next LOCK_PRINT message. */
+                       for (i=1; i<=numLockModes; i++) {
+                               Assert(lock->holders[i] == lock->activeHolders[i]);
+                               lock->holders[i] = lock->activeHolders[i] = 0;
+                       }
                }
+               LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+
+               /*
+                * Remove the xid from the process lock queue
+                */
+               SHMQueueDelete(&xidLook->queue);
+
                /* ----------------
                 * always remove the xidLookup entry, we're done with it now
                 * ----------------
                 */
-#ifdef USER_LOCKS
-               SHMQueueDelete(&xidLook->queue);
-#endif
-               if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
-                       || !found)
+
+               XID_PRINT("LockReleaseAll: deleting", xidLook);
+               result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
+                                                                                         (Pointer) xidLook, 
+                                                                                         HASH_REMOVE,
+                                                                                         &found);
+               if (!result || !found)
                {
                        SpinRelease(masterLock);
                        elog(NOTICE, "LockReleaseAll: xid table corrupted");
@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                         * the last lock.
                         * --------------------
                         */
-
+                       LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
                        Assert(lockMethodTable->lockHash->hash == tag_hash);
-                       lock = (LOCK *)
-                               hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
+                       lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+                                                                               (Pointer) &(lock->tag),
+                                                                               HASH_REMOVE, &found);
                        if ((!lock) || (!found))
                        {
                                SpinRelease(masterLock);
@@ -1324,15 +1448,18 @@ next_item:
                SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
                xidLook = tmp;
        }
-       SpinRelease(masterLock);
-#ifdef USER_LOCKS
 
        /*
         * Reinitialize the queue only if nothing has been left in.
         */
-       if (nskip == 0)
-#endif
+       if (nleft == 0) {
+               TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
                SHMQueueInit(lockQueue);
+       }
+
+       SpinRelease(masterLock);
+       TPRINTF(trace_flag, "LockReleaseAll: done");
+
        return TRUE;
 }
 
@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
                checked_procs[0] = MyProc;
                nprocs = 1;
 
-               lockMethodTable = LockMethodTable[1];
+               lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
                xidTable = lockMethodTable->xidHash;
 
                MemSet(&item, 0, XID_TAGSIZE);
@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
                done = (xidLook->queue.next == end);
                lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-               LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
+               LOCK_PRINT("DeadLockCheck", lock, 0);
 
                /*
                 * This is our only check to see if we found the lock we want.
@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
        return false;
 }
 
+/*
+ * Return an array with the pids of all processes owning a lock.
+ * This works only for user locks because normal locks have no
+ * pid information in the corresponding XIDLookupEnt.
+ */
+ArrayType *
+LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
+{
+       XIDLookupEnt *xidLook = NULL;
+       SPINLOCK        masterLock;
+       LOCK            *lock;
+       SHMEM_OFFSET lock_offset;
+       int             count = 0;
+       LOCKMETHODTABLE    *lockMethodTable;
+       HTAB            *xidTable;
+       bool            found;
+       int                     ndims,
+                               nitems,
+                               hdrlen,
+                               size;
+       int                     lbounds[1],
+                               hbounds[1];
+       ArrayType       *array;
+       int                     *data_ptr;
+
+       /* Assume that no one will modify the result */
+       static int      empty_array[] = { 20, 1, 0, 0, 0 };
+
+#ifdef USER_LOCKS
+       int                     is_user_lock;
+
+       is_user_lock = (lockmethod == USER_LOCKMETHOD);
+       if (is_user_lock)
+       {
+               TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]",
+                               locktag->tupleId.ip_posid,
+                               ((locktag->tupleId.ip_blkid.bi_hi << 16) +
+                                locktag->tupleId.ip_blkid.bi_lo));
+       }
+#endif
+
+       /* This must be changed when short term locks will be used */
+       locktag->lockmethod = lockmethod;
+
+       Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
+       lockMethodTable = LockMethodTable[lockmethod];
+       if (!lockMethodTable)
+       {
+               elog(NOTICE, "lockMethodTable is null in LockOwners");
+               return ((ArrayType *) &empty_array);
+       }
+
+       if (LockingIsDisabled)
+       {
+               return ((ArrayType *) &empty_array);
+       }
+
+       masterLock = lockMethodTable->ctl->masterLock;
+       SpinAcquire(masterLock);
+
+       /*
+        * Find a lock with this tag
+        */
+       Assert(lockMethodTable->lockHash->hash == tag_hash);
+       lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
+                                                               HASH_FIND, &found);
+
+       /*
+        * let the caller print its own error message, too. Do not elog(WARN).
+        */
+       if (!lock)
+       {
+               SpinRelease(masterLock);
+               elog(NOTICE, "LockOwners: locktable corrupted");
+               return ((ArrayType *) &empty_array);
+       }
+
+       if (!found)
+       {
+               SpinRelease(masterLock);
+#ifdef USER_LOCKS
+               if (is_user_lock) {
+                       TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
+                       return ((ArrayType *) &empty_array);
+               }
+#endif
+               elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
+               return ((ArrayType *) &empty_array);
+       }
+       LOCK_PRINT("LockOwners: found", lock, 0);
+       Assert((lock->nHolding > 0) && (lock->nActive > 0));
+       Assert(lock->nActive <= lock->nHolding);
+       lock_offset = MAKE_OFFSET(lock);
+
+       /* Construct a 1-dimensional array */
+       ndims = 1;
+       hdrlen = ARR_OVERHEAD(ndims);
+       lbounds[0] = 0;
+       hbounds[0] = lock->nActive;
+       size = hdrlen + sizeof(int) * hbounds[0];
+       array = (ArrayType *) palloc(size);
+       MemSet(array, 0, size);
+       memmove((char *) array, (char *) &size, sizeof(int));
+       memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
+       memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+       memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
+       SET_LO_FLAG(false, array);
+       data_ptr = (int *) ARR_DATA_PTR(array);
+
+       xidTable = lockMethodTable->xidHash;
+       hash_seq(NULL);
+       nitems = 0;
+       while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+                  (xidLook != (XIDLookupEnt *)TRUE)) {
+               if (count++ > 1000) {
+                       elog(NOTICE,"LockOwners: possible loop, giving up");
+                       break;
+               }
+
+               if (xidLook->tag.pid == 0) {
+                       XID_PRINT("LockOwners: no pid", xidLook);
+                       continue;
+               }
+
+               if (!xidLook->tag.lock) {
+                       XID_PRINT("LockOwners: NULL LOCK", xidLook);
+                       continue;
+               }
+
+               if (xidLook->tag.lock != lock_offset) {
+                       XID_PRINT("LockOwners: different lock", xidLook);
+                       continue;
+               }
+
+               if (LOCK_LOCKMETHOD(*lock) != lockmethod) {
+                       XID_PRINT("LockOwners: other table", xidLook);
+                       continue;
+               }
+
+               if (xidLook->nHolding <= 0) {
+                       XID_PRINT("LockOwners: not holding", xidLook);
+                       continue;
+               }
+
+               if (nitems >= hbounds[0]) {
+                       elog(NOTICE,"LockOwners: array size exceeded");
+                       break;
+               }
+
+               /*
+                * Check that the holding process is still alive by sending
+                * him an unused (ignored) signal. If the kill fails the
+                * process is not alive.
+                */
+               if ((xidLook->tag.pid != MyProcPid) \
+                       && (kill(xidLook->tag.pid, SIGCHLD)) != 0)
+               {
+                       /* Return a negative pid to signal that process is dead */
+                       data_ptr[nitems++] = - (xidLook->tag.pid);
+                       XID_PRINT("LockOwners: not alive", xidLook);
+                       /* XXX - TODO: remove this entry and update lock stats */
+                       continue;
+               }
+
+               /* Found a process holding the lock */
+               XID_PRINT("LockOwners: holding", xidLook);
+               data_ptr[nitems++] = xidLook->tag.pid;
+       }
+
+       SpinRelease(masterLock);
+
+       /* Adjust the actual size of the array */
+       hbounds[0] = nitems;
+       size = hdrlen + sizeof(int) * hbounds[0];
+       memmove((char *) array, (char *) &size, sizeof(int));
+       memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
+
+       return (array);
+}
+
 #ifdef DEADLOCK_DEBUG
 /*
- * Dump all locks. Must have already acquired the masterLock.
+ * Dump all locks in the proc->lockQueue. Must have already acquired
+ * the masterLock.
  */
 void
 DumpLocks()
@@ -1577,9 +1885,8 @@ DumpLocks()
        SPINLOCK        masterLock;
        int                     numLockModes;
        LOCK       *lock;
-
-       count;
-       int                     lockmethod = 1;
+       int                     count = 0;
+       int                     lockmethod = DEFAULT_LOCKMETHOD;
        LOCKMETHODTABLE    *lockMethodTable;
 
        ShmemPIDLookup(MyProcPid, &location);
@@ -1604,11 +1911,18 @@ DumpLocks()
        SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
        end = MAKE_OFFSET(lockQueue);
 
-       LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
-       XID_PRINT("DumpLocks", xidLook);
+       if (MyProc->waitLock) {
+               LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
+       }
 
-       for (count = 0;;)
+       for (;;)
        {
+               if (count++ > 2000)
+               {
+                       elog(NOTICE, "DumpLocks: xid loop detected, giving up");
+                       break;
+               }
+
                /* ---------------------------
                 * XXX Here we assume the shared memory queue is circular and
                 * that we know its internal structure.  Should have some sort of
@@ -1618,19 +1932,68 @@ DumpLocks()
                done = (xidLook->queue.next == end);
                lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
-               LOCK_DUMP("DumpLocks", lock, 0);
-
-               if (count++ > 2000)
-               {
-                       elog(NOTICE, "DumpLocks: xid loop detected, giving up");
-                       break;
-               }
+               XID_PRINT_AUX("DumpLocks", xidLook);
+               LOCK_PRINT_AUX("DumpLocks", lock, 0);
 
                if (done)
                        break;
+
                SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
                xidLook = tmp;
        }
 }
 
+/*
+ * Dump all postgres locks. Must have already acquired the masterLock.
+ */
+void
+DumpAllLocks()
+{
+       SHMEM_OFFSET location;
+       PROC            *proc;
+       XIDLookupEnt *xidLook = NULL;
+       LOCK            *lock;
+       int             pid;
+       int             count = 0;
+       int                     lockmethod = DEFAULT_LOCKMETHOD;
+       LOCKMETHODTABLE         *lockMethodTable;
+       HTAB            *xidTable;
+
+       pid = getpid();
+       ShmemPIDLookup(pid,&location);
+       if (location == INVALID_OFFSET)
+               return;
+       proc = (PROC *) MAKE_PTR(location);
+       if (proc != MyProc)
+               return;
+
+       Assert(lockmethod < NumLockMethods);
+       lockMethodTable = LockMethodTable[lockmethod];
+       if (!lockMethodTable)
+               return;
+
+       xidTable = lockMethodTable->xidHash;
+
+       if (MyProc->waitLock) {
+               LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0);
+       }
+
+       hash_seq(NULL);
+       while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
+                  (xidLook != (XIDLookupEnt *)TRUE)) {
+               XID_PRINT_AUX("DumpAllLocks", xidLook);
+
+               if (xidLook->tag.lock) {
+                       lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+                       LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
+               } else {
+                       elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
+               }
+
+               if (count++ > 2000) {
+                       elog(NOTICE,"DumpAllLocks: possible loop, giving up");
+                       break;
+               }
+       }
+}
 #endif
index 0998389ffe91ec373a01760c722c3b68fb9f493b..3887a8e37d16b7051cb739a8342b511d15db13d2 100644 (file)
@@ -12,7 +12,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $
  *
  * NOTES:
  *      (1) The lock.c module assumes that the caller here is doing
 #include "utils/rel.h"
 #include "miscadmin.h"                 /* MyDatabaseId */
 
-static bool
-MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-                        PG_LOCK_LEVEL level);
-static bool
-MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
-                        PG_LOCK_LEVEL level);
+static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag,
+                                                LOCKMODE lockmode, PG_LOCK_LEVEL level);
+static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag,
+                                                LOCKMODE lockmode, PG_LOCK_LEVEL level);
 
 #ifdef LowLevelLocking
 
@@ -130,6 +128,7 @@ static int  MultiPrios[] = {
  * lock table is ONE lock table, not three.
  */
 LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL;
+LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
 #ifdef NOT_USED
 LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL;
 #endif
@@ -150,12 +149,25 @@ InitMultiLevelLocks()
        /* -----------------------
         * No short term lock table for now.  -Jeff 15 July 1991
         *
-        * ShortTermTableId = LockTableRename(lockmethod);
+        * ShortTermTableId = LockMethodTableRename(lockmethod);
         * if (! (ShortTermTableId)) {
         *       elog(ERROR,"InitMultiLocks: couldnt rename lock table");
         * }
         * -----------------------
         */
+
+#ifdef USER_LOCKS
+       /*
+        * Allocate another tableId for long-term locks
+        */
+       LongTermTableId = LockMethodTableRename(MultiTableId);
+       if (!(LongTermTableId))
+       {
+               elog(ERROR,
+                        "InitMultiLevelLocks: couldn't rename long-term lock table");
+       }
+#endif
+
        return MultiTableId;
 }
 
index fb052582e79d7fc0ba43338270620225fdf4bc64..4cd9602a8b51dae5ecf3beb29407761da02427fb 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
 #include "storage/shmem.h"
 #include "storage/spin.h"
 #include "storage/proc.h"
+#include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
 static PROC *ProcWakeup(PROC *proc, int errType);
 
+#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
+
 /* --------------------
  * Spin lock for manipulating the shared process data structure:
  * ProcGlobal.... Adding an extra spin lock seemed like the smallest
@@ -247,10 +250,7 @@ InitProcess(IPCKey key)
         */
        SpinRelease(ProcStructLock);
 
-       MyProc->pid = 0;
-#if 0
        MyProc->pid = MyProcPid;
-#endif
        MyProc->xid = InvalidTransactionId;
 #ifdef LowLevelLocking
        MyProc->xmin = InvalidTransactionId;
@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid)
         * ---------------
         */
        ProcReleaseSpins(proc);
-       LockReleaseAll(1, &proc->lockQueue);
+       LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
 
 #ifdef USER_LOCKS
-       LockReleaseAll(0, &proc->lockQueue);
+       /*
+        * Assume we have a second lock table.
+        */
+       LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
 #endif
 
        /* ----------------
@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 int
-ProcSleep(PROC_QUEUE *waitQueue,
+ProcSleep(PROC_QUEUE *waitQueue,               /* lock->waitProcs */
                  SPINLOCK spinlock,
-                 int token,
+                 int token,                                    /* lockmode */
                  int prio,
-                 LOCK *lock)
+                 LOCK *lock,
+                 TransactionId xid)                /* needed by user locks, see below */
 {
        int                     i;
        PROC       *proc;
@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
        proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
        /* If we are a reader, and they are writers, skip past them */
-
        for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
                proc = (PROC *) MAKE_PTR(proc->links.prev);
 
@@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue,
        MyProc->token = token;
        MyProc->waitLock = lock;
 
+#ifdef USER_LOCKS
+       /* -------------------
+        * Currently, we only need this for the ProcWakeup routines.
+        * This must be 0 for user lock, so we can't just use the value
+        * from GetCurrentTransactionId().
+        * -------------------
+        */
+       TransactionIdStore(xid, &MyProc->xid);
+#else
 #ifndef LowLevelLocking
        /* -------------------
         * currently, we only need this for the ProcWakeup routines
         * -------------------
         */
        TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
+#endif
 #endif
 
        /* -------------------
@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
         * --------------
         */
        MemSet(&timeval, 0, sizeof(struct itimerval));
-       timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+       timeval.it_value.tv_sec = \
+               (DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER);
 
        do
        {
@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
                 * the semaphore implementation.
                 * --------------
                 */
-               IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+               IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+                                                IpcExclusiveLock);
        } while (MyProc->errType == STATUS_NOT_FOUND);          /* sleep after deadlock
                                                                                                                 * check */
 
@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
         * ---------------
         */
        timeval.it_value.tv_sec = 0;
-
-
        if (setitimer(ITIMER_REAL, &timeval, &dummy))
                elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
 
@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue,
         */
        SpinAcquire(spinlock);
 
+#ifdef LOCK_MGR_DEBUG
+       /* Just to get meaningful debug messages from DumpLocks() */
+       MyProc->waitLock = (LOCK *)NULL;
+#endif
+
        return (MyProc->errType);
 }
 
@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
        PROC       *proc;
        int                     count;
+       int                     trace_flag;
+       int                     last_locktype = -1;
+       int                     queue_size = queue->size;
+
+       Assert(queue->size >= 0);
 
        if (!queue->size)
                return (STATUS_NOT_FOUND);
 
        proc = (PROC *) MAKE_PTR(queue->links.prev);
        count = 0;
-       while ((LockResolveConflicts(lockmethod,
+       while ((queue_size--) && (proc))
+       {
+               /*
+                * This proc will conflict as the previous one did, don't even try.
+                */
+               if (proc->token == last_locktype)
+               {
+                       continue;
+               }
+
+               /*
+                * This proc conflicts with locks held by others, ignored.
+                */
+               if (LockResolveConflicts(lockmethod,
                                                                 lock,
                                                                 proc->token,
-                                                                proc->xid) == STATUS_OK))
-       {
+                                                                proc->xid,
+                                                                (XIDLookupEnt *) NULL) != STATUS_OK)
+               {
+                       last_locktype = proc->token;
+                       continue;
+               }
 
                /*
                 * there was a waiting process, grant it the lock before waking it
@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                 * time that the awoken process begins executing again.
                 */
                GrantLock(lock, proc->token);
-               queue->size--;
 
                /*
                 * ProcWakeup removes proc from the lock waiting process queue and
                 * returns the next proc in chain.
                 */
-               proc = ProcWakeup(proc, NO_ERROR);
 
                count++;
-               if (!proc || queue->size == 0)
-                       break;
+               queue->size--;
+               proc = ProcWakeup(proc, NO_ERROR);
        }
 
+       Assert(queue->size >= 0);
+
        if (count)
                return (STATUS_OK);
-       else
+       else {
                /* Something is still blocking us.      May have deadlocked. */
+               trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \
+                       TRACE_USERLOCKS : TRACE_LOCKS;
+               TPRINTF(trace_flag,
+                               "ProcLockWakeup: lock(%x) can't wake up any process",
+                               MAKE_OFFSET(lock));
+#ifdef DEADLOCK_DEBUG
+               if (pg_options[trace_flag] >= 2)
+                       DumpAllLocks();
+#endif
                return (STATUS_NOT_FOUND);
+       }
 }
 
 void
@@ -685,7 +735,7 @@ HandleDeadLock(int sig)
        }
 
 #ifdef DEADLOCK_DEBUG
-       DumpLocks();
+       DumpAllLocks();
 #endif
 
        if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
@@ -711,7 +761,8 @@ HandleDeadLock(int sig)
         * I was awoken by a signal, not by someone unlocking my semaphore.
         * ------------------
         */
-       IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+       IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+                                          IpcExclusiveLock);
 
        /* -------------
         * Set MyProc->errType to STATUS_ERROR so that we abort after
index fd1f95aab988368eef0b1b5d5bed73f5d541384c..e989e57b4404ce8e47c8bf0b14bfbfb594cdd625 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.16 1998/08/01 15:26:37 vadim Exp $
+ * $Id: lock.h,v 1.17 1998/08/25 21:20:31 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,6 +15,8 @@
 
 #include <storage/shmem.h>
 #include <storage/itemptr.h>
+#include <storage/sinvaladt.h>
+#include <utils/array.h>
 
 extern SPINLOCK LockMgrLock;
 typedef int MASK;
@@ -32,7 +34,7 @@ typedef int MASK;
  * NLOCKENTS - The maximum number of lock entries in the lock table.
  * ----------------------
  */
-#define NBACKENDS 50
+#define NBACKENDS MaxBackendId
 #define NLOCKS_PER_XACT 40
 #define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS
 
@@ -51,9 +53,14 @@ typedef int LOCKMETHOD;
  * CreateSpinLocks() or the number of shared memory locations allocated
  * for lock table spin locks in the case of machines with TAS instructions.
  */
-#define MAX_LOCK_METHODS 2
+#define MAX_LOCK_METHODS       3
 
-#define INVALID_TABLEID 0
+#define INVALID_TABLEID                0
+
+#define INVALID_LOCKMETHOD     INVALID_TABLEID
+#define DEFAULT_LOCKMETHOD     1
+#define USER_LOCKMETHOD                2
+#define MIN_LOCKMETHOD         DEFAULT_LOCKMETHOD
 
 /*typedef struct LOCK LOCK; */
 
@@ -63,9 +70,11 @@ typedef struct LTAG
        Oid                     relId;
        Oid                     dbId;
        ItemPointerData tupleId;
+       uint16          lockmethod;                             /* needed by user locks */
 } LOCKTAG;
 
 #define TAGSIZE (sizeof(LOCKTAG))
+#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod)
 
 /* This is the control structure for a lock table.     It
  * lives in shared memory:
@@ -143,8 +152,18 @@ typedef struct XIDTAG
        SHMEM_OFFSET lock;
        int                     pid;
        TransactionId xid;
+#ifdef USE_XIDTAG_LOCKMETHOD
+       uint16          lockmethod;                             /* for debug or consistency checking */
+#endif
 } XIDTAG;
 
+#ifdef USE_XIDTAG_LOCKMETHOD
+#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod)
+#else
+#define XIDTAG_LOCKMETHOD(xidtag) \
+               (((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod)
+#endif
+
 typedef struct XIDLookupEnt
 {
        /* tag */
@@ -157,6 +176,7 @@ typedef struct XIDLookupEnt
 } XIDLookupEnt;
 
 #define XID_TAGSIZE (sizeof(XIDTAG))
+#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag))
 
 /* originally in procq.h */
 typedef struct PROC_QUEUE
@@ -191,14 +211,16 @@ typedef struct LOCK
        int                     nActive;
 } LOCK;
 
-#define LockGetLock_nHolders(l) l->nHolders
+#define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag))
 
+#define LockGetLock_nHolders(l) l->nHolders
+#ifdef NOT_USED
 #define LockDecrWaitHolders(lock, lockmode) \
 ( \
   lock->nHolding--, \
   lock->holders[lockmode]-- \
 )
-
+#endif
 #define LockLockTable() SpinAcquire(LockMgrLock);
 #define UnlockLockTable() SpinRelease(LockMgrLock);
 
@@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock;
  */
 extern void InitLocks(void);
 extern void LockDisable(int status);
-extern LOCKMETHOD
-LockMethodTableInit(char *tabName, MASK *conflictsP, int *prioP,
-                       int numModes);
-extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
-extern int
-LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-                                        TransactionId xid);
-extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode);
+extern LOCKMETHOD LockMethodTableInit(char *tabName, MASK *conflictsP,
+                                                                         int *prioP, int numModes);
+extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
+extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+                                               LOCKMODE lockmode);
+extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock,
+                                                               LOCKMODE lockmode, TransactionId xid,
+                                                               XIDLookupEnt *xidentP);
+extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+                                               LOCKMODE lockmode);
 extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int     LockShmemSize(void);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check);
+extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
+                                                 bool skip_check);
+ArrayType* LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
-
+extern void DumpAllLocks(void);
 #endif
 
 #endif                                                 /* LOCK_H */
index 8b8b7e28488daea7a1956b5d886a9a3115dd0c67..50f7b03ef633d37b4208e0af2ee6593609c063d7 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.13 1998/07/27 19:38:38 vadim Exp $
+ * $Id: proc.h,v 1.14 1998/08/25 21:20:32 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -105,10 +105,10 @@ extern bool ProcRemove(int pid);
 /* make static in storage/lmgr/proc.c -- jolly */
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int
-ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
-                 int prio, LOCK *lock);
-extern int     ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock);
+extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
+                                        int prio, LOCK *lock, TransactionId xid);
+extern int     ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
+                                                  LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);
 extern void ProcReleaseSpins(PROC *proc);
 extern void ProcFreeAllSemaphores(void);