]> granicus.if.org Git - postgresql/blob - src/backend/replication/syncrep.c
Measure WaitLatch's timeout parameter in milliseconds, not microseconds.
[postgresql] / src / backend / replication / syncrep.c
1 /*-------------------------------------------------------------------------
2  *
3  * syncrep.c
4  *
5  * Synchronous replication is new as of PostgreSQL 9.1.
6  *
7  * If requested, transaction commits wait until their commit LSN is
8  * acknowledged by the sync standby.
9  *
10  * This module contains the code for waiting and release of backends.
11  * All code in this module executes on the primary. The core streaming
12  * replication transport remains within WALreceiver/WALsender modules.
13  *
14  * The essence of this design is that it isolates all logic about
15  * waiting/releasing onto the primary. The primary defines which standbys
16  * it wishes to wait for. The standby is completely unaware of the
17  * durability requirements of transactions on the primary, reducing the
18  * complexity of the code and streamlining both standby operations and
19  * network bandwidth because there is no requirement to ship
20  * per-transaction state information.
21  *
22  * Replication is either synchronous or not synchronous (async). If it is
23  * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
24  * for the flush location on the standby before releasing the waiting backend.
25  * Further complexity in that interaction is expected in later releases.
26  *
27  * The best performing way to manage the waiting backends is to have a
28  * single ordered queue of waiting backends, so that we can avoid
29  * searching the through all waiters each time we receive a reply.
30  *
31  * In 9.1 we support only a single synchronous standby, chosen from a
32  * priority list of synchronous_standby_names. Before it can become the
33  * synchronous standby it must have caught up with the primary; that may
34  * take some time. Once caught up, the current highest priority standby
35  * will release waiters from the queue.
36  *
37  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
38  *
39  * IDENTIFICATION
40  *        src/backend/replication/syncrep.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45
46 #include <unistd.h>
47
48 #include "access/xact.h"
49 #include "access/xlog_internal.h"
50 #include "miscadmin.h"
51 #include "postmaster/autovacuum.h"
52 #include "replication/syncrep.h"
53 #include "replication/walsender.h"
54 #include "storage/latch.h"
55 #include "storage/ipc.h"
56 #include "storage/pmsignal.h"
57 #include "storage/proc.h"
58 #include "tcop/tcopprot.h"
59 #include "utils/builtins.h"
60 #include "utils/guc.h"
61 #include "utils/guc_tables.h"
62 #include "utils/memutils.h"
63 #include "utils/ps_status.h"
64
65 /* User-settable parameters for sync rep */
66 char       *SyncRepStandbyNames;
67
68 #define SyncStandbysDefined() \
69         (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
70
71 static bool announce_next_takeover = true;
72
73 static void SyncRepQueueInsert(void);
74 static void SyncRepCancelWait(void);
75
76 static int      SyncRepGetStandbyPriority(void);
77
78 #ifdef USE_ASSERT_CHECKING
79 static bool SyncRepQueueIsOrderedByLSN(void);
80 #endif
81
82 /*
83  * ===========================================================
84  * Synchronous Replication functions for normal user backends
85  * ===========================================================
86  */
87
88 /*
89  * Wait for synchronous replication, if requested by user.
90  *
91  * Initially backends start in state SYNC_REP_NOT_WAITING and then
92  * change that state to SYNC_REP_WAITING before adding ourselves
93  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
94  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
95  * This backend then resets its state to SYNC_REP_NOT_WAITING.
96  */
97 void
98 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
99 {
100         char       *new_status = NULL;
101         const char *old_status;
102
103         /*
104          * Fast exit if user has not requested sync replication, or there are no
105          * sync replication standby names defined. Note that those standbys don't
106          * need to be connected.
107          */
108         if (!SyncRepRequested() || !SyncStandbysDefined())
109                 return;
110
111         Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
112         Assert(WalSndCtl != NULL);
113
114         /* Reset the latch before adding ourselves to the queue. */
115         ResetLatch(&MyProc->waitLatch);
116
117         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
118         Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
119
120         /*
121          * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
122          * set.  See SyncRepUpdateSyncStandbysDefined.
123          *
124          * Also check that the standby hasn't already replied. Unlikely race
125          * condition but we'll be fetching that cache line anyway so its likely to
126          * be a low cost check.
127          */
128         if (!WalSndCtl->sync_standbys_defined ||
129                 XLByteLE(XactCommitLSN, WalSndCtl->lsn))
130         {
131                 LWLockRelease(SyncRepLock);
132                 return;
133         }
134
135         /*
136          * Set our waitLSN so WALSender will know when to wake us, and add
137          * ourselves to the queue.
138          */
139         MyProc->waitLSN = XactCommitLSN;
140         MyProc->syncRepState = SYNC_REP_WAITING;
141         SyncRepQueueInsert();
142         Assert(SyncRepQueueIsOrderedByLSN());
143         LWLockRelease(SyncRepLock);
144
145         /* Alter ps display to show waiting for sync rep. */
146         if (update_process_title)
147         {
148                 int                     len;
149
150                 old_status = get_ps_display(&len);
151                 new_status = (char *) palloc(len + 32 + 1);
152                 memcpy(new_status, old_status, len);
153                 sprintf(new_status + len, " waiting for %X/%X",
154                                 XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
155                 set_ps_display(new_status, false);
156                 new_status[len] = '\0'; /* truncate off " waiting ..." */
157         }
158
159         /*
160          * Wait for specified LSN to be confirmed.
161          *
162          * Each proc has its own wait latch, so we perform a normal latch
163          * check/wait loop here.
164          */
165         for (;;)
166         {
167                 int                     syncRepState;
168
169                 /* Must reset the latch before testing state. */
170                 ResetLatch(&MyProc->waitLatch);
171
172                 /*
173                  * Try checking the state without the lock first.  There's no
174                  * guarantee that we'll read the most up-to-date value, so if it looks
175                  * like we're still waiting, recheck while holding the lock.  But if
176                  * it looks like we're done, we must really be done, because once
177                  * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
178                  * never update it again, so we can't be seeing a stale value in that
179                  * case.
180                  *
181                  * Note: on machines with weak memory ordering, the acquisition of
182                  * the lock is essential to avoid race conditions: we cannot be sure
183                  * the sender's state update has reached main memory until we acquire
184                  * the lock.  We could get rid of this dance if SetLatch/ResetLatch
185                  * contained memory barriers.
186                  */
187                 syncRepState = MyProc->syncRepState;
188                 if (syncRepState == SYNC_REP_WAITING)
189                 {
190                         LWLockAcquire(SyncRepLock, LW_SHARED);
191                         syncRepState = MyProc->syncRepState;
192                         LWLockRelease(SyncRepLock);
193                 }
194                 if (syncRepState == SYNC_REP_WAIT_COMPLETE)
195                         break;
196
197                 /*
198                  * If a wait for synchronous replication is pending, we can neither
199                  * acknowledge the commit nor raise ERROR or FATAL.  The latter would
200                  * lead the client to believe that that the transaction aborted, which
201                  * is not true: it's already committed locally. The former is no good
202                  * either: the client has requested synchronous replication, and is
203                  * entitled to assume that an acknowledged commit is also replicated,
204                  * which might not be true. So in this case we issue a WARNING (which
205                  * some clients may be able to interpret) and shut off further output.
206                  * We do NOT reset ProcDiePending, so that the process will die after
207                  * the commit is cleaned up.
208                  */
209                 if (ProcDiePending)
210                 {
211                         ereport(WARNING,
212                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
213                                          errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
214                                          errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
215                         whereToSendOutput = DestNone;
216                         SyncRepCancelWait();
217                         break;
218                 }
219
220                 /*
221                  * It's unclear what to do if a query cancel interrupt arrives.  We
222                  * can't actually abort at this point, but ignoring the interrupt
223                  * altogether is not helpful, so we just terminate the wait with a
224                  * suitable warning.
225                  */
226                 if (QueryCancelPending)
227                 {
228                         QueryCancelPending = false;
229                         ereport(WARNING,
230                                         (errmsg("canceling wait for synchronous replication due to user request"),
231                                          errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
232                         SyncRepCancelWait();
233                         break;
234                 }
235
236                 /*
237                  * If the postmaster dies, we'll probably never get an
238                  * acknowledgement, because all the wal sender processes will exit. So
239                  * just bail out.
240                  */
241                 if (!PostmasterIsAlive())
242                 {
243                         ProcDiePending = true;
244                         whereToSendOutput = DestNone;
245                         SyncRepCancelWait();
246                         break;
247                 }
248
249                 /*
250                  * Wait on latch for up to 60 seconds. This allows us to check for
251                  * cancel/die signal or postmaster death regularly while waiting. Note
252                  * that timeout here does not necessarily release from loop.
253                  */
254                 WaitLatch(&MyProc->waitLatch, WL_LATCH_SET | WL_TIMEOUT, 60000L);
255         }
256
257         /*
258          * WalSender has checked our LSN and has removed us from queue. Clean up
259          * state and leave.  It's OK to reset these shared memory fields without
260          * holding SyncRepLock, because any walsenders will ignore us anyway when
261          * we're not on the queue.
262          */
263         Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
264         MyProc->syncRepState = SYNC_REP_NOT_WAITING;
265         MyProc->waitLSN.xlogid = 0;
266         MyProc->waitLSN.xrecoff = 0;
267
268         if (new_status)
269         {
270                 /* Reset ps display */
271                 set_ps_display(new_status, false);
272                 pfree(new_status);
273         }
274 }
275
276 /*
277  * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
278  *
279  * Usually we will go at tail of queue, though it's possible that we arrive
280  * here out of order, so start at tail and work back to insertion point.
281  */
282 static void
283 SyncRepQueueInsert(void)
284 {
285         PGPROC     *proc;
286
287         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
288                                                                    &(WalSndCtl->SyncRepQueue),
289                                                                    offsetof(PGPROC, syncRepLinks));
290
291         while (proc)
292         {
293                 /*
294                  * Stop at the queue element that we should after to ensure the queue
295                  * is ordered by LSN.
296                  */
297                 if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
298                         break;
299
300                 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
301                                                                            &(proc->syncRepLinks),
302                                                                            offsetof(PGPROC, syncRepLinks));
303         }
304
305         if (proc)
306                 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
307         else
308                 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
309 }
310
311 /*
312  * Acquire SyncRepLock and cancel any wait currently in progress.
313  */
314 static void
315 SyncRepCancelWait(void)
316 {
317         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
318         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
319                 SHMQueueDelete(&(MyProc->syncRepLinks));
320         MyProc->syncRepState = SYNC_REP_NOT_WAITING;
321         LWLockRelease(SyncRepLock);
322 }
323
324 void
325 SyncRepCleanupAtProcExit(int code, Datum arg)
326 {
327         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
328         {
329                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
330                 SHMQueueDelete(&(MyProc->syncRepLinks));
331                 LWLockRelease(SyncRepLock);
332         }
333
334         DisownLatch(&MyProc->waitLatch);
335 }
336
337 /*
338  * ===========================================================
339  * Synchronous Replication functions for wal sender processes
340  * ===========================================================
341  */
342
343 /*
344  * Take any action required to initialise sync rep state from config
345  * data. Called at WALSender startup and after each SIGHUP.
346  */
347 void
348 SyncRepInitConfig(void)
349 {
350         int                     priority;
351
352         /*
353          * Determine if we are a potential sync standby and remember the result
354          * for handling replies from standby.
355          */
356         priority = SyncRepGetStandbyPriority();
357         if (MyWalSnd->sync_standby_priority != priority)
358         {
359                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
360                 MyWalSnd->sync_standby_priority = priority;
361                 LWLockRelease(SyncRepLock);
362                 ereport(DEBUG1,
363                         (errmsg("standby \"%s\" now has synchronous standby priority %u",
364                                         application_name, priority)));
365         }
366 }
367
368 /*
369  * Update the LSNs on each queue based upon our latest state. This
370  * implements a simple policy of first-valid-standby-releases-waiter.
371  *
372  * Other policies are possible, which would change what we do here and what
373  * perhaps also which information we store as well.
374  */
375 void
376 SyncRepReleaseWaiters(void)
377 {
378         volatile WalSndCtlData *walsndctl = WalSndCtl;
379         volatile WalSnd *syncWalSnd = NULL;
380         int                     numprocs = 0;
381         int                     priority = 0;
382         int                     i;
383
384         /*
385          * If this WALSender is serving a standby that is not on the list of
386          * potential standbys then we have nothing to do. If we are still starting
387          * up or still running base backup, then leave quickly also.
388          */
389         if (MyWalSnd->sync_standby_priority == 0 ||
390                 MyWalSnd->state < WALSNDSTATE_STREAMING)
391                 return;
392
393         /*
394          * We're a potential sync standby. Release waiters if we are the highest
395          * priority standby. If there are multiple standbys with same priorities
396          * then we use the first mentioned standby. If you change this, also
397          * change pg_stat_get_wal_senders().
398          */
399         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
400
401         for (i = 0; i < max_wal_senders; i++)
402         {
403                 /* use volatile pointer to prevent code rearrangement */
404                 volatile WalSnd *walsnd = &walsndctl->walsnds[i];
405
406                 if (walsnd->pid != 0 &&
407                         walsnd->sync_standby_priority > 0 &&
408                         (priority == 0 ||
409                          priority > walsnd->sync_standby_priority))
410                 {
411                         priority = walsnd->sync_standby_priority;
412                         syncWalSnd = walsnd;
413                 }
414         }
415
416         /*
417          * We should have found ourselves at least.
418          */
419         Assert(syncWalSnd);
420
421         /*
422          * If we aren't managing the highest priority standby then just leave.
423          */
424         if (syncWalSnd != MyWalSnd)
425         {
426                 LWLockRelease(SyncRepLock);
427                 announce_next_takeover = true;
428                 return;
429         }
430
431         if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
432         {
433                 /*
434                  * Set the lsn first so that when we wake backends they will release
435                  * up to this location.
436                  */
437                 walsndctl->lsn = MyWalSnd->flush;
438                 numprocs = SyncRepWakeQueue(false);
439         }
440
441         LWLockRelease(SyncRepLock);
442
443         elog(DEBUG3, "released %d procs up to %X/%X",
444                  numprocs,
445                  MyWalSnd->flush.xlogid,
446                  MyWalSnd->flush.xrecoff);
447
448         /*
449          * If we are managing the highest priority standby, though we weren't
450          * prior to this, then announce we are now the sync standby.
451          */
452         if (announce_next_takeover)
453         {
454                 announce_next_takeover = false;
455                 ereport(LOG,
456                                 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
457                                                 application_name, MyWalSnd->sync_standby_priority)));
458         }
459 }
460
461 /*
462  * Check if we are in the list of sync standbys, and if so, determine
463  * priority sequence. Return priority if set, or zero to indicate that
464  * we are not a potential sync standby.
465  *
466  * Compare the parameter SyncRepStandbyNames against the application_name
467  * for this WALSender, or allow any name if we find a wildcard "*".
468  */
469 static int
470 SyncRepGetStandbyPriority(void)
471 {
472         char       *rawstring;
473         List       *elemlist;
474         ListCell   *l;
475         int                     priority = 0;
476         bool            found = false;
477
478         /*
479          * Since synchronous cascade replication is not allowed, we always
480          * set the priority of cascading walsender to zero.
481          */
482         if (am_cascading_walsender)
483                 return 0;
484
485         /* Need a modifiable copy of string */
486         rawstring = pstrdup(SyncRepStandbyNames);
487
488         /* Parse string into list of identifiers */
489         if (!SplitIdentifierString(rawstring, ',', &elemlist))
490         {
491                 /* syntax error in list */
492                 pfree(rawstring);
493                 list_free(elemlist);
494                 /* GUC machinery will have already complained - no need to do again */
495                 return 0;
496         }
497
498         foreach(l, elemlist)
499         {
500                 char       *standby_name = (char *) lfirst(l);
501
502                 priority++;
503
504                 if (pg_strcasecmp(standby_name, application_name) == 0 ||
505                         pg_strcasecmp(standby_name, "*") == 0)
506                 {
507                         found = true;
508                         break;
509                 }
510         }
511
512         pfree(rawstring);
513         list_free(elemlist);
514
515         return (found ? priority : 0);
516 }
517
518 /*
519  * Walk queue from head.  Set the state of any backends that need to be woken,
520  * remove them from the queue, and then wake them.      Pass all = true to wake
521  * whole queue; otherwise, just wake up to the walsender's LSN.
522  *
523  * Must hold SyncRepLock.
524  */
525 int
526 SyncRepWakeQueue(bool all)
527 {
528         volatile WalSndCtlData *walsndctl = WalSndCtl;
529         PGPROC     *proc = NULL;
530         PGPROC     *thisproc = NULL;
531         int                     numprocs = 0;
532
533         Assert(SyncRepQueueIsOrderedByLSN());
534
535         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
536                                                                    &(WalSndCtl->SyncRepQueue),
537                                                                    offsetof(PGPROC, syncRepLinks));
538
539         while (proc)
540         {
541                 /*
542                  * Assume the queue is ordered by LSN
543                  */
544                 if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
545                         return numprocs;
546
547                 /*
548                  * Move to next proc, so we can delete thisproc from the queue.
549                  * thisproc is valid, proc may be NULL after this.
550                  */
551                 thisproc = proc;
552                 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
553                                                                            &(proc->syncRepLinks),
554                                                                            offsetof(PGPROC, syncRepLinks));
555
556                 /*
557                  * Set state to complete; see SyncRepWaitForLSN() for discussion of
558                  * the various states.
559                  */
560                 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
561
562                 /*
563                  * Remove thisproc from queue.
564                  */
565                 SHMQueueDelete(&(thisproc->syncRepLinks));
566
567                 /*
568                  * Wake only when we have set state and removed from queue.
569                  */
570                 Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks)));
571                 Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE);
572                 SetLatch(&(thisproc->waitLatch));
573
574                 numprocs++;
575         }
576
577         return numprocs;
578 }
579
580 /*
581  * The background writer calls this as needed to update the shared
582  * sync_standbys_defined flag, so that backends don't remain permanently wedged
583  * if synchronous_standby_names is unset.  It's safe to check the current value
584  * without the lock, because it's only ever updated by one process.  But we
585  * must take the lock to change it.
586  */
587 void
588 SyncRepUpdateSyncStandbysDefined(void)
589 {
590         bool            sync_standbys_defined = SyncStandbysDefined();
591
592         if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
593         {
594                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
595
596                 /*
597                  * If synchronous_standby_names has been reset to empty, it's futile
598                  * for backends to continue to waiting.  Since the user no longer
599                  * wants synchronous replication, we'd better wake them up.
600                  */
601                 if (!sync_standbys_defined)
602                         SyncRepWakeQueue(true);
603
604                 /*
605                  * Only allow people to join the queue when there are synchronous
606                  * standbys defined.  Without this interlock, there's a race
607                  * condition: we might wake up all the current waiters; then, some
608                  * backend that hasn't yet reloaded its config might go to sleep on
609                  * the queue (and never wake up).  This prevents that.
610                  */
611                 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
612
613                 LWLockRelease(SyncRepLock);
614         }
615 }
616
617 #ifdef USE_ASSERT_CHECKING
618 static bool
619 SyncRepQueueIsOrderedByLSN(void)
620 {
621         PGPROC     *proc = NULL;
622         XLogRecPtr      lastLSN;
623
624         lastLSN.xlogid = 0;
625         lastLSN.xrecoff = 0;
626
627         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
628                                                                    &(WalSndCtl->SyncRepQueue),
629                                                                    offsetof(PGPROC, syncRepLinks));
630
631         while (proc)
632         {
633                 /*
634                  * Check the queue is ordered by LSN and that multiple procs don't
635                  * have matching LSNs
636                  */
637                 if (XLByteLE(proc->waitLSN, lastLSN))
638                         return false;
639
640                 lastLSN = proc->waitLSN;
641
642                 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
643                                                                            &(proc->syncRepLinks),
644                                                                            offsetof(PGPROC, syncRepLinks));
645         }
646
647         return true;
648 }
649 #endif
650
651 /*
652  * ===========================================================
653  * Synchronous Replication functions executed by any process
654  * ===========================================================
655  */
656
657 bool
658 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
659 {
660         char       *rawstring;
661         List       *elemlist;
662
663         /* Need a modifiable copy of string */
664         rawstring = pstrdup(*newval);
665
666         /* Parse string into list of identifiers */
667         if (!SplitIdentifierString(rawstring, ',', &elemlist))
668         {
669                 /* syntax error in list */
670                 GUC_check_errdetail("List syntax is invalid.");
671                 pfree(rawstring);
672                 list_free(elemlist);
673                 return false;
674         }
675
676         /*
677          * Any additional validation of standby names should go here.
678          *
679          * Don't attempt to set WALSender priority because this is executed by
680          * postmaster at startup, not WALSender, so the application_name is not
681          * yet correctly set.
682          */
683
684         pfree(rawstring);
685         list_free(elemlist);
686
687         return true;
688 }