<para>
Specifies whether transaction commit will wait for WAL records
to be written to disk before the command returns a <quote>success</>
- indication to the client. Valid values are <literal>on</>,
+ indication to the client. Valid values are <literal>on</>, <literal>write</>,
<literal>local</>, and <literal>off</>. The default, and safe, value
is <literal>on</>. When <literal>off</>, there can be a delay between
when success is reported to the client and when the transaction is
If <xref linkend="guc-synchronous-standby-names"> is set, this
parameter also controls whether or not transaction commit will wait
for the transaction's WAL records to be flushed to disk and replicated
- to the standby server. The commit wait will last until a reply from
- the current synchronous standby indicates it has written the commit
- record of the transaction to durable storage. If synchronous
+ to the standby server. When <literal>write</>, the commit wait will
+ last until a reply from the current synchronous standby indicates
+ it has received the commit record of the transaction to memory.
+ Normally this causes no data loss at the time of failover. However,
+ if both primary and standby crash, and the database cluster of
+ the primary gets corrupted, recent committed transactions might
+ be lost. When <literal>on</>, the commit wait will last until a reply
+ from the current synchronous standby indicates it has flushed
+ the commit record of the transaction to durable storage. This
+ avoids any data loss unless the database cluster of both primary and
+ standby gets corrupted simultaneously. If synchronous
replication is in use, it will normally be sensible either to wait
- both for WAL records to reach both the local and remote disks, or
+ for both local flush and replication of WAL records, or
to allow the transaction to commit asynchronously. However, the
special value <literal>local</> is available for transactions that
wish to wait for local flush to disk, but not synchronous replication.
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
- * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
- * for the flush location on the standby before releasing the waiting backend.
+ * async, we just fastpath out of here. If it is sync, then we wait for
+ * the write or flush location on the standby before releasing the waiting backend.
* Further complexity in that interaction is expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
static bool announce_next_takeover = true;
-static void SyncRepQueueInsert(void);
+static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+
+static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
-static bool SyncRepQueueIsOrderedByLSN(void);
+static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
/*
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
- XLByteLE(XactCommitLSN, WalSndCtl->lsn))
+ XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
{
LWLockRelease(SyncRepLock);
return;
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
- SyncRepQueueInsert();
- Assert(SyncRepQueueIsOrderedByLSN());
+ SyncRepQueueInsert(SyncRepWaitMode);
+ Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
}
/*
- * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
+ * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
-SyncRepQueueInsert(void)
+SyncRepQueueInsert(int mode)
{
PGPROC *proc;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
- SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
+ SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}
/*
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
- int numprocs = 0;
+ int numwrite = 0;
+ int numflush = 0;
int priority = 0;
int i;
return;
}
- if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
+ /*
+ * Set the lsn first so that when we wake backends they will release
+ * up to this location.
+ */
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
{
- /*
- * Set the lsn first so that when we wake backends they will release
- * up to this location.
- */
- walsndctl->lsn = MyWalSnd->flush;
- numprocs = SyncRepWakeQueue(false);
+ walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
+ numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
+ }
+ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
+ {
+ walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
+ numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
- elog(DEBUG3, "released %d procs up to %X/%X",
- numprocs,
+ elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
+ numwrite,
+ MyWalSnd->write.xlogid,
+ MyWalSnd->write.xrecoff,
+ numflush,
MyWalSnd->flush.xlogid,
MyWalSnd->flush.xrecoff);
}
/*
- * Walk queue from head. Set the state of any backends that need to be woken,
- * remove them from the queue, and then wake them. Pass all = true to wake
- * whole queue; otherwise, just wake up to the walsender's LSN.
+ * Walk the specified queue from head. Set the state of any backends that
+ * need to be woken, remove them from the queue, and then wake them.
+ * Pass all = true to wake whole queue; otherwise, just wake up to
+ * the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
-SyncRepWakeQueue(bool all)
+SyncRepWakeQueue(bool all, int mode)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
- Assert(SyncRepQueueIsOrderedByLSN());
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+ Assert(SyncRepQueueIsOrderedByLSN(mode));
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
/*
* Assume the queue is ordered by LSN
*/
- if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
+ if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
return numprocs;
/*
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
- SyncRepWakeQueue(true);
+ {
+ int i;
+
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ SyncRepWakeQueue(true, i);
+ }
/*
* Only allow people to join the queue when there are synchronous
#ifdef USE_ASSERT_CHECKING
static bool
-SyncRepQueueIsOrderedByLSN(void)
+SyncRepQueueIsOrderedByLSN(int mode)
{
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
- &(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
+ &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
lastLSN = proc->waitLSN;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
+ proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
return true;
}
+
+void
+assign_synchronous_commit(int newval, void *extra)
+{
+ switch (newval)
+ {
+ case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+ SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+ break;
+ case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+ SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+ break;
+ default:
+ SyncRepWaitMode = SYNC_REP_NO_WAIT;
+ break;
+ }
+}