]> granicus.if.org Git - postgresql/commitdiff
Add new replication mode synchronous_commit = 'write'.
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 24 Jan 2012 20:22:37 +0000 (20:22 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 24 Jan 2012 20:22:37 +0000 (20:22 +0000)
Replication occurs only to memory on standby, not to disk,
so provides additional performance if user wishes to
reduce durability level slightly. Adds concept of multiple
independent sync rep queues.

Fujii Masao and Simon Riggs

doc/src/sgml/config.sgml
doc/src/sgml/high-availability.sgml
src/backend/replication/syncrep.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc.c
src/include/access/xact.h
src/include/replication/syncrep.h
src/include/replication/walsender_private.h

index e55b5035e264970d585ea7e63edf6df0fce5467b..309b6a546158a82bd9b40f66e1e80da48f65138c 100644 (file)
@@ -1560,7 +1560,7 @@ SET ENABLE_SEQSCAN TO OFF;
        <para>
         Specifies whether transaction commit will wait for WAL records
         to be written to disk before the command returns a <quote>success</>
-        indication to the client.  Valid values are <literal>on</>,
+        indication to the client.  Valid values are <literal>on</>, <literal>write</>,
         <literal>local</>, and <literal>off</>.  The default, and safe, value
         is <literal>on</>.  When <literal>off</>, there can be a delay between
         when success is reported to the client and when the transaction is
@@ -1580,11 +1580,19 @@ SET ENABLE_SEQSCAN TO OFF;
         If <xref linkend="guc-synchronous-standby-names"> is set, this
         parameter also controls whether or not transaction commit will wait
         for the transaction's WAL records to be flushed to disk and replicated
-        to the standby server.  The commit wait will last until a reply from
-        the current synchronous standby indicates it has written the commit
-        record of the transaction to durable storage.  If synchronous
+        to the standby server.  When <literal>write</>, the commit wait will
+        last until a reply from the current synchronous standby indicates
+        it has received the commit record of the transaction to memory.
+        Normally this causes no data loss at the time of failover. However,
+        if both primary and standby crash, and the database cluster of
+        the primary gets corrupted, recent committed transactions might
+        be lost. When <literal>on</>,  the commit wait will last until a reply
+        from the current synchronous standby indicates it has flushed
+        the commit record of the transaction to durable storage. This
+        avoids any data loss unless the database cluster of both primary and
+        standby gets corrupted simultaneously. If synchronous
         replication is in use, it will normally be sensible either to wait
-        both for WAL records to reach both the local and remote disks, or
+        for both local flush and replication of WAL records, or
         to allow the transaction to commit asynchronously.  However, the
         special value <literal>local</> is available for transactions that
         wish to wait for local flush to disk, but not synchronous replication.
index c5db6ef01f83a83141c99aee3a422f3d555d2e71..ed34dac023dd860b6d897d545b930780b4883700 100644 (file)
@@ -1010,6 +1010,16 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     standby servers using cascaded replication.
    </para>
 
+   <para>
+    Setting <varname>synchronous_commit</> to <literal>write</> will
+    cause each commit to wait for confirmation that the standby has received
+    the commit record to memory. This provides a lower level of durability
+    than <literal>on</> does. However, it's a practically useful setting
+    because it can decrease the response time for the transaction, and causes
+    no data loss unless both the primary and the standby crashes and
+    the database of the primary gets corrupted at the same time.
+   </para>
+
    <para>
     Users will stop waiting if a fast shutdown is requested.  However, as
     when using asynchronous replication, the server will does not fully
@@ -1065,13 +1075,13 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
 
    <para>
     Commits made when <varname>synchronous_commit</> is set to <literal>on</>
-    will wait until the sync standby responds. The response may never occur
-    if the last, or only, standby should crash.
+    or <literal>write</> will wait until the synchronous standby responds. The response
+    may never occur if the last, or only, standby should crash.
    </para>
 
    <para>
     The best solution for avoiding data loss is to ensure you don't lose
-    your last remaining sync standby. This can be achieved by naming multiple
+    your last remaining synchronous standby. This can be achieved by naming multiple
     potential synchronous standbys using <varname>synchronous_standby_names</>.
     The first named standby will be used as the synchronous standby. Standbys
     listed after this will take over the role of synchronous standby if the
index 6bf69f0d35b41b4867bcea36498c0a27650f0f12..1273a8b9ebfe0fc40dd8c217ef0439e35d87c499 100644 (file)
@@ -20,8 +20,8 @@
  * per-transaction state information.
  *
  * Replication is either synchronous or not synchronous (async). If it is
- * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
- * for the flush location on the standby before releasing the waiting backend.
+ * async, we just fastpath out of here. If it is sync, then we wait for
+ * the write or flush location on the standby before releasing the waiting backend.
  * Further complexity in that interaction is expected in later releases.
  *
  * The best performing way to manage the waiting backends is to have a
@@ -67,13 +67,15 @@ char           *SyncRepStandbyNames;
 
 static bool announce_next_takeover = true;
 
-static void SyncRepQueueInsert(void);
+static int     SyncRepWaitMode = SYNC_REP_NO_WAIT;
+
+static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
 
 static int     SyncRepGetStandbyPriority(void);
 
 #ifdef USE_ASSERT_CHECKING
-static bool SyncRepQueueIsOrderedByLSN(void);
+static bool SyncRepQueueIsOrderedByLSN(int mode);
 #endif
 
 /*
@@ -120,7 +122,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
         * be a low cost check.
         */
        if (!WalSndCtl->sync_standbys_defined ||
-               XLByteLE(XactCommitLSN, WalSndCtl->lsn))
+               XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
        {
                LWLockRelease(SyncRepLock);
                return;
@@ -132,8 +134,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
         */
        MyProc->waitLSN = XactCommitLSN;
        MyProc->syncRepState = SYNC_REP_WAITING;
-       SyncRepQueueInsert();
-       Assert(SyncRepQueueIsOrderedByLSN());
+       SyncRepQueueInsert(SyncRepWaitMode);
+       Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
        LWLockRelease(SyncRepLock);
 
        /* Alter ps display to show waiting for sync rep. */
@@ -267,18 +269,19 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 }
 
 /*
- * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
+ * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
  *
  * Usually we will go at tail of queue, though it's possible that we arrive
  * here out of order, so start at tail and work back to insertion point.
  */
 static void
-SyncRepQueueInsert(void)
+SyncRepQueueInsert(int mode)
 {
        PGPROC     *proc;
 
-       proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
-                                                                  &(WalSndCtl->SyncRepQueue),
+       Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+       proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
+                                                                  &(WalSndCtl->SyncRepQueue[mode]),
                                                                   offsetof(PGPROC, syncRepLinks));
 
        while (proc)
@@ -290,7 +293,7 @@ SyncRepQueueInsert(void)
                if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
                        break;
 
-               proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+               proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
                                                                           &(proc->syncRepLinks),
                                                                           offsetof(PGPROC, syncRepLinks));
        }
@@ -298,7 +301,7 @@ SyncRepQueueInsert(void)
        if (proc)
                SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
        else
-               SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
+               SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
 }
 
 /*
@@ -368,7 +371,8 @@ SyncRepReleaseWaiters(void)
 {
        volatile WalSndCtlData *walsndctl = WalSndCtl;
        volatile WalSnd *syncWalSnd = NULL;
-       int                     numprocs = 0;
+       int                     numwrite = 0;
+       int                     numflush = 0;
        int                     priority = 0;
        int                     i;
 
@@ -419,20 +423,28 @@ SyncRepReleaseWaiters(void)
                return;
        }
 
-       if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
+       /*
+        * Set the lsn first so that when we wake backends they will release
+        * up to this location.
+        */
+       if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
        {
-               /*
-                * Set the lsn first so that when we wake backends they will release
-                * up to this location.
-                */
-               walsndctl->lsn = MyWalSnd->flush;
-               numprocs = SyncRepWakeQueue(false);
+               walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+               numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
+       }
+       if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
+       {
+               walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+               numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
        }
 
        LWLockRelease(SyncRepLock);
 
-       elog(DEBUG3, "released %d procs up to %X/%X",
-                numprocs,
+       elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+                numwrite,
+                MyWalSnd->write.xlogid,
+                MyWalSnd->write.xrecoff,
+                numflush,
                 MyWalSnd->flush.xlogid,
                 MyWalSnd->flush.xrecoff);
 
@@ -507,24 +519,26 @@ SyncRepGetStandbyPriority(void)
 }
 
 /*
- * Walk queue from head.  Set the state of any backends that need to be woken,
- * remove them from the queue, and then wake them.     Pass all = true to wake
- * whole queue; otherwise, just wake up to the walsender's LSN.
+ * Walk the specified queue from head.  Set the state of any backends that
+ * need to be woken, remove them from the queue, and then wake them.
+ * Pass all = true to wake whole queue; otherwise, just wake up to
+ * the walsender's LSN.
  *
  * Must hold SyncRepLock.
  */
 int
-SyncRepWakeQueue(bool all)
+SyncRepWakeQueue(bool all, int mode)
 {
        volatile WalSndCtlData *walsndctl = WalSndCtl;
        PGPROC     *proc = NULL;
        PGPROC     *thisproc = NULL;
        int                     numprocs = 0;
 
-       Assert(SyncRepQueueIsOrderedByLSN());
+       Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+       Assert(SyncRepQueueIsOrderedByLSN(mode));
 
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
-                                                                  &(WalSndCtl->SyncRepQueue),
+       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+                                                                  &(WalSndCtl->SyncRepQueue[mode]),
                                                                   offsetof(PGPROC, syncRepLinks));
 
        while (proc)
@@ -532,7 +546,7 @@ SyncRepWakeQueue(bool all)
                /*
                 * Assume the queue is ordered by LSN
                 */
-               if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
+               if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
                        return numprocs;
 
                /*
@@ -540,7 +554,7 @@ SyncRepWakeQueue(bool all)
                 * thisproc is valid, proc may be NULL after this.
                 */
                thisproc = proc;
-               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                                                           &(proc->syncRepLinks),
                                                                           offsetof(PGPROC, syncRepLinks));
 
@@ -588,7 +602,12 @@ SyncRepUpdateSyncStandbysDefined(void)
                 * wants synchronous replication, we'd better wake them up.
                 */
                if (!sync_standbys_defined)
-                       SyncRepWakeQueue(true);
+               {
+                       int     i;
+
+                       for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                               SyncRepWakeQueue(true, i);
+               }
 
                /*
                 * Only allow people to join the queue when there are synchronous
@@ -605,16 +624,18 @@ SyncRepUpdateSyncStandbysDefined(void)
 
 #ifdef USE_ASSERT_CHECKING
 static bool
-SyncRepQueueIsOrderedByLSN(void)
+SyncRepQueueIsOrderedByLSN(int mode)
 {
        PGPROC     *proc = NULL;
        XLogRecPtr      lastLSN;
 
+       Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+
        lastLSN.xlogid = 0;
        lastLSN.xrecoff = 0;
 
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
-                                                                  &(WalSndCtl->SyncRepQueue),
+       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+                                                                  &(WalSndCtl->SyncRepQueue[mode]),
                                                                   offsetof(PGPROC, syncRepLinks));
 
        while (proc)
@@ -628,7 +649,7 @@ SyncRepQueueIsOrderedByLSN(void)
 
                lastLSN = proc->waitLSN;
 
-               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                                                           &(proc->syncRepLinks),
                                                                           offsetof(PGPROC, syncRepLinks));
        }
@@ -675,3 +696,20 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source)
 
        return true;
 }
+
+void
+assign_synchronous_commit(int newval, void *extra)
+{
+       switch (newval)
+       {
+               case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+                       SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+                       break;
+               case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+                       SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+                       break;
+               default:
+                       SyncRepWaitMode = SYNC_REP_NO_WAIT;
+                       break;
+       }
+}
index 3611713434a439bfb7d3d3481d561274498944f5..5f938124e726627f61d0c6649972b8501c8fc263 100644 (file)
@@ -1410,7 +1410,8 @@ WalSndShmemInit(void)
                /* First time through, so initialize */
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
-               SHMQueueInit(&(WalSndCtl->SyncRepQueue));
+               for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
 
                for (i = 0; i < max_wal_senders; i++)
                {
index 9fc96b2126acc90498533484484c8da7f794eee3..ec8f2f2309b3168470d63b6d932b74afe24f0ca7 100644 (file)
@@ -370,11 +370,12 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
 };
 
 /*
- * Although only "on", "off", and "local" are documented, we
+ * Although only "on", "off", "write", and "local" are documented, we
  * accept all the likely variants of "on" and "off".
  */
 static const struct config_enum_entry synchronous_commit_options[] = {
        {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
+       {"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
        {"on", SYNCHRONOUS_COMMIT_ON, false},
        {"off", SYNCHRONOUS_COMMIT_OFF, false},
        {"true", SYNCHRONOUS_COMMIT_ON, true},
@@ -3164,7 +3165,7 @@ static struct config_enum ConfigureNamesEnum[] =
                },
                &synchronous_commit,
                SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
-               NULL, NULL, NULL
+               NULL, assign_synchronous_commit, NULL
        },
 
        {
index 5f063a9f452bbc57a19185b16084b00e3b2057f1..20e344e5b73a7807368d9659d7e3b19e5eaf87d0 100644 (file)
@@ -55,6 +55,7 @@ typedef enum
 {
        SYNCHRONOUS_COMMIT_OFF,         /* asynchronous commit */
        SYNCHRONOUS_COMMIT_LOCAL_FLUSH,         /* wait for local flush only */
+       SYNCHRONOUS_COMMIT_REMOTE_WRITE,                /* wait for local flush and remote write */
        SYNCHRONOUS_COMMIT_REMOTE_FLUSH         /* wait for local and remote flush */
 }      SyncCommitLevel;
 
index 91446a8bebed15e4ede4d167d889c1ba09fba4b6..74820cbbb46ceb2bc95003ccf0563e1caabca0ce 100644 (file)
 
 #include "utils/guc.h"
 
+#define SyncRepRequested() \
+       (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+
+/* SyncRepWaitMode */
+#define SYNC_REP_NO_WAIT               -1
+#define SYNC_REP_WAIT_WRITE            0
+#define SYNC_REP_WAIT_FLUSH            1
+
+#define NUM_SYNC_REP_WAIT_MODE 2
+
 /* syncRepState */
 #define SYNC_REP_NOT_WAITING           0
 #define SYNC_REP_WAITING                       1
@@ -37,8 +47,9 @@ extern void SyncRepReleaseWaiters(void);
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* called by various procs */
-extern int     SyncRepWakeQueue(bool all);
+extern int     SyncRepWakeQueue(bool all, int mode);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern void assign_synchronous_commit(int newval, void *extra);
 
 #endif   /* _SYNCREP_H */
index 89666d738334bc4a46993d9a6e57e05468973d0b..f6cae84a6d47cadce07d7fe39e462d14bb5c2993 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "access/xlog.h"
 #include "nodes/nodes.h"
+#include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
@@ -68,15 +69,16 @@ extern WalSnd *MyWalSnd;
 typedef struct
 {
        /*
-        * Synchronous replication queue. Protected by SyncRepLock.
+        * Synchronous replication queue with one queue per request type.
+        * Protected by SyncRepLock.
         */
-       SHM_QUEUE       SyncRepQueue;
+       SHM_QUEUE       SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
 
        /*
         * Current location of the head of the queue. All waiters should have a
         * waitLSN that follows this value. Protected by SyncRepLock.
         */
-       XLogRecPtr      lsn;
+       XLogRecPtr      lsn[NUM_SYNC_REP_WAIT_MODE];
 
        /*
         * Are any sync standbys defined?  Waiting backends can't reload the