1 /*-------------------------------------------------------------------------
5 * Synchronous replication is new as of PostgreSQL 9.1.
7 * If requested, transaction commits wait until their commit LSN is
8 * acknowledged by the sync standby.
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standby is completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
24 * for the flush location on the standby before releasing the waiting backend.
25 * Further complexity in that interaction is expected in later releases.
27 * The best performing way to manage the waiting backends is to have a
28 * single ordered queue of waiting backends, so that we can avoid
29 * searching the through all waiters each time we receive a reply.
31 * In 9.1 we support only a single synchronous standby, chosen from a
32 * priority list of synchronous_standby_names. Before it can become the
33 * synchronous standby it must have caught up with the primary; that may
34 * take some time. Once caught up, the current highest priority standby
35 * will release waiters from the queue.
37 * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
40 * src/backend/replication/syncrep.c
42 *-------------------------------------------------------------------------
48 #include "access/xact.h"
49 #include "access/xlog_internal.h"
50 #include "miscadmin.h"
51 #include "postmaster/autovacuum.h"
52 #include "replication/syncrep.h"
53 #include "replication/walsender.h"
54 #include "storage/latch.h"
55 #include "storage/ipc.h"
56 #include "storage/pmsignal.h"
57 #include "storage/proc.h"
58 #include "tcop/tcopprot.h"
59 #include "utils/builtins.h"
60 #include "utils/guc.h"
61 #include "utils/guc_tables.h"
62 #include "utils/memutils.h"
63 #include "utils/ps_status.h"
65 /* User-settable parameters for sync rep */
66 char *SyncRepStandbyNames;
68 #define SyncStandbysDefined() \
69 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
71 static bool announce_next_takeover = true;
73 static void SyncRepQueueInsert(void);
74 static void SyncRepCancelWait(void);
76 static int SyncRepGetStandbyPriority(void);
78 #ifdef USE_ASSERT_CHECKING
79 static bool SyncRepQueueIsOrderedByLSN(void);
83 * ===========================================================
84 * Synchronous Replication functions for normal user backends
85 * ===========================================================
89 * Wait for synchronous replication, if requested by user.
91 * Initially backends start in state SYNC_REP_NOT_WAITING and then
92 * change that state to SYNC_REP_WAITING before adding ourselves
93 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
94 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
95 * This backend then resets its state to SYNC_REP_NOT_WAITING.
98 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
100 char *new_status = NULL;
101 const char *old_status;
104 * Fast exit if user has not requested sync replication, or there are no
105 * sync replication standby names defined. Note that those standbys don't
106 * need to be connected.
108 if (!SyncRepRequested() || !SyncStandbysDefined())
111 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
112 Assert(WalSndCtl != NULL);
114 /* Reset the latch before adding ourselves to the queue. */
115 ResetLatch(&MyProc->waitLatch);
117 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
118 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
121 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
122 * set. See SyncRepUpdateSyncStandbysDefined.
124 * Also check that the standby hasn't already replied. Unlikely race
125 * condition but we'll be fetching that cache line anyway so its likely to
126 * be a low cost check.
128 if (!WalSndCtl->sync_standbys_defined ||
129 XLByteLE(XactCommitLSN, WalSndCtl->lsn))
131 LWLockRelease(SyncRepLock);
136 * Set our waitLSN so WALSender will know when to wake us, and add
137 * ourselves to the queue.
139 MyProc->waitLSN = XactCommitLSN;
140 MyProc->syncRepState = SYNC_REP_WAITING;
141 SyncRepQueueInsert();
142 Assert(SyncRepQueueIsOrderedByLSN());
143 LWLockRelease(SyncRepLock);
145 /* Alter ps display to show waiting for sync rep. */
146 if (update_process_title)
150 old_status = get_ps_display(&len);
151 new_status = (char *) palloc(len + 32 + 1);
152 memcpy(new_status, old_status, len);
153 sprintf(new_status + len, " waiting for %X/%X",
154 XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
155 set_ps_display(new_status, false);
156 new_status[len] = '\0'; /* truncate off " waiting ..." */
160 * Wait for specified LSN to be confirmed.
162 * Each proc has its own wait latch, so we perform a normal latch
163 * check/wait loop here.
169 /* Must reset the latch before testing state. */
170 ResetLatch(&MyProc->waitLatch);
173 * Try checking the state without the lock first. There's no
174 * guarantee that we'll read the most up-to-date value, so if it looks
175 * like we're still waiting, recheck while holding the lock. But if
176 * it looks like we're done, we must really be done, because once
177 * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
178 * never update it again, so we can't be seeing a stale value in that
181 * Note: on machines with weak memory ordering, the acquisition of
182 * the lock is essential to avoid race conditions: we cannot be sure
183 * the sender's state update has reached main memory until we acquire
184 * the lock. We could get rid of this dance if SetLatch/ResetLatch
185 * contained memory barriers.
187 syncRepState = MyProc->syncRepState;
188 if (syncRepState == SYNC_REP_WAITING)
190 LWLockAcquire(SyncRepLock, LW_SHARED);
191 syncRepState = MyProc->syncRepState;
192 LWLockRelease(SyncRepLock);
194 if (syncRepState == SYNC_REP_WAIT_COMPLETE)
198 * If a wait for synchronous replication is pending, we can neither
199 * acknowledge the commit nor raise ERROR or FATAL. The latter would
200 * lead the client to believe that that the transaction aborted, which
201 * is not true: it's already committed locally. The former is no good
202 * either: the client has requested synchronous replication, and is
203 * entitled to assume that an acknowledged commit is also replicated,
204 * which might not be true. So in this case we issue a WARNING (which
205 * some clients may be able to interpret) and shut off further output.
206 * We do NOT reset ProcDiePending, so that the process will die after
207 * the commit is cleaned up.
212 (errcode(ERRCODE_ADMIN_SHUTDOWN),
213 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
214 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
215 whereToSendOutput = DestNone;
221 * It's unclear what to do if a query cancel interrupt arrives. We
222 * can't actually abort at this point, but ignoring the interrupt
223 * altogether is not helpful, so we just terminate the wait with a
226 if (QueryCancelPending)
228 QueryCancelPending = false;
230 (errmsg("canceling wait for synchronous replication due to user request"),
231 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
237 * If the postmaster dies, we'll probably never get an
238 * acknowledgement, because all the wal sender processes will exit. So
241 if (!PostmasterIsAlive())
243 ProcDiePending = true;
244 whereToSendOutput = DestNone;
250 * Wait on latch for up to 60 seconds. This allows us to check for
251 * cancel/die signal or postmaster death regularly while waiting. Note
252 * that timeout here does not necessarily release from loop.
254 WaitLatch(&MyProc->waitLatch, WL_LATCH_SET | WL_TIMEOUT, 60000L);
258 * WalSender has checked our LSN and has removed us from queue. Clean up
259 * state and leave. It's OK to reset these shared memory fields without
260 * holding SyncRepLock, because any walsenders will ignore us anyway when
261 * we're not on the queue.
263 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
264 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
265 MyProc->waitLSN.xlogid = 0;
266 MyProc->waitLSN.xrecoff = 0;
270 /* Reset ps display */
271 set_ps_display(new_status, false);
277 * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
279 * Usually we will go at tail of queue, though it's possible that we arrive
280 * here out of order, so start at tail and work back to insertion point.
283 SyncRepQueueInsert(void)
287 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
288 &(WalSndCtl->SyncRepQueue),
289 offsetof(PGPROC, syncRepLinks));
294 * Stop at the queue element that we should after to ensure the queue
297 if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
300 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
301 &(proc->syncRepLinks),
302 offsetof(PGPROC, syncRepLinks));
306 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
308 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
312 * Acquire SyncRepLock and cancel any wait currently in progress.
315 SyncRepCancelWait(void)
317 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
318 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
319 SHMQueueDelete(&(MyProc->syncRepLinks));
320 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
321 LWLockRelease(SyncRepLock);
325 SyncRepCleanupAtProcExit(int code, Datum arg)
327 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
329 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
330 SHMQueueDelete(&(MyProc->syncRepLinks));
331 LWLockRelease(SyncRepLock);
334 DisownLatch(&MyProc->waitLatch);
338 * ===========================================================
339 * Synchronous Replication functions for wal sender processes
340 * ===========================================================
344 * Take any action required to initialise sync rep state from config
345 * data. Called at WALSender startup and after each SIGHUP.
348 SyncRepInitConfig(void)
353 * Determine if we are a potential sync standby and remember the result
354 * for handling replies from standby.
356 priority = SyncRepGetStandbyPriority();
357 if (MyWalSnd->sync_standby_priority != priority)
359 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
360 MyWalSnd->sync_standby_priority = priority;
361 LWLockRelease(SyncRepLock);
363 (errmsg("standby \"%s\" now has synchronous standby priority %u",
364 application_name, priority)));
369 * Update the LSNs on each queue based upon our latest state. This
370 * implements a simple policy of first-valid-standby-releases-waiter.
372 * Other policies are possible, which would change what we do here and what
373 * perhaps also which information we store as well.
376 SyncRepReleaseWaiters(void)
378 volatile WalSndCtlData *walsndctl = WalSndCtl;
379 volatile WalSnd *syncWalSnd = NULL;
385 * If this WALSender is serving a standby that is not on the list of
386 * potential standbys then we have nothing to do. If we are still starting
387 * up or still running base backup, then leave quickly also.
389 if (MyWalSnd->sync_standby_priority == 0 ||
390 MyWalSnd->state < WALSNDSTATE_STREAMING)
394 * We're a potential sync standby. Release waiters if we are the highest
395 * priority standby. If there are multiple standbys with same priorities
396 * then we use the first mentioned standby. If you change this, also
397 * change pg_stat_get_wal_senders().
399 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
401 for (i = 0; i < max_wal_senders; i++)
403 /* use volatile pointer to prevent code rearrangement */
404 volatile WalSnd *walsnd = &walsndctl->walsnds[i];
406 if (walsnd->pid != 0 &&
407 walsnd->sync_standby_priority > 0 &&
409 priority > walsnd->sync_standby_priority))
411 priority = walsnd->sync_standby_priority;
417 * We should have found ourselves at least.
422 * If we aren't managing the highest priority standby then just leave.
424 if (syncWalSnd != MyWalSnd)
426 LWLockRelease(SyncRepLock);
427 announce_next_takeover = true;
431 if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
434 * Set the lsn first so that when we wake backends they will release
435 * up to this location.
437 walsndctl->lsn = MyWalSnd->flush;
438 numprocs = SyncRepWakeQueue(false);
441 LWLockRelease(SyncRepLock);
443 elog(DEBUG3, "released %d procs up to %X/%X",
445 MyWalSnd->flush.xlogid,
446 MyWalSnd->flush.xrecoff);
449 * If we are managing the highest priority standby, though we weren't
450 * prior to this, then announce we are now the sync standby.
452 if (announce_next_takeover)
454 announce_next_takeover = false;
456 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
457 application_name, MyWalSnd->sync_standby_priority)));
462 * Check if we are in the list of sync standbys, and if so, determine
463 * priority sequence. Return priority if set, or zero to indicate that
464 * we are not a potential sync standby.
466 * Compare the parameter SyncRepStandbyNames against the application_name
467 * for this WALSender, or allow any name if we find a wildcard "*".
470 SyncRepGetStandbyPriority(void)
479 * Since synchronous cascade replication is not allowed, we always
480 * set the priority of cascading walsender to zero.
482 if (am_cascading_walsender)
485 /* Need a modifiable copy of string */
486 rawstring = pstrdup(SyncRepStandbyNames);
488 /* Parse string into list of identifiers */
489 if (!SplitIdentifierString(rawstring, ',', &elemlist))
491 /* syntax error in list */
494 /* GUC machinery will have already complained - no need to do again */
500 char *standby_name = (char *) lfirst(l);
504 if (pg_strcasecmp(standby_name, application_name) == 0 ||
505 pg_strcasecmp(standby_name, "*") == 0)
515 return (found ? priority : 0);
519 * Walk queue from head. Set the state of any backends that need to be woken,
520 * remove them from the queue, and then wake them. Pass all = true to wake
521 * whole queue; otherwise, just wake up to the walsender's LSN.
523 * Must hold SyncRepLock.
526 SyncRepWakeQueue(bool all)
528 volatile WalSndCtlData *walsndctl = WalSndCtl;
530 PGPROC *thisproc = NULL;
533 Assert(SyncRepQueueIsOrderedByLSN());
535 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
536 &(WalSndCtl->SyncRepQueue),
537 offsetof(PGPROC, syncRepLinks));
542 * Assume the queue is ordered by LSN
544 if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
548 * Move to next proc, so we can delete thisproc from the queue.
549 * thisproc is valid, proc may be NULL after this.
552 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
553 &(proc->syncRepLinks),
554 offsetof(PGPROC, syncRepLinks));
557 * Set state to complete; see SyncRepWaitForLSN() for discussion of
558 * the various states.
560 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
563 * Remove thisproc from queue.
565 SHMQueueDelete(&(thisproc->syncRepLinks));
568 * Wake only when we have set state and removed from queue.
570 Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks)));
571 Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE);
572 SetLatch(&(thisproc->waitLatch));
581 * The background writer calls this as needed to update the shared
582 * sync_standbys_defined flag, so that backends don't remain permanently wedged
583 * if synchronous_standby_names is unset. It's safe to check the current value
584 * without the lock, because it's only ever updated by one process. But we
585 * must take the lock to change it.
588 SyncRepUpdateSyncStandbysDefined(void)
590 bool sync_standbys_defined = SyncStandbysDefined();
592 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
594 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
597 * If synchronous_standby_names has been reset to empty, it's futile
598 * for backends to continue to waiting. Since the user no longer
599 * wants synchronous replication, we'd better wake them up.
601 if (!sync_standbys_defined)
602 SyncRepWakeQueue(true);
605 * Only allow people to join the queue when there are synchronous
606 * standbys defined. Without this interlock, there's a race
607 * condition: we might wake up all the current waiters; then, some
608 * backend that hasn't yet reloaded its config might go to sleep on
609 * the queue (and never wake up). This prevents that.
611 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
613 LWLockRelease(SyncRepLock);
617 #ifdef USE_ASSERT_CHECKING
619 SyncRepQueueIsOrderedByLSN(void)
627 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
628 &(WalSndCtl->SyncRepQueue),
629 offsetof(PGPROC, syncRepLinks));
634 * Check the queue is ordered by LSN and that multiple procs don't
637 if (XLByteLE(proc->waitLSN, lastLSN))
640 lastLSN = proc->waitLSN;
642 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
643 &(proc->syncRepLinks),
644 offsetof(PGPROC, syncRepLinks));
652 * ===========================================================
653 * Synchronous Replication functions executed by any process
654 * ===========================================================
658 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
663 /* Need a modifiable copy of string */
664 rawstring = pstrdup(*newval);
666 /* Parse string into list of identifiers */
667 if (!SplitIdentifierString(rawstring, ',', &elemlist))
669 /* syntax error in list */
670 GUC_check_errdetail("List syntax is invalid.");
677 * Any additional validation of standby names should go here.
679 * Don't attempt to set WALSender priority because this is executed by
680 * postmaster at startup, not WALSender, so the application_name is not