]> granicus.if.org Git - postgresql/blobdiff - src/backend/storage/lmgr/lmgr.c
Remove cvs keywords from all files.
[postgresql] / src / backend / storage / lmgr / lmgr.c
index 351ef27bee7d263074e361f05f4b1b066c0132eb..9f335ed4868f70129e81d8481a16b9ce03f66b68 100644 (file)
@@ -3,12 +3,12 @@
  * lmgr.c
  *       POSTGRES lock manager code
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.76 2005/06/14 22:15:32 tgl Exp $
+ *       src/backend/storage/lmgr/lmgr.c
  *
  *-------------------------------------------------------------------------
  */
 
 
 /*
- * This conflict table defines the semantics of the various lock modes.
+ * RelationInitLockInfo
+ *             Initializes the lock information in a relation descriptor.
+ *
+ *             relcache.c must call this during creation of any reldesc.
  */
-static const LOCKMASK LockConflicts[] = {
-       0,
-
-       /* AccessShareLock */
-       (1 << AccessExclusiveLock),
-
-       /* RowShareLock */
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+void
+RelationInitLockInfo(Relation relation)
+{
+       Assert(RelationIsValid(relation));
+       Assert(OidIsValid(RelationGetRelid(relation)));
 
-       /* RowExclusiveLock */
-       (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+       relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
 
-       /* ShareUpdateExclusiveLock */
-       (1 << ShareUpdateExclusiveLock) |
-       (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+       if (relation->rd_rel->relisshared)
+               relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
+       else
+               relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
+}
 
-       /* ShareLock */
-       (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
-       (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+/*
+ * SetLocktagRelationOid
+ *             Set up a locktag for a relation, given only relation OID
+ */
+static inline void
+SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
+{
+       Oid                     dbid;
 
-       /* ShareRowExclusiveLock */
-       (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
-       (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
 
-       /* ExclusiveLock */
-       (1 << RowShareLock) |
-       (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
-       (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
+       SET_LOCKTAG_RELATION(*tag, dbid, relid);
+}
 
-       /* AccessExclusiveLock */
-       (1 << AccessShareLock) | (1 << RowShareLock) |
-       (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
-       (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
-       (1 << ExclusiveLock) | (1 << AccessExclusiveLock)
+/*
+ *             LockRelationOid
+ *
+ * Lock a relation given only its OID. This should generally be used
+ * before attempting to open the relation's relcache entry.
+ */
+void
+LockRelationOid(Oid relid, LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
+       LockAcquireResult res;
 
-};
+       SetLocktagRelationOid(&tag, relid);
 
-static LOCKMETHODID LockTableId = INVALID_LOCKMETHOD;
+       res = LockAcquire(&tag, lockmode, false, false);
 
+       /*
+        * Now that we have the lock, check for invalidation messages, so that we
+        * will update or flush any stale relcache entry before we try to use it.
+        * We can skip this in the not-uncommon case that we already had the same
+        * type of lock being requested, since then no one else could have
+        * modified the relcache entry in an undesirable way.  (In the case where
+        * our own xact modifies the rel, the relcache update happens via
+        * CommandCounterIncrement, not here.)
+        */
+       if (res != LOCKACQUIRE_ALREADY_HELD)
+               AcceptInvalidationMessages();
+}
 
 /*
- * Create the lock table described by LockConflicts
+ *             ConditionalLockRelationOid
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ *
+ * NOTE: we do not currently need conditional versions of all the
+ * LockXXX routines in this file, but they could easily be added if needed.
  */
-void
-InitLockTable(int maxBackends)
+bool
+ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
-       LOCKMETHODID LongTermTableId;
+       LOCKTAG         tag;
+       LockAcquireResult res;
 
-       /* there's no zero-th table */
-       NumLockMethods = 1;
+       SetLocktagRelationOid(&tag, relid);
+
+       res = LockAcquire(&tag, lockmode, false, true);
+
+       if (res == LOCKACQUIRE_NOT_AVAIL)
+               return false;
 
        /*
-        * Create the default lock method table
+        * Now that we have the lock, check for invalidation messages; see notes
+        * in LockRelationOid.
         */
+       if (res != LOCKACQUIRE_ALREADY_HELD)
+               AcceptInvalidationMessages();
 
-       /* number of lock modes is lengthof()-1 because of dummy zero */
-       LockTableId = LockMethodTableInit("LockTable",
-                                                                         LockConflicts,
-                                                                         lengthof(LockConflicts) - 1,
-                                                                         maxBackends);
-       if (!LockMethodIsValid(LockTableId))
-               elog(ERROR, "could not initialize lock table");
-       Assert(LockTableId == DEFAULT_LOCKMETHOD);
+       return true;
+}
 
-#ifdef USER_LOCKS
+/*
+ *             UnlockRelationId
+ *
+ * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
+ * for speed reasons.
+ */
+void
+UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
 
-       /*
-        * Allocate another tableId for user locks (same shared hashtable though)
-        */
-       LongTermTableId = LockMethodTableRename(LockTableId);
-       if (!LockMethodIsValid(LongTermTableId))
-               elog(ERROR, "could not rename user lock table");
-       Assert(LongTermTableId == USER_LOCKMETHOD);
-#endif
+       SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
+
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
- * RelationInitLockInfo
- *             Initializes the lock information in a relation descriptor.
+ *             UnlockRelationOid
  *
- *             relcache.c must call this during creation of any reldesc.
+ * Unlock, given only a relation Oid.  Use UnlockRelationId if you can.
  */
 void
-RelationInitLockInfo(Relation relation)
+UnlockRelationOid(Oid relid, LOCKMODE lockmode)
 {
-       Assert(RelationIsValid(relation));
-       Assert(OidIsValid(RelationGetRelid(relation)));
+       LOCKTAG         tag;
 
-       relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
+       SetLocktagRelationOid(&tag, relid);
 
-       if (relation->rd_rel->relisshared)
-               relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
-       else
-               relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
  *             LockRelation
+ *
+ * This is a convenience routine for acquiring an additional lock on an
+ * already-open relation.  Never try to do "relation_open(foo, NoLock)"
+ * and then lock with this.
  */
 void
 LockRelation(Relation relation, LOCKMODE lockmode)
@@ -142,32 +171,22 @@ LockRelation(Relation relation, LOCKMODE lockmode)
                                                 relation->rd_lockInfo.lockRelId.dbId,
                                                 relation->rd_lockInfo.lockRelId.relId);
 
-       res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                         lockmode, false, false);
+       res = LockAcquire(&tag, lockmode, false, false);
 
        /*
-        * Check to see if the relcache entry has been invalidated while we
-        * were waiting to lock it.  If so, rebuild it, or ereport() trying.
-        * Increment the refcount to ensure that RelationFlushRelation will
-        * rebuild it and not just delete it.  We can skip this if the lock
-        * was already held, however.
+        * Now that we have the lock, check for invalidation messages; see notes
+        * in LockRelationOid.
         */
        if (res != LOCKACQUIRE_ALREADY_HELD)
-       {
-               RelationIncrementReferenceCount(relation);
                AcceptInvalidationMessages();
-               RelationDecrementReferenceCount(relation);
-       }
 }
 
 /*
  *             ConditionalLockRelation
  *
- * As above, but only lock if we can get the lock without blocking.
- * Returns TRUE iff the lock was acquired.
- *
- * NOTE: we do not currently need conditional versions of all the
- * LockXXX routines in this file, but they could easily be added if needed.
+ * This is a convenience routine for acquiring an additional lock on an
+ * already-open relation.  Never try to do "relation_open(foo, NoLock)"
+ * and then lock with this.
  */
 bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
@@ -179,31 +198,26 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
                                                 relation->rd_lockInfo.lockRelId.dbId,
                                                 relation->rd_lockInfo.lockRelId.relId);
 
-       res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                         lockmode, false, true);
+       res = LockAcquire(&tag, lockmode, false, true);
 
        if (res == LOCKACQUIRE_NOT_AVAIL)
                return false;
 
        /*
-        * Check to see if the relcache entry has been invalidated while we
-        * were waiting to lock it.  If so, rebuild it, or ereport() trying.
-        * Increment the refcount to ensure that RelationFlushRelation will
-        * rebuild it and not just delete it.  We can skip this if the lock
-        * was already held, however.
+        * Now that we have the lock, check for invalidation messages; see notes
+        * in LockRelationOid.
         */
        if (res != LOCKACQUIRE_ALREADY_HELD)
-       {
-               RelationIncrementReferenceCount(relation);
                AcceptInvalidationMessages();
-               RelationDecrementReferenceCount(relation);
-       }
 
        return true;
 }
 
 /*
  *             UnlockRelation
+ *
+ * This is a convenience routine for unlocking a relation without also
+ * closing it.
  */
 void
 UnlockRelation(Relation relation, LOCKMODE lockmode)
@@ -214,15 +228,15 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
                                                 relation->rd_lockInfo.lockRelId.dbId,
                                                 relation->rd_lockInfo.lockRelId.relId);
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
- *             LockRelationForSession
+ *             LockRelationIdForSession
  *
  * This routine grabs a session-level lock on the target relation.     The
  * session lock persists across transaction boundaries.  It will be removed
- * when UnlockRelationForSession() is called, or if an ereport(ERROR) occurs,
+ * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
  * or if the backend exits.
  *
  * Note that one should also grab a transaction-level lock on the rel
@@ -230,27 +244,26 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
  * relcache entry is up to date.
  */
 void
-LockRelationForSession(LockRelId *relid, bool istemprel, LOCKMODE lockmode)
+LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
 
        SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
 
-       (void) LockAcquire(LockTableId, &tag, istemprel,
-                                          lockmode, true, false);
+       (void) LockAcquire(&tag, lockmode, true, false);
 }
 
 /*
- *             UnlockRelationForSession
+ *             UnlockRelationIdForSession
  */
 void
-UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
+UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
 {
        LOCKTAG         tag;
 
        SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
 
-       LockRelease(LockTableId, &tag, lockmode, true);
+       LockRelease(&tag, lockmode, true);
 }
 
 /*
@@ -272,8 +285,7 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
                                                                relation->rd_lockInfo.lockRelId.dbId,
                                                                relation->rd_lockInfo.lockRelId.relId);
 
-       (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                          lockmode, false, false);
+       (void) LockAcquire(&tag, lockmode, false, false);
 }
 
 /*
@@ -288,7 +300,7 @@ UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
                                                                relation->rd_lockInfo.lockRelId.dbId,
                                                                relation->rd_lockInfo.lockRelId.relId);
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
@@ -307,8 +319,7 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
                                         relation->rd_lockInfo.lockRelId.relId,
                                         blkno);
 
-       (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                          lockmode, false, false);
+       (void) LockAcquire(&tag, lockmode, false, false);
 }
 
 /*
@@ -327,8 +338,7 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
                                         relation->rd_lockInfo.lockRelId.relId,
                                         blkno);
 
-       return (LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                               lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+       return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
 }
 
 /*
@@ -344,7 +354,7 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
                                         relation->rd_lockInfo.lockRelId.relId,
                                         blkno);
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
@@ -365,8 +375,27 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
                                          ItemPointerGetBlockNumber(tid),
                                          ItemPointerGetOffsetNumber(tid));
 
-       (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
-                                          lockmode, false, false);
+       (void) LockAcquire(&tag, lockmode, false, false);
+}
+
+/*
+ *             ConditionalLockTuple
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ */
+bool
+ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_TUPLE(tag,
+                                         relation->rd_lockInfo.lockRelId.dbId,
+                                         relation->rd_lockInfo.lockRelId.relId,
+                                         ItemPointerGetBlockNumber(tid),
+                                         ItemPointerGetOffsetNumber(tid));
+
+       return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
 }
 
 /*
@@ -383,15 +412,15 @@ UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
                                          ItemPointerGetBlockNumber(tid),
                                          ItemPointerGetOffsetNumber(tid));
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
  *             XactLockTableInsert
  *
  * Insert a lock showing that the given transaction ID is running ---
- * this is done during xact startup.  The lock can then be used to wait
- * for the transaction to finish.
+ * this is done when an XID is acquired by a transaction or subtransaction.
+ * The lock can then be used to wait for the transaction to finish.
  */
 void
 XactLockTableInsert(TransactionId xid)
@@ -400,8 +429,7 @@ XactLockTableInsert(TransactionId xid)
 
        SET_LOCKTAG_TRANSACTION(tag, xid);
 
-       (void) LockAcquire(LockTableId, &tag, false,
-                                          ExclusiveLock, false, false);
+       (void) LockAcquire(&tag, ExclusiveLock, false, false);
 }
 
 /*
@@ -409,8 +437,7 @@ XactLockTableInsert(TransactionId xid)
  *
  * Delete the lock showing that the given transaction ID is running.
  * (This is never used for main transaction IDs; those locks are only
- * released implicitly at transaction end.  But we do use it for subtrans
- * IDs.)
+ * released implicitly at transaction end.     But we do use it for subtrans IDs.)
  */
 void
 XactLockTableDelete(TransactionId xid)
@@ -419,7 +446,7 @@ XactLockTableDelete(TransactionId xid)
 
        SET_LOCKTAG_TRANSACTION(tag, xid);
 
-       LockRelease(LockTableId, &tag, ExclusiveLock, false);
+       LockRelease(&tag, ExclusiveLock, false);
 }
 
 /*
@@ -431,7 +458,7 @@ XactLockTableDelete(TransactionId xid)
  * subtransaction, we will exit as soon as it aborts or its top parent commits.
  * It takes some extra work to ensure this, because to save on shared memory
  * the XID lock of a subtransaction is released when it ends, whether
- * successfully or unsuccessfully.  So we have to check if it's "still running"
+ * successfully or unsuccessfully.     So we have to check if it's "still running"
  * and if so wait for its parent.
  */
 void
@@ -442,26 +469,112 @@ XactLockTableWait(TransactionId xid)
        for (;;)
        {
                Assert(TransactionIdIsValid(xid));
-               Assert(!TransactionIdEquals(xid, GetTopTransactionId()));
+               Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
 
                SET_LOCKTAG_TRANSACTION(tag, xid);
 
-               (void) LockAcquire(LockTableId, &tag, false,
-                                                  ShareLock, false, false);
+               (void) LockAcquire(&tag, ShareLock, false, false);
 
-               LockRelease(LockTableId, &tag, ShareLock, false);
+               LockRelease(&tag, ShareLock, false);
 
                if (!TransactionIdIsInProgress(xid))
                        break;
                xid = SubTransGetParent(xid);
        }
+}
 
-       /*
-        * Transaction was committed/aborted/crashed - we have to update
-        * pg_clog if transaction is still marked as running.
-        */
-       if (!TransactionIdDidCommit(xid) && !TransactionIdDidAbort(xid))
-               TransactionIdAbort(xid);
+/*
+ *             ConditionalXactLockTableWait
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE if the lock was acquired.
+ */
+bool
+ConditionalXactLockTableWait(TransactionId xid)
+{
+       LOCKTAG         tag;
+
+       for (;;)
+       {
+               Assert(TransactionIdIsValid(xid));
+               Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+               SET_LOCKTAG_TRANSACTION(tag, xid);
+
+               if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
+                       return false;
+
+               LockRelease(&tag, ShareLock, false);
+
+               if (!TransactionIdIsInProgress(xid))
+                       break;
+               xid = SubTransGetParent(xid);
+       }
+
+       return true;
+}
+
+
+/*
+ *             VirtualXactLockTableInsert
+ *
+ * Insert a lock showing that the given virtual transaction ID is running ---
+ * this is done at main transaction start when its VXID is assigned.
+ * The lock can then be used to wait for the transaction to finish.
+ */
+void
+VirtualXactLockTableInsert(VirtualTransactionId vxid)
+{
+       LOCKTAG         tag;
+
+       Assert(VirtualTransactionIdIsValid(vxid));
+
+       SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
+
+       (void) LockAcquire(&tag, ExclusiveLock, false, false);
+}
+
+/*
+ *             VirtualXactLockTableWait
+ *
+ * Waits until the lock on the given VXID is released, which shows that
+ * the top-level transaction owning the VXID has ended.
+ */
+void
+VirtualXactLockTableWait(VirtualTransactionId vxid)
+{
+       LOCKTAG         tag;
+
+       Assert(VirtualTransactionIdIsValid(vxid));
+
+       SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
+
+       (void) LockAcquire(&tag, ShareLock, false, false);
+
+       LockRelease(&tag, ShareLock, false);
+}
+
+/*
+ *             ConditionalVirtualXactLockTableWait
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE if the lock was acquired.
+ */
+bool
+ConditionalVirtualXactLockTableWait(VirtualTransactionId vxid)
+{
+       LOCKTAG         tag;
+
+       Assert(VirtualTransactionIdIsValid(vxid));
+
+       SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
+
+       if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
+               return false;
+
+       LockRelease(&tag, ShareLock, false);
+
+       return true;
 }
 
 
@@ -471,9 +584,7 @@ XactLockTableWait(TransactionId xid)
  * Obtain a lock on a general object of the current database.  Don't use
  * this for shared objects (such as tablespaces).  It's unwise to apply it
  * to relations, also, since a lock taken this way will NOT conflict with
- * LockRelation, and also may be wrongly marked if the relation is temp.
- * (If we ever invent temp objects that aren't tables, we'll want to extend
- * the API of this routine to include an isTempObject flag.)
+ * locks taken via LockRelation and friends.
  */
 void
 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
@@ -487,8 +598,10 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
                                           objid,
                                           objsubid);
 
-       (void) LockAcquire(LockTableId, &tag, false,
-                                          lockmode, false, false);
+       (void) LockAcquire(&tag, lockmode, false, false);
+
+       /* Make sure syscaches are up-to-date with any changes we waited for */
+       AcceptInvalidationMessages();
 }
 
 /*
@@ -506,7 +619,7 @@ UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
                                           objid,
                                           objsubid);
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
 }
 
 /*
@@ -526,8 +639,10 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
                                           objid,
                                           objsubid);
 
-       (void) LockAcquire(LockTableId, &tag, false,
-                                          lockmode, false, false);
+       (void) LockAcquire(&tag, lockmode, false, false);
+
+       /* Make sure syscaches are up-to-date with any changes we waited for */
+       AcceptInvalidationMessages();
 }
 
 /*
@@ -545,5 +660,126 @@ UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
                                           objid,
                                           objsubid);
 
-       LockRelease(LockTableId, &tag, lockmode, false);
+       LockRelease(&tag, lockmode, false);
+}
+
+/*
+ *             LockSharedObjectForSession
+ *
+ * Obtain a session-level lock on a shared-across-databases object.
+ * See LockRelationIdForSession for notes about session-level locks.
+ */
+void
+LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
+                                                  LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_OBJECT(tag,
+                                          InvalidOid,
+                                          classid,
+                                          objid,
+                                          objsubid);
+
+       (void) LockAcquire(&tag, lockmode, true, false);
+}
+
+/*
+ *             UnlockSharedObjectForSession
+ */
+void
+UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
+                                                        LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_OBJECT(tag,
+                                          InvalidOid,
+                                          classid,
+                                          objid,
+                                          objsubid);
+
+       LockRelease(&tag, lockmode, true);
+}
+
+
+/*
+ * Append a description of a lockable object to buf.
+ *
+ * Ideally we would print names for the numeric values, but that requires
+ * getting locks on system tables, which might cause problems since this is
+ * typically used to report deadlock situations.
+ */
+void
+DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
+{
+       switch ((LockTagType) tag->locktag_type)
+       {
+               case LOCKTAG_RELATION:
+                       appendStringInfo(buf,
+                                                        _("relation %u of database %u"),
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_RELATION_EXTEND:
+                       appendStringInfo(buf,
+                                                        _("extension of relation %u of database %u"),
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_PAGE:
+                       appendStringInfo(buf,
+                                                        _("page %u of relation %u of database %u"),
+                                                        tag->locktag_field3,
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_TUPLE:
+                       appendStringInfo(buf,
+                                                        _("tuple (%u,%u) of relation %u of database %u"),
+                                                        tag->locktag_field3,
+                                                        tag->locktag_field4,
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_TRANSACTION:
+                       appendStringInfo(buf,
+                                                        _("transaction %u"),
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_VIRTUALTRANSACTION:
+                       appendStringInfo(buf,
+                                                        _("virtual transaction %d/%u"),
+                                                        tag->locktag_field1,
+                                                        tag->locktag_field2);
+                       break;
+               case LOCKTAG_OBJECT:
+                       appendStringInfo(buf,
+                                                        _("object %u of class %u of database %u"),
+                                                        tag->locktag_field3,
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field1);
+                       break;
+               case LOCKTAG_USERLOCK:
+                       /* reserved for old contrib code, now on pgfoundry */
+                       appendStringInfo(buf,
+                                                        _("user lock [%u,%u,%u]"),
+                                                        tag->locktag_field1,
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field3);
+                       break;
+               case LOCKTAG_ADVISORY:
+                       appendStringInfo(buf,
+                                                        _("advisory lock [%u,%u,%u,%u]"),
+                                                        tag->locktag_field1,
+                                                        tag->locktag_field2,
+                                                        tag->locktag_field3,
+                                                        tag->locktag_field4);
+                       break;
+               default:
+                       appendStringInfo(buf,
+                                                        _("unrecognized locktag type %d"),
+                                                        (int) tag->locktag_type);
+                       break;
+       }
 }