]> granicus.if.org Git - postgresql/commitdiff
Add assertions that we hold some relevant lock during relation open.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Oct 2018 16:43:21 +0000 (12:43 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Oct 2018 16:43:21 +0000 (12:43 -0400)
Opening a relation with no lock at all is unsafe; there's no guarantee
that we'll see a consistent state of the relevant catalog entries.
While use of MVCC scans to read the catalogs partially addresses that
complaint, it's still possible to switch to a new catalog snapshot
partway through loading the relcache entry.  Moreover, whether or not
you trust the reasoning behind sometimes using less than
AccessExclusiveLock for ALTER TABLE, that reasoning is certainly not
valid if concurrent users of the table don't hold a lock corresponding
to the operation they want to perform.

Hence, add some assertion-build-only checks that require any caller
of relation_open(x, NoLock) to hold at least AccessShareLock.  This
isn't a full solution, since we can't verify that the lock level is
semantically appropriate for the action --- but it's definitely of
some use, because it's already caught two bugs.

We can also assert that callers of addRangeTableEntryForRelation()
hold at least the lock level specified for the new RTE.

Amit Langote and Tom Lane

Discussion: https://postgr.es/m/16565.1538327894@sss.pgh.pa.us

src/backend/access/heap/heapam.c
src/backend/parser/parse_relation.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/include/storage/lmgr.h
src/include/storage/lock.h
src/include/storage/lockdefs.h

index 3395445a9a432621ea1000d44b3d0c50a3bd59c7..5f1a69ca53c6ea8f6b4a13bf4d6d8705cd22555b 100644 (file)
@@ -1137,6 +1137,14 @@ relation_open(Oid relationId, LOCKMODE lockmode)
        if (!RelationIsValid(r))
                elog(ERROR, "could not open relation with OID %u", relationId);
 
+       /*
+        * If we didn't get the lock ourselves, assert that caller holds one,
+        * except in bootstrap mode where no locks are used.
+        */
+       Assert(lockmode != NoLock ||
+                  IsBootstrapProcessingMode() ||
+                  CheckRelationLockedByMe(r, AccessShareLock, true));
+
        /* Make note that we've accessed a temporary relation */
        if (RelationUsesLocalBuffers(r))
                MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@@ -1183,6 +1191,10 @@ try_relation_open(Oid relationId, LOCKMODE lockmode)
        if (!RelationIsValid(r))
                elog(ERROR, "could not open relation with OID %u", relationId);
 
+       /* If we didn't get the lock ourselves, assert that caller holds one */
+       Assert(lockmode != NoLock ||
+                  CheckRelationLockedByMe(r, AccessShareLock, true));
+
        /* Make note that we've accessed a temporary relation */
        if (RelationUsesLocalBuffers(r))
                MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPREL;
@@ -8084,6 +8096,8 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
 
        idx_rel = RelationIdGetRelation(replidindex);
 
+       Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
+
        /* deform tuple, so we have fast access to columns */
        heap_deform_tuple(tp, desc, values, nulls);
 
index da600dc0ff3eb4ce2d281ff924ea04815f300812..f115ed863af0d13edaf33ac65d9a9c75131d2561 100644 (file)
@@ -28,6 +28,7 @@
 #include "parser/parse_enr.h"
 #include "parser/parse_relation.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
@@ -1272,7 +1273,7 @@ addRangeTableEntry(ParseState *pstate,
  *
  * lockmode is the lock type required for query execution; it must be one
  * of AccessShareLock, RowShareLock, or RowExclusiveLock depending on the
- * RTE's role within the query.  The caller should always hold that lock mode
+ * RTE's role within the query.  The caller must hold that lock mode
  * or a stronger one.
  *
  * Note: properly, lockmode should be declared LOCKMODE not int, but that
@@ -1295,6 +1296,7 @@ addRangeTableEntryForRelation(ParseState *pstate,
        Assert(lockmode == AccessShareLock ||
                   lockmode == RowShareLock ||
                   lockmode == RowExclusiveLock);
+       Assert(CheckRelationLockedByMe(rel, lockmode, true));
 
        rte->rtekind = RTE_RELATION;
        rte->alias = alias;
index dc0a4396388ec7bd6d9bdf3c5a46df577667c9ce..3f57507bce8ce1a6d3caa3f9d94424faa7e82930 100644 (file)
@@ -287,6 +287,51 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
        LockRelease(&tag, lockmode, false);
 }
 
+/*
+ *             CheckRelationLockedByMe
+ *
+ * Returns true if current transaction holds a lock on 'relation' of mode
+ * 'lockmode'.  If 'orstronger' is true, a stronger lockmode is also OK.
+ * ("Stronger" is defined as "numerically higher", which is a bit
+ * semantically dubious but is OK for the purposes we use this for.)
+ */
+bool
+CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_RELATION(tag,
+                                                relation->rd_lockInfo.lockRelId.dbId,
+                                                relation->rd_lockInfo.lockRelId.relId);
+
+       if (LockHeldByMe(&tag, lockmode))
+               return true;
+
+       if (orstronger)
+       {
+               LOCKMODE        slockmode;
+
+               for (slockmode = lockmode + 1;
+                        slockmode <= MaxLockMode;
+                        slockmode++)
+               {
+                       if (LockHeldByMe(&tag, slockmode))
+                       {
+#ifdef NOT_USED
+                               /* Sometimes this might be useful for debugging purposes */
+                               elog(WARNING, "lock mode %s substituted for %s on relation %s",
+                                        GetLockmodeName(tag.locktag_lockmethodid, slockmode),
+                                        GetLockmodeName(tag.locktag_lockmethodid, lockmode),
+                                        RelationGetRelationName(relation));
+#endif
+                               return true;
+                       }
+               }
+       }
+
+       return false;
+}
+
 /*
  *             LockHasWaitersRelation
  *
index 831ae62525806d58119d22e7425b028b2c4eaf59..10f6f60f1e26ece304625cb6790f182aa4712d16 100644 (file)
@@ -563,6 +563,30 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
        return false;
 }
 
+/*
+ * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
+ *             by the current transaction
+ */
+bool
+LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
+{
+       LOCALLOCKTAG localtag;
+       LOCALLOCK  *locallock;
+
+       /*
+        * See if there is a LOCALLOCK entry for this lock and lockmode
+        */
+       MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
+       localtag.lock = *locktag;
+       localtag.mode = lockmode;
+
+       locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+                                                                                 (void *) &localtag,
+                                                                                 HASH_FIND, NULL);
+
+       return (locallock && locallock->nLocks > 0);
+}
+
 /*
  * LockHasWaiters -- look up 'locktag' and check if releasing this
  *             lock would wake up other processes waiting for it.
index a217de971667d31f6fbea725eb962403295e2727..e5356b7d54d0393f91e15af931c8da284f8ff4c4 100644 (file)
@@ -45,6 +45,8 @@ extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void LockRelation(Relation relation, LOCKMODE lockmode);
 extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
 extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
+extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode,
+                                               bool orstronger);
 extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
index ff4df7fec513c11ff048aa7c977ea040f56284b5..a37fda7b638c1e8017d01298e6c5fb8f61d1e4ed 100644 (file)
@@ -540,6 +540,7 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
 extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
 extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 extern bool LockHasWaiters(const LOCKTAG *locktag,
                           LOCKMODE lockmode, bool sessionLock);
 extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
index 72eca39f17fb5c635efc6467cb6f2cb5361f8f1c..9c933fab3fb8a13d321f3e8d5e7095475fba0f75 100644 (file)
@@ -45,11 +45,15 @@ typedef int LOCKMODE;
 #define AccessExclusiveLock            8       /* ALTER TABLE, DROP TABLE, VACUUM FULL,
                                                                         * and unqualified LOCK TABLE */
 
+#define MaxLockMode                            8
+
+
+/* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
 {
        TransactionId xid;                      /* xid of holder of AccessExclusiveLock */
-       Oid                     dbOid;
-       Oid                     relOid;
+       Oid                     dbOid;                  /* DB containing table */
+       Oid                     relOid;                 /* OID of table */
 } xl_standby_lock;
 
 #endif                                                 /* LOCKDEF_H_ */