]> granicus.if.org Git - postgresql/blob - src/backend/replication/syncrep.c
Add barriers to the latch code.
[postgresql] / src / backend / replication / syncrep.c
1 /*-------------------------------------------------------------------------
2  *
3  * syncrep.c
4  *
5  * Synchronous replication is new as of PostgreSQL 9.1.
6  *
7  * If requested, transaction commits wait until their commit LSN is
8  * acknowledged by the synchronous standby.
9  *
10  * This module contains the code for waiting and release of backends.
11  * All code in this module executes on the primary. The core streaming
12  * replication transport remains within WALreceiver/WALsender modules.
13  *
14  * The essence of this design is that it isolates all logic about
15  * waiting/releasing onto the primary. The primary defines which standbys
16  * it wishes to wait for. The standby is completely unaware of the
17  * durability requirements of transactions on the primary, reducing the
18  * complexity of the code and streamlining both standby operations and
19  * network bandwidth because there is no requirement to ship
20  * per-transaction state information.
21  *
22  * Replication is either synchronous or not synchronous (async). If it is
23  * async, we just fastpath out of here. If it is sync, then we wait for
24  * the write or flush location on the standby before releasing the waiting
25  * backend. Further complexity in that interaction is expected in later
26  * releases.
27  *
28  * The best performing way to manage the waiting backends is to have a
29  * single ordered queue of waiting backends, so that we can avoid
30  * searching the through all waiters each time we receive a reply.
31  *
32  * In 9.1 we support only a single synchronous standby, chosen from a
33  * priority list of synchronous_standby_names. Before it can become the
34  * synchronous standby it must have caught up with the primary; that may
35  * take some time. Once caught up, the current highest priority standby
36  * will release waiters from the queue.
37  *
38  * Portions Copyright (c) 2010-2015, PostgreSQL Global Development Group
39  *
40  * IDENTIFICATION
41  *        src/backend/replication/syncrep.c
42  *
43  *-------------------------------------------------------------------------
44  */
45 #include "postgres.h"
46
47 #include <unistd.h>
48
49 #include "access/xact.h"
50 #include "miscadmin.h"
51 #include "replication/syncrep.h"
52 #include "replication/walsender.h"
53 #include "replication/walsender_private.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "tcop/tcopprot.h"
57 #include "utils/builtins.h"
58 #include "utils/ps_status.h"
59
60 /* User-settable parameters for sync rep */
61 char       *SyncRepStandbyNames;
62
63 #define SyncStandbysDefined() \
64         (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
65
66 static bool announce_next_takeover = true;
67
68 static int      SyncRepWaitMode = SYNC_REP_NO_WAIT;
69
70 static void SyncRepQueueInsert(int mode);
71 static void SyncRepCancelWait(void);
72
73 static int      SyncRepGetStandbyPriority(void);
74
75 #ifdef USE_ASSERT_CHECKING
76 static bool SyncRepQueueIsOrderedByLSN(int mode);
77 #endif
78
79 /*
80  * ===========================================================
81  * Synchronous Replication functions for normal user backends
82  * ===========================================================
83  */
84
85 /*
86  * Wait for synchronous replication, if requested by user.
87  *
88  * Initially backends start in state SYNC_REP_NOT_WAITING and then
89  * change that state to SYNC_REP_WAITING before adding ourselves
90  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
91  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
92  * This backend then resets its state to SYNC_REP_NOT_WAITING.
93  */
94 void
95 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
96 {
97         char       *new_status = NULL;
98         const char *old_status;
99         int                     mode = SyncRepWaitMode;
100
101         /*
102          * Fast exit if user has not requested sync replication, or there are no
103          * sync replication standby names defined. Note that those standbys don't
104          * need to be connected.
105          */
106         if (!SyncRepRequested() || !SyncStandbysDefined())
107                 return;
108
109         Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
110         Assert(WalSndCtl != NULL);
111
112         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
113         Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
114
115         /*
116          * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
117          * set.  See SyncRepUpdateSyncStandbysDefined.
118          *
119          * Also check that the standby hasn't already replied. Unlikely race
120          * condition but we'll be fetching that cache line anyway so it's likely
121          * to be a low cost check.
122          */
123         if (!WalSndCtl->sync_standbys_defined ||
124                 XactCommitLSN <= WalSndCtl->lsn[mode])
125         {
126                 LWLockRelease(SyncRepLock);
127                 return;
128         }
129
130         /*
131          * Set our waitLSN so WALSender will know when to wake us, and add
132          * ourselves to the queue.
133          */
134         MyProc->waitLSN = XactCommitLSN;
135         MyProc->syncRepState = SYNC_REP_WAITING;
136         SyncRepQueueInsert(mode);
137         Assert(SyncRepQueueIsOrderedByLSN(mode));
138         LWLockRelease(SyncRepLock);
139
140         /* Alter ps display to show waiting for sync rep. */
141         if (update_process_title)
142         {
143                 int                     len;
144
145                 old_status = get_ps_display(&len);
146                 new_status = (char *) palloc(len + 32 + 1);
147                 memcpy(new_status, old_status, len);
148                 sprintf(new_status + len, " waiting for %X/%X",
149                                 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
150                 set_ps_display(new_status, false);
151                 new_status[len] = '\0'; /* truncate off " waiting ..." */
152         }
153
154         /*
155          * Wait for specified LSN to be confirmed.
156          *
157          * Each proc has its own wait latch, so we perform a normal latch
158          * check/wait loop here.
159          */
160         for (;;)
161         {
162                 int                     syncRepState;
163
164                 /* Must reset the latch before testing state. */
165                 ResetLatch(&MyProc->procLatch);
166
167                 /*
168                  * Try checking the state without the lock first.  There's no
169                  * guarantee that we'll read the most up-to-date value, so if it looks
170                  * like we're still waiting, recheck while holding the lock.  But if
171                  * it looks like we're done, we must really be done, because once
172                  * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
173                  * never update it again, so we can't be seeing a stale value in that
174                  * case.
175                  */
176                 syncRepState = MyProc->syncRepState;
177                 if (syncRepState == SYNC_REP_WAITING)
178                         syncRepState = MyProc->syncRepState;
179                 if (syncRepState == SYNC_REP_WAIT_COMPLETE)
180                         break;
181
182                 /*
183                  * If a wait for synchronous replication is pending, we can neither
184                  * acknowledge the commit nor raise ERROR or FATAL.  The latter would
185                  * lead the client to believe that that the transaction aborted, which
186                  * is not true: it's already committed locally. The former is no good
187                  * either: the client has requested synchronous replication, and is
188                  * entitled to assume that an acknowledged commit is also replicated,
189                  * which might not be true. So in this case we issue a WARNING (which
190                  * some clients may be able to interpret) and shut off further output.
191                  * We do NOT reset ProcDiePending, so that the process will die after
192                  * the commit is cleaned up.
193                  */
194                 if (ProcDiePending)
195                 {
196                         ereport(WARNING,
197                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
198                                          errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
199                                          errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
200                         whereToSendOutput = DestNone;
201                         SyncRepCancelWait();
202                         break;
203                 }
204
205                 /*
206                  * It's unclear what to do if a query cancel interrupt arrives.  We
207                  * can't actually abort at this point, but ignoring the interrupt
208                  * altogether is not helpful, so we just terminate the wait with a
209                  * suitable warning.
210                  */
211                 if (QueryCancelPending)
212                 {
213                         QueryCancelPending = false;
214                         ereport(WARNING,
215                                         (errmsg("canceling wait for synchronous replication due to user request"),
216                                          errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
217                         SyncRepCancelWait();
218                         break;
219                 }
220
221                 /*
222                  * If the postmaster dies, we'll probably never get an
223                  * acknowledgement, because all the wal sender processes will exit. So
224                  * just bail out.
225                  */
226                 if (!PostmasterIsAlive())
227                 {
228                         ProcDiePending = true;
229                         whereToSendOutput = DestNone;
230                         SyncRepCancelWait();
231                         break;
232                 }
233
234                 /*
235                  * Wait on latch.  Any condition that should wake us up will set the
236                  * latch, so no need for timeout.
237                  */
238                 WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
239         }
240
241         /*
242          * WalSender has checked our LSN and has removed us from queue. Clean up
243          * state and leave.  It's OK to reset these shared memory fields without
244          * holding SyncRepLock, because any walsenders will ignore us anyway when
245          * we're not on the queue.
246          */
247         Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
248         MyProc->syncRepState = SYNC_REP_NOT_WAITING;
249         MyProc->waitLSN = 0;
250
251         if (new_status)
252         {
253                 /* Reset ps display */
254                 set_ps_display(new_status, false);
255                 pfree(new_status);
256         }
257 }
258
259 /*
260  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
261  *
262  * Usually we will go at tail of queue, though it's possible that we arrive
263  * here out of order, so start at tail and work back to insertion point.
264  */
265 static void
266 SyncRepQueueInsert(int mode)
267 {
268         PGPROC     *proc;
269
270         Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
271         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
272                                                                    &(WalSndCtl->SyncRepQueue[mode]),
273                                                                    offsetof(PGPROC, syncRepLinks));
274
275         while (proc)
276         {
277                 /*
278                  * Stop at the queue element that we should after to ensure the queue
279                  * is ordered by LSN.
280                  */
281                 if (proc->waitLSN < MyProc->waitLSN)
282                         break;
283
284                 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
285                                                                            &(proc->syncRepLinks),
286                                                                            offsetof(PGPROC, syncRepLinks));
287         }
288
289         if (proc)
290                 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
291         else
292                 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
293 }
294
295 /*
296  * Acquire SyncRepLock and cancel any wait currently in progress.
297  */
298 static void
299 SyncRepCancelWait(void)
300 {
301         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
302         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
303                 SHMQueueDelete(&(MyProc->syncRepLinks));
304         MyProc->syncRepState = SYNC_REP_NOT_WAITING;
305         LWLockRelease(SyncRepLock);
306 }
307
308 void
309 SyncRepCleanupAtProcExit(void)
310 {
311         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
312         {
313                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
314                 SHMQueueDelete(&(MyProc->syncRepLinks));
315                 LWLockRelease(SyncRepLock);
316         }
317 }
318
319 /*
320  * ===========================================================
321  * Synchronous Replication functions for wal sender processes
322  * ===========================================================
323  */
324
325 /*
326  * Take any action required to initialise sync rep state from config
327  * data. Called at WALSender startup and after each SIGHUP.
328  */
329 void
330 SyncRepInitConfig(void)
331 {
332         int                     priority;
333
334         /*
335          * Determine if we are a potential sync standby and remember the result
336          * for handling replies from standby.
337          */
338         priority = SyncRepGetStandbyPriority();
339         if (MyWalSnd->sync_standby_priority != priority)
340         {
341                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
342                 MyWalSnd->sync_standby_priority = priority;
343                 LWLockRelease(SyncRepLock);
344                 ereport(DEBUG1,
345                         (errmsg("standby \"%s\" now has synchronous standby priority %u",
346                                         application_name, priority)));
347         }
348 }
349
350 /*
351  * Find the WAL sender servicing the synchronous standby with the lowest
352  * priority value, or NULL if no synchronous standby is connected. If there
353  * are multiple standbys with the same lowest priority value, the first one
354  * found is selected. The caller must hold SyncRepLock.
355  */
356 WalSnd *
357 SyncRepGetSynchronousStandby(void)
358 {
359         WalSnd     *result = NULL;
360         int                     result_priority = 0;
361         int                     i;
362
363         for (i = 0; i < max_wal_senders; i++)
364         {
365                 /* Use volatile pointer to prevent code rearrangement */
366                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
367                 int                     this_priority;
368
369                 /* Must be active */
370                 if (walsnd->pid == 0)
371                         continue;
372
373                 /* Must be streaming */
374                 if (walsnd->state != WALSNDSTATE_STREAMING)
375                         continue;
376
377                 /* Must be synchronous */
378                 this_priority = walsnd->sync_standby_priority;
379                 if (this_priority == 0)
380                         continue;
381
382                 /* Must have a lower priority value than any previous ones */
383                 if (result != NULL && result_priority <= this_priority)
384                         continue;
385
386                 /* Must have a valid flush position */
387                 if (XLogRecPtrIsInvalid(walsnd->flush))
388                         continue;
389
390                 result = (WalSnd *) walsnd;
391                 result_priority = this_priority;
392
393                 /*
394                  * If priority is equal to 1, there cannot be any other WAL senders
395                  * with a lower priority, so we're done.
396                  */
397                 if (this_priority == 1)
398                         return result;
399         }
400
401         return result;
402 }
403
404 /*
405  * Update the LSNs on each queue based upon our latest state. This
406  * implements a simple policy of first-valid-standby-releases-waiter.
407  *
408  * Other policies are possible, which would change what we do here and what
409  * perhaps also which information we store as well.
410  */
411 void
412 SyncRepReleaseWaiters(void)
413 {
414         volatile WalSndCtlData *walsndctl = WalSndCtl;
415         WalSnd     *syncWalSnd;
416         int                     numwrite = 0;
417         int                     numflush = 0;
418
419         /*
420          * If this WALSender is serving a standby that is not on the list of
421          * potential standbys then we have nothing to do. If we are still starting
422          * up, still running base backup or the current flush position is still
423          * invalid, then leave quickly also.
424          */
425         if (MyWalSnd->sync_standby_priority == 0 ||
426                 MyWalSnd->state < WALSNDSTATE_STREAMING ||
427                 XLogRecPtrIsInvalid(MyWalSnd->flush))
428                 return;
429
430         /*
431          * We're a potential sync standby. Release waiters if we are the highest
432          * priority standby.
433          */
434         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
435         syncWalSnd = SyncRepGetSynchronousStandby();
436
437         /* We should have found ourselves at least */
438         Assert(syncWalSnd != NULL);
439
440         /*
441          * If we aren't managing the highest priority standby then just leave.
442          */
443         if (syncWalSnd != MyWalSnd)
444         {
445                 LWLockRelease(SyncRepLock);
446                 announce_next_takeover = true;
447                 return;
448         }
449
450         /*
451          * Set the lsn first so that when we wake backends they will release up to
452          * this location.
453          */
454         if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
455         {
456                 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
457                 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
458         }
459         if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
460         {
461                 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
462                 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
463         }
464
465         LWLockRelease(SyncRepLock);
466
467         elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
468                  numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
469            numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
470
471         /*
472          * If we are managing the highest priority standby, though we weren't
473          * prior to this, then announce we are now the sync standby.
474          */
475         if (announce_next_takeover)
476         {
477                 announce_next_takeover = false;
478                 ereport(LOG,
479                                 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
480                                                 application_name, MyWalSnd->sync_standby_priority)));
481         }
482 }
483
484 /*
485  * Check if we are in the list of sync standbys, and if so, determine
486  * priority sequence. Return priority if set, or zero to indicate that
487  * we are not a potential sync standby.
488  *
489  * Compare the parameter SyncRepStandbyNames against the application_name
490  * for this WALSender, or allow any name if we find a wildcard "*".
491  */
492 static int
493 SyncRepGetStandbyPriority(void)
494 {
495         char       *rawstring;
496         List       *elemlist;
497         ListCell   *l;
498         int                     priority = 0;
499         bool            found = false;
500
501         /*
502          * Since synchronous cascade replication is not allowed, we always set the
503          * priority of cascading walsender to zero.
504          */
505         if (am_cascading_walsender)
506                 return 0;
507
508         /* Need a modifiable copy of string */
509         rawstring = pstrdup(SyncRepStandbyNames);
510
511         /* Parse string into list of identifiers */
512         if (!SplitIdentifierString(rawstring, ',', &elemlist))
513         {
514                 /* syntax error in list */
515                 pfree(rawstring);
516                 list_free(elemlist);
517                 /* GUC machinery will have already complained - no need to do again */
518                 return 0;
519         }
520
521         foreach(l, elemlist)
522         {
523                 char       *standby_name = (char *) lfirst(l);
524
525                 priority++;
526
527                 if (pg_strcasecmp(standby_name, application_name) == 0 ||
528                         pg_strcasecmp(standby_name, "*") == 0)
529                 {
530                         found = true;
531                         break;
532                 }
533         }
534
535         pfree(rawstring);
536         list_free(elemlist);
537
538         return (found ? priority : 0);
539 }
540
541 /*
542  * Walk the specified queue from head.  Set the state of any backends that
543  * need to be woken, remove them from the queue, and then wake them.
544  * Pass all = true to wake whole queue; otherwise, just wake up to
545  * the walsender's LSN.
546  *
547  * Must hold SyncRepLock.
548  */
549 int
550 SyncRepWakeQueue(bool all, int mode)
551 {
552         volatile WalSndCtlData *walsndctl = WalSndCtl;
553         PGPROC     *proc = NULL;
554         PGPROC     *thisproc = NULL;
555         int                     numprocs = 0;
556
557         Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
558         Assert(SyncRepQueueIsOrderedByLSN(mode));
559
560         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
561                                                                    &(WalSndCtl->SyncRepQueue[mode]),
562                                                                    offsetof(PGPROC, syncRepLinks));
563
564         while (proc)
565         {
566                 /*
567                  * Assume the queue is ordered by LSN
568                  */
569                 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
570                         return numprocs;
571
572                 /*
573                  * Move to next proc, so we can delete thisproc from the queue.
574                  * thisproc is valid, proc may be NULL after this.
575                  */
576                 thisproc = proc;
577                 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
578                                                                            &(proc->syncRepLinks),
579                                                                            offsetof(PGPROC, syncRepLinks));
580
581                 /*
582                  * Set state to complete; see SyncRepWaitForLSN() for discussion of
583                  * the various states.
584                  */
585                 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
586
587                 /*
588                  * Remove thisproc from queue.
589                  */
590                 SHMQueueDelete(&(thisproc->syncRepLinks));
591
592                 /*
593                  * Wake only when we have set state and removed from queue.
594                  */
595                 SetLatch(&(thisproc->procLatch));
596
597                 numprocs++;
598         }
599
600         return numprocs;
601 }
602
603 /*
604  * The checkpointer calls this as needed to update the shared
605  * sync_standbys_defined flag, so that backends don't remain permanently wedged
606  * if synchronous_standby_names is unset.  It's safe to check the current value
607  * without the lock, because it's only ever updated by one process.  But we
608  * must take the lock to change it.
609  */
610 void
611 SyncRepUpdateSyncStandbysDefined(void)
612 {
613         bool            sync_standbys_defined = SyncStandbysDefined();
614
615         if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
616         {
617                 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
618
619                 /*
620                  * If synchronous_standby_names has been reset to empty, it's futile
621                  * for backends to continue to waiting.  Since the user no longer
622                  * wants synchronous replication, we'd better wake them up.
623                  */
624                 if (!sync_standbys_defined)
625                 {
626                         int                     i;
627
628                         for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
629                                 SyncRepWakeQueue(true, i);
630                 }
631
632                 /*
633                  * Only allow people to join the queue when there are synchronous
634                  * standbys defined.  Without this interlock, there's a race
635                  * condition: we might wake up all the current waiters; then, some
636                  * backend that hasn't yet reloaded its config might go to sleep on
637                  * the queue (and never wake up).  This prevents that.
638                  */
639                 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
640
641                 LWLockRelease(SyncRepLock);
642         }
643 }
644
645 #ifdef USE_ASSERT_CHECKING
646 static bool
647 SyncRepQueueIsOrderedByLSN(int mode)
648 {
649         PGPROC     *proc = NULL;
650         XLogRecPtr      lastLSN;
651
652         Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
653
654         lastLSN = 0;
655
656         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
657                                                                    &(WalSndCtl->SyncRepQueue[mode]),
658                                                                    offsetof(PGPROC, syncRepLinks));
659
660         while (proc)
661         {
662                 /*
663                  * Check the queue is ordered by LSN and that multiple procs don't
664                  * have matching LSNs
665                  */
666                 if (proc->waitLSN <= lastLSN)
667                         return false;
668
669                 lastLSN = proc->waitLSN;
670
671                 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
672                                                                            &(proc->syncRepLinks),
673                                                                            offsetof(PGPROC, syncRepLinks));
674         }
675
676         return true;
677 }
678 #endif
679
680 /*
681  * ===========================================================
682  * Synchronous Replication functions executed by any process
683  * ===========================================================
684  */
685
686 bool
687 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
688 {
689         char       *rawstring;
690         List       *elemlist;
691
692         /* Need a modifiable copy of string */
693         rawstring = pstrdup(*newval);
694
695         /* Parse string into list of identifiers */
696         if (!SplitIdentifierString(rawstring, ',', &elemlist))
697         {
698                 /* syntax error in list */
699                 GUC_check_errdetail("List syntax is invalid.");
700                 pfree(rawstring);
701                 list_free(elemlist);
702                 return false;
703         }
704
705         /*
706          * Any additional validation of standby names should go here.
707          *
708          * Don't attempt to set WALSender priority because this is executed by
709          * postmaster at startup, not WALSender, so the application_name is not
710          * yet correctly set.
711          */
712
713         pfree(rawstring);
714         list_free(elemlist);
715
716         return true;
717 }
718
719 void
720 assign_synchronous_commit(int newval, void *extra)
721 {
722         switch (newval)
723         {
724                 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
725                         SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
726                         break;
727                 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
728                         SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
729                         break;
730                 default:
731                         SyncRepWaitMode = SYNC_REP_NO_WAIT;
732                         break;
733         }
734 }