Relax the requirement that all lwlocks be stored in a single array.
authorRobert Haas <rhaas@postgresql.org>
Mon, 27 Jan 2014 16:07:44 +0000 (11:07 -0500)
committerRobert Haas <rhaas@postgresql.org>
Mon, 27 Jan 2014 16:07:44 +0000 (11:07 -0500)
This makes it possible to store lwlocks as part of some other data
structure in the main shared memory segment, or in a dynamic shared
memory segment.  There is still a main LWLock array and this patch does
not move anything out of it, but it provides necessary infrastructure
for doing that in the future.

This change is likely to increase the size of LWLockPadded on some
platforms, especially 32-bit platforms where it was previously only
16 bytes.

Patch by me.  Review by Andres Freund and KaiGai Kohei.

18 files changed:
contrib/pg_buffercache/pg_buffercache_pages.c
contrib/pg_stat_statements/pg_stat_statements.c
doc/src/sgml/monitoring.sgml
src/backend/access/transam/slru.c
src/backend/postmaster/postmaster.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/predicate.c
src/backend/storage/lmgr/proc.c
src/backend/utils/probes.d
src/include/access/slru.h
src/include/storage/buf_internals.h
src/include/storage/lock.h
src/include/storage/lwlock.h
src/include/storage/proc.h
src/tools/pgindent/typedefs.list

index dbf8030f7c6f0924200cde1a16c881e75665716e..1e2d192f11949bc115593640e99e177cc9123116 100644 (file)
@@ -116,7 +116,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
                 * possible deadlocks.
                 */
                for (i = 0; i < NUM_BUFFER_PARTITIONS; i++)
-                       LWLockAcquire(FirstBufMappingLock + i, LW_SHARED);
+                       LWLockAcquire(BufMappingPartitionLockByIndex(i), LW_SHARED);
 
                /*
                 * Scan though all the buffers, saving the relevant fields in the
@@ -157,7 +157,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
                 * avoids O(N^2) behavior inside LWLockRelease.
                 */
                for (i = NUM_BUFFER_PARTITIONS; --i >= 0;)
-                       LWLockRelease(FirstBufMappingLock + i);
+                       LWLockRelease(BufMappingPartitionLockByIndex(i));
        }
 
        funcctx = SRF_PERCALL_SETUP();
index 2f069b768e1665ed4561b8eb47e0135e155cc67e..858cce34576e2bbeff29dd94d50baa52cad5db23 100644 (file)
@@ -150,7 +150,7 @@ typedef struct pgssEntry
  */
 typedef struct pgssSharedState
 {
-       LWLockId        lock;                   /* protects hashtable search/modification */
+       LWLock     *lock;                       /* protects hashtable search/modification */
        int                     query_size;             /* max query length in bytes */
        double          cur_median_usage;               /* current median usage in hashtable */
 } pgssSharedState;
index 4ec6981ab8260e0c067c1bc7fdcc88ab08d95d94..82eaf89a6b625b5361f531cec16ecdbcf361e8b2 100644 (file)
@@ -2212,49 +2212,55 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
     </row>
     <row>
      <entry>lwlock-acquire</entry>
-     <entry>(LWLockId, LWLockMode)</entry>
+     <entry>(char *, int, LWLockMode)</entry>
      <entry>Probe that fires when an LWLock has been acquired.
-      arg0 is the LWLock's ID.
-      arg1 is the requested lock mode, either exclusive or shared.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.
+      arg2 is the requested lock mode, either exclusive or shared.</entry>
     </row>
     <row>
      <entry>lwlock-release</entry>
-     <entry>(LWLockId)</entry>
+     <entry>(char *, int)</entry>
      <entry>Probe that fires when an LWLock has been released (but note
       that any released waiters have not yet been awakened).
-      arg0 is the LWLock's ID.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.</entry>
     </row>
     <row>
      <entry>lwlock-wait-start</entry>
-     <entry>(LWLockId, LWLockMode)</entry>
+     <entry>(char *, int, LWLockMode)</entry>
      <entry>Probe that fires when an LWLock was not immediately available and
       a server process has begun to wait for the lock to become available.
-      arg0 is the LWLock's ID.
-      arg1 is the requested lock mode, either exclusive or shared.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.
+      arg2 is the requested lock mode, either exclusive or shared.</entry>
     </row>
     <row>
      <entry>lwlock-wait-done</entry>
-     <entry>(LWLockId, LWLockMode)</entry>
+     <entry>(char *, int, LWLockMode)</entry>
      <entry>Probe that fires when a server process has been released from its
       wait for an LWLock (it does not actually have the lock yet).
-      arg0 is the LWLock's ID.
-      arg1 is the requested lock mode, either exclusive or shared.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.
+      arg2 is the requested lock mode, either exclusive or shared.</entry>
     </row>
     <row>
      <entry>lwlock-condacquire</entry>
-     <entry>(LWLockId, LWLockMode)</entry>
+     <entry>(char *, int, LWLockMode)</entry>
      <entry>Probe that fires when an LWLock was successfully acquired when the
       caller specified no waiting.
-      arg0 is the LWLock's ID.
-      arg1 is the requested lock mode, either exclusive or shared.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.
+      arg2 is the requested lock mode, either exclusive or shared.</entry>
     </row>
     <row>
      <entry>lwlock-condacquire-fail</entry>
-     <entry>(LWLockId, LWLockMode)</entry>
+     <entry>(char *, int, LWLockMode)</entry>
      <entry>Probe that fires when an LWLock was not successfully acquired when
       the caller specified no waiting.
-      arg0 is the LWLock's ID.
-      arg1 is the requested lock mode, either exclusive or shared.</entry>
+      arg0 is the LWLock's tranche.
+      arg1 is the LWLock's offset within its trance.
+      arg2 is the requested lock mode, either exclusive or shared.</entry>
     </row>
     <row>
      <entry>lock-wait-start</entry>
@@ -2299,10 +2305,6 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
      <entry>LocalTransactionId</entry>
      <entry>unsigned int</entry>
     </row>
-    <row>
-     <entry>LWLockId</entry>
-     <entry>int</entry>
-    </row>
     <row>
      <entry>LWLockMode</entry>
      <entry>int</entry>
index f604aa9b60eb361c1fa470ca1f710a25984f198f..b90db9a417df30a3daa3e59ab2e09adfe5f43c1b 100644 (file)
@@ -151,7 +151,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
        sz += MAXALIGN(nslots * sizeof(bool));          /* page_dirty[] */
        sz += MAXALIGN(nslots * sizeof(int));           /* page_number[] */
        sz += MAXALIGN(nslots * sizeof(int));           /* page_lru_count[] */
-       sz += MAXALIGN(nslots * sizeof(LWLockId));      /* buffer_locks[] */
+       sz += MAXALIGN(nslots * sizeof(LWLock *));      /* buffer_locks[] */
 
        if (nlsns > 0)
                sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));    /* group_lsn[] */
@@ -161,7 +161,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
 
 void
 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
-                         LWLockId ctllock, const char *subdir)
+                         LWLock *ctllock, const char *subdir)
 {
        SlruShared      shared;
        bool            found;
@@ -202,8 +202,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
                offset += MAXALIGN(nslots * sizeof(int));
                shared->page_lru_count = (int *) (ptr + offset);
                offset += MAXALIGN(nslots * sizeof(int));
-               shared->buffer_locks = (LWLockId *) (ptr + offset);
-               offset += MAXALIGN(nslots * sizeof(LWLockId));
+               shared->buffer_locks = (LWLock **) (ptr + offset);
+               offset += MAXALIGN(nslots * sizeof(LWLock *));
 
                if (nlsns > 0)
                {
index b807b064be3ae4c00febfe9b5524acb28b77239c..52f87895a66ad01d154bbf5aff89c927111f32f7 100644 (file)
@@ -448,8 +448,6 @@ typedef struct
 typedef int InheritableSocket;
 #endif
 
-typedef struct LWLock LWLock;  /* ugly kluge */
-
 /*
  * Structure contains all variables passed to exec:ed backends
  */
@@ -473,7 +471,7 @@ typedef struct
 #ifndef HAVE_SPINLOCKS
        PGSemaphore     SpinlockSemaArray;
 #endif
-       LWLock     *LWLockArray;
+       LWLock     *MainLWLockArray;
        slock_t    *ProcStructLock;
        PROC_HDR   *ProcGlobal;
        PGPROC     *AuxiliaryProcs;
@@ -5576,7 +5574,6 @@ PostmasterMarkPIDForWorkerNotify(int pid)
  * functions.  They are marked NON_EXEC_STATIC in their home modules.
  */
 extern slock_t *ShmemLock;
-extern LWLock *LWLockArray;
 extern slock_t *ProcStructLock;
 extern PGPROC *AuxiliaryProcs;
 extern PMSignalData *PMSignalState;
@@ -5625,7 +5622,7 @@ save_backend_variables(BackendParameters *param, Port *port,
 #ifndef HAVE_SPINLOCKS
        param->SpinlockSemaArray = SpinlockSemaArray;
 #endif
-       param->LWLockArray = LWLockArray;
+       param->MainLWLockArray = MainLWLockArray;
        param->ProcStructLock = ProcStructLock;
        param->ProcGlobal = ProcGlobal;
        param->AuxiliaryProcs = AuxiliaryProcs;
@@ -5856,7 +5853,7 @@ restore_backend_variables(BackendParameters *param, Port *port)
 #ifndef HAVE_SPINLOCKS
        SpinlockSemaArray = param->SpinlockSemaArray;
 #endif
-       LWLockArray = param->LWLockArray;
+       MainLWLockArray = param->MainLWLockArray;
        ProcStructLock = param->ProcStructLock;
        ProcGlobal = param->ProcGlobal;
        AuxiliaryProcs = param->AuxiliaryProcs;
index 91f0c7eb36e16b3c17e67d3bf7684daf8f2d9224..19eecab4c2862b7ee880b3eb07d6559c679d930f 100644 (file)
@@ -146,7 +146,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
        {
                BufferTag       newTag;         /* identity of requested block */
                uint32          newHash;        /* hash value for newTag */
-               LWLockId        newPartitionLock;       /* buffer partition lock for it */
+               LWLock     *newPartitionLock;   /* buffer partition lock for it */
                int                     buf_id;
 
                /* create a tag so we can lookup the buffer */
@@ -539,10 +539,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 {
        BufferTag       newTag;                 /* identity of requested block */
        uint32          newHash;                /* hash value for newTag */
-       LWLockId        newPartitionLock;               /* buffer partition lock for it */
+       LWLock     *newPartitionLock;           /* buffer partition lock for it */
        BufferTag       oldTag;                 /* previous identity of selected buffer */
        uint32          oldHash;                /* hash value for oldTag */
-       LWLockId        oldPartitionLock;               /* buffer partition lock for it */
+       LWLock     *oldPartitionLock;           /* buffer partition lock for it */
        BufFlags        oldFlags;
        int                     buf_id;
        volatile BufferDesc *buf;
@@ -891,7 +891,7 @@ InvalidateBuffer(volatile BufferDesc *buf)
 {
        BufferTag       oldTag;
        uint32          oldHash;                /* hash value for oldTag */
-       LWLockId        oldPartitionLock;               /* buffer partition lock for it */
+       LWLock     *oldPartitionLock;           /* buffer partition lock for it */
        BufFlags        oldFlags;
 
        /* Save the original buffer tag before dropping the spinlock */
index cc219237097f06a974657be05c488714b7d1d882..2e717457b123f600eb60d6315ac4624f78c8dacf 100644 (file)
@@ -182,8 +182,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         * Now initialize LWLocks, which do shared memory allocation and are
         * needed for InitShmemIndex.
         */
-       if (!IsUnderPostmaster)
-               CreateLWLocks();
+       CreateLWLocks();
 
        /*
         * Set up shmem.c index hashtable
index 5c8b4b0656ce4b19e145b7f4d1149d2dd41fd4c6..6335129ac25e4856967177d01500080c4145cc3d 100644 (file)
@@ -565,7 +565,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
        LOCALLOCK  *locallock;
        LOCK       *lock;
        PROCLOCK   *proclock;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        bool            hasWaiters = false;
 
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
@@ -702,7 +702,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
        bool            found;
        ResourceOwner owner;
        uint32          hashcode;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        int                     status;
        bool            log_lock = false;
 
@@ -1744,7 +1744,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
        LOCALLOCK  *locallock;
        LOCK       *lock;
        PROCLOCK   *proclock;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        bool            wakeupNeeded;
 
        if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
@@ -2096,10 +2096,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
         */
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
        {
-               LWLockId        partitionLock = FirstLockMgrLock + partition;
+               LWLock     *partitionLock;
                SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
                PROCLOCK   *nextplock;
 
+               partitionLock = LockHashPartitionLockByIndex(partition);
+
                /*
                 * If the proclock list for this partition is empty, we can skip
                 * acquiring the partition lock.  This optimization is trickier than
@@ -2475,7 +2477,7 @@ static bool
 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
                                                          uint32 hashcode)
 {
-       LWLockId        partitionLock = LockHashPartitionLock(hashcode);
+       LWLock     *partitionLock = LockHashPartitionLock(hashcode);
        Oid                     relid = locktag->locktag_field2;
        uint32          i;
 
@@ -2565,7 +2567,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
        LockMethod      lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
        LOCKTAG    *locktag = &locallock->tag.lock;
        PROCLOCK   *proclock = NULL;
-       LWLockId        partitionLock = LockHashPartitionLock(locallock->hashcode);
+       LWLock     *partitionLock = LockHashPartitionLock(locallock->hashcode);
        Oid                     relid = locktag->locktag_field2;
        uint32          f;
 
@@ -2671,7 +2673,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
        SHM_QUEUE  *procLocks;
        PROCLOCK   *proclock;
        uint32          hashcode;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        int                     count = 0;
        int                     fast_count = 0;
 
@@ -2883,7 +2885,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
        PROCLOCKTAG proclocktag;
        uint32          hashcode;
        uint32          proclock_hashcode;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        bool            wakeupNeeded;
 
        hashcode = LockTagHashCode(locktag);
@@ -3159,10 +3161,12 @@ PostPrepare_Locks(TransactionId xid)
         */
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
        {
-               LWLockId        partitionLock = FirstLockMgrLock + partition;
+               LWLock     *partitionLock;
                SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
                PROCLOCK   *nextplock;
 
+               partitionLock = LockHashPartitionLockByIndex(partition);
+
                /*
                 * If the proclock list for this partition is empty, we can skip
                 * acquiring the partition lock.  This optimization is safer than the
@@ -3400,7 +3404,7 @@ GetLockStatusData(void)
         * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
         */
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+               LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
 
        /* Now we can safely count the number of proclocks */
        data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
@@ -3442,7 +3446,7 @@ GetLockStatusData(void)
         * behavior inside LWLockRelease.
         */
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
-               LWLockRelease(FirstLockMgrLock + i);
+               LWLockRelease(LockHashPartitionLockByIndex(i));
 
        Assert(el == data->nelements);
 
@@ -3477,7 +3481,7 @@ GetRunningTransactionLocks(int *nlocks)
         * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
         */
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+               LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
 
        /* Now we can safely count the number of proclocks */
        els = hash_get_num_entries(LockMethodProcLockHash);
@@ -3537,7 +3541,7 @@ GetRunningTransactionLocks(int *nlocks)
         * behavior inside LWLockRelease.
         */
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
-               LWLockRelease(FirstLockMgrLock + i);
+               LWLockRelease(LockHashPartitionLockByIndex(i));
 
        *nlocks = index;
        return accessExclusiveLocks;
@@ -3673,7 +3677,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
        uint32          hashcode;
        uint32          proclock_hashcode;
        int                     partition;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        LockMethod      lockMethodTable;
 
        Assert(len == sizeof(TwoPhaseLockRecord));
@@ -4044,7 +4048,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
        {
                PROCLOCK   *proclock;
                uint32          hashcode;
-               LWLockId        partitionLock;
+               LWLock     *partitionLock;
 
                hashcode = LockTagHashCode(&tag);
 
index 0e319a7e6acc4a713537ba096ee65bfc15024b6c..55d9d7837cabdc019b7f58237ebf8967ddb406d7 100644 (file)
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/spin.h"
+#include "utils/memutils.h"
+
+#ifdef LWLOCK_STATS
+#include "utils/hsearch.h"
+#endif
 
 
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
-
-typedef struct LWLock
-{
-       slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
-       bool            releaseOK;              /* T if ok to release waiters */
-       char            exclusive;              /* # of exclusive holders (0 or 1) */
-       int                     shared;                 /* # of shared holders (0..MaxBackends) */
-       PGPROC     *head;                       /* head of list of waiting PGPROCs */
-       PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
-       /* tail is undefined when head is NULL */
-} LWLock;
-
 /*
- * All the LWLock structs are allocated as an array in shared memory.
- * (LWLockIds are indexes into the array.)     We force the array stride to
- * be a power of 2, which saves a few cycles in indexing, but more
- * importantly also ensures that individual LWLocks don't cross cache line
- * boundaries. This reduces cache contention problems, especially on AMD
- * Opterons.  (Of course, we have to also ensure that the array start
- * address is suitably aligned.)
- *
- * LWLock is between 16 and 32 bytes on all known platforms, so these two
- * cases are sufficient.
+ * This is indexed by tranche ID and stores metadata for all tranches known
+ * to the current backend.
  */
-#define LWLOCK_PADDED_SIZE     (sizeof(LWLock) <= 16 ? 16 : 32)
+static LWLockTranche **LWLockTrancheArray = NULL;
+static int LWLockTranchesAllocated = 0;
 
-typedef union LWLockPadded
-{
-       LWLock          lock;
-       char            pad[LWLOCK_PADDED_SIZE];
-} LWLockPadded;
+#define T_NAME(lock) \
+       (LWLockTrancheArray[(lock)->tranche]->name)
+#define T_ID(lock) \
+       ((int) ((((char *) lock) - \
+               ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
+               LWLockTrancheArray[(lock)->tranche]->array_stride))
 
 /*
- * This points to the array of LWLocks in shared memory.  Backends inherit
+ * This points to the main array of LWLocks in shared memory.  Backends inherit
  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
  * where we have special measures to pass it down).
  */
-NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
-
+LWLockPadded *MainLWLockArray = NULL;
+static LWLockTranche MainLWLockTranche;
 
 /*
  * We use this structure to keep track of locked LWLocks for release
@@ -85,58 +72,78 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
 #define MAX_SIMUL_LWLOCKS      100
 
 static int     num_held_lwlocks = 0;
-static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
 
 static int     lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
 
 #ifdef LWLOCK_STATS
+typedef struct lwlock_stats_key
+{
+       int             tranche;
+       int             instance;
+} lwlock_stats_key;
+
+typedef struct lwlock_stats
+{
+       lwlock_stats_key        key;
+       int             sh_acquire_count;
+       int             ex_acquire_count;
+       int             block_count;
+       int             spin_delay_count;
+} lwlock_stats;
+
 static int     counts_for_pid = 0;
-static int *sh_acquire_counts;
-static int *ex_acquire_counts;
-static int *block_counts;
-static int *spin_delay_counts;
+static HTAB *lwlock_stats_htab;
 #endif
 
 #ifdef LOCK_DEBUG
 bool           Trace_lwlocks = false;
 
 inline static void
-PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
+PRINT_LWDEBUG(const char *where, const volatile LWLock *lock)
 {
        if (Trace_lwlocks)
-               elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
-                        where, (int) lockid,
+               elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+                        where, T_NAME(lock), T_ID(lock),
                         (int) lock->exclusive, lock->shared, lock->head,
                         (int) lock->releaseOK);
 }
 
 inline static void
-LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
+LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg)
 {
        if (Trace_lwlocks)
-               elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
+               elog(LOG, "%s(%s %d): %s", where, name, index, msg);
 }
 #else                                                  /* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b,c)
-#define LOG_LWDEBUG(a,b,c)
+#define PRINT_LWDEBUG(a,b)
+#define LOG_LWDEBUG(a,b,c,d)
 #endif   /* LOCK_DEBUG */
 
 #ifdef LWLOCK_STATS
 
 static void init_lwlock_stats(void);
 static void print_lwlock_stats(int code, Datum arg);
+static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
 
 static void
 init_lwlock_stats(void)
 {
-       int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
-       int                     numLocks = LWLockCounter[1];
+       HASHCTL         ctl;
+
+       if (lwlock_stats_htab != NULL)
+       {
+               hash_destroy(lwlock_stats_htab);
+               lwlock_stats_htab = NULL;
+       }
 
-       sh_acquire_counts = calloc(numLocks, sizeof(int));
-       ex_acquire_counts = calloc(numLocks, sizeof(int));
-       spin_delay_counts = calloc(numLocks, sizeof(int));
-       block_counts = calloc(numLocks, sizeof(int));
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(lwlock_stats_key);
+       ctl.entrysize = sizeof(lwlock_stats);
+       ctl.hash = tag_hash;
+       lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
+                                                                       HASH_ELEM | HASH_FUNCTION);
        counts_for_pid = MyProcPid;
        on_shmem_exit(print_lwlock_stats, 0);
 }
@@ -144,30 +151,58 @@ init_lwlock_stats(void)
 static void
 print_lwlock_stats(int code, Datum arg)
 {
-       int                     i;
-       int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
-       int                     numLocks = LWLockCounter[1];
+       HASH_SEQ_STATUS scan;
+       lwlock_stats *lwstats;
+
+       hash_seq_init(&scan, lwlock_stats_htab);
 
        /* Grab an LWLock to keep different backends from mixing reports */
-       LWLockAcquire(0, LW_EXCLUSIVE);
+       LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
 
-       for (i = 0; i < numLocks; i++)
+       while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
        {
-               if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i])
-                       fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n",
-                                       MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
-                                       block_counts[i], spin_delay_counts[i]);
+               fprintf(stderr,
+                               "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n",
+                               MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
+                               lwstats->key.instance, lwstats->sh_acquire_count,
+                               lwstats->ex_acquire_count, lwstats->block_count,
+                               lwstats->spin_delay_count);
        }
 
-       LWLockRelease(0);
+       LWLockRelease(&MainLWLockArray[0].lock);
+}
+
+static lwlock_stats *
+get_lwlock_stats_entry(LWLock *lock)
+{
+       lwlock_stats_key        key;
+       lwlock_stats *lwstats;
+       bool    found;
+
+       /* Set up local count state first time through in a given process */
+       if (counts_for_pid != MyProcPid)
+               init_lwlock_stats();
+
+       /* Fetch or create the entry. */
+       key.tranche = lock->tranche;
+       key.instance = T_ID(lock);
+       lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
+       if (!found)
+       {
+               lwstats->sh_acquire_count = 0;
+               lwstats->ex_acquire_count = 0;
+               lwstats->block_count = 0;
+               lwstats->spin_delay_count = 0;
+       }
+       return lwstats;
 }
 #endif   /* LWLOCK_STATS */
 
 
 /*
- * Compute number of LWLocks to allocate.
+ * Compute number of LWLocks to allocate in the main array.
  */
-int
+static int
 NumLWLocks(void)
 {
        int                     numLocks;
@@ -180,7 +215,7 @@ NumLWLocks(void)
         */
 
        /* Predefined LWLocks */
-       numLocks = (int) NumFixedLWLocks;
+       numLocks = NUM_FIXED_LWLOCKS;
 
        /* bufmgr.c needs two for each shared buffer */
        numLocks += 2 * NBuffers;
@@ -248,56 +283,67 @@ LWLockShmemSize(void)
        size = mul_size(numLocks, sizeof(LWLockPadded));
 
        /* Space for dynamic allocation counter, plus room for alignment. */
-       size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
+       size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE);
 
        return size;
 }
 
 
 /*
- * Allocate shmem space for LWLocks and initialize the locks.
+ * Allocate shmem space for the main LWLock array and initialize it.  We also
+ * register the main tranch here.
  */
 void
 CreateLWLocks(void)
 {
-       int                     numLocks = NumLWLocks();
-       Size            spaceLocks = LWLockShmemSize();
-       LWLockPadded *lock;
-       int                *LWLockCounter;
-       char       *ptr;
-       int                     id;
+       if (!IsUnderPostmaster)
+       {
+               int                     numLocks = NumLWLocks();
+               Size            spaceLocks = LWLockShmemSize();
+               LWLockPadded *lock;
+               int                *LWLockCounter;
+               char       *ptr;
+               int                     id;
 
-       /* Allocate space */
-       ptr = (char *) ShmemAlloc(spaceLocks);
+               /* Allocate space */
+               ptr = (char *) ShmemAlloc(spaceLocks);
 
-       /* Leave room for dynamic allocation counter */
-       ptr += 2 * sizeof(int);
+               /* Leave room for dynamic allocation of locks and tranches */
+               ptr += 3 * sizeof(int);
 
-       /* Ensure desired alignment of LWLock array */
-       ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
+               /* Ensure desired alignment of LWLock array */
+               ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
 
-       LWLockArray = (LWLockPadded *) ptr;
+               MainLWLockArray = (LWLockPadded *) ptr;
 
-       /*
-        * Initialize all LWLocks to "unlocked" state
-        */
-       for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
+               /* Initialize all LWLocks in main array */
+               for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++)
+                       LWLockInitialize(&lock->lock, 0);
+
+               /*
+                * Initialize the dynamic-allocation counters, which are stored just
+                * before the first LWLock.  LWLockCounter[0] is the allocation
+                * counter for lwlocks, LWLockCounter[1] is the maximum number that
+                * can be allocated from the main array, and LWLockCounter[2] is the
+                *  allocation counter for tranches.
+                */
+               LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
+               LWLockCounter[0] = NUM_FIXED_LWLOCKS;
+               LWLockCounter[1] = numLocks;
+               LWLockCounter[2] = 1;                   /* 0 is the main array */
+       }
+
+       if (LWLockTrancheArray == NULL)
        {
-               SpinLockInit(&lock->lock.mutex);
-               lock->lock.releaseOK = true;
-               lock->lock.exclusive = 0;
-               lock->lock.shared = 0;
-               lock->lock.head = NULL;
-               lock->lock.tail = NULL;
+               LWLockTranchesAllocated = 16;
+               LWLockTrancheArray = MemoryContextAlloc(TopMemoryContext,
+                       LWLockTranchesAllocated * sizeof(LWLockTranche *));
        }
 
-       /*
-        * Initialize the dynamic-allocation counter, which is stored just before
-        * the first LWLock.
-        */
-       LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
-       LWLockCounter[0] = (int) NumFixedLWLocks;
-       LWLockCounter[1] = numLocks;
+       MainLWLockTranche.name = "main";
+       MainLWLockTranche.array_base = MainLWLockArray;
+       MainLWLockTranche.array_stride = sizeof(LWLockPadded);
+       LWLockRegisterTranche(0, &MainLWLockTranche);
 }
 
 
@@ -309,26 +355,86 @@ CreateLWLocks(void)
  * startup, but it is needed if any user-defined code tries to allocate
  * LWLocks after startup.
  */
-LWLockId
+LWLock *
 LWLockAssign(void)
 {
-       LWLockId        result;
+       LWLock     *result;
 
        /* use volatile pointer to prevent code rearrangement */
        volatile int *LWLockCounter;
 
-       LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
+       LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
        SpinLockAcquire(ShmemLock);
        if (LWLockCounter[0] >= LWLockCounter[1])
        {
                SpinLockRelease(ShmemLock);
-               elog(ERROR, "no more LWLockIds available");
+               elog(ERROR, "no more LWLocks available");
        }
-       result = (LWLockId) (LWLockCounter[0]++);
+       result = &MainLWLockArray[LWLockCounter[0]++].lock;
        SpinLockRelease(ShmemLock);
        return result;
 }
 
+/*
+ * Allocate a new tranche ID.
+ */
+int
+LWLockNewTrancheId(void)
+{
+       int                     result;
+
+       /* use volatile pointer to prevent code rearrangement */
+       volatile int *LWLockCounter;
+
+       LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
+       SpinLockAcquire(ShmemLock);
+       result = LWLockCounter[2]++;
+       SpinLockRelease(ShmemLock);
+
+       return result;
+}
+
+/*
+ * Register a tranche ID in the lookup table for the current process.  This
+ * routine will save a pointer to the tranche object passed as an argument,
+ * so that object should be allocated in a backend-lifetime context
+ * (TopMemoryContext, static variable, or similar).
+ */
+void
+LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
+{
+       Assert(LWLockTrancheArray != NULL);
+
+       if (tranche_id >= LWLockTranchesAllocated)
+       {
+               int             i = LWLockTranchesAllocated;
+
+               while (i < tranche_id)
+                       i *= 2;
+
+               LWLockTrancheArray = repalloc(LWLockTrancheArray,
+                                                                         i * sizeof(LWLockTranche *));
+               LWLockTranchesAllocated = i;
+       }
+
+       LWLockTrancheArray[tranche_id] = tranche;
+}
+
+/*
+ * LWLockInitialize - initialize a new lwlock; it's initially unlocked
+ */
+void
+LWLockInitialize(LWLock *lock, int tranche_id)
+{
+       SpinLockInit(&lock->mutex);
+       lock->releaseOK = true;
+       lock->exclusive = 0;
+       lock->shared = 0;
+       lock->tranche = tranche_id;
+       lock->head = NULL;
+       lock->tail = NULL;
+}
+
 
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -338,24 +444,26 @@ LWLockAssign(void)
  * Side effect: cancel/die interrupts are held off until lock release.
  */
 void
-LWLockAcquire(LWLockId lockid, LWLockMode mode)
+LWLockAcquire(LWLock *l, LWLockMode mode)
 {
-       volatile LWLock *lock = &(LWLockArray[lockid].lock);
+       volatile LWLock *lock = l;
        PGPROC     *proc = MyProc;
        bool            retry = false;
        int                     extraWaits = 0;
+#ifdef LWLOCK_STATS
+       lwlock_stats *lwstats;
+#endif
 
-       PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
+       PRINT_LWDEBUG("LWLockAcquire", lock);
 
 #ifdef LWLOCK_STATS
-       /* Set up local count state first time through in a given process */
-       if (counts_for_pid != MyProcPid)
-               init_lwlock_stats();
+       lwstats = get_lwlock_stats_entry(l);
+
        /* Count lock acquisition attempts */
        if (mode == LW_EXCLUSIVE)
-               ex_acquire_counts[lockid]++;
+               lwstats->ex_acquire_count++;
        else
-               sh_acquire_counts[lockid]++;
+               lwstats->sh_acquire_count++;
 #endif   /* LWLOCK_STATS */
 
        /*
@@ -398,7 +506,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 
                /* Acquire mutex.  Time spent holding mutex should be short! */
 #ifdef LWLOCK_STATS
-               spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
+               lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
 #else
                SpinLockAcquire(&lock->mutex);
 #endif
@@ -466,13 +574,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
                 * so that the lock manager or signal manager will see the received
                 * signal when it next waits.
                 */
-               LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
+               LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "waiting");
 
 #ifdef LWLOCK_STATS
-               block_counts[lockid]++;
+               lwstats->block_count++;
 #endif
 
-               TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+               TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
 
                for (;;)
                {
@@ -483,9 +591,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
                        extraWaits++;
                }
 
-               TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+               TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
 
-               LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
+               LOG_LWDEBUG("LWLockAcquire", T_NAME(l), T_ID(l), "awakened");
 
                /* Now loop back and try to acquire lock again. */
                retry = true;
@@ -494,10 +602,10 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
        /* We are done updating shared state of the lock itself. */
        SpinLockRelease(&lock->mutex);
 
-       TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
+       TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
 
        /* Add lock to list of locks held by this backend */
-       held_lwlocks[num_held_lwlocks++] = lockid;
+       held_lwlocks[num_held_lwlocks++] = l;
 
        /*
         * Fix the process wait semaphore's count for any absorbed wakeups.
@@ -514,12 +622,12 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
  * If successful, cancel/die interrupts are held off until lock release.
  */
 bool
-LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
+LWLockConditionalAcquire(LWLock *l, LWLockMode mode)
 {
-       volatile LWLock *lock = &(LWLockArray[lockid].lock);
+       volatile LWLock *lock = l;
        bool            mustwait;
 
-       PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
+       PRINT_LWDEBUG("LWLockConditionalAcquire", lock);
 
        /* Ensure we will have room to remember the lock */
        if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -564,14 +672,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
        {
                /* Failed to get lock, so release interrupt holdoff */
                RESUME_INTERRUPTS();
-               LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
-               TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
+               LOG_LWDEBUG("LWLockConditionalAcquire", T_NAME(l), T_ID(l), "failed");
+               TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(l), T_ID(l), mode);
        }
        else
        {
                /* Add lock to list of locks held by this backend */
-               held_lwlocks[num_held_lwlocks++] = lockid;
-               TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
+               held_lwlocks[num_held_lwlocks++] = l;
+               TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(l), T_ID(l), mode);
        }
 
        return !mustwait;
@@ -592,19 +700,20 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
  * wake up, observe that their records have already been flushed, and return.
  */
 bool
-LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
+LWLockAcquireOrWait(LWLock *l, LWLockMode mode)
 {
-       volatile LWLock *lock = &(LWLockArray[lockid].lock);
+       volatile LWLock *lock = l;
        PGPROC     *proc = MyProc;
        bool            mustwait;
        int                     extraWaits = 0;
+#ifdef LWLOCK_STATS
+       lwlock_stats *lwstats;
+#endif
 
-       PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
+       PRINT_LWDEBUG("LWLockAcquireOrWait", lock);
 
 #ifdef LWLOCK_STATS
-       /* Set up local count state first time through in a given process */
-       if (counts_for_pid != MyProcPid)
-               init_lwlock_stats();
+       lwstats = get_lwlock_stats_entry(l);
 #endif
 
        /* Ensure we will have room to remember the lock */
@@ -671,13 +780,13 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
                 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
                 * wakups, because we share the semaphore with ProcWaitForSignal.
                 */
-               LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
+               LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "waiting");
 
 #ifdef LWLOCK_STATS
-               block_counts[lockid]++;
+               lwstats->block_count++;
 #endif
 
-               TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+               TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
 
                for (;;)
                {
@@ -688,9 +797,9 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
                        extraWaits++;
                }
 
-               TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+               TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
 
-               LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
+               LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "awakened");
        }
        else
        {
@@ -708,14 +817,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
        {
                /* Failed to get lock, so release interrupt holdoff */
                RESUME_INTERRUPTS();
-               LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
-               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+               LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(l), T_ID(l), "failed");
+               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(T_NAME(l), T_ID(l), mode);
        }
        else
        {
                /* Add lock to list of locks held by this backend */
-               held_lwlocks[num_held_lwlocks++] = lockid;
-               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+               held_lwlocks[num_held_lwlocks++] = l;
+               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(l), T_ID(l), mode);
        }
 
        return !mustwait;
@@ -725,14 +834,14 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
  * LWLockRelease - release a previously acquired lock
  */
 void
-LWLockRelease(LWLockId lockid)
+LWLockRelease(LWLock *l)
 {
-       volatile LWLock *lock = &(LWLockArray[lockid].lock);
+       volatile LWLock *lock = l;
        PGPROC     *head;
        PGPROC     *proc;
        int                     i;
 
-       PRINT_LWDEBUG("LWLockRelease", lockid, lock);
+       PRINT_LWDEBUG("LWLockRelease", lock);
 
        /*
         * Remove lock from list of locks held.  Usually, but not always, it will
@@ -740,11 +849,11 @@ LWLockRelease(LWLockId lockid)
         */
        for (i = num_held_lwlocks; --i >= 0;)
        {
-               if (lockid == held_lwlocks[i])
+               if (l == held_lwlocks[i])
                        break;
        }
        if (i < 0)
-               elog(ERROR, "lock %d is not held", (int) lockid);
+               elog(ERROR, "lock %s %d is not held", T_NAME(l), T_ID(l));
        num_held_lwlocks--;
        for (; i < num_held_lwlocks; i++)
                held_lwlocks[i] = held_lwlocks[i + 1];
@@ -824,14 +933,14 @@ LWLockRelease(LWLockId lockid)
        /* We are done updating shared state of the lock itself. */
        SpinLockRelease(&lock->mutex);
 
-       TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
+       TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(l), T_ID(l));
 
        /*
         * Awaken any waiters I removed from the queue.
         */
        while (head != NULL)
        {
-               LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
+               LOG_LWDEBUG("LWLockRelease", T_NAME(l), T_ID(l), "release waiter");
                proc = head;
                head = proc->lwWaitLink;
                proc->lwWaitLink = NULL;
@@ -874,13 +983,13 @@ LWLockReleaseAll(void)
  * lock is held shared or exclusive.
  */
 bool
-LWLockHeldByMe(LWLockId lockid)
+LWLockHeldByMe(LWLock *l)
 {
        int                     i;
 
        for (i = 0; i < num_held_lwlocks; i++)
        {
-               if (held_lwlocks[i] == lockid)
+               if (held_lwlocks[i] == l)
                        return true;
        }
        return false;
index e7f44cce841e2587e59b1c807bea6a7ae2b04e8d..67000720275050a7928287050d8783d2db67e4b6 100644 (file)
 #define PredicateLockHashPartition(hashcode) \
        ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
 #define PredicateLockHashPartitionLock(hashcode) \
-       ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
+       (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
+               PredicateLockHashPartition(hashcode)].lock)
+#define PredicateLockHashPartitionLockByIndex(i) \
+       (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
 
 #define NPREDICATELOCKTARGETENTS() \
        mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
@@ -383,7 +386,7 @@ static SHM_QUEUE *FinishedSerializableTransactions;
  */
 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0};
 static uint32 ScratchTargetTagHash;
-static int     ScratchPartitionLock;
+static LWLock *ScratchPartitionLock;
 
 /*
  * The local hash table used to determine when to combine multiple fine-
@@ -1398,7 +1401,7 @@ GetPredicateLockStatusData(void)
         * in ascending order, then SerializableXactHashLock.
         */
        for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+               LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
        LWLockAcquire(SerializableXactHashLock, LW_SHARED);
 
        /* Get number of locks and allocate appropriately-sized arrays. */
@@ -1427,7 +1430,7 @@ GetPredicateLockStatusData(void)
        /* Release locks in reverse order */
        LWLockRelease(SerializableXactHashLock);
        for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
-               LWLockRelease(FirstPredicateLockMgrLock + i);
+               LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
 
        return data;
 }
@@ -1856,7 +1859,7 @@ PageIsPredicateLocked(Relation relation, BlockNumber blkno)
 {
        PREDICATELOCKTARGETTAG targettag;
        uint32          targettaghash;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        PREDICATELOCKTARGET *target;
 
        SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
@@ -2089,7 +2092,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
                if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
                {
                        uint32          oldtargettaghash;
-                       LWLockId        partitionLock;
+                       LWLock      *partitionLock;
                        PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
 
                        oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
@@ -2301,7 +2304,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
        PREDICATELOCKTARGET *target;
        PREDICATELOCKTAG locktag;
        PREDICATELOCK *lock;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        bool            found;
 
        partitionLock = PredicateLockHashPartitionLock(targettaghash);
@@ -2599,10 +2602,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
                                                                  bool removeOld)
 {
        uint32          oldtargettaghash;
-       LWLockId        oldpartitionLock;
+       LWLock     *oldpartitionLock;
        PREDICATELOCKTARGET *oldtarget;
        uint32          newtargettaghash;
-       LWLockId        newpartitionLock;
+       LWLock     *newpartitionLock;
        bool            found;
        bool            outOfShmem = false;
 
@@ -2858,7 +2861,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
        /* Acquire locks on all lock partitions */
        LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
        for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
+               LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
        LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
        /*
@@ -2996,7 +2999,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer)
        /* Release locks in reverse order */
        LWLockRelease(SerializableXactHashLock);
        for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
-               LWLockRelease(FirstPredicateLockMgrLock + i);
+               LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
        LWLockRelease(SerializablePredicateLockListLock);
 }
 
@@ -3611,7 +3614,7 @@ ClearOldPredicateLocks(void)
                        PREDICATELOCKTARGET *target;
                        PREDICATELOCKTARGETTAG targettag;
                        uint32          targettaghash;
-                       LWLockId        partitionLock;
+                       LWLock     *partitionLock;
 
                        tag = predlock->tag;
                        target = tag.myTarget;
@@ -3690,7 +3693,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
                PREDICATELOCKTARGET *target;
                PREDICATELOCKTARGETTAG targettag;
                uint32          targettaghash;
-               LWLockId        partitionLock;
+               LWLock     *partitionLock;
 
                nextpredlock = (PREDICATELOCK *)
                        SHMQueueNext(&(sxact->predicateLocks),
@@ -4068,7 +4071,7 @@ static void
 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
 {
        uint32          targettaghash;
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        PREDICATELOCKTARGET *target;
        PREDICATELOCK *predlock;
        PREDICATELOCK *mypredlock = NULL;
@@ -4360,7 +4363,7 @@ CheckTableForSerializableConflictIn(Relation relation)
 
        LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
        for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+               LWLockAcquire(PredicateLockHashPartitionLockByIndex(i), LW_SHARED);
        LWLockAcquire(SerializableXactHashLock, LW_SHARED);
 
        /* Scan through target list */
@@ -4407,7 +4410,7 @@ CheckTableForSerializableConflictIn(Relation relation)
        /* Release locks in reverse order */
        LWLockRelease(SerializableXactHashLock);
        for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
-               LWLockRelease(FirstPredicateLockMgrLock + i);
+               LWLockRelease(PredicateLockHashPartitionLockByIndex(i));
        LWLockRelease(SerializablePredicateLockListLock);
 }
 
index ee6c24cea7debab654bf3857a2fbda0f94100a8a..1a683b83361029c1783c115586698ca0aeac3902 100644 (file)
@@ -189,7 +189,8 @@ InitProcGlobal(void)
         */
        procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
        ProcGlobal->allProcs = procs;
-       ProcGlobal->allProcCount = TotalProcs;
+       /* XXX allProcCount isn't really all of them; it excludes prepared xacts */
+       ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;
        if (!procs)
                ereport(FATAL,
                                (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -663,7 +664,7 @@ IsWaitingForLock(void)
 void
 LockErrorCleanup(void)
 {
-       LWLockId        partitionLock;
+       LWLock     *partitionLock;
        DisableTimeoutParams timeouts[2];
 
        AbortStrongLockAcquire();
@@ -942,7 +943,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
        LOCK       *lock = locallock->lock;
        PROCLOCK   *proclock = locallock->proclock;
        uint32          hashcode = locallock->hashcode;
-       LWLockId        partitionLock = LockHashPartitionLock(hashcode);
+       LWLock     *partitionLock = LockHashPartitionLock(hashcode);
        PROC_QUEUE *waitQueue = &(lock->waitProcs);
        LOCKMASK        myHeldLocks = MyProc->heldLocks;
        bool            early_deadlock = false;
@@ -1440,7 +1441,7 @@ CheckDeadLock(void)
         * interrupts.
         */
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               LWLockAcquire(FirstLockMgrLock + i, LW_EXCLUSIVE);
+               LWLockAcquire(LockHashPartitionLockByIndex(i), LW_EXCLUSIVE);
 
        /*
         * Check to see if we've been awoken by anyone in the interim.
@@ -1522,7 +1523,7 @@ CheckDeadLock(void)
         */
 check_done:
        for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
-               LWLockRelease(FirstLockMgrLock + i);
+               LWLockRelease(LockHashPartitionLockByIndex(i));
 }
 
 
index 17c8e15f3589cead875b2cd5efe961b47d4f7450..804ba6ae62a66a8d430b3aa4db81a442fd503255 100644 (file)
@@ -15,7 +15,6 @@
  * in probe definitions, as they cause compilation errors on Mac OS X 10.5.
  */
 #define LocalTransactionId unsigned int
-#define LWLockId int
 #define LWLockMode int
 #define LOCKMODE int
 #define BlockNumber unsigned int
@@ -29,14 +28,14 @@ provider postgresql {
        probe transaction__commit(LocalTransactionId);
        probe transaction__abort(LocalTransactionId);
 
-       probe lwlock__acquire(LWLockId, LWLockMode);
-       probe lwlock__release(LWLockId);
-       probe lwlock__wait__start(LWLockId, LWLockMode);
-       probe lwlock__wait__done(LWLockId, LWLockMode);
-       probe lwlock__condacquire(LWLockId, LWLockMode);
-       probe lwlock__condacquire__fail(LWLockId, LWLockMode);
-       probe lwlock__wait__until__free(LWLockId, LWLockMode);
-       probe lwlock__wait__until__free__fail(LWLockId, LWLockMode);
+       probe lwlock__acquire(const char *, int, LWLockMode);
+       probe lwlock__release(const char *, int);
+       probe lwlock__wait__start(const char *, int, LWLockMode);
+       probe lwlock__wait__done(const char *, int, LWLockMode);
+       probe lwlock__condacquire(const char *, int, LWLockMode);
+       probe lwlock__condacquire__fail(const char *, int, LWLockMode);
+       probe lwlock__wait__until__free(const char *, int, LWLockMode);
+       probe lwlock__wait__until__free__fail(const char *, int, LWLockMode);
 
        probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
        probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
index 4ec11b1ec2db31a8adb493ca59bc291d244a9717..c7b4186ffa9aaa846dd042c0744255a7b1d2b806 100644 (file)
@@ -55,7 +55,7 @@ typedef enum
  */
 typedef struct SlruSharedData
 {
-       LWLockId        ControlLock;
+       LWLock     *ControlLock;
 
        /* Number of buffers managed by this SLRU structure */
        int                     num_slots;
@@ -69,7 +69,7 @@ typedef struct SlruSharedData
        bool       *page_dirty;
        int                *page_number;
        int                *page_lru_count;
-       LWLockId   *buffer_locks;
+       LWLock    **buffer_locks;
 
        /*
         * Optional array of WAL flush LSNs associated with entries in the SLRU
@@ -136,7 +136,7 @@ typedef SlruCtlData *SlruCtl;
 
 extern Size SimpleLruShmemSize(int nslots, int nlsns);
 extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
-                         LWLockId ctllock, const char *subdir);
+                         LWLock *ctllock, const char *subdir);
 extern int     SimpleLruZeroPage(SlruCtl ctl, int pageno);
 extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
                                  TransactionId xid);
index 457390fc879a1be122e745ddaa2b9a3e9485dccc..93a0030c3ee9b97f79f1f9190372f92edccce7f3 100644 (file)
@@ -104,7 +104,10 @@ typedef struct buftag
 #define BufTableHashPartition(hashcode) \
        ((hashcode) % NUM_BUFFER_PARTITIONS)
 #define BufMappingPartitionLock(hashcode) \
-       ((LWLockId) (FirstBufMappingLock + BufTableHashPartition(hashcode)))
+       (&MainLWLockArray[BUFFER_MAPPING_LWLOCK_OFFSET + \
+               BufTableHashPartition(hashcode)].lock)
+#define BufMappingPartitionLockByIndex(i) \
+       (&MainLWLockArray[BUFFER_MAPPING_LWLOCK_OFFSET + (i)].lock)
 
 /*
  *     BufferDesc -- shared descriptor/state data for a single shared buffer.
@@ -144,8 +147,8 @@ typedef struct sbufdesc
        int                     buf_id;                 /* buffer's index number (from 0) */
        int                     freeNext;               /* link in freelist chain */
 
-       LWLockId        io_in_progress_lock;    /* to wait for I/O to complete */
-       LWLockId        content_lock;   /* to lock access to buffer contents */
+       LWLock     *io_in_progress_lock;        /* to wait for I/O to complete */
+       LWLock     *content_lock;       /* to lock access to buffer contents */
 } BufferDesc;
 
 #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
index f6a2029e2aa4c4e2252eafe02ee318db9cab4031..ceeab9fc8a443262053631c24ef1ef62e56e3e89 100644 (file)
@@ -483,8 +483,10 @@ typedef enum
 #define LockHashPartition(hashcode) \
        ((hashcode) % NUM_LOCK_PARTITIONS)
 #define LockHashPartitionLock(hashcode) \
-       ((LWLockId) (FirstLockMgrLock + LockHashPartition(hashcode)))
-
+       (&MainLWLockArray[LOCK_MANAGER_LWLOCK_OFFSET + \
+               LockHashPartition(hashcode)].lock)
+#define LockHashPartitionLockByIndex(i) \
+       (&MainLWLockArray[LOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
 
 /*
  * function prototypes
index efdb8b5faf9342b25d1a59edb54ff046abbe6843..45079262740f7c8d93952502ccd4b841605e929e 100644 (file)
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "storage/s_lock.h"
+
+struct PGPROC;
+
+/*
+ * It's occasionally necessary to identify a particular LWLock "by name"; e.g.
+ * because we wish to report the lock to dtrace.  We could store a name or
+ * other identifying information in the lock itself, but since it's common
+ * to have many nearly-identical locks (e.g. one per buffer) this would end
+ * up wasting significant amounts of memory.  Instead, each lwlock stores a
+ * tranche ID which tells us which array it's part of.  Based on that, we can
+ * figure out where the lwlock lies within the array using the data structure
+ * shown below; the lock is then identified based on the tranche name and
+ * computed array index.  We need the array stride because the array might not
+ * be an array of lwlocks, but rather some larger data structure that includes
+ * one or more lwlocks per element.
+ */
+typedef struct LWLockTranche
+{
+       const char *name;
+       void       *array_base;
+       Size            array_stride;
+} LWLockTranche;
+
+/*
+ * Code outside of lwlock.c should not manipulate the contents of this
+ * structure directly, but we have to declare it here to allow LWLocks to be
+ * incorporated into other data structures.
+ */
+typedef struct LWLock
+{
+       slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
+       bool            releaseOK;              /* T if ok to release waiters */
+       char            exclusive;              /* # of exclusive holders (0 or 1) */
+       int                     shared;                 /* # of shared holders (0..MaxBackends) */
+       int                     tranche;                /* tranche ID */
+       struct PGPROC *head;                    /* head of list of waiting PGPROCs */
+       struct PGPROC *tail;                    /* tail of list of waiting PGPROCs */
+       /* tail is undefined when head is NULL */
+} LWLock;
+
+/*
+ * Prior to PostgreSQL 9.4, every lightweight lock in the system was stored
+ * in a single array.  For convenience and for compatibility with past
+ * releases, we still have a main array, but it's now also permissible to
+ * store LWLocks elsewhere in the main shared memory segment or in a dynamic
+ * shared memory segment.  In the main array, we force the array stride to
+ * be a power of 2, which saves a few cycles in indexing, but more importantly
+ * also ensures that individual LWLocks don't cross cache line boundaries.
+ * This reduces cache contention problems, especially on AMD Opterons.
+ * (Of course, we have to also ensure that the array start address is suitably
+ * aligned.)
+ *
+ * Even on a 32-bit platform, an lwlock will be more than 16 bytes, because
+ * it contains 2 integers and 2 pointers, plus other stuff.  It should fit
+ * into 32 bytes, though, unless slock_t is really big.  On a 64-bit platform,
+ * it should fit into 32 bytes unless slock_t is larger than 4 bytes.  We
+ * allow for that just in case.
+ */
+#define LWLOCK_PADDED_SIZE     (sizeof(LWLock) <= 32 ? 32 : 64)
+
+typedef union LWLockPadded
+{
+       LWLock          lock;
+       char            pad[LWLOCK_PADDED_SIZE];
+} LWLockPadded;
+extern LWLockPadded *MainLWLockArray;
+
+/*
+ * Some commonly-used locks have predefined positions within MainLWLockArray;
+ * defining macros here makes it much easier to keep track of these.  If you
+ * add a lock, add it to the end to avoid renumbering the existing locks;
+ * if you remove a lock, consider leaving a gap in the numbering sequence for
+ * the benefit of DTrace and other external debugging scripts.
+ */
+#define BufFreelistLock                                (&MainLWLockArray[0].lock)
+#define ShmemIndexLock                         (&MainLWLockArray[1].lock)
+#define OidGenLock                                     (&MainLWLockArray[2].lock)
+#define XidGenLock                                     (&MainLWLockArray[3].lock)
+#define ProcArrayLock                          (&MainLWLockArray[4].lock)
+#define SInvalReadLock                         (&MainLWLockArray[5].lock)
+#define SInvalWriteLock                                (&MainLWLockArray[6].lock)
+#define WALBufMappingLock                      (&MainLWLockArray[7].lock)
+#define WALWriteLock                           (&MainLWLockArray[8].lock)
+#define ControlFileLock                                (&MainLWLockArray[9].lock)
+#define CheckpointLock                         (&MainLWLockArray[10].lock)
+#define CLogControlLock                                (&MainLWLockArray[11].lock)
+#define SubtransControlLock                    (&MainLWLockArray[12].lock)
+#define MultiXactGenLock                       (&MainLWLockArray[13].lock)
+#define MultiXactOffsetControlLock     (&MainLWLockArray[14].lock)
+#define MultiXactMemberControlLock     (&MainLWLockArray[15].lock)
+#define RelCacheInitLock                       (&MainLWLockArray[16].lock)
+#define CheckpointerCommLock           (&MainLWLockArray[17].lock)
+#define TwoPhaseStateLock                      (&MainLWLockArray[18].lock)
+#define TablespaceCreateLock           (&MainLWLockArray[19].lock)
+#define BtreeVacuumLock                                (&MainLWLockArray[20].lock)
+#define AddinShmemInitLock                     (&MainLWLockArray[21].lock)
+#define AutovacuumLock                         (&MainLWLockArray[22].lock)
+#define AutovacuumScheduleLock         (&MainLWLockArray[23].lock)
+#define SyncScanLock                           (&MainLWLockArray[24].lock)
+#define RelationMappingLock                    (&MainLWLockArray[25].lock)
+#define AsyncCtlLock                           (&MainLWLockArray[26].lock)
+#define AsyncQueueLock                         (&MainLWLockArray[27].lock)
+#define SerializableXactHashLock       (&MainLWLockArray[28].lock)
+#define SerializableFinishedListLock           (&MainLWLockArray[29].lock)
+#define SerializablePredicateLockListLock      (&MainLWLockArray[30].lock)
+#define OldSerXidLock                          (&MainLWLockArray[31].lock)
+#define SyncRepLock                                    (&MainLWLockArray[32].lock)
+#define BackgroundWorkerLock           (&MainLWLockArray[33].lock)
+#define DynamicSharedMemoryControlLock         (&MainLWLockArray[34].lock)
+#define AutoFileLock                           (&MainLWLockArray[35].lock)
+#define NUM_INDIVIDUAL_LWLOCKS         36
+
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
- * here, but we need them to set up enum LWLockId correctly, and having
- * this file include lock.h or bufmgr.h would be backwards.
+ * here, but we need them to figure out offsets within MainLWLockArray, and
+ * having this file include lock.h or bufmgr.h would be backwards.
  */
 
 /* Number of partitions of the shared buffer mapping hashtable */
 #define LOG2_NUM_PREDICATELOCK_PARTITIONS  4
 #define NUM_PREDICATELOCK_PARTITIONS  (1 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
 
-/*
- * We have a number of predefined LWLocks, plus a bunch of LWLocks that are
- * dynamically assigned (e.g., for shared buffers).  The LWLock structures
- * live in shared memory (since they contain shared data) and are identified
- * by values of this enumerated type.  We abuse the notion of an enum somewhat
- * by allowing values not listed in the enum declaration to be assigned.
- * The extra value MaxDynamicLWLock is there to keep the compiler from
- * deciding that the enum can be represented as char or short ...
- *
- * If you remove a lock, please replace it with a placeholder. This retains
- * the lock numbering, which is helpful for DTrace and other external
- * debugging scripts.
- */
-typedef enum LWLockId
-{
-       BufFreelistLock,
-       ShmemIndexLock,
-       OidGenLock,
-       XidGenLock,
-       ProcArrayLock,
-       SInvalReadLock,
-       SInvalWriteLock,
-       WALBufMappingLock,
-       WALWriteLock,
-       ControlFileLock,
-       CheckpointLock,
-       CLogControlLock,
-       SubtransControlLock,
-       MultiXactGenLock,
-       MultiXactOffsetControlLock,
-       MultiXactMemberControlLock,
-       RelCacheInitLock,
-       CheckpointerCommLock,
-       TwoPhaseStateLock,
-       TablespaceCreateLock,
-       BtreeVacuumLock,
-       AddinShmemInitLock,
-       AutovacuumLock,
-       AutovacuumScheduleLock,
-       SyncScanLock,
-       RelationMappingLock,
-       AsyncCtlLock,
-       AsyncQueueLock,
-       SerializableXactHashLock,
-       SerializableFinishedListLock,
-       SerializablePredicateLockListLock,
-       OldSerXidLock,
-       SyncRepLock,
-       BackgroundWorkerLock,
-       DynamicSharedMemoryControlLock,
-       AutoFileLock,
-       /* Individual lock IDs end here */
-       FirstBufMappingLock,
-       FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
-       FirstPredicateLockMgrLock = FirstLockMgrLock + NUM_LOCK_PARTITIONS,
-
-       /* must be last except for MaxDynamicLWLock: */
-       NumFixedLWLocks = FirstPredicateLockMgrLock + NUM_PREDICATELOCK_PARTITIONS,
-
-       MaxDynamicLWLock = 1000000000
-} LWLockId;
-
+/* Offsets for various chunks of preallocated lwlocks. */
+#define BUFFER_MAPPING_LWLOCK_OFFSET   NUM_INDIVIDUAL_LWLOCKS
+#define LOCK_MANAGER_LWLOCK_OFFSET             \
+       (BUFFER_MAPPING_LWLOCK_OFFSET + NUM_BUFFER_PARTITIONS)
+#define PREDICATELOCK_MANAGER_LWLOCK_OFFSET    \
+       (NUM_INDIVIDUAL_LWLOCKS + NUM_LOCK_PARTITIONS)
+#define NUM_FIXED_LWLOCKS \
+       (PREDICATELOCK_MANAGER_LWLOCK_OFFSET + NUM_PREDICATELOCK_PARTITIONS)
 
 typedef enum LWLockMode
 {
@@ -108,18 +167,47 @@ typedef enum LWLockMode
 extern bool Trace_lwlocks;
 #endif
 
-extern LWLockId LWLockAssign(void);
-extern void LWLockAcquire(LWLockId lockid, LWLockMode mode);
-extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
-extern bool LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode);
-extern void LWLockRelease(LWLockId lockid);
+extern void LWLockAcquire(LWLock *lock, LWLockMode mode);
+extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
+extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
+extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseAll(void);
-extern bool LWLockHeldByMe(LWLockId lockid);
+extern bool LWLockHeldByMe(LWLock *lock);
 
-extern int     NumLWLocks(void);
 extern Size LWLockShmemSize(void);
 extern void CreateLWLocks(void);
 
+/*
+ * The traditional method for obtaining an lwlock for use by an extension is
+ * to call RequestAddinLWLocks() during postmaster startup; this will reserve
+ * space for the indicated number of locks in MainLWLockArray.  Subsequently,
+ * a lock can be allocated using LWLockAssign.
+ */
 extern void RequestAddinLWLocks(int n);
+extern LWLock *LWLockAssign(void);
+
+/*
+ * There is another, more flexible method of obtaining lwlocks. First, call
+ * LWLockNewTrancheId just once to obtain a tranche ID; this allocates from
+ * a shared counter.  Next, each individual process using the tranche should
+ * call LWLockRegisterTranche() to associate that tranche ID with appropriate
+ * metadata.  Finally, LWLockInitialize should be called just once per lwlock,
+ * passing the tranche ID as an argument.
+ *
+ * It may seem strange that each process using the tranche must register it
+ * separately, but dynamic shared memory segments aren't guaranteed to be
+ * mapped at the same address in all coordinating backends, so storing the
+ * registration in the main shared memory segment wouldn't work for that case.
+ */
+extern int LWLockNewTrancheId(void);
+extern void LWLockRegisterTranche(int, LWLockTranche *);
+extern void LWLockInitialize(LWLock *, int tranche_id);
+
+/*
+ * Prior to PostgreSQL 9.4, we used an enum type called LWLockId to refer
+ * to LWLocks.  New code should instead use LWLock *.  However, for the
+ * convenience of third-party code, we include the following typedef.
+ */
+typedef LWLock *LWLockId;
 
 #endif   /* LWLOCK_H */
index acdc6788bc3134fd80adf703b096c84d8a07d969..a3cadd9a017917b1fe4f26bfa1e11fb16bc25540 100644 (file)
@@ -131,7 +131,7 @@ struct PGPROC
        struct XidCache subxids;        /* cache for subtransaction XIDs */
 
        /* Per-backend LWLock.  Protects fields below. */
-       LWLockId        backendLock;    /* protects the fields below */
+       LWLock     *backendLock;        /* protects the fields below */
 
        /* Lock manager data, recording fast-path locks taken by this backend. */
        uint64          fpLockBits;             /* lock modes held for each fast-path slot */
index 1f735b70b7a55d37c0fadca456d6bb7567b92ca0..ad40735333b178fff7463d9a61ec3dff5fc151a1 100644 (file)
@@ -896,7 +896,6 @@ LPWSTR
 LSEG
 LVRelStats
 LWLock
-LWLockId
 LWLockMode
 LWLockPadded
 LabelProvider