From 7dbcf31be29d2f4b57dbbb2c57a3c783e3595ac6 Mon Sep 17 00:00:00 2001 From: "Marc G. Fournier" Date: Tue, 25 Aug 1998 21:20:32 +0000 Subject: [PATCH] From: Massimo Dal Zotto lock.patch I have rewritten lock.c cleaning up the code and adding better assert checking I have also added some fields to the lock and xid tags for better support of user locks. There is also a new function which returns an array of pids owning a lock. I'm using this code from over six months and it works fine. --- src/backend/storage/lmgr/lock.c | 1267 +++++++++++++++++++----------- src/backend/storage/lmgr/multi.c | 28 +- src/backend/storage/lmgr/proc.c | 101 ++- src/include/storage/lock.h | 58 +- src/include/storage/proc.h | 10 +- 5 files changed, 958 insertions(+), 506 deletions(-) diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 4b83c9c0d1..b35008a3bb 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -18,11 +18,9 @@ * * Interface: * - * LockAcquire(), LockRelease(), LockMethodTableInit(). - * - * LockReplace() is called only within this module and by the - * lkchain module. It releases a lock without looking - * the lock up in the lock table. + * LockAcquire(), LockRelease(), LockMethodTableInit(), + * LockMethodTableRename(), LockReleaseAll, LockOwners() + * LockResolveConflicts(), GrantLock() * * NOTE: This module is used to define new lock tables. The * multi-level lock table (multi.c) used by the heap @@ -35,6 +33,7 @@ #include #include #include +#include #include "postgres.h" #include "miscadmin.h" @@ -49,25 +48,110 @@ #include "utils/palloc.h" #include "access/xact.h" #include "access/transam.h" +#include "utils/trace.h" +#include "utils/ps_status.h" -static int -WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); - -/*#define LOCK_MGR_DEBUG*/ - -#ifndef LOCK_MGR_DEBUG +static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, + TransactionId xid); -#define LOCK_PRINT(where,tag,type) -#define LOCK_DUMP(where,lock,type) -#define LOCK_DUMP_AUX(where,lock,type) +/* + * lockDebugRelation can be used to trace unconditionally a single relation, + * for example pg_listener, if you suspect there are locking problems. + * + * lockDebugOidMin is is used to avoid tracing postgres relations, which + * would produce a lot of output. Unfortunately most system relations are + * created after bootstrap and have oid greater than BootstrapObjectIdData. + * If you are using tprintf you should specify a value greater than the max + * oid of system relations, which can be found with the following query: + * + * select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_'; + * + * To get a useful lock trace you can use the following pg_options: + * + * -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500" + */ +#define LOCKDEBUG(lockmethod) (pg_options[TRACE_SHORTLOCKS+lockmethod]) +#define lockDebugRelation (pg_options[TRACE_LOCKRELATION]) +#define lockDebugOidMin (pg_options[TRACE_LOCKOIDMIN]) +#define lockReadPriority (pg_options[OPT_LOCKREADPRIORITY]) + +#ifdef LOCK_MGR_DEBUG +#define LOCK_PRINT(where,lock,type) \ + if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \ + && (lock->tag.relId >= lockDebugOidMin)) \ + || (lock->tag.relId == lockDebugRelation)) \ + LOCK_PRINT_AUX(where,lock,type) + +#define LOCK_PRINT_AUX(where,lock,type) \ + TPRINTF(TRACE_ALL, \ + "%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \ + "hold(%d,%d,%d,%d,%d)=%d " \ + "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \ + where, \ + MAKE_OFFSET(lock), \ + lock->tag.lockmethod, \ + lock->tag.relId, \ + lock->tag.dbId, \ + ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \ + lock->tag.tupleId.ip_blkid.bi_lo), \ + lock->tag.tupleId.ip_posid, \ + lock->mask, \ + lock->holders[1], \ + lock->holders[2], \ + lock->holders[3], \ + lock->holders[4], \ + lock->holders[5], \ + lock->nHolding, \ + lock->activeHolders[1], \ + lock->activeHolders[2], \ + lock->activeHolders[3], \ + lock->activeHolders[4], \ + lock->activeHolders[5], \ + lock->nActive, \ + lock->waitProcs.size, \ + lock_types[type]) + +#define XID_PRINT(where,xidentP) \ + if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \ + && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \ + >= lockDebugOidMin)) \ + || (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \ + == lockDebugRelation)) \ + XID_PRINT_AUX(where,xidentP) + +#define XID_PRINT_AUX(where,xidentP) \ + TPRINTF(TRACE_ALL, \ + "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \ + "hold(%d,%d,%d,%d,%d)=%d", \ + where, \ + MAKE_OFFSET(xidentP), \ + xidentP->tag.lock, \ + XIDENT_LOCKMETHOD(*(xidentP)), \ + xidentP->tag.pid, \ + xidentP->tag.xid, \ + xidentP->holders[1], \ + xidentP->holders[2], \ + xidentP->holders[3], \ + xidentP->holders[4], \ + xidentP->holders[5], \ + xidentP->nHolding) + +#define LOCK_TPRINTF(lock, args...) \ + if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \ + && (lock->tag.relId >= lockDebugOidMin)) \ + || (lock->tag.relId == lockDebugRelation)) \ + TPRINTF(TRACE_ALL, args) + +#else /* !LOCK_MGR_DEBUG */ +#define LOCK_PRINT(where,lock,type) +#define LOCK_PRINT_AUX(where,lock,type) #define XID_PRINT(where,xidentP) +#define XID_PRINT_AUX(where,xidentP) +#define LOCK_TPRINTF(lock, args...) +#endif /* !LOCK_MGR_DEBUG */ -#else /* LOCK_MGR_DEBUG */ - -int lockDebug = 0; -unsigned int lock_debug_oid_min = BootstrapObjectIdData; static char *lock_types[] = { - "NONE", + "", "WRITE", "READ", "WRITE INTENT", @@ -75,59 +159,6 @@ static char *lock_types[] = { "EXTEND" }; -#define LOCK_PRINT(where,tag,type)\ - if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \ - elog(DEBUG, \ - "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \ - MyProcPid,\ - tag->relId, tag->dbId, \ - ((tag->tupleId.ip_blkid.bi_hi<<16)+\ - tag->tupleId.ip_blkid.bi_lo),\ - tag->tupleId.ip_posid, \ - lock_types[type]) - -#define LOCK_DUMP(where,lock,type)\ - if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \ - LOCK_DUMP_AUX(where,lock,type) - -#define LOCK_DUMP_AUX(where,lock,type)\ - elog(DEBUG, \ - "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\ - "holders (%d,%d,%d,%d,%d) type (%s)",where, \ - MyProcPid,\ - lock->tag.relId, lock->tag.dbId, \ - ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\ - lock->tag.tupleId.ip_blkid.bi_lo),\ - lock->tag.tupleId.ip_posid, \ - lock->nHolding,\ - lock->holders[1],\ - lock->holders[2],\ - lock->holders[3],\ - lock->holders[4],\ - lock->holders[5],\ - lock_types[type]) - -#define XID_PRINT(where,xidentP)\ - if ((lockDebug >= 2) && \ - (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \ - >= lock_debug_oid_min)) \ - elog(DEBUG,\ - "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\ - "holders (%d,%d,%d,%d,%d)",\ - where,\ - MyProcPid,\ - xidentP->tag.xid,\ - xidentP->tag.pid,\ - xidentP->tag.lock,\ - xidentP->nHolding,\ - xidentP->holders[1],\ - xidentP->holders[2],\ - xidentP->holders[3],\ - xidentP->holders[4],\ - xidentP->holders[5]) - -#endif /* LOCK_MGR_DEBUG */ - SPINLOCK LockMgrLock; /* in Shmem or created in * CreateSpinlocks() */ @@ -171,6 +202,16 @@ InitLocks() BITS_ON[i] = bit; BITS_OFF[i] = ~bit; } + +#ifdef LOCK_MGR_DEBUG + /* + * If lockDebugOidMin value has not been specified + * in pg_options set a default value. + */ + if (!lockDebugOidMin) { + lockDebugOidMin = BootstrapObjectIdData; + } +#endif } /* ------------------- @@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName, { elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d", numModes, MAX_LOCKMODES); - return (INVALID_TABLEID); + return (INVALID_LOCKMETHOD); } /* allocate a string for the shmem index table lookup */ @@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName, if (!shmemName) { elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName); - return (INVALID_TABLEID); + return (INVALID_LOCKMETHOD); } /* each lock table has a non-shared header */ @@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName, { elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName); pfree(shmemName); - return (INVALID_TABLEID); + return (INVALID_LOCKMETHOD); } /* ------------------------ @@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName, if (status) return (lockMethodTable->ctl->lockmethod); else - return (INVALID_TABLEID); + return (INVALID_LOCKMETHOD); } /* @@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName, * client to use different lockmethods when acquiring/releasing * short term and long term locks. */ -#ifdef NOT_USED + LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod) { LOCKMETHOD newLockMethod; if (NumLockMethods >= MAX_LOCK_METHODS) - return (INVALID_TABLEID); - if (LockMethodTable[lockmethod] == INVALID_TABLEID) - return (INVALID_TABLEID); + return (INVALID_LOCKMETHOD); + if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD) + return (INVALID_LOCKMETHOD); /* other modules refer to the lock table by a lockmethod */ newLockMethod = NumLockMethods; @@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod) LockMethodTable[newLockMethod] = LockMethodTable[lockmethod]; return (newLockMethod); } -#endif /* * LockAcquire -- Check for lock conflicts, sleep if conflict found, @@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod) * Side Effects: The lock is always acquired. No way to abort * a lock acquisition other than aborting the transaction. * Lock is recorded in the lkchain. + * #ifdef USER_LOCKS + * * Note on User Locks: + * * User locks are handled totally on the application side as * long term cooperative locks which extend beyond the normal * transaction boundaries. Their purpose is to indicate to an @@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod) * acquired if already held by another process. They must be * released explicitly by the application but they are released * automatically when a backend terminates. - * They are indicated by a dummy lockmethod 0 which doesn't have - * any table allocated but uses the normal lock table, and are - * distinguished from normal locks for the following differences: + * They are indicated by a lockmethod 2 which is an alias for the + * normal lock table, and are distinguished from normal locks + * for the following differences: * * normal lock user lock * - * lockmethod 1 0 + * lockmethod 1 2 * tag.relId rel oid 0 * tag.ItemPointerData.ip_blkid block id lock id2 * tag.ItemPointerData.ip_posid tuple offset lock id1 * xid.pid 0 backend pid - * xid.xid current xid 0 + * xid.xid xid or 0 0 * persistence transaction user or backend * * The lockmode parameter can have the same values for normal locks * although probably only WRITE_LOCK can have some practical use. * - * DZ - 4 Oct 1996 + * DZ - 22 Nov 1997 #endif */ @@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) SPINLOCK masterLock; LOCKMETHODTABLE *lockMethodTable; int status; - TransactionId myXid; + TransactionId xid; #ifdef USER_LOCKS int is_user_lock; - is_user_lock = (lockmethod == 0); + is_user_lock = (lockmethod == USER_LOCKMETHOD); if (is_user_lock) { - lockmethod = 1; #ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d", - locktag->tupleId.ip_posid, - ((locktag->tupleId.ip_blkid.bi_hi << 16) + - locktag->tupleId.ip_blkid.bi_lo), - lockmode); + TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s", + locktag->tupleId.ip_posid, + ((locktag->tupleId.ip_blkid.bi_hi << 16) + + locktag->tupleId.ip_blkid.bi_lo), + lock_types[lockmode]); #endif } #endif + /* ???????? This must be changed when short term locks will be used */ + locktag->lockmethod = lockmethod; + Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) @@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (LockingIsDisabled) return (TRUE); - LOCK_PRINT("Acquire", locktag, lockmode); masterLock = lockMethodTable->ctl->masterLock; SpinAcquire(masterLock); + /* + * Find or create a lock with this tag + */ Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found); - + lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, + HASH_ENTER, &found); if (!lock) { SpinRelease(masterLock); @@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (!found) { lock->mask = 0; - ProcQueueInit(&(lock->waitProcs)); - MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES); - MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES); lock->nHolding = 0; lock->nActive = 0; - + MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES); + MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES); + ProcQueueInit(&(lock->waitProcs)); Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid), &(locktag->tupleId.ip_blkid))); - + LOCK_PRINT("LockAcquire: new", lock, lockmode); + } else { + LOCK_PRINT("LockAcquire: found", lock, lockmode); + Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); + Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0)); + Assert(lock->nActive <= lock->nHolding); } /* ------------------ @@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * ------------------ */ xidTable = lockMethodTable->xidHash; - myXid = GetCurrentTransactionId(); /* ------------------ * Zero out all of the tag bytes (this clears the padding bytes for long @@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * ------------------ */ MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */ - TransactionIdStore(myXid, &item.tag.xid); item.tag.lock = MAKE_OFFSET(lock); -#if 0 - item.tag.pid = MyPid; +#ifdef USE_XIDTAG_LOCKMETHOD + item.tag.lockmethod = lockmethod; #endif - #ifdef USER_LOCKS if (is_user_lock) { item.tag.pid = MyProcPid; - item.tag.xid = myXid = 0; -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]", - item.tag.lock, item.tag.pid, item.tag.xid); -#endif + item.tag.xid = xid = 0; + } else { + xid = GetCurrentTransactionId(); + TransactionIdStore(xid, &item.tag.xid); } +#else + xid = GetCurrentTransactionId(); + TransactionIdStore(xid, &item.tag.xid); #endif - result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found); + /* + * Find or create an xid entry with this tag + */ + result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + HASH_ENTER, &found); if (!result) { elog(NOTICE, "LockAcquire: xid table corrupted"); return (STATUS_ERROR); } + + /* + * If not found initialize the new entry + */ if (!found) { - XID_PRINT("LockAcquire: queueing XidEnt", result); - ProcAddLock(&result->queue); result->nHolding = 0; MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES); + ProcAddLock(&result->queue); + XID_PRINT("LockAcquire: new", result); + } else { + XID_PRINT("LockAcquire: found", result); + Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0)); + Assert(result->nHolding <= lock->nActive); } /* ---------------- @@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ lock->nHolding++; lock->holders[lockmode]++; + Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0)); /* -------------------- * If I'm the only one holding a lock, then there @@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) { result->holders[lockmode]++; result->nHolding++; + XID_PRINT("LockAcquire: owning", result); + Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); GrantLock(lock, lockmode); SpinRelease(masterLock); return (TRUE); } - Assert(result->nHolding <= lock->nActive); - - status = LockResolveConflicts(lockmethod, lock, lockmode, myXid); - + status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); if (status == STATUS_OK) GrantLock(lock, lockmode); else if (status == STATUS_FOUND) { #ifdef USER_LOCKS - /* - * User locks are non blocking. If we can't acquire a lock remove - * the xid entry and return FALSE without waiting. + * User locks are non blocking. If we can't acquire a lock we must + * remove the xid entry and return FALSE without waiting. */ if (is_user_lock) { if (!result->nHolding) { SHMQueueDelete(&result->queue); - hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found); + result = (XIDLookupEnt *) hash_search(xidTable, + (Pointer) &result, + HASH_REMOVE, &found); + if (!result || !found) { + elog(NOTICE, "LockAcquire: remove xid, table corrupted"); + } + } else { + XID_PRINT_AUX("LockAcquire: NHOLDING", result); } lock->nHolding--; lock->holders[lockmode]--; + LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode); + Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); + Assert(lock->nActive <= lock->nHolding); SpinRelease(masterLock); -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockAcquire: user lock failed"); -#endif return (FALSE); } #endif - status = WaitOnLock(lockmethod, lock, lockmode); - XID_PRINT("Someone granted me the lock", result); + status = WaitOnLock(lockmethod, lock, lockmode, xid); + /* + * Check the xid entry status, in case something in the + * ipc communication doesn't work correctly. + */ + if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) { + XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result); + LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode); + /* Should we retry ? */ + return (FALSE); + } + XID_PRINT("LockAcquire: granted", result); + LOCK_PRINT("LockAcquire: granted", lock, lockmode); } SpinRelease(masterLock); @@ -646,7 +725,8 @@ int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, - TransactionId xid) + TransactionId xid, + XIDLookupEnt *xidentP) /* xident ptr or NULL */ { XIDLookupEnt *result, item; @@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod, int bitmask; int i, tmpMask; +#ifdef USER_LOCKS + int is_user_lock; +#endif numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes; xidTable = LockMethodTable[lockmethod]->xidHash; - /* --------------------- - * read my own statistics from the xid table. If there - * isn't an entry, then we'll just add one. - * - * Zero out the tag, this clears the padding bytes for long - * word alignment and ensures hashing consistency. - * ------------------ - */ - MemSet(&item, 0, XID_TAGSIZE); - TransactionIdStore(xid, &item.tag.xid); - item.tag.lock = MAKE_OFFSET(lock); -#if 0 - item.tag.pid = pid; + if (xidentP) { + /* + * A pointer to the xid entry was supplied from the caller. + * Actually only LockAcquire can do it. + */ + result = xidentP; + } else { + /* --------------------- + * read my own statistics from the xid table. If there + * isn't an entry, then we'll just add one. + * + * Zero out the tag, this clears the padding bytes for long + * word alignment and ensures hashing consistency. + * ------------------ + */ + MemSet(&item, 0, XID_TAGSIZE); + item.tag.lock = MAKE_OFFSET(lock); +#ifdef USE_XIDTAG_LOCKMETHOD + item.tag.lockmethod = lockmethod; +#endif +#ifdef USER_LOCKS + is_user_lock = (lockmethod == 2); + if (is_user_lock) { + item.tag.pid = MyProcPid; + item.tag.xid = 0; + } else { + TransactionIdStore(xid, &item.tag.xid); + } +#else + TransactionIdStore(xid, &item.tag.xid); #endif - if (!(result = (XIDLookupEnt *) - hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found))) - { - elog(NOTICE, "LockResolveConflicts: xid table corrupted"); - return (STATUS_ERROR); - } - myHolders = result->holders; + /* + * Find or create an xid entry with this tag + */ + result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + HASH_ENTER, &found); + if (!result) + { + elog(NOTICE, "LockResolveConflicts: xid table corrupted"); + return (STATUS_ERROR); + } - if (!found) - { - /* --------------- - * we're not holding any type of lock yet. Clear - * the lock stats. - * --------------- + /* + * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN, + * if we are trying to resolve a conflict we must already have + * allocated an xid entry for this lock. dz 21-11-1997 */ - MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders))); - result->nHolding = 0; + if (!found) + { + /* --------------- + * we're not holding any type of lock yet. Clear + * the lock stats. + * --------------- + */ + MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders))); + result->nHolding = 0; + XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result); + } else { + XID_PRINT("LockResolveConflicts: found", result); + } } + Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0)); + /* + * We can control runtime this option. Default is lockReadPriority=0 + */ + if (!lockReadPriority) { /* ------------------------ * If someone with a greater priority is waiting for the lock, @@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod, PROC_QUEUE *waitQueue = &(lock->waitProcs); PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev); - if (waitQueue->size && topproc->prio > myprio) + if (waitQueue->size && topproc->prio > myprio) { + XID_PRINT("LockResolveConflicts: higher priority proc waiting", + result); return STATUS_FOUND; + } } /* ---------------------------- @@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod, */ if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask)) { - result->holders[lockmode]++; result->nHolding++; - - XID_PRINT("Conflict Resolved: updated xid entry stats", result); - + XID_PRINT("LockResolveConflicts: no conflict", result); + Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); return (STATUS_OK); } @@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, * that does not reflect our own locks. * ------------------------ */ + myHolders = result->holders; bitmask = 0; tmpMask = 2; for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) @@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod, */ if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask)) { - /* no conflict. Get the lock and go on */ - result->holders[lockmode]++; result->nHolding++; - - XID_PRINT("Conflict Resolved: updated xid entry stats", result); - + XID_PRINT("LockResolveConflicts: resolved", result); + Assert((result->nHolding > 0) && (result->holders[lockmode] > 0)); return (STATUS_OK); - } + XID_PRINT("LockResolveConflicts: conflicting", result); return (STATUS_FOUND); } +/* + * GrantLock -- update the lock data structure to show + * the new lock holder. + */ +void +GrantLock(LOCK *lock, LOCKMODE lockmode) +{ + lock->nActive++; + lock->activeHolders[lockmode]++; + lock->mask |= BITS_ON[lockmode]; + LOCK_PRINT("GrantLock", lock, lockmode); + Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0)); + Assert(lock->nActive <= lock->nHolding); +} + static int -WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode) +WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, + TransactionId xid) { PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod]; int prio = lockMethodTable->ctl->prio[lockmode]; + char old_status[64], + new_status[64]; Assert(lockmethod < NumLockMethods); @@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode) * synchronization for this queue. That will not be true if/when * people can be deleted from the queue by a SIGINT or something. */ - LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode); + LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode); + strcpy(old_status, PS_STATUS); + strcpy(new_status, PS_STATUS); + strcat(new_status, " waiting"); + PS_SET_STATUS(new_status); if (ProcSleep(waitQueue, lockMethodTable->ctl->masterLock, lockmode, prio, - lock) != NO_ERROR) + lock, + xid) != NO_ERROR) { /* ------------------- - * This could have happend as a result of a deadlock, see HandleDeadLock() - * Decrement the lock nHolding and holders fields as we are no longer - * waiting on this lock. + * This could have happend as a result of a deadlock, + * see HandleDeadLock(). + * Decrement the lock nHolding and holders fields as + * we are no longer waiting on this lock. * ------------------- */ lock->nHolding--; lock->holders[lockmode]--; - LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode); + LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode); + Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0)); + Assert(lock->nActive <= lock->nHolding); SpinRelease(lockMethodTable->ctl->masterLock); elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction"); + + /* not reached */ } - LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode); + PS_SET_STATUS(old_status); + LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode); return (STATUS_OK); } @@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) XIDLookupEnt *result, item; HTAB *xidTable; + TransactionId xid; bool wakeupNeeded = true; + int trace_flag; #ifdef USER_LOCKS int is_user_lock; - is_user_lock = (lockmethod == 0); + is_user_lock = (lockmethod == USER_LOCKMETHOD); if (is_user_lock) { - lockmethod = 1; -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d", - locktag->tupleId.ip_posid, - ((locktag->tupleId.ip_blkid.bi_hi << 16) + - locktag->tupleId.ip_blkid.bi_lo), - lockmode); -#endif + TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d", + locktag->tupleId.ip_posid, + ((locktag->tupleId.ip_blkid.bi_hi << 16) + + locktag->tupleId.ip_blkid.bi_lo), + lockmode); } #endif + /* ???????? This must be changed when short term locks will be used */ + locktag->lockmethod = lockmethod; + +#ifdef USER_LOCKS + trace_flag = \ + (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS; +#else + trace_flag = TRACE_LOCKS; +#endif + Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; if (!lockMethodTable) @@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (LockingIsDisabled) return (TRUE); - LOCK_PRINT("Release", locktag, lockmode); - masterLock = lockMethodTable->ctl->masterLock; - xidTable = lockMethodTable->xidHash; - SpinAcquire(masterLock); - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) - hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found); - -#ifdef USER_LOCKS - /* - * If the entry is not found hash_search returns TRUE instead of NULL, - * so we must check it explicitly. + * Find a lock with this tag */ - if ((is_user_lock) && (lock == (LOCK *) TRUE)) - { - SpinRelease(masterLock); - elog(NOTICE, "LockRelease: there are no locks with this tag"); - return (FALSE); - } -#endif + Assert(lockMethodTable->lockHash->hash == tag_hash); + lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, + HASH_FIND, &found); /* - * let the caller print its own error message, too. Do not - * elog(ERROR). + * let the caller print its own error message, too. Do not elog(ERROR). */ if (!lock) { @@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (!found) { SpinRelease(masterLock); +#ifdef USER_LOCKS + if (is_user_lock) { + TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag"); + return (FALSE); + } +#endif elog(NOTICE, "LockRelease: locktable lookup failed, no lock"); return (FALSE); } + LOCK_PRINT("LockRelease: found", lock, lockmode); + Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); + Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0)); + Assert(lock->nActive <= lock->nHolding); - Assert(lock->nHolding > 0); - -#ifdef USER_LOCKS - - /* - * If this is an user lock it can be removed only after checking that - * it was acquired by the current process, so this code is skipped and - * executed later. - */ - if (!is_user_lock) - { -#endif - - /* - * fix the general lock stats - */ - lock->nHolding--; - lock->holders[lockmode]--; - lock->nActive--; - lock->activeHolders[lockmode]--; - - Assert(lock->nActive >= 0); - - if (!lock->nHolding) - { - /* ------------------ - * if there's no one waiting in the queue, - * we just released the last lock. - * Delete it from the lock table. - * ------------------ - */ - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, - (Pointer) &(lock->tag), - HASH_REMOVE_SAVED, - &found); - Assert(lock && found); - wakeupNeeded = false; - } -#ifdef USER_LOCKS - } -#endif /* ------------------ * Zero out all of the tag bytes (this clears the padding bytes for long @@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * ------------------ */ MemSet(&item, 0, XID_TAGSIZE); - - TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid); item.tag.lock = MAKE_OFFSET(lock); -#if 0 - item.tag.pid = MyPid; +#ifdef USE_XIDTAG_LOCKMETHOD + item.tag.lockmethod = lockmethod; #endif - #ifdef USER_LOCKS - if (is_user_lock) - { + if (is_user_lock) { item.tag.pid = MyProcPid; - item.tag.xid = 0; -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]", - item.tag.lock, item.tag.pid, item.tag.xid); -#endif + item.tag.xid = xid = 0; + } else { + xid = GetCurrentTransactionId(); + TransactionIdStore(xid, &item.tag.xid); } +#else + xid = GetCurrentTransactionId(); + TransactionIdStore(xid, &item.tag.xid); #endif - if (!(result = (XIDLookupEnt *) hash_search(xidTable, - (Pointer) &item, - HASH_FIND_SAVE, - &found)) - || !found) + /* + * Find an xid entry with this tag + */ + xidTable = lockMethodTable->xidHash; + result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, + HASH_FIND_SAVE, &found); + if (!result || !found) { SpinRelease(masterLock); #ifdef USER_LOCKS - if ((is_user_lock) && (result)) - elog(NOTICE, "LockRelease: you don't have a lock on this tag"); - else - elog(NOTICE, "LockRelease: find xid, table corrupted"); -#else - elog(NOTICE, "LockReplace: xid table corrupted"); + if (!found && is_user_lock) { + TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag"); + } else #endif + elog(NOTICE, "LockReplace: xid table corrupted"); return (FALSE); } + XID_PRINT("LockRelease: found", result); + Assert(result->tag.lock == MAKE_OFFSET(lock)); + + /* + * Check that we are actually holding a lock of the type we want + * to release. + */ + if (!(result->holders[lockmode] > 0)) { + SpinRelease(masterLock); + XID_PRINT_AUX("LockAcquire: WRONGTYPE", result); + elog(NOTICE, "LockRelease: you don't own a lock of type %s", + lock_types[lockmode]); + Assert(result->holders[lockmode] >= 0); + return (FALSE); + } + Assert(result->nHolding > 0); + + /* + * fix the general lock stats + */ + lock->nHolding--; + lock->holders[lockmode]--; + lock->nActive--; + lock->activeHolders[lockmode]--; + + /* -------------------------- + * If there are still active locks of the type I just released, no one + * should be woken up. Whoever is asleep will still conflict + * with the remaining locks. + * -------------------------- + */ + if (lock->activeHolders[lockmode]) + { + wakeupNeeded = false; + } + else + { + /* change the conflict mask. No more of this lock type. */ + lock->mask &= BITS_OFF[lockmode]; + } + + LOCK_PRINT("LockRelease: updated", lock, lockmode); + Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0)); + Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0)); + Assert(lock->nActive <= lock->nHolding); + + if (!lock->nHolding) + { + /* ------------------ + * if there's no one waiting in the queue, + * we just released the last lock. + * Delete it from the lock table. + * ------------------ + */ + Assert(lockMethodTable->lockHash->hash == tag_hash); + lock = (LOCK *) hash_search(lockMethodTable->lockHash, + (Pointer) &(lock->tag), + HASH_REMOVE, + &found); + Assert(lock && found); + wakeupNeeded = false; + } /* * now check to see if I have any private locks. If I do, decrement @@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ result->holders[lockmode]--; result->nHolding--; - - XID_PRINT("LockRelease updated xid stats", result); + XID_PRINT("LockRelease: updated", result); + Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0)); /* * If this was my last hold on this lock, delete my entry in the XID @@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ if (!result->nHolding) { -#ifdef USER_LOCKS - if (result->queue.prev == INVALID_OFFSET) + if (result->queue.prev == INVALID_OFFSET) { elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET"); - if (result->queue.next == INVALID_OFFSET) + } + if (result->queue.next == INVALID_OFFSET) { elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET"); -#endif + } if (result->queue.next != INVALID_OFFSET) SHMQueueDelete(&result->queue); - if (!(result = (XIDLookupEnt *) - hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) || - !found) + XID_PRINT("LockRelease: deleting", result); + result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result, + HASH_REMOVE_SAVED, &found); + if (!result || !found) { SpinRelease(masterLock); -#ifdef USER_LOCKS elog(NOTICE, "LockRelease: remove xid, table corrupted"); -#else - elog(NOTICE, "LockReplace: xid table corrupted"); -#endif return (FALSE); } } -#ifdef USER_LOCKS - - /* - * If this is an user lock remove it now, after the corresponding xid - * entry has been found and deleted. - */ - if (is_user_lock) - { - - /* - * fix the general lock stats - */ - lock->nHolding--; - lock->holders[lockmode]--; - lock->nActive--; - lock->activeHolders[lockmode]--; - - Assert(lock->nActive >= 0); - - if (!lock->nHolding) - { - /* ------------------ - * if there's no one waiting in the queue, - * we just released the last lock. - * Delete it from the lock table. - * ------------------ - */ - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, - (Pointer) &(lock->tag), - HASH_REMOVE, - &found); - Assert(lock && found); - wakeupNeeded = false; - } - } -#endif - - /* -------------------------- - * If there are still active locks of the type I just released, no one - * should be woken up. Whoever is asleep will still conflict - * with the remaining locks. - * -------------------------- - */ - if (!(lock->activeHolders[lockmode])) - { - /* change the conflict mask. No more of this lock type. */ - lock->mask &= BITS_OFF[lockmode]; - } - if (wakeupNeeded) { /* -------------------------- @@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) */ ProcLockWakeup(&(lock->waitProcs), lockmethod, lock); } + else + { + LOCK_TPRINTF(lock, "LockRelease: no wakeup needed"); + } SpinRelease(masterLock); return (TRUE); } -/* - * GrantLock -- update the lock data structure to show - * the new lock holder. - */ -void -GrantLock(LOCK *lock, LOCKMODE lockmode) -{ - lock->nActive++; - lock->activeHolders[lockmode]++; - lock->mask |= BITS_ON[lockmode]; -} - -#ifdef USER_LOCKS /* * LockReleaseAll -- Release all locks in a process lock queue. - * - * Note: This code is a little complicated by the presence in the - * same queue of user locks which can't be removed from the - * normal lock queue at the end of a transaction. They must - * however be removed when the backend exits. - * A dummy lockmethod 0 is used to indicate that we are releasing - * the user locks, from the code added to ProcKill(). */ -#endif bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) { @@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) int done; XIDLookupEnt *xidLook = NULL; XIDLookupEnt *tmp = NULL; + XIDLookupEnt *result; SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SPINLOCK masterLock; LOCKMETHODTABLE *lockMethodTable; @@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) numLockModes; LOCK *lock; bool found; + int trace_flag; + int xidtag_lockmethod; #ifdef USER_LOCKS int is_user_lock_table, count, - nskip; + nleft; - is_user_lock_table = (lockmethod == 0); -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid); -#endif - if (is_user_lock_table) - lockmethod = 1; + count = nleft = 0; + + is_user_lock_table = (lockmethod == USER_LOCKMETHOD); + trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS; +#else + trace_flag = TRACE_LOCKS; #endif + TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d", + lockmethod, MyProcPid); Assert(lockmethod < NumLockMethods); lockMethodTable = LockMethodTable[lockmethod]; - if (!lockMethodTable) + if (!lockMethodTable) { + elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod); return (FALSE); - - numLockModes = lockMethodTable->ctl->numLockModes; - masterLock = lockMethodTable->ctl->masterLock; + } if (SHMQueueEmpty(lockQueue)) return TRUE; -#ifdef USER_LOCKS + numLockModes = lockMethodTable->ctl->numLockModes; + masterLock = lockMethodTable->ctl->masterLock; + SpinAcquire(masterLock); -#endif SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); - XID_PRINT("LockReleaseAll", xidLook); - -#ifndef USER_LOCKS - SpinAcquire(masterLock); -#else - count = nskip = 0; -#endif for (;;) { + /* + * Sometimes the queue appears to be messed up. + */ + if (count++ > 1000) + { + elog(NOTICE, "LockReleaseAll: xid loop detected, giving up"); + nleft = 0; + break; + } + /* --------------------------- * XXX Here we assume the shared memory queue is circular and * that we know its internal structure. Should have some sort of @@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); - LOCK_PRINT("LockReleaseAll", (&lock->tag), 0); - -#ifdef USER_LOCKS + xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook); + if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) { + XID_PRINT("LockReleaseAll", xidLook); + LOCK_PRINT("LockReleaseAll", lock, 0); + } - /* - * Sometimes the queue appears to be messed up. - */ - if (count++ > 2000) - { - elog(NOTICE, "LockReleaseAll: xid loop detected, giving up"); - nskip = 0; - break; +#ifdef USE_XIDTAG_LOCKMETHOD + if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock)) + elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d", + xidtag_lockmethod, lock->tag.lockmethod); +#endif + if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) { + TPRINTF(trace_flag, "LockReleaseAll: skipping other table"); + nleft++; + goto next_item; } + + Assert(lock->nHolding > 0); + Assert(lock->nActive > 0); + Assert(lock->nActive <= lock->nHolding); + Assert(xidLook->nHolding >= 0); + Assert(xidLook->nHolding <= lock->nHolding); + +#ifdef USER_LOCKS if (is_user_lock_table) { if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) { -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]", - xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); -#endif - nskip++; + TPRINTF(TRACE_USERLOCKS, + "LockReleaseAll: skiping normal lock [%d,%d,%d]", + xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); + nleft++; goto next_item; } if (xidLook->tag.pid != MyProcPid) { - /* This should never happen */ -#ifdef USER_LOCKS_DEBUG + /* Should never happen */ elog(NOTICE, - "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]", + "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]", lock->tag.tupleId.ip_posid, ((lock->tag.tupleId.ip_blkid.bi_hi << 16) + lock->tag.tupleId.ip_blkid.bi_lo), - xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); -#endif - nskip++; + xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); + nleft++; goto next_item; } -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, - "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]", - lock->tag.tupleId.ip_posid, - ((lock->tag.tupleId.ip_blkid.bi_hi << 16) + - lock->tag.tupleId.ip_blkid.bi_lo), - xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); -#endif + TPRINTF(TRACE_USERLOCKS, + "LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]", + lock->tag.tupleId.ip_posid, + ((lock->tag.tupleId.ip_blkid.bi_hi << 16) + + lock->tag.tupleId.ip_blkid.bi_lo), + xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); } else { - if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) + /* Can't check xidLook->tag.xid, can be 0 also for normal locks */ + if (xidLook->tag.pid != 0) { -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, - "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]", - lock->tag.tupleId.ip_posid, - ((lock->tag.tupleId.ip_blkid.bi_hi << 16) + - lock->tag.tupleId.ip_blkid.bi_lo), - xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); -#endif - nskip++; + TPRINTF(TRACE_LOCKS, + "LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]", + lock->tag.tupleId.ip_posid, + ((lock->tag.tupleId.ip_blkid.bi_hi << 16) + + lock->tag.tupleId.ip_blkid.bi_lo), + xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); + nleft++; goto next_item; } -#ifdef USER_LOCKS_DEBUG - elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]", - xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); -#endif } #endif @@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) */ if (lock->nHolding != xidLook->nHolding) { - lock->nHolding -= xidLook->nHolding; - lock->nActive -= xidLook->nHolding; - Assert(lock->nActive >= 0); for (i = 1; i <= numLockModes; i++) { + Assert(xidLook->holders[i] >= 0); lock->holders[i] -= xidLook->holders[i]; lock->activeHolders[i] -= xidLook->holders[i]; + Assert((lock->holders[i] >= 0) \ + && (lock->activeHolders[i] >= 0)); if (!lock->activeHolders[i]) lock->mask &= BITS_OFF[i]; } + lock->nHolding -= xidLook->nHolding; + lock->nActive -= xidLook->nHolding; + Assert((lock->nHolding >= 0) && (lock->nActive >= 0)); + Assert(lock->nActive <= lock->nHolding); } else { @@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) * -------------- */ lock->nHolding = 0; + /* Fix the lock status, just for next LOCK_PRINT message. */ + for (i=1; i<=numLockModes; i++) { + Assert(lock->holders[i] == lock->activeHolders[i]); + lock->holders[i] = lock->activeHolders[i] = 0; + } } + LOCK_PRINT("LockReleaseAll: updated", lock, 0); + + /* + * Remove the xid from the process lock queue + */ + SHMQueueDelete(&xidLook->queue); + /* ---------------- * always remove the xidLookup entry, we're done with it now * ---------------- */ -#ifdef USER_LOCKS - SHMQueueDelete(&xidLook->queue); -#endif - if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found)) - || !found) + + XID_PRINT("LockReleaseAll: deleting", xidLook); + result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash, + (Pointer) xidLook, + HASH_REMOVE, + &found); + if (!result || !found) { SpinRelease(masterLock); elog(NOTICE, "LockReleaseAll: xid table corrupted"); @@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) * the last lock. * -------------------- */ - + LOCK_PRINT("LockReleaseAll: deleting", lock, 0); Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) - hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found); + lock = (LOCK *) hash_search(lockMethodTable->lockHash, + (Pointer) &(lock->tag), + HASH_REMOVE, &found); if ((!lock) || (!found)) { SpinRelease(masterLock); @@ -1324,15 +1448,18 @@ next_item: SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); xidLook = tmp; } - SpinRelease(masterLock); -#ifdef USER_LOCKS /* * Reinitialize the queue only if nothing has been left in. */ - if (nskip == 0) -#endif + if (nleft == 0) { + TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue"); SHMQueueInit(lockQueue); + } + + SpinRelease(masterLock); + TPRINTF(trace_flag, "LockReleaseAll: done"); + return TRUE; } @@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) checked_procs[0] = MyProc; nprocs = 1; - lockMethodTable = LockMethodTable[1]; + lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD]; xidTable = lockMethodTable->xidHash; MemSet(&item, 0, XID_TAGSIZE); @@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); - LOCK_PRINT("DeadLockCheck", (&lock->tag), 0); + LOCK_PRINT("DeadLockCheck", lock, 0); /* * This is our only check to see if we found the lock we want. @@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) return false; } +/* + * Return an array with the pids of all processes owning a lock. + * This works only for user locks because normal locks have no + * pid information in the corresponding XIDLookupEnt. + */ +ArrayType * +LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag) +{ + XIDLookupEnt *xidLook = NULL; + SPINLOCK masterLock; + LOCK *lock; + SHMEM_OFFSET lock_offset; + int count = 0; + LOCKMETHODTABLE *lockMethodTable; + HTAB *xidTable; + bool found; + int ndims, + nitems, + hdrlen, + size; + int lbounds[1], + hbounds[1]; + ArrayType *array; + int *data_ptr; + + /* Assume that no one will modify the result */ + static int empty_array[] = { 20, 1, 0, 0, 0 }; + +#ifdef USER_LOCKS + int is_user_lock; + + is_user_lock = (lockmethod == USER_LOCKMETHOD); + if (is_user_lock) + { + TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]", + locktag->tupleId.ip_posid, + ((locktag->tupleId.ip_blkid.bi_hi << 16) + + locktag->tupleId.ip_blkid.bi_lo)); + } +#endif + + /* This must be changed when short term locks will be used */ + locktag->lockmethod = lockmethod; + + Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods)); + lockMethodTable = LockMethodTable[lockmethod]; + if (!lockMethodTable) + { + elog(NOTICE, "lockMethodTable is null in LockOwners"); + return ((ArrayType *) &empty_array); + } + + if (LockingIsDisabled) + { + return ((ArrayType *) &empty_array); + } + + masterLock = lockMethodTable->ctl->masterLock; + SpinAcquire(masterLock); + + /* + * Find a lock with this tag + */ + Assert(lockMethodTable->lockHash->hash == tag_hash); + lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, + HASH_FIND, &found); + + /* + * let the caller print its own error message, too. Do not elog(WARN). + */ + if (!lock) + { + SpinRelease(masterLock); + elog(NOTICE, "LockOwners: locktable corrupted"); + return ((ArrayType *) &empty_array); + } + + if (!found) + { + SpinRelease(masterLock); +#ifdef USER_LOCKS + if (is_user_lock) { + TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag"); + return ((ArrayType *) &empty_array); + } +#endif + elog(NOTICE, "LockOwners: locktable lookup failed, no lock"); + return ((ArrayType *) &empty_array); + } + LOCK_PRINT("LockOwners: found", lock, 0); + Assert((lock->nHolding > 0) && (lock->nActive > 0)); + Assert(lock->nActive <= lock->nHolding); + lock_offset = MAKE_OFFSET(lock); + + /* Construct a 1-dimensional array */ + ndims = 1; + hdrlen = ARR_OVERHEAD(ndims); + lbounds[0] = 0; + hbounds[0] = lock->nActive; + size = hdrlen + sizeof(int) * hbounds[0]; + array = (ArrayType *) palloc(size); + MemSet(array, 0, size); + memmove((char *) array, (char *) &size, sizeof(int)); + memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int)); + memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int)); + memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int)); + SET_LO_FLAG(false, array); + data_ptr = (int *) ARR_DATA_PTR(array); + + xidTable = lockMethodTable->xidHash; + hash_seq(NULL); + nitems = 0; + while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) && + (xidLook != (XIDLookupEnt *)TRUE)) { + if (count++ > 1000) { + elog(NOTICE,"LockOwners: possible loop, giving up"); + break; + } + + if (xidLook->tag.pid == 0) { + XID_PRINT("LockOwners: no pid", xidLook); + continue; + } + + if (!xidLook->tag.lock) { + XID_PRINT("LockOwners: NULL LOCK", xidLook); + continue; + } + + if (xidLook->tag.lock != lock_offset) { + XID_PRINT("LockOwners: different lock", xidLook); + continue; + } + + if (LOCK_LOCKMETHOD(*lock) != lockmethod) { + XID_PRINT("LockOwners: other table", xidLook); + continue; + } + + if (xidLook->nHolding <= 0) { + XID_PRINT("LockOwners: not holding", xidLook); + continue; + } + + if (nitems >= hbounds[0]) { + elog(NOTICE,"LockOwners: array size exceeded"); + break; + } + + /* + * Check that the holding process is still alive by sending + * him an unused (ignored) signal. If the kill fails the + * process is not alive. + */ + if ((xidLook->tag.pid != MyProcPid) \ + && (kill(xidLook->tag.pid, SIGCHLD)) != 0) + { + /* Return a negative pid to signal that process is dead */ + data_ptr[nitems++] = - (xidLook->tag.pid); + XID_PRINT("LockOwners: not alive", xidLook); + /* XXX - TODO: remove this entry and update lock stats */ + continue; + } + + /* Found a process holding the lock */ + XID_PRINT("LockOwners: holding", xidLook); + data_ptr[nitems++] = xidLook->tag.pid; + } + + SpinRelease(masterLock); + + /* Adjust the actual size of the array */ + hbounds[0] = nitems; + size = hdrlen + sizeof(int) * hbounds[0]; + memmove((char *) array, (char *) &size, sizeof(int)); + memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int)); + + return (array); +} + #ifdef DEADLOCK_DEBUG /* - * Dump all locks. Must have already acquired the masterLock. + * Dump all locks in the proc->lockQueue. Must have already acquired + * the masterLock. */ void DumpLocks() @@ -1577,9 +1885,8 @@ DumpLocks() SPINLOCK masterLock; int numLockModes; LOCK *lock; - - count; - int lockmethod = 1; + int count = 0; + int lockmethod = DEFAULT_LOCKMETHOD; LOCKMETHODTABLE *lockMethodTable; ShmemPIDLookup(MyProcPid, &location); @@ -1604,11 +1911,18 @@ DumpLocks() SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); end = MAKE_OFFSET(lockQueue); - LOCK_DUMP("DumpLocks", MyProc->waitLock, 0); - XID_PRINT("DumpLocks", xidLook); + if (MyProc->waitLock) { + LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0); + } - for (count = 0;;) + for (;;) { + if (count++ > 2000) + { + elog(NOTICE, "DumpLocks: xid loop detected, giving up"); + break; + } + /* --------------------------- * XXX Here we assume the shared memory queue is circular and * that we know its internal structure. Should have some sort of @@ -1618,19 +1932,68 @@ DumpLocks() done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); - LOCK_DUMP("DumpLocks", lock, 0); - - if (count++ > 2000) - { - elog(NOTICE, "DumpLocks: xid loop detected, giving up"); - break; - } + XID_PRINT_AUX("DumpLocks", xidLook); + LOCK_PRINT_AUX("DumpLocks", lock, 0); if (done) break; + SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); xidLook = tmp; } } +/* + * Dump all postgres locks. Must have already acquired the masterLock. + */ +void +DumpAllLocks() +{ + SHMEM_OFFSET location; + PROC *proc; + XIDLookupEnt *xidLook = NULL; + LOCK *lock; + int pid; + int count = 0; + int lockmethod = DEFAULT_LOCKMETHOD; + LOCKMETHODTABLE *lockMethodTable; + HTAB *xidTable; + + pid = getpid(); + ShmemPIDLookup(pid,&location); + if (location == INVALID_OFFSET) + return; + proc = (PROC *) MAKE_PTR(location); + if (proc != MyProc) + return; + + Assert(lockmethod < NumLockMethods); + lockMethodTable = LockMethodTable[lockmethod]; + if (!lockMethodTable) + return; + + xidTable = lockMethodTable->xidHash; + + if (MyProc->waitLock) { + LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0); + } + + hash_seq(NULL); + while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) && + (xidLook != (XIDLookupEnt *)TRUE)) { + XID_PRINT_AUX("DumpAllLocks", xidLook); + + if (xidLook->tag.lock) { + lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); + LOCK_PRINT_AUX("DumpAllLocks", lock, 0); + } else { + elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL"); + } + + if (count++ > 2000) { + elog(NOTICE,"DumpAllLocks: possible loop, giving up"); + break; + } + } +} #endif diff --git a/src/backend/storage/lmgr/multi.c b/src/backend/storage/lmgr/multi.c index 0998389ffe..3887a8e37d 100644 --- a/src/backend/storage/lmgr/multi.c +++ b/src/backend/storage/lmgr/multi.c @@ -12,7 +12,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $ * * NOTES: * (1) The lock.c module assumes that the caller here is doing @@ -29,12 +29,10 @@ #include "utils/rel.h" #include "miscadmin.h" /* MyDatabaseId */ -static bool -MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode, - PG_LOCK_LEVEL level); -static bool -MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode, - PG_LOCK_LEVEL level); +static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, + LOCKMODE lockmode, PG_LOCK_LEVEL level); +static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, + LOCKMODE lockmode, PG_LOCK_LEVEL level); #ifdef LowLevelLocking @@ -130,6 +128,7 @@ static int MultiPrios[] = { * lock table is ONE lock table, not three. */ LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL; +LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL; #ifdef NOT_USED LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL; #endif @@ -150,12 +149,25 @@ InitMultiLevelLocks() /* ----------------------- * No short term lock table for now. -Jeff 15 July 1991 * - * ShortTermTableId = LockTableRename(lockmethod); + * ShortTermTableId = LockMethodTableRename(lockmethod); * if (! (ShortTermTableId)) { * elog(ERROR,"InitMultiLocks: couldnt rename lock table"); * } * ----------------------- */ + +#ifdef USER_LOCKS + /* + * Allocate another tableId for long-term locks + */ + LongTermTableId = LockMethodTableRename(MultiTableId); + if (!(LongTermTableId)) + { + elog(ERROR, + "InitMultiLevelLocks: couldn't rename long-term lock table"); + } +#endif + return MultiTableId; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index fb052582e7..4cd9602a8b 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -46,7 +46,7 @@ * This is so that we can support more backends. (system-wide semaphore * sets run out pretty fast.) -ay 4/95 * - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $ */ #include #include @@ -75,10 +75,13 @@ #include "storage/shmem.h" #include "storage/spin.h" #include "storage/proc.h" +#include "utils/trace.h" static void HandleDeadLock(int sig); static PROC *ProcWakeup(PROC *proc, int errType); +#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT] + /* -------------------- * Spin lock for manipulating the shared process data structure: * ProcGlobal.... Adding an extra spin lock seemed like the smallest @@ -247,10 +250,7 @@ InitProcess(IPCKey key) */ SpinRelease(ProcStructLock); - MyProc->pid = 0; -#if 0 MyProc->pid = MyProcPid; -#endif MyProc->xid = InvalidTransactionId; #ifdef LowLevelLocking MyProc->xmin = InvalidTransactionId; @@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid) * --------------- */ ProcReleaseSpins(proc); - LockReleaseAll(1, &proc->lockQueue); + LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue); #ifdef USER_LOCKS - LockReleaseAll(0, &proc->lockQueue); + /* + * Assume we have a second lock table. + */ + LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue); #endif /* ---------------- @@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue) * NOTES: The process queue is now a priority queue for locking. */ int -ProcSleep(PROC_QUEUE *waitQueue, +ProcSleep(PROC_QUEUE *waitQueue, /* lock->waitProcs */ SPINLOCK spinlock, - int token, + int token, /* lockmode */ int prio, - LOCK *lock) + LOCK *lock, + TransactionId xid) /* needed by user locks, see below */ { int i; PROC *proc; @@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue, proc = (PROC *) MAKE_PTR(waitQueue->links.prev); /* If we are a reader, and they are writers, skip past them */ - for (i = 0; i < waitQueue->size && proc->prio > prio; i++) proc = (PROC *) MAKE_PTR(proc->links.prev); @@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue, MyProc->token = token; MyProc->waitLock = lock; +#ifdef USER_LOCKS + /* ------------------- + * Currently, we only need this for the ProcWakeup routines. + * This must be 0 for user lock, so we can't just use the value + * from GetCurrentTransactionId(). + * ------------------- + */ + TransactionIdStore(xid, &MyProc->xid); +#else #ifndef LowLevelLocking /* ------------------- * currently, we only need this for the ProcWakeup routines * ------------------- */ TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid); +#endif #endif /* ------------------- @@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue, * -------------- */ MemSet(&timeval, 0, sizeof(struct itimerval)); - timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER; + timeval.it_value.tv_sec = \ + (DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER); do { @@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue, * the semaphore implementation. * -------------- */ - IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, + IpcExclusiveLock); } while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock * check */ @@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue, * --------------- */ timeval.it_value.tv_sec = 0; - - if (setitimer(ITIMER_REAL, &timeval, &dummy)) elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup"); @@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue, */ SpinAcquire(spinlock); +#ifdef LOCK_MGR_DEBUG + /* Just to get meaningful debug messages from DumpLocks() */ + MyProc->waitLock = (LOCK *)NULL; +#endif + return (MyProc->errType); } @@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) { PROC *proc; int count; + int trace_flag; + int last_locktype = -1; + int queue_size = queue->size; + + Assert(queue->size >= 0); if (!queue->size) return (STATUS_NOT_FOUND); proc = (PROC *) MAKE_PTR(queue->links.prev); count = 0; - while ((LockResolveConflicts(lockmethod, + while ((queue_size--) && (proc)) + { + /* + * This proc will conflict as the previous one did, don't even try. + */ + if (proc->token == last_locktype) + { + continue; + } + + /* + * This proc conflicts with locks held by others, ignored. + */ + if (LockResolveConflicts(lockmethod, lock, proc->token, - proc->xid) == STATUS_OK)) - { + proc->xid, + (XIDLookupEnt *) NULL) != STATUS_OK) + { + last_locktype = proc->token; + continue; + } /* * there was a waiting process, grant it the lock before waking it @@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) * time that the awoken process begins executing again. */ GrantLock(lock, proc->token); - queue->size--; /* * ProcWakeup removes proc from the lock waiting process queue and * returns the next proc in chain. */ - proc = ProcWakeup(proc, NO_ERROR); count++; - if (!proc || queue->size == 0) - break; + queue->size--; + proc = ProcWakeup(proc, NO_ERROR); } + Assert(queue->size >= 0); + if (count) return (STATUS_OK); - else + else { /* Something is still blocking us. May have deadlocked. */ + trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \ + TRACE_USERLOCKS : TRACE_LOCKS; + TPRINTF(trace_flag, + "ProcLockWakeup: lock(%x) can't wake up any process", + MAKE_OFFSET(lock)); +#ifdef DEADLOCK_DEBUG + if (pg_options[trace_flag] >= 2) + DumpAllLocks(); +#endif return (STATUS_NOT_FOUND); + } } void @@ -685,7 +735,7 @@ HandleDeadLock(int sig) } #ifdef DEADLOCK_DEBUG - DumpLocks(); + DumpAllLocks(); #endif if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true)) @@ -711,7 +761,8 @@ HandleDeadLock(int sig) * I was awoken by a signal, not by someone unlocking my semaphore. * ------------------ */ - IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, + IpcExclusiveLock); /* ------------- * Set MyProc->errType to STATUS_ERROR so that we abort after diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index fd1f95aab9..e989e57b44 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: lock.h,v 1.16 1998/08/01 15:26:37 vadim Exp $ + * $Id: lock.h,v 1.17 1998/08/25 21:20:31 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -15,6 +15,8 @@ #include #include +#include +#include extern SPINLOCK LockMgrLock; typedef int MASK; @@ -32,7 +34,7 @@ typedef int MASK; * NLOCKENTS - The maximum number of lock entries in the lock table. * ---------------------- */ -#define NBACKENDS 50 +#define NBACKENDS MaxBackendId #define NLOCKS_PER_XACT 40 #define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS @@ -51,9 +53,14 @@ typedef int LOCKMETHOD; * CreateSpinLocks() or the number of shared memory locations allocated * for lock table spin locks in the case of machines with TAS instructions. */ -#define MAX_LOCK_METHODS 2 +#define MAX_LOCK_METHODS 3 -#define INVALID_TABLEID 0 +#define INVALID_TABLEID 0 + +#define INVALID_LOCKMETHOD INVALID_TABLEID +#define DEFAULT_LOCKMETHOD 1 +#define USER_LOCKMETHOD 2 +#define MIN_LOCKMETHOD DEFAULT_LOCKMETHOD /*typedef struct LOCK LOCK; */ @@ -63,9 +70,11 @@ typedef struct LTAG Oid relId; Oid dbId; ItemPointerData tupleId; + uint16 lockmethod; /* needed by user locks */ } LOCKTAG; #define TAGSIZE (sizeof(LOCKTAG)) +#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod) /* This is the control structure for a lock table. It * lives in shared memory: @@ -143,8 +152,18 @@ typedef struct XIDTAG SHMEM_OFFSET lock; int pid; TransactionId xid; +#ifdef USE_XIDTAG_LOCKMETHOD + uint16 lockmethod; /* for debug or consistency checking */ +#endif } XIDTAG; +#ifdef USE_XIDTAG_LOCKMETHOD +#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod) +#else +#define XIDTAG_LOCKMETHOD(xidtag) \ + (((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod) +#endif + typedef struct XIDLookupEnt { /* tag */ @@ -157,6 +176,7 @@ typedef struct XIDLookupEnt } XIDLookupEnt; #define XID_TAGSIZE (sizeof(XIDTAG)) +#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag)) /* originally in procq.h */ typedef struct PROC_QUEUE @@ -191,14 +211,16 @@ typedef struct LOCK int nActive; } LOCK; -#define LockGetLock_nHolders(l) l->nHolders +#define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag)) +#define LockGetLock_nHolders(l) l->nHolders +#ifdef NOT_USED #define LockDecrWaitHolders(lock, lockmode) \ ( \ lock->nHolding--, \ lock->holders[lockmode]-- \ ) - +#endif #define LockLockTable() SpinAcquire(LockMgrLock); #define UnlockLockTable() SpinRelease(LockMgrLock); @@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock; */ extern void InitLocks(void); extern void LockDisable(int status); -extern LOCKMETHOD -LockMethodTableInit(char *tabName, MASK *conflictsP, int *prioP, - int numModes); -extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode); -extern int -LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, - TransactionId xid); -extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode); +extern LOCKMETHOD LockMethodTableInit(char *tabName, MASK *conflictsP, + int *prioP, int numModes); +extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod); +extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, + LOCKMODE lockmode); +extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, + LOCKMODE lockmode, TransactionId xid, + XIDLookupEnt *xidentP); +extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, + LOCKMODE lockmode); extern void GrantLock(LOCK *lock, LOCKMODE lockmode); extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue); extern int LockShmemSize(void); extern bool LockingDisabled(void); -extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check); +extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, + bool skip_check); +ArrayType* LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag); #ifdef DEADLOCK_DEBUG extern void DumpLocks(void); - +extern void DumpAllLocks(void); #endif #endif /* LOCK_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 8b8b7e2848..50f7b03ef6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -6,7 +6,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: proc.h,v 1.13 1998/07/27 19:38:38 vadim Exp $ + * $Id: proc.h,v 1.14 1998/08/25 21:20:32 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -105,10 +105,10 @@ extern bool ProcRemove(int pid); /* make static in storage/lmgr/proc.c -- jolly */ extern void ProcQueueInit(PROC_QUEUE *queue); -extern int -ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token, - int prio, LOCK *lock); -extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock); +extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token, + int prio, LOCK *lock, TransactionId xid); +extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, + LOCK *lock); extern void ProcAddLock(SHM_QUEUE *elem); extern void ProcReleaseSpins(PROC *proc); extern void ProcFreeAllSemaphores(void); -- 2.40.0