]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Separate messages for standby replies and hot standby feedback.
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  * It attempts to keep reading XLOG records from the disk and sending them
11  * to the standby server, as long as the connection is alive (i.e., like
12  * any backend, there is a one-to-one relationship between a connection
13  * and a walsender process).
14  *
15  * Normal termination is by SIGTERM, which instructs the walsender to
16  * close the connection and exit(0) at next convenient moment. Emergency
17  * termination is by SIGQUIT; like any backend, the walsender will simply
18  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
19  * are treated as not a crash but approximately normal termination;
20  * the walsender will exit quickly without sending any more XLOG records.
21  *
22  * If the server is shut down, postmaster sends us SIGUSR2 after all
23  * regular backends have exited and the shutdown checkpoint has been written.
24  * This instruct walsender to send any outstanding WAL, including the
25  * shutdown checkpoint record, and then exit.
26  *
27  *
28  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
29  *
30  * IDENTIFICATION
31  *        src/backend/replication/walsender.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres.h"
36
37 #include <signal.h>
38 #include <unistd.h>
39
40 #include "funcapi.h"
41 #include "access/xlog_internal.h"
42 #include "access/transam.h"
43 #include "catalog/pg_type.h"
44 #include "libpq/libpq.h"
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47 #include "miscadmin.h"
48 #include "replication/basebackup.h"
49 #include "replication/replnodes.h"
50 #include "replication/walprotocol.h"
51 #include "replication/walsender.h"
52 #include "storage/fd.h"
53 #include "storage/ipc.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "storage/procarray.h"
57 #include "tcop/tcopprot.h"
58 #include "utils/builtins.h"
59 #include "utils/guc.h"
60 #include "utils/memutils.h"
61 #include "utils/ps_status.h"
62 #include "utils/resowner.h"
63
64
65 /* Array of WalSnds in shared memory */
66 WalSndCtlData *WalSndCtl = NULL;
67
68 /* My slot in the shared memory array */
69 static WalSnd *MyWalSnd = NULL;
70
71 /* Global state */
72 bool            am_walsender = false;           /* Am I a walsender process ? */
73
74 /* User-settable parameters for walsender */
75 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
76 int                     WalSndDelay = 200;      /* max sleep time between some actions */
77
78 /*
79  * These variables are used similarly to openLogFile/Id/Seg/Off,
80  * but for walsender to read the XLOG.
81  */
82 static int      sendFile = -1;
83 static uint32 sendId = 0;
84 static uint32 sendSeg = 0;
85 static uint32 sendOff = 0;
86
87 /*
88  * How far have we sent WAL already? This is also advertised in
89  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
90  */
91 static XLogRecPtr sentPtr = {0, 0};
92
93 /*
94  * Buffer for processing reply messages.
95  */
96 static StringInfoData reply_message;
97
98 /* Flags set by signal handlers for later service in main loop */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 volatile sig_atomic_t walsender_shutdown_requested = false;
101 volatile sig_atomic_t walsender_ready_to_stop = false;
102
103 /* Signal handlers */
104 static void WalSndSigHupHandler(SIGNAL_ARGS);
105 static void WalSndShutdownHandler(SIGNAL_ARGS);
106 static void WalSndQuickDieHandler(SIGNAL_ARGS);
107 static void WalSndXLogSendHandler(SIGNAL_ARGS);
108 static void WalSndLastCycleHandler(SIGNAL_ARGS);
109
110 /* Prototypes for private functions */
111 static bool HandleReplicationCommand(const char *cmd_string);
112 static int      WalSndLoop(void);
113 static void InitWalSnd(void);
114 static void WalSndHandshake(void);
115 static void WalSndKill(int code, Datum arg);
116 static bool XLogSend(char *msgbuf, bool *caughtup);
117 static void IdentifySystem(void);
118 static void StartReplication(StartReplicationCmd * cmd);
119 static void ProcessStandbyMessage(void);
120 static void ProcessStandbyReplyMessage(void);
121 static void ProcessStandbyHSFeedbackMessage(void);
122 static void ProcessRepliesIfAny(void);
123
124
125 /* Main entry point for walsender process */
126 int
127 WalSenderMain(void)
128 {
129         MemoryContext walsnd_context;
130
131         if (RecoveryInProgress())
132                 ereport(FATAL,
133                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
134                                  errmsg("recovery is still in progress, can't accept WAL streaming connections")));
135
136         /* Create a per-walsender data structure in shared memory */
137         InitWalSnd();
138
139         /*
140          * Create a memory context that we will do all our work in.  We do this so
141          * that we can reset the context during error recovery and thereby avoid
142          * possible memory leaks.  Formerly this code just ran in
143          * TopMemoryContext, but resetting that would be a really bad idea.
144          *
145          * XXX: we don't actually attempt error recovery in walsender, we just
146          * close the connection and exit.
147          */
148         walsnd_context = AllocSetContextCreate(TopMemoryContext,
149                                                                                    "Wal Sender",
150                                                                                    ALLOCSET_DEFAULT_MINSIZE,
151                                                                                    ALLOCSET_DEFAULT_INITSIZE,
152                                                                                    ALLOCSET_DEFAULT_MAXSIZE);
153         MemoryContextSwitchTo(walsnd_context);
154
155         /* Set up resource owner */
156         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
157
158         /* Unblock signals (they were blocked when the postmaster forked us) */
159         PG_SETMASK(&UnBlockSig);
160
161         /* Tell the standby that walsender is ready for receiving commands */
162         ReadyForQuery(DestRemote);
163
164         /* Handle handshake messages before streaming */
165         WalSndHandshake();
166
167         /* Initialize shared memory status */
168         {
169                 /* use volatile pointer to prevent code rearrangement */
170                 volatile WalSnd *walsnd = MyWalSnd;
171
172                 SpinLockAcquire(&walsnd->mutex);
173                 walsnd->sentPtr = sentPtr;
174                 SpinLockRelease(&walsnd->mutex);
175         }
176
177         /* Main loop of walsender */
178         return WalSndLoop();
179 }
180
181 /*
182  * Execute commands from walreceiver, until we enter streaming mode.
183  */
184 static void
185 WalSndHandshake(void)
186 {
187         StringInfoData input_message;
188         bool            replication_started = false;
189
190         initStringInfo(&input_message);
191
192         while (!replication_started)
193         {
194                 int                     firstchar;
195
196                 WalSndSetState(WALSNDSTATE_STARTUP);
197                 set_ps_display("idle", false);
198
199                 /* Wait for a command to arrive */
200                 firstchar = pq_getbyte();
201
202                 /*
203                  * Emergency bailout if postmaster has died.  This is to avoid the
204                  * necessity for manual cleanup of all postmaster children.
205                  */
206                 if (!PostmasterIsAlive(true))
207                         exit(1);
208
209                 /*
210                  * Check for any other interesting events that happened while we
211                  * slept.
212                  */
213                 if (got_SIGHUP)
214                 {
215                         got_SIGHUP = false;
216                         ProcessConfigFile(PGC_SIGHUP);
217                 }
218
219                 if (firstchar != EOF)
220                 {
221                         /*
222                          * Read the message contents. This is expected to be done without
223                          * blocking because we've been able to get message type code.
224                          */
225                         if (pq_getmessage(&input_message, 0))
226                                 firstchar = EOF;        /* suitable message already logged */
227                 }
228
229                 /* Handle the very limited subset of commands expected in this phase */
230                 switch (firstchar)
231                 {
232                         case 'Q':                       /* Query message */
233                                 {
234                                         const char *query_string;
235
236                                         query_string = pq_getmsgstring(&input_message);
237                                         pq_getmsgend(&input_message);
238
239                                         if (HandleReplicationCommand(query_string))
240                                                 replication_started = true;
241                                 }
242                                 break;
243
244                         case 'X':
245                                 /* standby is closing the connection */
246                                 proc_exit(0);
247
248                         case EOF:
249                                 /* standby disconnected unexpectedly */
250                                 ereport(COMMERROR,
251                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
252                                                  errmsg("unexpected EOF on standby connection")));
253                                 proc_exit(0);
254
255                         default:
256                                 ereport(FATAL,
257                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
258                                                  errmsg("invalid standby handshake message type %d", firstchar)));
259                 }
260         }
261 }
262
263 /*
264  * IDENTIFY_SYSTEM
265  */
266 static void
267 IdentifySystem(void)
268 {
269         StringInfoData buf;
270         char            sysid[32];
271         char            tli[11];
272         char            xpos[MAXFNAMELEN];
273         XLogRecPtr      logptr;
274
275         /*
276          * Reply with a result set with one row, three columns. First col is system
277          * ID, second is timeline ID, and third is current xlog location.
278          */
279
280         snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
281                          GetSystemIdentifier());
282         snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
283
284         logptr = GetInsertRecPtr();
285
286         snprintf(xpos, sizeof(xpos), "%X/%X",
287                          logptr.xlogid, logptr.xrecoff);
288
289         /* Send a RowDescription message */
290         pq_beginmessage(&buf, 'T');
291         pq_sendint(&buf, 3, 2);         /* 3 fields */
292
293         /* first field */
294         pq_sendstring(&buf, "systemid");        /* col name */
295         pq_sendint(&buf, 0, 4);         /* table oid */
296         pq_sendint(&buf, 0, 2);         /* attnum */
297         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
298         pq_sendint(&buf, -1, 2);        /* typlen */
299         pq_sendint(&buf, 0, 4);         /* typmod */
300         pq_sendint(&buf, 0, 2);         /* format code */
301
302         /* second field */
303         pq_sendstring(&buf, "timeline");        /* col name */
304         pq_sendint(&buf, 0, 4);         /* table oid */
305         pq_sendint(&buf, 0, 2);         /* attnum */
306         pq_sendint(&buf, INT4OID, 4);           /* type oid */
307         pq_sendint(&buf, 4, 2);         /* typlen */
308         pq_sendint(&buf, 0, 4);         /* typmod */
309         pq_sendint(&buf, 0, 2);         /* format code */
310
311         /* third field */
312         pq_sendstring(&buf, "xlogpos");
313         pq_sendint(&buf, 0, 4);
314         pq_sendint(&buf, 0, 2);
315         pq_sendint(&buf, TEXTOID, 4);
316         pq_sendint(&buf, -1, 2);
317         pq_sendint(&buf, 0, 4);
318         pq_sendint(&buf, 0, 2);
319         pq_endmessage(&buf);
320
321         /* Send a DataRow message */
322         pq_beginmessage(&buf, 'D');
323         pq_sendint(&buf, 3, 2);         /* # of columns */
324         pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
325         pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
326         pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
327         pq_sendbytes(&buf, (char *) tli, strlen(tli));
328         pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
329         pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
330
331         pq_endmessage(&buf);
332
333         /* Send CommandComplete and ReadyForQuery messages */
334         EndCommand("SELECT", DestRemote);
335         ReadyForQuery(DestRemote);
336         /* ReadyForQuery did pq_flush for us */
337 }
338
339 /*
340  * START_REPLICATION
341  */
342 static void
343 StartReplication(StartReplicationCmd * cmd)
344 {
345         StringInfoData buf;
346
347         /*
348          * Let postmaster know that we're streaming. Once we've declared us as
349          * a WAL sender process, postmaster will let us outlive the bgwriter and
350          * kill us last in the shutdown sequence, so we get a chance to stream
351          * all remaining WAL at shutdown, including the shutdown checkpoint.
352          * Note that there's no going back, and we mustn't write any WAL records
353          * after this.
354          */
355         MarkPostmasterChildWalSender();
356
357         /*
358          * Check that we're logging enough information in the WAL for
359          * log-shipping.
360          *
361          * NOTE: This only checks the current value of wal_level. Even if the
362          * current setting is not 'minimal', there can be old WAL in the pg_xlog
363          * directory that was created with 'minimal'. So this is not bulletproof,
364          * the purpose is just to give a user-friendly error message that hints
365          * how to configure the system correctly.
366          */
367         if (wal_level == WAL_LEVEL_MINIMAL)
368                 ereport(FATAL,
369                                 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
370                 errmsg("standby connections not allowed because wal_level=minimal")));
371
372         /* Send a CopyBothResponse message, and start streaming */
373         pq_beginmessage(&buf, 'W');
374         pq_sendbyte(&buf, 0);
375         pq_sendint(&buf, 0, 2);
376         pq_endmessage(&buf);
377         pq_flush();
378
379         /*
380          * Initialize position to the received one, then the xlog records begin to
381          * be shipped from that position
382          */
383         sentPtr = cmd->startpoint;
384 }
385
386 /*
387  * Execute an incoming replication command.
388  */
389 static bool
390 HandleReplicationCommand(const char *cmd_string)
391 {
392         bool            replication_started = false;
393         int                     parse_rc;
394         Node       *cmd_node;
395         MemoryContext cmd_context;
396         MemoryContext old_context;
397
398         elog(DEBUG1, "received replication command: %s", cmd_string);
399
400         cmd_context = AllocSetContextCreate(CurrentMemoryContext,
401                                                                                 "Replication command context",
402                                                                                 ALLOCSET_DEFAULT_MINSIZE,
403                                                                                 ALLOCSET_DEFAULT_INITSIZE,
404                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
405         old_context = MemoryContextSwitchTo(cmd_context);
406
407         replication_scanner_init(cmd_string);
408         parse_rc = replication_yyparse();
409         if (parse_rc != 0)
410                 ereport(ERROR,
411                                 (errcode(ERRCODE_SYNTAX_ERROR),
412                                  (errmsg_internal("replication command parser returned %d",
413                                                                   parse_rc))));
414
415         cmd_node = replication_parse_result;
416
417         switch (cmd_node->type)
418         {
419                 case T_IdentifySystemCmd:
420                         IdentifySystem();
421                         break;
422
423                 case T_StartReplicationCmd:
424                         StartReplication((StartReplicationCmd *) cmd_node);
425
426                         /* break out of the loop */
427                         replication_started = true;
428                         break;
429
430                 case T_BaseBackupCmd:
431                         SendBaseBackup((BaseBackupCmd *) cmd_node);
432
433                         /* Send CommandComplete and ReadyForQuery messages */
434                         EndCommand("SELECT", DestRemote);
435                         ReadyForQuery(DestRemote);
436                         /* ReadyForQuery did pq_flush for us */
437                         break;
438
439                 default:
440                         ereport(FATAL,
441                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
442                                          errmsg("invalid standby query string: %s", cmd_string)));
443         }
444
445         /* done */
446         MemoryContextSwitchTo(old_context);
447         MemoryContextDelete(cmd_context);
448
449         return replication_started;
450 }
451
452 /*
453  * Check if the remote end has closed the connection.
454  */
455 static void
456 ProcessRepliesIfAny(void)
457 {
458         unsigned char firstchar;
459         int                     r;
460
461         for (;;)
462         {
463                 r = pq_getbyte_if_available(&firstchar);
464                 if (r < 0)
465                 {
466                         /* unexpected error or EOF */
467                         ereport(COMMERROR,
468                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
469                                          errmsg("unexpected EOF on standby connection")));
470                         proc_exit(0);
471                 }
472                 if (r == 0)
473                 {
474                         /* no data available without blocking */
475                         return;
476                 }
477
478                 /* Handle the very limited subset of commands expected in this phase */
479                 switch (firstchar)
480                 {
481                                 /*
482                                  * 'd' means a standby reply wrapped in a CopyData packet.
483                                  */
484                         case 'd':
485                                 ProcessStandbyMessage();
486                                 break;
487
488                                 /*
489                                  * 'X' means that the standby is closing down the socket.
490                                  */
491                         case 'X':
492                                 proc_exit(0);
493
494                         default:
495                                 ereport(FATAL,
496                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
497                                                  errmsg("invalid standby closing message type %d",
498                                                                 firstchar)));
499                 }
500         }
501 }
502
503 /*
504  * Process a status update message received from standby.
505  */
506 static void
507 ProcessStandbyMessage(void)
508 {
509         char msgtype;
510
511         resetStringInfo(&reply_message);
512
513         /*
514          * Read the message contents.
515          */
516         if (pq_getmessage(&reply_message, 0))
517         {
518                 ereport(COMMERROR,
519                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
520                                  errmsg("unexpected EOF on standby connection")));
521                 proc_exit(0);
522         }
523
524         /*
525          * Check message type from the first byte. At the moment, there is only
526          * one type.
527          */
528         msgtype = pq_getmsgbyte(&reply_message);
529
530         switch (msgtype)
531         {
532                 case 'r':
533                         ProcessStandbyReplyMessage();
534                         break;
535
536                 case 'h':
537                         ProcessStandbyHSFeedbackMessage();
538                         break;
539
540                 default:
541                         ereport(COMMERROR,
542                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
543                                          errmsg("unexpected message type %c", msgtype)));
544                         proc_exit(0);
545         }
546 }
547
548 /*
549  * Regular reply from standby advising of WAL positions on standby server.
550  */
551 static void
552 ProcessStandbyReplyMessage(void)
553 {
554         StandbyReplyMessage     reply;
555
556         pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
557
558         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
559                  reply.write.xlogid, reply.write.xrecoff,
560                  reply.flush.xlogid, reply.flush.xrecoff,
561                  reply.apply.xlogid, reply.apply.xrecoff);
562
563         /*
564          * Update shared state for this WalSender process
565          * based on reply data from standby.
566          */
567         {
568                 /* use volatile pointer to prevent code rearrangement */
569                 volatile WalSnd *walsnd = MyWalSnd;
570
571                 SpinLockAcquire(&walsnd->mutex);
572                 walsnd->write = reply.write;
573                 walsnd->flush = reply.flush;
574                 walsnd->apply = reply.apply;
575                 SpinLockRelease(&walsnd->mutex);
576         }
577 }
578
579 /*
580  * Hot Standby feedback
581  */
582 static void
583 ProcessStandbyHSFeedbackMessage(void)
584 {
585         StandbyHSFeedbackMessage        reply;
586         TransactionId newxmin = InvalidTransactionId;
587
588         pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
589
590         elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
591                  reply.xmin,
592                  reply.epoch);
593
594         /*
595          * Update the WalSender's proc xmin to allow it to be visible
596          * to snapshots. This will hold back the removal of dead rows
597          * and thereby prevent the generation of cleanup conflicts
598          * on the standby server.
599          */
600         if (TransactionIdIsValid(reply.xmin))
601         {
602                 TransactionId   nextXid;
603                 uint32                  nextEpoch;
604                 bool                    epochOK = false;
605
606                 GetNextXidAndEpoch(&nextXid, &nextEpoch);
607
608                 /*
609                  * Epoch of oldestXmin should be same as standby or
610                  * if the counter has wrapped, then one less than reply.
611                  */
612                 if (reply.xmin <= nextXid)
613                 {
614                         if (reply.epoch == nextEpoch)
615                                 epochOK = true;
616                 }
617                 else
618                 {
619                         if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
620                                 epochOK = true;
621                 }
622
623                 /*
624                  * Feedback from standby must not go backwards, nor should it go
625                  * forwards further than our most recent xid.
626                  */
627                 if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
628                 {
629                         if (!TransactionIdIsValid(MyProc->xmin))
630                         {
631                                 TransactionId oldestXmin = GetOldestXmin(true, true);
632                                 if (TransactionIdPrecedes(oldestXmin, reply.xmin))
633                                         newxmin = reply.xmin;
634                                 else
635                                         newxmin = oldestXmin;
636                         }
637                         else
638                         {
639                                 if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
640                                         newxmin = reply.xmin;
641                                 else
642                                         newxmin = MyProc->xmin; /* stay the same */
643                         }
644                 }
645         }
646
647         /*
648          * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
649          */
650         if (MyProc->xmin != newxmin)
651         {
652                 LWLockAcquire(ProcArrayLock, LW_SHARED);
653                 MyProc->xmin = newxmin;
654                 LWLockRelease(ProcArrayLock);
655         }
656 }
657
658 /* Main loop of walsender process */
659 static int
660 WalSndLoop(void)
661 {
662         char       *output_message;
663         bool            caughtup = false;
664
665         /*
666          * Allocate buffer that will be used for each output message.  We do this
667          * just once to reduce palloc overhead.  The buffer must be made large
668          * enough for maximum-sized messages.
669          */
670         output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
671
672         /*
673          * Allocate buffer that will be used for processing reply messages.  As
674          * above, do this just once to reduce palloc overhead.
675          */
676         initStringInfo(&reply_message);
677
678         /* Loop forever, unless we get an error */
679         for (;;)
680         {
681                 /*
682                  * Emergency bailout if postmaster has died.  This is to avoid the
683                  * necessity for manual cleanup of all postmaster children.
684                  */
685                 if (!PostmasterIsAlive(true))
686                         exit(1);
687
688                 /* Process any requests or signals received recently */
689                 if (got_SIGHUP)
690                 {
691                         got_SIGHUP = false;
692                         ProcessConfigFile(PGC_SIGHUP);
693                 }
694
695                 /*
696                  * When SIGUSR2 arrives, we send all outstanding logs up to the
697                  * shutdown checkpoint record (i.e., the latest record) and exit.
698                  */
699                 if (walsender_ready_to_stop)
700                 {
701                         if (!XLogSend(output_message, &caughtup))
702                                 break;
703                         ProcessRepliesIfAny();
704                         if (caughtup)
705                                 walsender_shutdown_requested = true;
706                 }
707
708                 /* Normal exit from the walsender is here */
709                 if (walsender_shutdown_requested)
710                 {
711                         /* Inform the standby that XLOG streaming was done */
712                         pq_puttextmessage('C', "COPY 0");
713                         pq_flush();
714
715                         proc_exit(0);
716                 }
717
718                 /*
719                  * If we had sent all accumulated WAL in last round, nap for the
720                  * configured time before retrying.
721                  */
722                 if (caughtup)
723                 {
724                         /*
725                          * Even if we wrote all the WAL that was available when we started
726                          * sending, more might have arrived while we were sending this
727                          * batch. We had the latch set while sending, so we have not
728                          * received any signals from that time. Let's arm the latch
729                          * again, and after that check that we're still up-to-date.
730                          */
731                         ResetLatch(&MyWalSnd->latch);
732
733                         if (!XLogSend(output_message, &caughtup))
734                                 break;
735                         if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
736                         {
737                                 /*
738                                  * XXX: We don't really need the periodic wakeups anymore,
739                                  * WaitLatchOrSocket should reliably wake up as soon as
740                                  * something interesting happens.
741                                  */
742
743                                 /* Sleep */
744                                 WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
745                                                                   WalSndDelay * 1000L);
746                         }
747                 }
748                 else
749                 {
750                         /* Attempt to send the log once every loop */
751                         if (!XLogSend(output_message, &caughtup))
752                                 break;
753                 }
754
755                 /* Update our state to indicate if we're behind or not */
756                 WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
757                 ProcessRepliesIfAny();
758         }
759
760         /*
761          * Get here on send failure.  Clean up and exit.
762          *
763          * Reset whereToSendOutput to prevent ereport from attempting to send any
764          * more messages to the standby.
765          */
766         if (whereToSendOutput == DestRemote)
767                 whereToSendOutput = DestNone;
768
769         proc_exit(0);
770         return 1;                                       /* keep the compiler quiet */
771 }
772
773 /* Initialize a per-walsender data structure for this walsender process */
774 static void
775 InitWalSnd(void)
776 {
777         int                     i;
778
779         /*
780          * WalSndCtl should be set up already (we inherit this by fork() or
781          * EXEC_BACKEND mechanism from the postmaster).
782          */
783         Assert(WalSndCtl != NULL);
784         Assert(MyWalSnd == NULL);
785
786         /*
787          * Find a free walsender slot and reserve it. If this fails, we must be
788          * out of WalSnd structures.
789          */
790         for (i = 0; i < max_wal_senders; i++)
791         {
792                 /* use volatile pointer to prevent code rearrangement */
793                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
794
795                 SpinLockAcquire(&walsnd->mutex);
796
797                 if (walsnd->pid != 0)
798                 {
799                         SpinLockRelease(&walsnd->mutex);
800                         continue;
801                 }
802                 else
803                 {
804                         /*
805                          * Found a free slot. Reserve it for us.
806                          */
807                         walsnd->pid = MyProcPid;
808                         MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
809                         walsnd->state = WALSNDSTATE_STARTUP;
810                         SpinLockRelease(&walsnd->mutex);
811                         /* don't need the lock anymore */
812                         OwnLatch((Latch *) &walsnd->latch);
813                         MyWalSnd = (WalSnd *) walsnd;
814
815                         break;
816                 }
817         }
818         if (MyWalSnd == NULL)
819                 ereport(FATAL,
820                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
821                                  errmsg("number of requested standby connections "
822                                                 "exceeds max_wal_senders (currently %d)",
823                                                 max_wal_senders)));
824
825         /* Arrange to clean up at walsender exit */
826         on_shmem_exit(WalSndKill, 0);
827 }
828
829 /* Destroy the per-walsender data structure for this walsender process */
830 static void
831 WalSndKill(int code, Datum arg)
832 {
833         Assert(MyWalSnd != NULL);
834
835         /*
836          * Mark WalSnd struct no longer in use. Assume that no lock is required
837          * for this.
838          */
839         MyWalSnd->pid = 0;
840         DisownLatch(&MyWalSnd->latch);
841
842         /* WalSnd struct isn't mine anymore */
843         MyWalSnd = NULL;
844 }
845
846 /*
847  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
848  *
849  * XXX probably this should be improved to suck data directly from the
850  * WAL buffers when possible.
851  *
852  * Will open, and keep open, one WAL segment stored in the global file
853  * descriptor sendFile. This means if XLogRead is used once, there will
854  * always be one descriptor left open until the process ends, but never
855  * more than one.
856  */
857 void
858 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
859 {
860         XLogRecPtr      startRecPtr = recptr;
861         char            path[MAXPGPATH];
862         uint32          lastRemovedLog;
863         uint32          lastRemovedSeg;
864         uint32          log;
865         uint32          seg;
866
867         while (nbytes > 0)
868         {
869                 uint32          startoff;
870                 int                     segbytes;
871                 int                     readbytes;
872
873                 startoff = recptr.xrecoff % XLogSegSize;
874
875                 if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
876                 {
877                         /* Switch to another logfile segment */
878                         if (sendFile >= 0)
879                                 close(sendFile);
880
881                         XLByteToSeg(recptr, sendId, sendSeg);
882                         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
883
884                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
885                         if (sendFile < 0)
886                         {
887                                 /*
888                                  * If the file is not found, assume it's because the standby
889                                  * asked for a too old WAL segment that has already been
890                                  * removed or recycled.
891                                  */
892                                 if (errno == ENOENT)
893                                 {
894                                         char            filename[MAXFNAMELEN];
895
896                                         XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
897                                         ereport(ERROR,
898                                                         (errcode_for_file_access(),
899                                                          errmsg("requested WAL segment %s has already been removed",
900                                                                         filename)));
901                                 }
902                                 else
903                                         ereport(ERROR,
904                                                         (errcode_for_file_access(),
905                                                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
906                                                                         path, sendId, sendSeg)));
907                         }
908                         sendOff = 0;
909                 }
910
911                 /* Need to seek in the file? */
912                 if (sendOff != startoff)
913                 {
914                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
915                                 ereport(ERROR,
916                                                 (errcode_for_file_access(),
917                                                  errmsg("could not seek in log file %u, segment %u to offset %u: %m",
918                                                                 sendId, sendSeg, startoff)));
919                         sendOff = startoff;
920                 }
921
922                 /* How many bytes are within this segment? */
923                 if (nbytes > (XLogSegSize - startoff))
924                         segbytes = XLogSegSize - startoff;
925                 else
926                         segbytes = nbytes;
927
928                 readbytes = read(sendFile, buf, segbytes);
929                 if (readbytes <= 0)
930                         ereport(ERROR,
931                                         (errcode_for_file_access(),
932                         errmsg("could not read from log file %u, segment %u, offset %u, "
933                                    "length %lu: %m",
934                                    sendId, sendSeg, sendOff, (unsigned long) segbytes)));
935
936                 /* Update state for read */
937                 XLByteAdvance(recptr, readbytes);
938
939                 sendOff += readbytes;
940                 nbytes -= readbytes;
941                 buf += readbytes;
942         }
943
944         /*
945          * After reading into the buffer, check that what we read was valid. We do
946          * this after reading, because even though the segment was present when we
947          * opened it, it might get recycled or removed while we read it. The
948          * read() succeeds in that case, but the data we tried to read might
949          * already have been overwritten with new WAL records.
950          */
951         XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
952         XLByteToSeg(startRecPtr, log, seg);
953         if (log < lastRemovedLog ||
954                 (log == lastRemovedLog && seg <= lastRemovedSeg))
955         {
956                 char            filename[MAXFNAMELEN];
957
958                 XLogFileName(filename, ThisTimeLineID, log, seg);
959                 ereport(ERROR,
960                                 (errcode_for_file_access(),
961                                  errmsg("requested WAL segment %s has already been removed",
962                                                 filename)));
963         }
964 }
965
966 /*
967  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
968  * but not yet sent to the client, and send it.
969  *
970  * msgbuf is a work area in which the output message is constructed.  It's
971  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
972  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
973  *
974  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
975  * *caughtup is set to false.
976  *
977  * Returns true if OK, false if trouble.
978  */
979 static bool
980 XLogSend(char *msgbuf, bool *caughtup)
981 {
982         XLogRecPtr      SendRqstPtr;
983         XLogRecPtr      startptr;
984         XLogRecPtr      endptr;
985         Size            nbytes;
986         WalDataMessageHeader msghdr;
987
988         /*
989          * Attempt to send all data that's already been written out and fsync'd to
990          * disk.  We cannot go further than what's been written out given the
991          * current implementation of XLogRead().  And in any case it's unsafe to
992          * send WAL that is not securely down to disk on the master: if the master
993          * subsequently crashes and restarts, slaves must not have applied any WAL
994          * that gets lost on the master.
995          */
996         SendRqstPtr = GetFlushRecPtr();
997
998         /* Quick exit if nothing to do */
999         if (XLByteLE(SendRqstPtr, sentPtr))
1000         {
1001                 *caughtup = true;
1002                 return true;
1003         }
1004
1005         /*
1006          * Figure out how much to send in one message. If there's no more than
1007          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
1008          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
1009          *
1010          * The rounding is not only for performance reasons. Walreceiver relies on
1011          * the fact that we never split a WAL record across two messages. Since a
1012          * long WAL record is split at page boundary into continuation records,
1013          * page boundary is always a safe cut-off point. We also assume that
1014          * SendRqstPtr never points to the middle of a WAL record.
1015          */
1016         startptr = sentPtr;
1017         if (startptr.xrecoff >= XLogFileSize)
1018         {
1019                 /*
1020                  * crossing a logid boundary, skip the non-existent last log segment
1021                  * in previous logical log file.
1022                  */
1023                 startptr.xlogid += 1;
1024                 startptr.xrecoff = 0;
1025         }
1026
1027         endptr = startptr;
1028         XLByteAdvance(endptr, MAX_SEND_SIZE);
1029         if (endptr.xlogid != startptr.xlogid)
1030         {
1031                 /* Don't cross a logfile boundary within one message */
1032                 Assert(endptr.xlogid == startptr.xlogid + 1);
1033                 endptr.xlogid = startptr.xlogid;
1034                 endptr.xrecoff = XLogFileSize;
1035         }
1036
1037         /* if we went beyond SendRqstPtr, back off */
1038         if (XLByteLE(SendRqstPtr, endptr))
1039         {
1040                 endptr = SendRqstPtr;
1041                 *caughtup = true;
1042         }
1043         else
1044         {
1045                 /* round down to page boundary. */
1046                 endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
1047                 *caughtup = false;
1048         }
1049
1050         nbytes = endptr.xrecoff - startptr.xrecoff;
1051         Assert(nbytes <= MAX_SEND_SIZE);
1052
1053         /*
1054          * OK to read and send the slice.
1055          */
1056         msgbuf[0] = 'w';
1057
1058         /*
1059          * Read the log directly into the output buffer to avoid extra memcpy
1060          * calls.
1061          */
1062         XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
1063
1064         /*
1065          * We fill the message header last so that the send timestamp is taken as
1066          * late as possible.
1067          */
1068         msghdr.dataStart = startptr;
1069         msghdr.walEnd = SendRqstPtr;
1070         msghdr.sendTime = GetCurrentTimestamp();
1071
1072         memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
1073
1074         pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
1075
1076         /* Flush pending output to the client */
1077         if (pq_flush())
1078                 return false;
1079
1080         sentPtr = endptr;
1081
1082         /* Update shared memory status */
1083         {
1084                 /* use volatile pointer to prevent code rearrangement */
1085                 volatile WalSnd *walsnd = MyWalSnd;
1086
1087                 SpinLockAcquire(&walsnd->mutex);
1088                 walsnd->sentPtr = sentPtr;
1089                 SpinLockRelease(&walsnd->mutex);
1090         }
1091
1092         /* Report progress of XLOG streaming in PS display */
1093         if (update_process_title)
1094         {
1095                 char            activitymsg[50];
1096
1097                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1098                                  sentPtr.xlogid, sentPtr.xrecoff);
1099                 set_ps_display(activitymsg, false);
1100         }
1101
1102         return true;
1103 }
1104
1105 /* SIGHUP: set flag to re-read config file at next convenient time */
1106 static void
1107 WalSndSigHupHandler(SIGNAL_ARGS)
1108 {
1109         got_SIGHUP = true;
1110         if (MyWalSnd)
1111                 SetLatch(&MyWalSnd->latch);
1112 }
1113
1114 /* SIGTERM: set flag to shut down */
1115 static void
1116 WalSndShutdownHandler(SIGNAL_ARGS)
1117 {
1118         walsender_shutdown_requested = true;
1119         if (MyWalSnd)
1120                 SetLatch(&MyWalSnd->latch);
1121 }
1122
1123 /*
1124  * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
1125  *
1126  * Some backend has bought the farm,
1127  * so we need to stop what we're doing and exit.
1128  */
1129 static void
1130 WalSndQuickDieHandler(SIGNAL_ARGS)
1131 {
1132         PG_SETMASK(&BlockSig);
1133
1134         /*
1135          * We DO NOT want to run proc_exit() callbacks -- we're here because
1136          * shared memory may be corrupted, so we don't want to try to clean up our
1137          * transaction.  Just nail the windows shut and get out of town.  Now that
1138          * there's an atexit callback to prevent third-party code from breaking
1139          * things by calling exit() directly, we have to reset the callbacks
1140          * explicitly to make this work as intended.
1141          */
1142         on_exit_reset();
1143
1144         /*
1145          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
1146          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
1147          * backend.  This is necessary precisely because we don't clean up our
1148          * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
1149          * should ensure the postmaster sees this as a crash, too, but no harm in
1150          * being doubly sure.)
1151          */
1152         exit(2);
1153 }
1154
1155 /* SIGUSR1: set flag to send WAL records */
1156 static void
1157 WalSndXLogSendHandler(SIGNAL_ARGS)
1158 {
1159         latch_sigusr1_handler();
1160 }
1161
1162 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1163 static void
1164 WalSndLastCycleHandler(SIGNAL_ARGS)
1165 {
1166         walsender_ready_to_stop = true;
1167         if (MyWalSnd)
1168                 SetLatch(&MyWalSnd->latch);
1169 }
1170
1171 /* Set up signal handlers */
1172 void
1173 WalSndSignals(void)
1174 {
1175         /* Set up signal handlers */
1176         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
1177                                                                                                  * file */
1178         pqsignal(SIGINT, SIG_IGN);      /* not used */
1179         pqsignal(SIGTERM, WalSndShutdownHandler);       /* request shutdown */
1180         pqsignal(SIGQUIT, WalSndQuickDieHandler);       /* hard crash time */
1181         pqsignal(SIGALRM, SIG_IGN);
1182         pqsignal(SIGPIPE, SIG_IGN);
1183         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
1184         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
1185                                                                                                  * shutdown */
1186
1187         /* Reset some signals that are accepted by postmaster but not here */
1188         pqsignal(SIGCHLD, SIG_DFL);
1189         pqsignal(SIGTTIN, SIG_DFL);
1190         pqsignal(SIGTTOU, SIG_DFL);
1191         pqsignal(SIGCONT, SIG_DFL);
1192         pqsignal(SIGWINCH, SIG_DFL);
1193 }
1194
1195 /* Report shared-memory space needed by WalSndShmemInit */
1196 Size
1197 WalSndShmemSize(void)
1198 {
1199         Size            size = 0;
1200
1201         size = offsetof(WalSndCtlData, walsnds);
1202         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1203
1204         return size;
1205 }
1206
1207 /* Allocate and initialize walsender-related shared memory */
1208 void
1209 WalSndShmemInit(void)
1210 {
1211         bool            found;
1212         int                     i;
1213
1214         WalSndCtl = (WalSndCtlData *)
1215                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1216
1217         if (!found)
1218         {
1219                 /* First time through, so initialize */
1220                 MemSet(WalSndCtl, 0, WalSndShmemSize());
1221
1222                 for (i = 0; i < max_wal_senders; i++)
1223                 {
1224                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
1225
1226                         SpinLockInit(&walsnd->mutex);
1227                         InitSharedLatch(&walsnd->latch);
1228                 }
1229         }
1230 }
1231
1232 /* Wake up all walsenders */
1233 void
1234 WalSndWakeup(void)
1235 {
1236         int             i;
1237
1238         for (i = 0; i < max_wal_senders; i++)
1239                 SetLatch(&WalSndCtl->walsnds[i].latch);
1240 }
1241
1242 /* Set state for current walsender (only called in walsender) */
1243 void
1244 WalSndSetState(WalSndState state)
1245 {
1246         /* use volatile pointer to prevent code rearrangement */
1247         volatile WalSnd *walsnd = MyWalSnd;
1248
1249         Assert(am_walsender);
1250
1251         if (walsnd->state == state)
1252                 return;
1253
1254         SpinLockAcquire(&walsnd->mutex);
1255         walsnd->state = state;
1256         SpinLockRelease(&walsnd->mutex);
1257 }
1258
1259 /*
1260  * Return a string constant representing the state. This is used
1261  * in system views, and should *not* be translated.
1262  */
1263 static const char *
1264 WalSndGetStateString(WalSndState state)
1265 {
1266         switch (state)
1267         {
1268                 case WALSNDSTATE_STARTUP:
1269                         return "STARTUP";
1270                 case WALSNDSTATE_BACKUP:
1271                         return "BACKUP";
1272                 case WALSNDSTATE_CATCHUP:
1273                         return "CATCHUP";
1274                 case WALSNDSTATE_STREAMING:
1275                         return "STREAMING";
1276         }
1277         return "UNKNOWN";
1278 }
1279
1280
1281 /*
1282  * Returns activity of walsenders, including pids and xlog locations sent to
1283  * standby servers.
1284  */
1285 Datum
1286 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1287 {
1288 #define PG_STAT_GET_WAL_SENDERS_COLS    6
1289         ReturnSetInfo      *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1290         TupleDesc                       tupdesc;
1291         Tuplestorestate    *tupstore;
1292         MemoryContext           per_query_ctx;
1293         MemoryContext           oldcontext;
1294         int                                     i;
1295
1296         /* check to see if caller supports us returning a tuplestore */
1297         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1298                 ereport(ERROR,
1299                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1300                                  errmsg("set-valued function called in context that cannot accept a set")));
1301         if (!(rsinfo->allowedModes & SFRM_Materialize))
1302                 ereport(ERROR,
1303                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1304                                  errmsg("materialize mode required, but it is not " \
1305                                                 "allowed in this context")));
1306
1307         /* Build a tuple descriptor for our result type */
1308         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1309                 elog(ERROR, "return type must be a row type");
1310
1311         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1312         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1313
1314         tupstore = tuplestore_begin_heap(true, false, work_mem);
1315         rsinfo->returnMode = SFRM_Materialize;
1316         rsinfo->setResult = tupstore;
1317         rsinfo->setDesc = tupdesc;
1318
1319         MemoryContextSwitchTo(oldcontext);
1320
1321         for (i = 0; i < max_wal_senders; i++)
1322         {
1323                 /* use volatile pointer to prevent code rearrangement */
1324                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1325                 char            location[MAXFNAMELEN];
1326                 XLogRecPtr      sentPtr;
1327                 XLogRecPtr      write;
1328                 XLogRecPtr      flush;
1329                 XLogRecPtr      apply;
1330                 WalSndState     state;
1331                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1332                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1333
1334                 if (walsnd->pid == 0)
1335                         continue;
1336
1337                 SpinLockAcquire(&walsnd->mutex);
1338                 sentPtr = walsnd->sentPtr;
1339                 state = walsnd->state;
1340                 write = walsnd->write;
1341                 flush = walsnd->flush;
1342                 apply = walsnd->apply;
1343                 SpinLockRelease(&walsnd->mutex);
1344
1345                 memset(nulls, 0, sizeof(nulls));
1346                 values[0] = Int32GetDatum(walsnd->pid);
1347
1348                 if (!superuser())
1349                 {
1350                         /*
1351                          * Only superusers can see details. Other users only get
1352                          * the pid value to know it's a walsender, but no details.
1353                          */
1354                         nulls[1] = true;
1355                         nulls[2] = true;
1356                         nulls[3] = true;
1357                         nulls[4] = true;
1358                         nulls[5] = true;
1359                 }
1360                 else
1361                 {
1362                         values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1363
1364                         snprintf(location, sizeof(location), "%X/%X",
1365                                          sentPtr.xlogid, sentPtr.xrecoff);
1366                         values[2] = CStringGetTextDatum(location);
1367
1368                         if (write.xlogid == 0 && write.xrecoff == 0)
1369                                 nulls[3] = true;
1370                         snprintf(location, sizeof(location), "%X/%X",
1371                                          write.xlogid, write.xrecoff);
1372                         values[3] = CStringGetTextDatum(location);
1373
1374                         if (flush.xlogid == 0 && flush.xrecoff == 0)
1375                                 nulls[4] = true;
1376                         snprintf(location, sizeof(location), "%X/%X",
1377                                         flush.xlogid, flush.xrecoff);
1378                         values[4] = CStringGetTextDatum(location);
1379
1380                         if (apply.xlogid == 0 && apply.xrecoff == 0)
1381                                 nulls[5] = true;
1382                         snprintf(location, sizeof(location), "%X/%X",
1383                                          apply.xlogid, apply.xrecoff);
1384                         values[5] = CStringGetTextDatum(location);
1385                 }
1386
1387                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1388         }
1389
1390         /* clean up and return the tuplestore */
1391         tuplestore_donestoring(tupstore);
1392
1393         return (Datum) 0;
1394 }
1395
1396 /*
1397  * This isn't currently used for anything. Monitoring tools might be
1398  * interested in the future, and we'll need something like this in the
1399  * future for synchronous replication.
1400  */
1401 #ifdef NOT_USED
1402 /*
1403  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
1404  * if none.
1405  */
1406 XLogRecPtr
1407 GetOldestWALSendPointer(void)
1408 {
1409         XLogRecPtr      oldest = {0, 0};
1410         int                     i;
1411         bool            found = false;
1412
1413         for (i = 0; i < max_wal_senders; i++)
1414         {
1415                 /* use volatile pointer to prevent code rearrangement */
1416                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1417                 XLogRecPtr      recptr;
1418
1419                 if (walsnd->pid == 0)
1420                         continue;
1421
1422                 SpinLockAcquire(&walsnd->mutex);
1423                 recptr = walsnd->sentPtr;
1424                 SpinLockRelease(&walsnd->mutex);
1425
1426                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
1427                         continue;
1428
1429                 if (!found || XLByteLT(recptr, oldest))
1430                         oldest = recptr;
1431                 found = true;
1432         }
1433         return oldest;
1434 }
1435
1436 #endif