]> granicus.if.org Git - postgresql/commitdiff
Make group commit more effective.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 30 Jan 2012 14:40:58 +0000 (16:40 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 30 Jan 2012 14:53:48 +0000 (16:53 +0200)
When a backend needs to flush the WAL, and someone else is already flushing
the WAL, wait until it releases the WALInsertLock and check if we still need
to do the flush or if the other backend already did the work for us, before
acquiring WALInsertLock. This helps group commit, because when the WAL flush
finishes, all the backends that were waiting for it can be woken up in one
go, and the can all concurrently observe that they're done, rather than
waking them up one by one in a cascading fashion.

This is based on a new LWLock function, LWLockWaitUntilFree(), which has
peculiar semantics. If the lock is immediately free, it grabs the lock and
returns true. If it's not free, it waits until it is released, but then
returns false without grabbing the lock. This is used in XLogFlush(), so
that when the lock is acquired, the backend flushes the WAL, but if it's
not, the backend first checks the current flush location before retrying.

Original patch and benchmarking by Peter Geoghegan and Simon Riggs, although
this patch as committed ended up being very different from that.

src/backend/access/transam/twophase.c
src/backend/access/transam/xlog.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/proc.c
src/backend/utils/probes.d
src/include/storage/lwlock.h
src/include/storage/proc.h

index 69af75c6b64551167d428baf30b459efd7290154..6e84cd0a21671486693e7f94d5fda8efdedf4bb4 100644 (file)
@@ -327,7 +327,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        proc->databaseId = databaseid;
        proc->roleId = owner;
        proc->lwWaiting = false;
-       proc->lwExclusive = false;
+       proc->lwWaitMode = 0;
        proc->lwWaitLink = NULL;
        proc->waitLock = NULL;
        proc->waitProcLock = NULL;
index 4b273a8318f44324d911ed16a52907052fdeb56a..cce87a3cd30280a0b4f6a2cb68cb73a56a1e3d86 100644 (file)
@@ -2118,23 +2118,43 @@ XLogFlush(XLogRecPtr record)
        /* initialize to given target; may increase below */
        WriteRqstPtr = record;
 
-       /* read LogwrtResult and update local state */
+       /*
+        * Now wait until we get the write lock, or someone else does the
+        * flush for us.
+        */
+       for (;;)
        {
                /* use volatile pointer to prevent code rearrangement */
                volatile XLogCtlData *xlogctl = XLogCtl;
 
+               /* read LogwrtResult and update local state */
                SpinLockAcquire(&xlogctl->info_lck);
                if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
                        WriteRqstPtr = xlogctl->LogwrtRqst.Write;
                LogwrtResult = xlogctl->LogwrtResult;
                SpinLockRelease(&xlogctl->info_lck);
-       }
 
-       /* done already? */
-       if (!XLByteLE(record, LogwrtResult.Flush))
-       {
-               /* now wait for the write lock */
-               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+               /* done already? */
+               if (XLByteLE(record, LogwrtResult.Flush))
+                       break;
+
+               /*
+                * Try to get the write lock. If we can't get it immediately, wait
+                * until it's released, and recheck if we still need to do the flush
+                * or if the backend that held the lock did it for us already. This
+                * helps to maintain a good rate of group committing when the system
+                * is bottlenecked by the speed of fsyncing.
+                */
+               if (!LWLockWaitUntilFree(WALWriteLock, LW_EXCLUSIVE))
+               {
+                       /*
+                        * The lock is now free, but we didn't acquire it yet. Before we
+                        * do, loop back to check if someone else flushed the record for
+                        * us already.
+                        */
+                       continue;
+               }
+               /* Got the lock */
                LogwrtResult = XLogCtl->Write.LogwrtResult;
                if (!XLByteLE(record, LogwrtResult.Flush))
                {
@@ -2163,6 +2183,8 @@ XLogFlush(XLogRecPtr record)
                        XLogWrite(WriteRqst, false, false);
                }
                LWLockRelease(WALWriteLock);
+               /* done */
+               break;
        }
 
        END_CRIT_SECTION();
index cc4156826b59745545706dcaa660921486cbcdbc..bee35b8c1cf93c9a5c65ee2d76e05e2fa164e4eb 100644 (file)
@@ -430,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
                        elog(PANIC, "cannot wait without a PGPROC structure");
 
                proc->lwWaiting = true;
-               proc->lwExclusive = (mode == LW_EXCLUSIVE);
+               proc->lwWaitMode = mode;
                proc->lwWaitLink = NULL;
                if (lock->head == NULL)
                        lock->head = proc;
@@ -564,6 +564,144 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
        return !mustwait;
 }
 
+/*
+ * LWLockWaitUntilFree - Wait until a lock is free
+ *
+ * The semantics of this function are a bit funky.  If the lock is currently
+ * free, it is acquired in the given mode, and the function returns true.  If
+ * the lock isn't immediately free, the function waits until it is released
+ * and returns false, but does not acquire the lock.
+ *
+ * This is currently used for WALWriteLock: when a backend flushes the WAL,
+ * holding WALWriteLock, it can flush the commit records of many other
+ * backends as a side-effect.  Those other backends need to wait until the
+ * flush finishes, but don't need to acquire the lock anymore.  They can just
+ * wake up, observe that their records have already been flushed, and return.
+ */
+bool
+LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode)
+{
+       volatile LWLock *lock = &(LWLockArray[lockid].lock);
+       PGPROC     *proc = MyProc;
+       bool            mustwait;
+       int                     extraWaits = 0;
+
+       PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock);
+
+       /* Ensure we will have room to remember the lock */
+       if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
+               elog(ERROR, "too many LWLocks taken");
+
+       /*
+        * Lock out cancel/die interrupts until we exit the code section protected
+        * by the LWLock.  This ensures that interrupts will not interfere with
+        * manipulations of data structures in shared memory.
+        */
+       HOLD_INTERRUPTS();
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&lock->mutex);
+
+       /* If I can get the lock, do so quickly. */
+       if (mode == LW_EXCLUSIVE)
+       {
+               if (lock->exclusive == 0 && lock->shared == 0)
+               {
+                       lock->exclusive++;
+                       mustwait = false;
+               }
+               else
+                       mustwait = true;
+       }
+       else
+       {
+               if (lock->exclusive == 0)
+               {
+                       lock->shared++;
+                       mustwait = false;
+               }
+               else
+                       mustwait = true;
+       }
+
+       if (mustwait)
+       {
+               /*
+                * Add myself to wait queue.
+                *
+                * If we don't have a PGPROC structure, there's no way to wait.  This
+                * should never occur, since MyProc should only be null during shared
+                * memory initialization.
+                */
+               if (proc == NULL)
+                       elog(PANIC, "cannot wait without a PGPROC structure");
+
+               proc->lwWaiting = true;
+               proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+               proc->lwWaitLink = NULL;
+               if (lock->head == NULL)
+                       lock->head = proc;
+               else
+                       lock->tail->lwWaitLink = proc;
+               lock->tail = proc;
+
+               /* Can release the mutex now */
+               SpinLockRelease(&lock->mutex);
+
+               /*
+                * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+                * wakups, because we share the semaphore with ProcWaitForSignal.
+                */
+               LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting");
+
+#ifdef LWLOCK_STATS
+               block_counts[lockid]++;
+#endif
+
+               TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+
+               for (;;)
+               {
+                       /* "false" means cannot accept cancel/die interrupt here. */
+                       PGSemaphoreLock(&proc->sem, false);
+                       if (!proc->lwWaiting)
+                               break;
+                       extraWaits++;
+               }
+
+               TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+
+               LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "awakened");
+       }
+       else
+       {
+               /* We are done updating shared state of the lock itself. */
+               SpinLockRelease(&lock->mutex);
+       }
+
+       /*
+        * Fix the process wait semaphore's count for any absorbed wakeups.
+        */
+       while (extraWaits-- > 0)
+               PGSemaphoreUnlock(&proc->sem);
+
+       if (mustwait)
+       {
+               /* Failed to get lock, so release interrupt holdoff */
+               RESUME_INTERRUPTS();
+               LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed");
+               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+       }
+       else
+       {
+               /* Add lock to list of locks held by this backend */
+               held_lwlocks[num_held_lwlocks++] = lockid;
+               TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+       }
+
+       return !mustwait;
+}
+
 /*
  * LWLockRelease - release a previously acquired lock
  */
@@ -618,20 +756,36 @@ LWLockRelease(LWLockId lockid)
                        /*
                         * Remove the to-be-awakened PGPROCs from the queue.  If the front
                         * waiter wants exclusive lock, awaken him only. Otherwise awaken
-                        * as many waiters as want shared access.
+                        * as many waiters as want shared access (or just want to be
+                        * woken up when the lock becomes free without acquiring it,
+                        * ie. LWLockWaitUntilFree).
                         */
+                       bool releaseOK = true;
+
                        proc = head;
-                       if (!proc->lwExclusive)
+                       if (proc->lwWaitMode != LW_EXCLUSIVE)
                        {
                                while (proc->lwWaitLink != NULL &&
-                                          !proc->lwWaitLink->lwExclusive)
+                                          proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
+                               {
                                        proc = proc->lwWaitLink;
+                                       if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+                                               releaseOK = false;
+                               }
                        }
                        /* proc is now the last PGPROC to be released */
                        lock->head = proc->lwWaitLink;
                        proc->lwWaitLink = NULL;
-                       /* prevent additional wakeups until retryer gets to run */
-                       lock->releaseOK = false;
+                       /*
+                        * Prevent additional wakeups until retryer gets to run. Backends
+                        * that are just waiting for the lock to become free don't prevent
+                        * wakeups, because they might decide that they don't want the
+                        * lock, after all.
+                        */
+                       if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+                               releaseOK = false;
+
+                       lock->releaseOK = releaseOK;
                }
                else
                {
index 4c2b6d480240777048d928ea500b5aa75693db4f..2196f514d8f7ba01a5c4017ba12f3b15426115f1 100644 (file)
@@ -362,7 +362,7 @@ InitProcess(void)
        if (IsAutoVacuumWorkerProcess())
                MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
        MyProc->lwWaiting = false;
-       MyProc->lwExclusive = false;
+       MyProc->lwWaitMode = 0;
        MyProc->lwWaitLink = NULL;
        MyProc->waitLock = NULL;
        MyProc->waitProcLock = NULL;
@@ -517,7 +517,7 @@ InitAuxiliaryProcess(void)
        MyPgXact->inCommit = false;
        MyPgXact->vacuumFlags = 0;
        MyProc->lwWaiting = false;
-       MyProc->lwExclusive = false;
+       MyProc->lwWaitMode = 0;
        MyProc->lwWaitLink = NULL;
        MyProc->waitLock = NULL;
        MyProc->waitProcLock = NULL;
index f6030973141fb2e473e5e339aeb7ccc04d343128..8ae8877d993b6a1faa52d6ee93012cbf97e10a54 100644 (file)
@@ -35,6 +35,8 @@ provider postgresql {
        probe lwlock__wait__done(LWLockId, LWLockMode);
        probe lwlock__condacquire(LWLockId, LWLockMode);
        probe lwlock__condacquire__fail(LWLockId, LWLockMode);
+       probe lwlock__wait__until__free(LWLockId, LWLockMode);
+       probe lwlock__wait__until__free__fail(LWLockId, LWLockMode);
 
        probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
        probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
index df3df298ae9cbc4927169856f01a13c4a87ccd45..c684964a34fe25cd07646b6d48c4ac87d37cd544 100644 (file)
@@ -94,7 +94,10 @@ typedef enum LWLockId
 typedef enum LWLockMode
 {
        LW_EXCLUSIVE,
-       LW_SHARED
+       LW_SHARED,
+       LW_WAIT_UNTIL_FREE      /* A special mode used in PGPROC->lwlockMode, when
+                                                * waiting for lock to become free. Not to be used
+                                                * as LWLockAcquire argument */
 } LWLockMode;
 
 
@@ -105,6 +108,7 @@ extern bool Trace_lwlocks;
 extern LWLockId LWLockAssign(void);
 extern void LWLockAcquire(LWLockId lockid, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
+extern bool LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode);
 extern void LWLockRelease(LWLockId lockid);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLockId lockid);
index 9a6696a4ef04845da382e35591a3039002dc7323..415c0935ad611f4d4badcd09bac70554075731f4 100644 (file)
@@ -101,7 +101,7 @@ struct PGPROC
 
        /* Info about LWLock the process is currently waiting for, if any. */
        bool            lwWaiting;              /* true if waiting for an LW lock */
-       bool            lwExclusive;    /* true if waiting for exclusive access */
+       uint8           lwWaitMode;             /* lwlock mode being waited for */
        struct PGPROC *lwWaitLink;      /* next waiter for same LW lock */
 
        /* Info about lock the process is currently waiting for, if any. */