1 /*-------------------------------------------------------------------------
5 * Synchronous replication is new as of PostgreSQL 9.1.
7 * If requested, transaction commits wait until their commit LSN is
8 * acknowledged by the synchronous standby.
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standby is completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write or flush location on the standby before releasing the waiting
25 * backend. Further complexity in that interaction is expected in later
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
32 * In 9.1 we support only a single synchronous standby, chosen from a
33 * priority list of synchronous_standby_names. Before it can become the
34 * synchronous standby it must have caught up with the primary; that may
35 * take some time. Once caught up, the current highest priority standby
36 * will release waiters from the queue.
38 * Portions Copyright (c) 2010-2015, PostgreSQL Global Development Group
41 * src/backend/replication/syncrep.c
43 *-------------------------------------------------------------------------
49 #include "access/xact.h"
50 #include "miscadmin.h"
51 #include "replication/syncrep.h"
52 #include "replication/walsender.h"
53 #include "replication/walsender_private.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "tcop/tcopprot.h"
57 #include "utils/builtins.h"
58 #include "utils/ps_status.h"
60 /* User-settable parameters for sync rep */
61 char *SyncRepStandbyNames;
63 #define SyncStandbysDefined() \
64 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
66 static bool announce_next_takeover = true;
68 static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
70 static void SyncRepQueueInsert(int mode);
71 static void SyncRepCancelWait(void);
73 static int SyncRepGetStandbyPriority(void);
75 #ifdef USE_ASSERT_CHECKING
76 static bool SyncRepQueueIsOrderedByLSN(int mode);
80 * ===========================================================
81 * Synchronous Replication functions for normal user backends
82 * ===========================================================
86 * Wait for synchronous replication, if requested by user.
88 * Initially backends start in state SYNC_REP_NOT_WAITING and then
89 * change that state to SYNC_REP_WAITING before adding ourselves
90 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
91 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
92 * This backend then resets its state to SYNC_REP_NOT_WAITING.
95 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
97 char *new_status = NULL;
98 const char *old_status;
99 int mode = SyncRepWaitMode;
102 * Fast exit if user has not requested sync replication, or there are no
103 * sync replication standby names defined. Note that those standbys don't
104 * need to be connected.
106 if (!SyncRepRequested() || !SyncStandbysDefined())
109 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
110 Assert(WalSndCtl != NULL);
112 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
113 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
116 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
117 * set. See SyncRepUpdateSyncStandbysDefined.
119 * Also check that the standby hasn't already replied. Unlikely race
120 * condition but we'll be fetching that cache line anyway so it's likely
121 * to be a low cost check.
123 if (!WalSndCtl->sync_standbys_defined ||
124 XactCommitLSN <= WalSndCtl->lsn[mode])
126 LWLockRelease(SyncRepLock);
131 * Set our waitLSN so WALSender will know when to wake us, and add
132 * ourselves to the queue.
134 MyProc->waitLSN = XactCommitLSN;
135 MyProc->syncRepState = SYNC_REP_WAITING;
136 SyncRepQueueInsert(mode);
137 Assert(SyncRepQueueIsOrderedByLSN(mode));
138 LWLockRelease(SyncRepLock);
140 /* Alter ps display to show waiting for sync rep. */
141 if (update_process_title)
145 old_status = get_ps_display(&len);
146 new_status = (char *) palloc(len + 32 + 1);
147 memcpy(new_status, old_status, len);
148 sprintf(new_status + len, " waiting for %X/%X",
149 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
150 set_ps_display(new_status, false);
151 new_status[len] = '\0'; /* truncate off " waiting ..." */
155 * Wait for specified LSN to be confirmed.
157 * Each proc has its own wait latch, so we perform a normal latch
158 * check/wait loop here.
164 /* Must reset the latch before testing state. */
165 ResetLatch(&MyProc->procLatch);
168 * Try checking the state without the lock first. There's no
169 * guarantee that we'll read the most up-to-date value, so if it looks
170 * like we're still waiting, recheck while holding the lock. But if
171 * it looks like we're done, we must really be done, because once
172 * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
173 * never update it again, so we can't be seeing a stale value in that
176 syncRepState = MyProc->syncRepState;
177 if (syncRepState == SYNC_REP_WAITING)
178 syncRepState = MyProc->syncRepState;
179 if (syncRepState == SYNC_REP_WAIT_COMPLETE)
183 * If a wait for synchronous replication is pending, we can neither
184 * acknowledge the commit nor raise ERROR or FATAL. The latter would
185 * lead the client to believe that that the transaction aborted, which
186 * is not true: it's already committed locally. The former is no good
187 * either: the client has requested synchronous replication, and is
188 * entitled to assume that an acknowledged commit is also replicated,
189 * which might not be true. So in this case we issue a WARNING (which
190 * some clients may be able to interpret) and shut off further output.
191 * We do NOT reset ProcDiePending, so that the process will die after
192 * the commit is cleaned up.
197 (errcode(ERRCODE_ADMIN_SHUTDOWN),
198 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
199 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
200 whereToSendOutput = DestNone;
206 * It's unclear what to do if a query cancel interrupt arrives. We
207 * can't actually abort at this point, but ignoring the interrupt
208 * altogether is not helpful, so we just terminate the wait with a
211 if (QueryCancelPending)
213 QueryCancelPending = false;
215 (errmsg("canceling wait for synchronous replication due to user request"),
216 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
222 * If the postmaster dies, we'll probably never get an
223 * acknowledgement, because all the wal sender processes will exit. So
226 if (!PostmasterIsAlive())
228 ProcDiePending = true;
229 whereToSendOutput = DestNone;
235 * Wait on latch. Any condition that should wake us up will set the
236 * latch, so no need for timeout.
238 WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
242 * WalSender has checked our LSN and has removed us from queue. Clean up
243 * state and leave. It's OK to reset these shared memory fields without
244 * holding SyncRepLock, because any walsenders will ignore us anyway when
245 * we're not on the queue.
247 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
248 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
253 /* Reset ps display */
254 set_ps_display(new_status, false);
260 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
262 * Usually we will go at tail of queue, though it's possible that we arrive
263 * here out of order, so start at tail and work back to insertion point.
266 SyncRepQueueInsert(int mode)
270 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
271 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
272 &(WalSndCtl->SyncRepQueue[mode]),
273 offsetof(PGPROC, syncRepLinks));
278 * Stop at the queue element that we should after to ensure the queue
281 if (proc->waitLSN < MyProc->waitLSN)
284 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
285 &(proc->syncRepLinks),
286 offsetof(PGPROC, syncRepLinks));
290 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
292 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
296 * Acquire SyncRepLock and cancel any wait currently in progress.
299 SyncRepCancelWait(void)
301 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
302 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
303 SHMQueueDelete(&(MyProc->syncRepLinks));
304 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
305 LWLockRelease(SyncRepLock);
309 SyncRepCleanupAtProcExit(void)
311 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
313 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
314 SHMQueueDelete(&(MyProc->syncRepLinks));
315 LWLockRelease(SyncRepLock);
320 * ===========================================================
321 * Synchronous Replication functions for wal sender processes
322 * ===========================================================
326 * Take any action required to initialise sync rep state from config
327 * data. Called at WALSender startup and after each SIGHUP.
330 SyncRepInitConfig(void)
335 * Determine if we are a potential sync standby and remember the result
336 * for handling replies from standby.
338 priority = SyncRepGetStandbyPriority();
339 if (MyWalSnd->sync_standby_priority != priority)
341 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
342 MyWalSnd->sync_standby_priority = priority;
343 LWLockRelease(SyncRepLock);
345 (errmsg("standby \"%s\" now has synchronous standby priority %u",
346 application_name, priority)));
351 * Find the WAL sender servicing the synchronous standby with the lowest
352 * priority value, or NULL if no synchronous standby is connected. If there
353 * are multiple standbys with the same lowest priority value, the first one
354 * found is selected. The caller must hold SyncRepLock.
357 SyncRepGetSynchronousStandby(void)
359 WalSnd *result = NULL;
360 int result_priority = 0;
363 for (i = 0; i < max_wal_senders; i++)
365 /* Use volatile pointer to prevent code rearrangement */
366 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
370 if (walsnd->pid == 0)
373 /* Must be streaming */
374 if (walsnd->state != WALSNDSTATE_STREAMING)
377 /* Must be synchronous */
378 this_priority = walsnd->sync_standby_priority;
379 if (this_priority == 0)
382 /* Must have a lower priority value than any previous ones */
383 if (result != NULL && result_priority <= this_priority)
386 /* Must have a valid flush position */
387 if (XLogRecPtrIsInvalid(walsnd->flush))
390 result = (WalSnd *) walsnd;
391 result_priority = this_priority;
394 * If priority is equal to 1, there cannot be any other WAL senders
395 * with a lower priority, so we're done.
397 if (this_priority == 1)
405 * Update the LSNs on each queue based upon our latest state. This
406 * implements a simple policy of first-valid-standby-releases-waiter.
408 * Other policies are possible, which would change what we do here and what
409 * perhaps also which information we store as well.
412 SyncRepReleaseWaiters(void)
414 volatile WalSndCtlData *walsndctl = WalSndCtl;
420 * If this WALSender is serving a standby that is not on the list of
421 * potential standbys then we have nothing to do. If we are still starting
422 * up, still running base backup or the current flush position is still
423 * invalid, then leave quickly also.
425 if (MyWalSnd->sync_standby_priority == 0 ||
426 MyWalSnd->state < WALSNDSTATE_STREAMING ||
427 XLogRecPtrIsInvalid(MyWalSnd->flush))
431 * We're a potential sync standby. Release waiters if we are the highest
434 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
435 syncWalSnd = SyncRepGetSynchronousStandby();
437 /* We should have found ourselves at least */
438 Assert(syncWalSnd != NULL);
441 * If we aren't managing the highest priority standby then just leave.
443 if (syncWalSnd != MyWalSnd)
445 LWLockRelease(SyncRepLock);
446 announce_next_takeover = true;
451 * Set the lsn first so that when we wake backends they will release up to
454 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
456 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
457 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
459 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
461 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
462 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
465 LWLockRelease(SyncRepLock);
467 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
468 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
469 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
472 * If we are managing the highest priority standby, though we weren't
473 * prior to this, then announce we are now the sync standby.
475 if (announce_next_takeover)
477 announce_next_takeover = false;
479 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
480 application_name, MyWalSnd->sync_standby_priority)));
485 * Check if we are in the list of sync standbys, and if so, determine
486 * priority sequence. Return priority if set, or zero to indicate that
487 * we are not a potential sync standby.
489 * Compare the parameter SyncRepStandbyNames against the application_name
490 * for this WALSender, or allow any name if we find a wildcard "*".
493 SyncRepGetStandbyPriority(void)
502 * Since synchronous cascade replication is not allowed, we always set the
503 * priority of cascading walsender to zero.
505 if (am_cascading_walsender)
508 /* Need a modifiable copy of string */
509 rawstring = pstrdup(SyncRepStandbyNames);
511 /* Parse string into list of identifiers */
512 if (!SplitIdentifierString(rawstring, ',', &elemlist))
514 /* syntax error in list */
517 /* GUC machinery will have already complained - no need to do again */
523 char *standby_name = (char *) lfirst(l);
527 if (pg_strcasecmp(standby_name, application_name) == 0 ||
528 pg_strcasecmp(standby_name, "*") == 0)
538 return (found ? priority : 0);
542 * Walk the specified queue from head. Set the state of any backends that
543 * need to be woken, remove them from the queue, and then wake them.
544 * Pass all = true to wake whole queue; otherwise, just wake up to
545 * the walsender's LSN.
547 * Must hold SyncRepLock.
550 SyncRepWakeQueue(bool all, int mode)
552 volatile WalSndCtlData *walsndctl = WalSndCtl;
554 PGPROC *thisproc = NULL;
557 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
558 Assert(SyncRepQueueIsOrderedByLSN(mode));
560 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
561 &(WalSndCtl->SyncRepQueue[mode]),
562 offsetof(PGPROC, syncRepLinks));
567 * Assume the queue is ordered by LSN
569 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
573 * Move to next proc, so we can delete thisproc from the queue.
574 * thisproc is valid, proc may be NULL after this.
577 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
578 &(proc->syncRepLinks),
579 offsetof(PGPROC, syncRepLinks));
582 * Set state to complete; see SyncRepWaitForLSN() for discussion of
583 * the various states.
585 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
588 * Remove thisproc from queue.
590 SHMQueueDelete(&(thisproc->syncRepLinks));
593 * Wake only when we have set state and removed from queue.
595 SetLatch(&(thisproc->procLatch));
604 * The checkpointer calls this as needed to update the shared
605 * sync_standbys_defined flag, so that backends don't remain permanently wedged
606 * if synchronous_standby_names is unset. It's safe to check the current value
607 * without the lock, because it's only ever updated by one process. But we
608 * must take the lock to change it.
611 SyncRepUpdateSyncStandbysDefined(void)
613 bool sync_standbys_defined = SyncStandbysDefined();
615 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
617 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
620 * If synchronous_standby_names has been reset to empty, it's futile
621 * for backends to continue to waiting. Since the user no longer
622 * wants synchronous replication, we'd better wake them up.
624 if (!sync_standbys_defined)
628 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
629 SyncRepWakeQueue(true, i);
633 * Only allow people to join the queue when there are synchronous
634 * standbys defined. Without this interlock, there's a race
635 * condition: we might wake up all the current waiters; then, some
636 * backend that hasn't yet reloaded its config might go to sleep on
637 * the queue (and never wake up). This prevents that.
639 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
641 LWLockRelease(SyncRepLock);
645 #ifdef USE_ASSERT_CHECKING
647 SyncRepQueueIsOrderedByLSN(int mode)
652 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
656 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
657 &(WalSndCtl->SyncRepQueue[mode]),
658 offsetof(PGPROC, syncRepLinks));
663 * Check the queue is ordered by LSN and that multiple procs don't
666 if (proc->waitLSN <= lastLSN)
669 lastLSN = proc->waitLSN;
671 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
672 &(proc->syncRepLinks),
673 offsetof(PGPROC, syncRepLinks));
681 * ===========================================================
682 * Synchronous Replication functions executed by any process
683 * ===========================================================
687 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
692 /* Need a modifiable copy of string */
693 rawstring = pstrdup(*newval);
695 /* Parse string into list of identifiers */
696 if (!SplitIdentifierString(rawstring, ',', &elemlist))
698 /* syntax error in list */
699 GUC_check_errdetail("List syntax is invalid.");
706 * Any additional validation of standby names should go here.
708 * Don't attempt to set WALSender priority because this is executed by
709 * postmaster at startup, not WALSender, so the application_name is not
720 assign_synchronous_commit(int newval, void *extra)
724 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
725 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
727 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
728 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
731 SyncRepWaitMode = SYNC_REP_NO_WAIT;