]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Reset master xmin when hot_standby_feedback disabled.
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until the either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all
28  * regular backends have exited and the shutdown checkpoint has been written.
29  * This instruct walsender to send any outstanding WAL, including the
30  * shutdown checkpoint record, and then exit.
31  *
32  *
33  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
34  *
35  * IDENTIFICATION
36  *        src/backend/replication/walsender.c
37  *
38  *-------------------------------------------------------------------------
39  */
40 #include "postgres.h"
41
42 #include <signal.h>
43 #include <unistd.h>
44
45 #include "access/timeline.h"
46 #include "access/transam.h"
47 #include "access/xlog_internal.h"
48 #include "catalog/pg_type.h"
49 #include "funcapi.h"
50 #include "libpq/libpq.h"
51 #include "libpq/pqformat.h"
52 #include "libpq/pqsignal.h"
53 #include "miscadmin.h"
54 #include "nodes/replnodes.h"
55 #include "replication/basebackup.h"
56 #include "replication/syncrep.h"
57 #include "replication/walreceiver.h"
58 #include "replication/walsender.h"
59 #include "replication/walsender_private.h"
60 #include "storage/fd.h"
61 #include "storage/ipc.h"
62 #include "storage/pmsignal.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/memutils.h"
69 #include "utils/ps_status.h"
70 #include "utils/resowner.h"
71 #include "utils/timeout.h"
72 #include "utils/timestamp.h"
73
74 /*
75  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
76  *
77  * We don't have a good idea of what a good value would be; there's some
78  * overhead per message in both walsender and walreceiver, but on the other
79  * hand sending large batches makes walsender less responsive to signals
80  * because signals are checked only between messages.  128kB (with
81  * default 8k blocks) seems like a reasonable guess for now.
82  */
83 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
84
85 /* Array of WalSnds in shared memory */
86 WalSndCtlData *WalSndCtl = NULL;
87
88 /* My slot in the shared memory array */
89 WalSnd     *MyWalSnd = NULL;
90
91 /* Global state */
92 bool            am_walsender = false;           /* Am I a walsender process ? */
93 bool            am_cascading_walsender = false;         /* Am I cascading WAL to
94                                                                                                  * another standby ? */
95
96 /* User-settable parameters for walsender */
97 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
98 int                     wal_sender_timeout = 60 * 1000; /* maximum time to send one
99                                                                                                  * WAL data message */
100 /*
101  * State for WalSndWakeupRequest
102  */
103 bool wake_wal_senders = false;
104
105 /*
106  * These variables are used similarly to openLogFile/Id/Seg/Off,
107  * but for walsender to read the XLOG.
108  */
109 static int      sendFile = -1;
110 static XLogSegNo sendSegNo = 0;
111 static uint32 sendOff = 0;
112
113 /* Timeline ID of the currently open file */
114 static TimeLineID       curFileTimeLine = 0;
115
116 /*
117  * These variables keep track of the state of the timeline we're currently
118  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
119  * the timeline is not the latest timeline on this server, and the server's
120  * history forked off from that timeline at sendTimeLineValidUpto.
121  */
122 static TimeLineID       sendTimeLine = 0;
123 static TimeLineID       sendTimeLineNextTLI = 0;
124 static bool                     sendTimeLineIsHistoric = false;
125 static XLogRecPtr       sendTimeLineValidUpto = InvalidXLogRecPtr;
126
127 /*
128  * How far have we sent WAL already? This is also advertised in
129  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
130  */
131 static XLogRecPtr sentPtr = 0;
132
133 /* Buffers for constructing outgoing messages and processing reply messages. */
134 static StringInfoData output_message;
135 static StringInfoData reply_message;
136 static StringInfoData tmpbuf;
137
138 /*
139  * Timestamp of the last receipt of the reply from the standby.
140  */
141 static TimestampTz last_reply_timestamp;
142 /* Have we sent a heartbeat message asking for reply, since last reply? */
143 static bool     ping_sent = false;
144
145 /*
146  * While streaming WAL in Copy mode, streamingDoneSending is set to true
147  * after we have sent CopyDone. We should not send any more CopyData messages
148  * after that. streamingDoneReceiving is set to true when we receive CopyDone
149  * from the other end. When both become true, it's time to exit Copy mode.
150  */
151 static bool     streamingDoneSending;
152 static bool     streamingDoneReceiving;
153
154 /* Flags set by signal handlers for later service in main loop */
155 static volatile sig_atomic_t got_SIGHUP = false;
156 static volatile sig_atomic_t walsender_ready_to_stop = false;
157
158 /*
159  * This is set while we are streaming. When not set, SIGUSR2 signal will be
160  * handled like SIGTERM. When set, the main loop is responsible for checking
161  * walsender_ready_to_stop and terminating when it's set (after streaming any
162  * remaining WAL).
163  */
164 static volatile sig_atomic_t replication_active = false;
165
166 /* Signal handlers */
167 static void WalSndSigHupHandler(SIGNAL_ARGS);
168 static void WalSndXLogSendHandler(SIGNAL_ARGS);
169 static void WalSndLastCycleHandler(SIGNAL_ARGS);
170
171 /* Prototypes for private functions */
172 static void WalSndLoop(void);
173 static void InitWalSenderSlot(void);
174 static void WalSndKill(int code, Datum arg);
175 static void XLogSend(bool *caughtup);
176 static XLogRecPtr GetStandbyFlushRecPtr(void);
177 static void IdentifySystem(void);
178 static void StartReplication(StartReplicationCmd *cmd);
179 static void ProcessStandbyMessage(void);
180 static void ProcessStandbyReplyMessage(void);
181 static void ProcessStandbyHSFeedbackMessage(void);
182 static void ProcessRepliesIfAny(void);
183 static void WalSndKeepalive(bool requestReply);
184
185
186 /* Initialize walsender process before entering the main command loop */
187 void
188 InitWalSender(void)
189 {
190         am_cascading_walsender = RecoveryInProgress();
191
192         /* Create a per-walsender data structure in shared memory */
193         InitWalSenderSlot();
194
195         /* Set up resource owner */
196         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
197
198         /*
199          * Let postmaster know that we're a WAL sender. Once we've declared us as
200          * a WAL sender process, postmaster will let us outlive the bgwriter and
201          * kill us last in the shutdown sequence, so we get a chance to stream all
202          * remaining WAL at shutdown, including the shutdown checkpoint. Note that
203          * there's no going back, and we mustn't write any WAL records after this.
204          */
205         MarkPostmasterChildWalSender();
206         SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
207 }
208
209 /*
210  * Clean up after an error.
211  *
212  * WAL sender processes don't use transactions like regular backends do.
213  * This function does any cleanup requited after an error in a WAL sender
214  * process, similar to what transaction abort does in a regular backend.
215  */
216 void
217 WalSndErrorCleanup()
218 {
219         if (sendFile >= 0)
220         {
221                 close(sendFile);
222                 sendFile = -1;
223         }
224
225         replication_active = false;
226         if (walsender_ready_to_stop)
227                 proc_exit(0);
228
229         /* Revert back to startup state */
230         WalSndSetState(WALSNDSTATE_STARTUP);
231 }
232
233 /*
234  * Handle the IDENTIFY_SYSTEM command.
235  */
236 static void
237 IdentifySystem(void)
238 {
239         StringInfoData buf;
240         char            sysid[32];
241         char            tli[11];
242         char            xpos[MAXFNAMELEN];
243         XLogRecPtr      logptr;
244
245         /*
246          * Reply with a result set with one row, three columns. First col is
247          * system ID, second is timeline ID, and third is current xlog location.
248          */
249
250         snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
251                          GetSystemIdentifier());
252
253         am_cascading_walsender = RecoveryInProgress();
254         if (am_cascading_walsender)
255         {
256                 /* this also updates ThisTimeLineID */
257                 logptr = GetStandbyFlushRecPtr();
258         }
259         else
260                 logptr = GetInsertRecPtr();
261
262         snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
263
264         snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
265
266         /* Send a RowDescription message */
267         pq_beginmessage(&buf, 'T');
268         pq_sendint(&buf, 3, 2);         /* 3 fields */
269
270         /* first field */
271         pq_sendstring(&buf, "systemid");        /* col name */
272         pq_sendint(&buf, 0, 4);         /* table oid */
273         pq_sendint(&buf, 0, 2);         /* attnum */
274         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
275         pq_sendint(&buf, -1, 2);        /* typlen */
276         pq_sendint(&buf, 0, 4);         /* typmod */
277         pq_sendint(&buf, 0, 2);         /* format code */
278
279         /* second field */
280         pq_sendstring(&buf, "timeline");        /* col name */
281         pq_sendint(&buf, 0, 4);         /* table oid */
282         pq_sendint(&buf, 0, 2);         /* attnum */
283         pq_sendint(&buf, INT4OID, 4);           /* type oid */
284         pq_sendint(&buf, 4, 2);         /* typlen */
285         pq_sendint(&buf, 0, 4);         /* typmod */
286         pq_sendint(&buf, 0, 2);         /* format code */
287
288         /* third field */
289         pq_sendstring(&buf, "xlogpos");
290         pq_sendint(&buf, 0, 4);
291         pq_sendint(&buf, 0, 2);
292         pq_sendint(&buf, TEXTOID, 4);
293         pq_sendint(&buf, -1, 2);
294         pq_sendint(&buf, 0, 4);
295         pq_sendint(&buf, 0, 2);
296         pq_endmessage(&buf);
297
298         /* Send a DataRow message */
299         pq_beginmessage(&buf, 'D');
300         pq_sendint(&buf, 3, 2);         /* # of columns */
301         pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
302         pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
303         pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
304         pq_sendbytes(&buf, (char *) tli, strlen(tli));
305         pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
306         pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
307
308         pq_endmessage(&buf);
309 }
310
311
312 /*
313  * Handle TIMELINE_HISTORY command.
314  */
315 static void
316 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
317 {
318         StringInfoData buf;
319         char            histfname[MAXFNAMELEN];
320         char            path[MAXPGPATH];
321         int                     fd;
322         off_t           histfilelen;
323         off_t           bytesleft;
324
325         /*
326          * Reply with a result set with one row, and two columns. The first col
327          * is the name of the history file, 2nd is the contents.
328          */
329
330         TLHistoryFileName(histfname, cmd->timeline);
331         TLHistoryFilePath(path, cmd->timeline);
332
333         /* Send a RowDescription message */
334         pq_beginmessage(&buf, 'T');
335         pq_sendint(&buf, 2, 2);         /* 2 fields */
336
337         /* first field */
338         pq_sendstring(&buf, "filename");        /* col name */
339         pq_sendint(&buf, 0, 4);         /* table oid */
340         pq_sendint(&buf, 0, 2);         /* attnum */
341         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
342         pq_sendint(&buf, -1, 2);        /* typlen */
343         pq_sendint(&buf, 0, 4);         /* typmod */
344         pq_sendint(&buf, 0, 2);         /* format code */
345
346         /* second field */
347         pq_sendstring(&buf, "content"); /* col name */
348         pq_sendint(&buf, 0, 4);         /* table oid */
349         pq_sendint(&buf, 0, 2);         /* attnum */
350         pq_sendint(&buf, BYTEAOID, 4);          /* type oid */
351         pq_sendint(&buf, -1, 2);        /* typlen */
352         pq_sendint(&buf, 0, 4);         /* typmod */
353         pq_sendint(&buf, 0, 2);         /* format code */
354         pq_endmessage(&buf);
355
356         /* Send a DataRow message */
357         pq_beginmessage(&buf, 'D');
358         pq_sendint(&buf, 2, 2);         /* # of columns */
359         pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
360         pq_sendbytes(&buf, histfname, strlen(histfname));
361
362         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
363         if (fd < 0)
364                 ereport(ERROR,
365                                 (errcode_for_file_access(),
366                                  errmsg("could not open file \"%s\": %m", path)));
367
368         /* Determine file length and send it to client */
369         histfilelen = lseek(fd, 0, SEEK_END);
370         if (histfilelen < 0)
371                 ereport(ERROR,
372                                 (errcode_for_file_access(),
373                                  errmsg("could not seek to end of file \"%s\": %m", path)));
374         if (lseek(fd, 0, SEEK_SET) != 0)
375                 ereport(ERROR,
376                                 (errcode_for_file_access(),
377                                  errmsg("could not seek to beginning of file \"%s\": %m", path)));
378
379         pq_sendint(&buf, histfilelen, 4);       /* col2 len */
380
381         bytesleft = histfilelen;
382         while (bytesleft > 0)
383         {
384                 char rbuf[BLCKSZ];
385                 int nread;
386
387                 nread = read(fd, rbuf, sizeof(rbuf));
388                 if (nread <= 0)
389                         ereport(ERROR,
390                                         (errcode_for_file_access(),
391                                          errmsg("could not read file \"%s\": %m",
392                                                         path)));
393                 pq_sendbytes(&buf, rbuf, nread);
394                 bytesleft -= nread;
395         }
396         CloseTransientFile(fd);
397
398         pq_endmessage(&buf);
399 }
400
401 /*
402  * Handle START_REPLICATION command.
403  *
404  * At the moment, this never returns, but an ereport(ERROR) will take us back
405  * to the main loop.
406  */
407 static void
408 StartReplication(StartReplicationCmd *cmd)
409 {
410         StringInfoData buf;
411         XLogRecPtr FlushPtr;
412
413         /*
414          * We assume here that we're logging enough information in the WAL for
415          * log-shipping, since this is checked in PostmasterMain().
416          *
417          * NOTE: wal_level can only change at shutdown, so in most cases it is
418          * difficult for there to be WAL data that we can still see that was
419          * written at wal_level='minimal'.
420          */
421
422         /*
423          * Select the timeline. If it was given explicitly by the client, use
424          * that. Otherwise use the timeline of the last replayed record, which
425          * is kept in ThisTimeLineID.
426          */
427         if (am_cascading_walsender)
428         {
429                 /* this also updates ThisTimeLineID */
430                 FlushPtr = GetStandbyFlushRecPtr();
431         }
432         else
433                 FlushPtr = GetFlushRecPtr();
434
435         if (cmd->timeline != 0)
436         {
437                 XLogRecPtr      switchpoint;
438
439                 sendTimeLine = cmd->timeline;
440                 if (sendTimeLine == ThisTimeLineID)
441                 {
442                         sendTimeLineIsHistoric = false;
443                         sendTimeLineValidUpto = InvalidXLogRecPtr;
444                 }
445                 else
446                 {
447                         List       *timeLineHistory;
448
449                         sendTimeLineIsHistoric = true;
450
451                         /*
452                          * Check that the timeline the client requested for exists, and the
453                          * requested start location is on that timeline.
454                          */
455                         timeLineHistory = readTimeLineHistory(ThisTimeLineID);
456                         switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
457                                                                                  &sendTimeLineNextTLI);
458                         list_free_deep(timeLineHistory);
459
460                         /*
461                          * Found the requested timeline in the history. Check that
462                          * requested startpoint is on that timeline in our history.
463                          *
464                          * This is quite loose on purpose. We only check that we didn't
465                          * fork off the requested timeline before the switchpoint. We don't
466                          * check that we switched *to* it before the requested starting
467                          * point. This is because the client can legitimately request to
468                          * start replication from the beginning of the WAL segment that
469                          * contains switchpoint, but on the new timeline, so that it
470                          * doesn't end up with a partial segment. If you ask for a too old
471                          * starting point, you'll get an error later when we fail to find
472                          * the requested WAL segment in pg_xlog.
473                          *
474                          * XXX: we could be more strict here and only allow a startpoint
475                          * that's older than the switchpoint, if it it's still in the same
476                          * WAL segment.
477                          */
478                         if (!XLogRecPtrIsInvalid(switchpoint) &&
479                                 switchpoint < cmd->startpoint)
480                         {
481                                 ereport(ERROR,
482                                                 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
483                                                                 (uint32) (cmd->startpoint >> 32),
484                                                                 (uint32) (cmd->startpoint),
485                                                                 cmd->timeline),
486                                                  errdetail("This server's history forked from timeline %u at %X/%X",
487                                                                    cmd->timeline,
488                                                                    (uint32) (switchpoint >> 32),
489                                                                    (uint32) (switchpoint))));
490                         }
491                         sendTimeLineValidUpto = switchpoint;
492                 }
493         }
494         else
495         {
496                 sendTimeLine = ThisTimeLineID;
497                 sendTimeLineValidUpto = InvalidXLogRecPtr;
498                 sendTimeLineIsHistoric = false;
499         }
500
501         streamingDoneSending = streamingDoneReceiving = false;
502
503         /* If there is nothing to stream, don't even enter COPY mode */
504         if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
505         {
506                 /*
507                  * When we first start replication the standby will be behind the primary.
508                  * For some applications, for example, synchronous replication, it is
509                  * important to have a clear state for this initial catchup mode, so we
510                  * can trigger actions when we change streaming state later. We may stay
511                  * in this state for a long time, which is exactly why we want to be able
512                  * to monitor whether or not we are still here.
513                  */
514                 WalSndSetState(WALSNDSTATE_CATCHUP);
515
516                 /* Send a CopyBothResponse message, and start streaming */
517                 pq_beginmessage(&buf, 'W');
518                 pq_sendbyte(&buf, 0);
519                 pq_sendint(&buf, 0, 2);
520                 pq_endmessage(&buf);
521                 pq_flush();
522
523                 /*
524                  * Don't allow a request to stream from a future point in WAL that
525                  * hasn't been flushed to disk in this server yet.
526                  */
527                 if (FlushPtr < cmd->startpoint)
528                 {
529                         ereport(ERROR,
530                                         (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
531                                                         (uint32) (cmd->startpoint >> 32),
532                                                         (uint32) (cmd->startpoint),
533                                                         (uint32) (FlushPtr >> 32),
534                                                         (uint32) (FlushPtr))));
535                 }
536
537                 /* Start streaming from the requested point */
538                 sentPtr = cmd->startpoint;
539
540                 /* Initialize shared memory status, too */
541                 {
542                         /* use volatile pointer to prevent code rearrangement */
543                         volatile WalSnd *walsnd = MyWalSnd;
544
545                         SpinLockAcquire(&walsnd->mutex);
546                         walsnd->sentPtr = sentPtr;
547                         SpinLockRelease(&walsnd->mutex);
548                 }
549
550                 SyncRepInitConfig();
551
552                 /* Main loop of walsender */
553                 replication_active = true;
554
555                 WalSndLoop();
556
557                 replication_active = false;
558                 if (walsender_ready_to_stop)
559                         proc_exit(0);
560                 WalSndSetState(WALSNDSTATE_STARTUP);
561
562                 Assert(streamingDoneSending && streamingDoneReceiving);
563         }
564
565         /*
566          * Copy is finished now. Send a single-row result set indicating the next
567          * timeline.
568          */
569         if (sendTimeLineIsHistoric)
570         {
571                 char            str[11];
572                 snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
573
574                 pq_beginmessage(&buf, 'T'); /* RowDescription */
575                 pq_sendint(&buf, 1, 2);         /* 1 field */
576
577                 /* Field header */
578                 pq_sendstring(&buf, "next_tli");
579                 pq_sendint(&buf, 0, 4);         /* table oid */
580                 pq_sendint(&buf, 0, 2);         /* attnum */
581                 /*
582                  * int8 may seem like a surprising data type for this, but in theory
583                  * int4 would not be wide enough for this, as TimeLineID is unsigned.
584                  */
585                 pq_sendint(&buf, INT8OID, 4);   /* type oid */
586                 pq_sendint(&buf, -1, 2);
587                 pq_sendint(&buf, 0, 4);
588                 pq_sendint(&buf, 0, 2);
589                 pq_endmessage(&buf);
590
591                 /* Data row */
592                 pq_beginmessage(&buf, 'D');
593                 pq_sendint(&buf, 1, 2);         /* number of columns */
594                 pq_sendint(&buf, strlen(str), 4);       /* length */
595                 pq_sendbytes(&buf, str, strlen(str));
596                 pq_endmessage(&buf);
597         }
598
599         /* Send CommandComplete message */
600         pq_puttextmessage('C', "START_STREAMING");
601 }
602
603 /*
604  * Execute an incoming replication command.
605  */
606 void
607 exec_replication_command(const char *cmd_string)
608 {
609         int                     parse_rc;
610         Node       *cmd_node;
611         MemoryContext cmd_context;
612         MemoryContext old_context;
613
614         elog(DEBUG1, "received replication command: %s", cmd_string);
615
616         CHECK_FOR_INTERRUPTS();
617
618         cmd_context = AllocSetContextCreate(CurrentMemoryContext,
619                                                                                 "Replication command context",
620                                                                                 ALLOCSET_DEFAULT_MINSIZE,
621                                                                                 ALLOCSET_DEFAULT_INITSIZE,
622                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
623         old_context = MemoryContextSwitchTo(cmd_context);
624
625         replication_scanner_init(cmd_string);
626         parse_rc = replication_yyparse();
627         if (parse_rc != 0)
628                 ereport(ERROR,
629                                 (errcode(ERRCODE_SYNTAX_ERROR),
630                                  (errmsg_internal("replication command parser returned %d",
631                                                                   parse_rc))));
632
633         cmd_node = replication_parse_result;
634
635         switch (cmd_node->type)
636         {
637                 case T_IdentifySystemCmd:
638                         IdentifySystem();
639                         break;
640
641                 case T_StartReplicationCmd:
642                         StartReplication((StartReplicationCmd *) cmd_node);
643                         break;
644
645                 case T_BaseBackupCmd:
646                         SendBaseBackup((BaseBackupCmd *) cmd_node);
647                         break;
648
649                 case T_TimeLineHistoryCmd:
650                         SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
651                         break;
652
653                 default:
654                         elog(ERROR, "unrecognized replication command node tag: %u",
655                                  cmd_node->type);
656         }
657
658         /* done */
659         MemoryContextSwitchTo(old_context);
660         MemoryContextDelete(cmd_context);
661
662         /* Send CommandComplete message */
663         EndCommand("SELECT", DestRemote);
664 }
665
666 /*
667  * Process any incoming messages while streaming. Also checks if the remote
668  * end has closed the connection.
669  */
670 static void
671 ProcessRepliesIfAny(void)
672 {
673         unsigned char firstchar;
674         int                     r;
675         bool            received = false;
676
677         /*
678          * If we already received a CopyDone from the frontend, any subsequent
679          * message is the beginning of a new command, and should be processed in
680          * the main processing loop.
681          */
682         while (!streamingDoneReceiving)
683         {
684                 r = pq_getbyte_if_available(&firstchar);
685                 if (r < 0)
686                 {
687                         /* unexpected error or EOF */
688                         ereport(COMMERROR,
689                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
690                                          errmsg("unexpected EOF on standby connection")));
691                         proc_exit(0);
692                 }
693                 if (r == 0)
694                 {
695                         /* no data available without blocking */
696                         break;
697                 }
698
699                 /* Handle the very limited subset of commands expected in this phase */
700                 switch (firstchar)
701                 {
702                                 /*
703                                  * 'd' means a standby reply wrapped in a CopyData packet.
704                                  */
705                         case 'd':
706                                 ProcessStandbyMessage();
707                                 received = true;
708                                 break;
709
710                                 /*
711                                  * CopyDone means the standby requested to finish streaming.
712                                  * Reply with CopyDone, if we had not sent that already.
713                                  */
714                         case 'c':
715                                 if (!streamingDoneSending)
716                                 {
717                                         pq_putmessage_noblock('c', NULL, 0);
718                                         streamingDoneSending = true;
719                                 }
720
721                                 /* consume the CopyData message */
722                                 resetStringInfo(&reply_message);
723                                 if (pq_getmessage(&reply_message, 0))
724                                 {
725                                         ereport(COMMERROR,
726                                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
727                                                          errmsg("unexpected EOF on standby connection")));
728                                         proc_exit(0);
729                                 }
730
731                                 streamingDoneReceiving = true;
732                                 received = true;
733                                 break;
734
735                                 /*
736                                  * 'X' means that the standby is closing down the socket.
737                                  */
738                         case 'X':
739                                 proc_exit(0);
740
741                         default:
742                                 ereport(FATAL,
743                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
744                                                  errmsg("invalid standby message type \"%c\"",
745                                                                 firstchar)));
746                 }
747         }
748
749         /*
750          * Save the last reply timestamp if we've received at least one reply.
751          */
752         if (received)
753         {
754                 last_reply_timestamp = GetCurrentTimestamp();
755                 ping_sent = false;
756         }
757 }
758
759 /*
760  * Process a status update message received from standby.
761  */
762 static void
763 ProcessStandbyMessage(void)
764 {
765         char            msgtype;
766
767         resetStringInfo(&reply_message);
768
769         /*
770          * Read the message contents.
771          */
772         if (pq_getmessage(&reply_message, 0))
773         {
774                 ereport(COMMERROR,
775                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
776                                  errmsg("unexpected EOF on standby connection")));
777                 proc_exit(0);
778         }
779
780         /*
781          * Check message type from the first byte.
782          */
783         msgtype = pq_getmsgbyte(&reply_message);
784
785         switch (msgtype)
786         {
787                 case 'r':
788                         ProcessStandbyReplyMessage();
789                         break;
790
791                 case 'h':
792                         ProcessStandbyHSFeedbackMessage();
793                         break;
794
795                 default:
796                         ereport(COMMERROR,
797                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
798                                          errmsg("unexpected message type \"%c\"", msgtype)));
799                         proc_exit(0);
800         }
801 }
802
803 /*
804  * Regular reply from standby advising of WAL positions on standby server.
805  */
806 static void
807 ProcessStandbyReplyMessage(void)
808 {
809         XLogRecPtr      writePtr,
810                                 flushPtr,
811                                 applyPtr;
812         bool            replyRequested;
813
814         /* the caller already consumed the msgtype byte */
815         writePtr = pq_getmsgint64(&reply_message);
816         flushPtr = pq_getmsgint64(&reply_message);
817         applyPtr = pq_getmsgint64(&reply_message);
818         (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
819         replyRequested = pq_getmsgbyte(&reply_message);
820
821         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
822                  (uint32) (writePtr >> 32), (uint32) writePtr,
823                  (uint32) (flushPtr >> 32), (uint32) flushPtr,
824                  (uint32) (applyPtr >> 32), (uint32) applyPtr,
825                  replyRequested ? " (reply requested)" : "");
826
827         /* Send a reply if the standby requested one. */
828         if (replyRequested)
829                 WalSndKeepalive(false);
830
831         /*
832          * Update shared state for this WalSender process based on reply data from
833          * standby.
834          */
835         {
836                 /* use volatile pointer to prevent code rearrangement */
837                 volatile WalSnd *walsnd = MyWalSnd;
838
839                 SpinLockAcquire(&walsnd->mutex);
840                 walsnd->write = writePtr;
841                 walsnd->flush = flushPtr;
842                 walsnd->apply = applyPtr;
843                 SpinLockRelease(&walsnd->mutex);
844         }
845
846         if (!am_cascading_walsender)
847                 SyncRepReleaseWaiters();
848 }
849
850 /*
851  * Hot Standby feedback
852  */
853 static void
854 ProcessStandbyHSFeedbackMessage(void)
855 {
856         TransactionId nextXid;
857         uint32          nextEpoch;
858         TransactionId feedbackXmin;
859         uint32          feedbackEpoch;
860
861         /*
862          * Decipher the reply message. The caller already consumed the msgtype
863          * byte.
864          */
865         (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
866         feedbackXmin = pq_getmsgint(&reply_message, 4);
867         feedbackEpoch = pq_getmsgint(&reply_message, 4);
868
869         elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
870                  feedbackXmin,
871                  feedbackEpoch);
872
873         /* Unset WalSender's xmin if the feedback message value is invalid */
874         if (!TransactionIdIsNormal(feedbackXmin))
875         {
876                 MyPgXact->xmin = InvalidTransactionId;
877                 return;
878         }
879
880         /*
881          * Check that the provided xmin/epoch are sane, that is, not in the future
882          * and not so far back as to be already wrapped around.  Ignore if not.
883          *
884          * Epoch of nextXid should be same as standby, or if the counter has
885          * wrapped, then one greater than standby.
886          */
887         GetNextXidAndEpoch(&nextXid, &nextEpoch);
888
889         if (feedbackXmin <= nextXid)
890         {
891                 if (feedbackEpoch != nextEpoch)
892                         return;
893         }
894         else
895         {
896                 if (feedbackEpoch + 1 != nextEpoch)
897                         return;
898         }
899
900         if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
901                 return;                                 /* epoch OK, but it's wrapped around */
902
903         /*
904          * Set the WalSender's xmin equal to the standby's requested xmin, so that
905          * the xmin will be taken into account by GetOldestXmin.  This will hold
906          * back the removal of dead rows and thereby prevent the generation of
907          * cleanup conflicts on the standby server.
908          *
909          * There is a small window for a race condition here: although we just
910          * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
911          * advanced between our fetching it and applying the xmin below, perhaps
912          * far enough to make feedbackXmin wrap around.  In that case the xmin we
913          * set here would be "in the future" and have no effect.  No point in
914          * worrying about this since it's too late to save the desired data
915          * anyway.      Assuming that the standby sends us an increasing sequence of
916          * xmins, this could only happen during the first reply cycle, else our
917          * own xmin would prevent nextXid from advancing so far.
918          *
919          * We don't bother taking the ProcArrayLock here.  Setting the xmin field
920          * is assumed atomic, and there's no real need to prevent a concurrent
921          * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
922          * safe, and if we're moving it backwards, well, the data is at risk
923          * already since a VACUUM could have just finished calling GetOldestXmin.)
924          */
925         MyPgXact->xmin = feedbackXmin;
926 }
927
928 /* Main loop of walsender process that streams the WAL over Copy messages. */
929 static void
930 WalSndLoop(void)
931 {
932         bool            caughtup = false;
933
934         /*
935          * Allocate buffers that will be used for each outgoing and incoming
936          * message.  We do this just once to reduce palloc overhead.
937          */
938         initStringInfo(&output_message);
939         initStringInfo(&reply_message);
940         initStringInfo(&tmpbuf);
941
942         /* Initialize the last reply timestamp */
943         last_reply_timestamp = GetCurrentTimestamp();
944         ping_sent = false;
945
946         /*
947          * Loop until we reach the end of this timeline or the client requests
948          * to stop streaming.
949          */
950         for (;;)
951         {
952                 /* Clear any already-pending wakeups */
953                 ResetLatch(&MyWalSnd->latch);
954
955                 /*
956                  * Emergency bailout if postmaster has died.  This is to avoid the
957                  * necessity for manual cleanup of all postmaster children.
958                  */
959                 if (!PostmasterIsAlive())
960                         exit(1);
961
962                 /* Process any requests or signals received recently */
963                 if (got_SIGHUP)
964                 {
965                         got_SIGHUP = false;
966                         ProcessConfigFile(PGC_SIGHUP);
967                         SyncRepInitConfig();
968                 }
969
970                 CHECK_FOR_INTERRUPTS();
971
972                 /* Check for input from the client */
973                 ProcessRepliesIfAny();
974
975                 /*
976                  * If we have received CopyDone from the client, sent CopyDone
977                  * ourselves, and the output buffer is empty, it's time to exit
978                  * streaming.
979                  */
980                 if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
981                         break;
982
983                 /*
984                  * If we don't have any pending data in the output buffer, try to send
985                  * some more.  If there is some, we don't bother to call XLogSend
986                  * again until we've flushed it ... but we'd better assume we are not
987                  * caught up.
988                  */
989                 if (!pq_is_send_pending())
990                         XLogSend(&caughtup);
991                 else
992                         caughtup = false;
993
994                 /* Try to flush pending output to the client */
995                 if (pq_flush_if_writable() != 0)
996                         goto send_failure;
997
998                 /* If nothing remains to be sent right now ... */
999                 if (caughtup && !pq_is_send_pending())
1000                 {
1001                         /*
1002                          * If we're in catchup state, move to streaming.  This is an
1003                          * important state change for users to know about, since before
1004                          * this point data loss might occur if the primary dies and we
1005                          * need to failover to the standby. The state change is also
1006                          * important for synchronous replication, since commits that
1007                          * started to wait at that point might wait for some time.
1008                          */
1009                         if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1010                         {
1011                                 ereport(DEBUG1,
1012                                          (errmsg("standby \"%s\" has now caught up with primary",
1013                                                          application_name)));
1014                                 WalSndSetState(WALSNDSTATE_STREAMING);
1015                         }
1016
1017                         /*
1018                          * When SIGUSR2 arrives, we send any outstanding logs up to the
1019                          * shutdown checkpoint record (i.e., the latest record) and exit.
1020                          * This may be a normal termination at shutdown, or a promotion,
1021                          * the walsender is not sure which.
1022                          */
1023                         if (walsender_ready_to_stop)
1024                         {
1025                                 /* ... let's just be real sure we're caught up ... */
1026                                 XLogSend(&caughtup);
1027                                 if (caughtup && !pq_is_send_pending())
1028                                 {
1029                                         /* Inform the standby that XLOG streaming is done */
1030                                         EndCommand("COPY 0", DestRemote);
1031                                         pq_flush();
1032
1033                                         proc_exit(0);
1034                                 }
1035                         }
1036                 }
1037
1038                 /*
1039                  * We don't block if not caught up, unless there is unsent data
1040                  * pending in which case we'd better block until the socket is
1041                  * write-ready.  This test is only needed for the case where XLogSend
1042                  * loaded a subset of the available data but then pq_flush_if_writable
1043                  * flushed it all --- we should immediately try to send more.
1044                  */
1045                 if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
1046                 {
1047                         TimestampTz timeout = 0;
1048                         long            sleeptime = 10000;              /* 10 s */
1049                         int                     wakeEvents;
1050
1051                         wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
1052
1053                         if (!streamingDoneReceiving)
1054                                 wakeEvents |= WL_SOCKET_READABLE;
1055
1056                         if (pq_is_send_pending())
1057                                 wakeEvents |= WL_SOCKET_WRITEABLE;
1058                         else if (wal_sender_timeout > 0 && !ping_sent)
1059                         {
1060                                 /*
1061                                  * If half of wal_sender_timeout has lapsed without receiving
1062                                  * any reply from standby, send a keep-alive message to standby
1063                                  * requesting an immediate reply.
1064                                  */
1065                                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1066                                                                                                           wal_sender_timeout / 2);
1067                                 if (GetCurrentTimestamp() >= timeout)
1068                                 {
1069                                         WalSndKeepalive(true);
1070                                         ping_sent = true;
1071                                         /* Try to flush pending output to the client */
1072                                         if (pq_flush_if_writable() != 0)
1073                                                 break;
1074                                 }
1075                         }
1076
1077                         /* Determine time until replication timeout */
1078                         if (wal_sender_timeout > 0)
1079                         {
1080                                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1081                                                                                                           wal_sender_timeout);
1082                                 sleeptime = 1 + (wal_sender_timeout / 10);
1083                         }
1084
1085                         /* Sleep until something happens or we time out */
1086                         ImmediateInterruptOK = true;
1087                         CHECK_FOR_INTERRUPTS();
1088                         WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
1089                                                           MyProcPort->sock, sleeptime);
1090                         ImmediateInterruptOK = false;
1091
1092                         /*
1093                          * Check for replication timeout.  Note we ignore the corner case
1094                          * possibility that the client replied just as we reached the
1095                          * timeout ... he's supposed to reply *before* that.
1096                          */
1097                         if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
1098                         {
1099                                 /*
1100                                  * Since typically expiration of replication timeout means
1101                                  * communication problem, we don't send the error message to
1102                                  * the standby.
1103                                  */
1104                                 ereport(COMMERROR,
1105                                                 (errmsg("terminating walsender process due to replication timeout")));
1106                                 goto send_failure;
1107                         }
1108                 }
1109         }
1110         return;
1111
1112 send_failure:
1113         /*
1114          * Get here on send failure.  Clean up and exit.
1115          *
1116          * Reset whereToSendOutput to prevent ereport from attempting to send any
1117          * more messages to the standby.
1118          */
1119         if (whereToSendOutput == DestRemote)
1120                 whereToSendOutput = DestNone;
1121
1122         proc_exit(0);
1123         abort();                                        /* keep the compiler quiet */
1124 }
1125
1126 /* Initialize a per-walsender data structure for this walsender process */
1127 static void
1128 InitWalSenderSlot(void)
1129 {
1130         int                     i;
1131
1132         /*
1133          * WalSndCtl should be set up already (we inherit this by fork() or
1134          * EXEC_BACKEND mechanism from the postmaster).
1135          */
1136         Assert(WalSndCtl != NULL);
1137         Assert(MyWalSnd == NULL);
1138
1139         /*
1140          * Find a free walsender slot and reserve it. If this fails, we must be
1141          * out of WalSnd structures.
1142          */
1143         for (i = 0; i < max_wal_senders; i++)
1144         {
1145                 /* use volatile pointer to prevent code rearrangement */
1146                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1147
1148                 SpinLockAcquire(&walsnd->mutex);
1149
1150                 if (walsnd->pid != 0)
1151                 {
1152                         SpinLockRelease(&walsnd->mutex);
1153                         continue;
1154                 }
1155                 else
1156                 {
1157                         /*
1158                          * Found a free slot. Reserve it for us.
1159                          */
1160                         walsnd->pid = MyProcPid;
1161                         walsnd->sentPtr = InvalidXLogRecPtr;
1162                         walsnd->state = WALSNDSTATE_STARTUP;
1163                         SpinLockRelease(&walsnd->mutex);
1164                         /* don't need the lock anymore */
1165                         OwnLatch((Latch *) &walsnd->latch);
1166                         MyWalSnd = (WalSnd *) walsnd;
1167
1168                         break;
1169                 }
1170         }
1171         if (MyWalSnd == NULL)
1172                 ereport(FATAL,
1173                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
1174                                  errmsg("number of requested standby connections "
1175                                                 "exceeds max_wal_senders (currently %d)",
1176                                                 max_wal_senders)));
1177
1178         /* Arrange to clean up at walsender exit */
1179         on_shmem_exit(WalSndKill, 0);
1180 }
1181
1182 /* Destroy the per-walsender data structure for this walsender process */
1183 static void
1184 WalSndKill(int code, Datum arg)
1185 {
1186         Assert(MyWalSnd != NULL);
1187
1188         /*
1189          * Mark WalSnd struct no longer in use. Assume that no lock is required
1190          * for this.
1191          */
1192         MyWalSnd->pid = 0;
1193         DisownLatch(&MyWalSnd->latch);
1194
1195         /* WalSnd struct isn't mine anymore */
1196         MyWalSnd = NULL;
1197 }
1198
1199 /*
1200  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
1201  *
1202  * XXX probably this should be improved to suck data directly from the
1203  * WAL buffers when possible.
1204  *
1205  * Will open, and keep open, one WAL segment stored in the global file
1206  * descriptor sendFile. This means if XLogRead is used once, there will
1207  * always be one descriptor left open until the process ends, but never
1208  * more than one.
1209  */
1210 static void
1211 XLogRead(char *buf, XLogRecPtr startptr, Size count)
1212 {
1213         char       *p;
1214         XLogRecPtr      recptr;
1215         Size            nbytes;
1216         XLogSegNo       segno;
1217
1218 retry:
1219         p = buf;
1220         recptr = startptr;
1221         nbytes = count;
1222
1223         while (nbytes > 0)
1224         {
1225                 uint32          startoff;
1226                 int                     segbytes;
1227                 int                     readbytes;
1228
1229                 startoff = recptr % XLogSegSize;
1230
1231                 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
1232                 {
1233                         char            path[MAXPGPATH];
1234
1235                         /* Switch to another logfile segment */
1236                         if (sendFile >= 0)
1237                                 close(sendFile);
1238
1239                         XLByteToSeg(recptr, sendSegNo);
1240
1241                         /*-------
1242                          * When reading from a historic timeline, and there is a timeline
1243                          * switch within this segment, read from the WAL segment belonging
1244                          * to the new timeline.
1245                          *
1246                          * For example, imagine that this server is currently on timeline
1247                          * 5, and we're streaming timeline 4. The switch from timeline 4
1248                          * to 5 happened at 0/13002088. In pg_xlog, we have these files:
1249                          *
1250                          * ...
1251                          * 000000040000000000000012
1252                          * 000000040000000000000013
1253                          * 000000050000000000000013
1254                          * 000000050000000000000014
1255                          * ...
1256                          *
1257                          * In this situation, when requested to send the WAL from
1258                          * segment 0x13, on timeline 4, we read the WAL from file
1259                          * 000000050000000000000013. Archive recovery prefers files from
1260                          * newer timelines, so if the segment was restored from the
1261                          * archive on this server, the file belonging to the old timeline,
1262                          * 000000040000000000000013, might not exist. Their contents are
1263                          * equal up to the switchpoint, because at a timeline switch, the
1264                          * used portion of the old segment is copied to the new file.
1265                          *-------
1266                          */
1267                         curFileTimeLine = sendTimeLine;
1268                         if (sendTimeLineIsHistoric)
1269                         {
1270                                 XLogSegNo endSegNo;
1271
1272                                 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
1273                                 if (sendSegNo == endSegNo)
1274                                         curFileTimeLine = sendTimeLineNextTLI;
1275                         }
1276
1277                         XLogFilePath(path, curFileTimeLine, sendSegNo);
1278
1279                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1280                         if (sendFile < 0)
1281                         {
1282                                 /*
1283                                  * If the file is not found, assume it's because the standby
1284                                  * asked for a too old WAL segment that has already been
1285                                  * removed or recycled.
1286                                  */
1287                                 if (errno == ENOENT)
1288                                         ereport(ERROR,
1289                                                         (errcode_for_file_access(),
1290                                                          errmsg("requested WAL segment %s has already been removed",
1291                                                                         XLogFileNameP(curFileTimeLine, sendSegNo))));
1292                                 else
1293                                         ereport(ERROR,
1294                                                         (errcode_for_file_access(),
1295                                                          errmsg("could not open file \"%s\": %m",
1296                                                                         path)));
1297                         }
1298                         sendOff = 0;
1299                 }
1300
1301                 /* Need to seek in the file? */
1302                 if (sendOff != startoff)
1303                 {
1304                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
1305                                 ereport(ERROR,
1306                                                 (errcode_for_file_access(),
1307                                                  errmsg("could not seek in log segment %s to offset %u: %m",
1308                                                                 XLogFileNameP(curFileTimeLine, sendSegNo),
1309                                                                 startoff)));
1310                         sendOff = startoff;
1311                 }
1312
1313                 /* How many bytes are within this segment? */
1314                 if (nbytes > (XLogSegSize - startoff))
1315                         segbytes = XLogSegSize - startoff;
1316                 else
1317                         segbytes = nbytes;
1318
1319                 readbytes = read(sendFile, p, segbytes);
1320                 if (readbytes <= 0)
1321                 {
1322                         ereport(ERROR,
1323                                         (errcode_for_file_access(),
1324                         errmsg("could not read from log segment %s, offset %u, length %lu: %m",
1325                                    XLogFileNameP(curFileTimeLine, sendSegNo),
1326                                    sendOff, (unsigned long) segbytes)));
1327                 }
1328
1329                 /* Update state for read */
1330                 recptr += readbytes;
1331
1332                 sendOff += readbytes;
1333                 nbytes -= readbytes;
1334                 p += readbytes;
1335         }
1336
1337         /*
1338          * After reading into the buffer, check that what we read was valid. We do
1339          * this after reading, because even though the segment was present when we
1340          * opened it, it might get recycled or removed while we read it. The
1341          * read() succeeds in that case, but the data we tried to read might
1342          * already have been overwritten with new WAL records.
1343          */
1344         XLByteToSeg(startptr, segno);
1345         CheckXLogRemoved(segno, ThisTimeLineID);
1346
1347         /*
1348          * During recovery, the currently-open WAL file might be replaced with the
1349          * file of the same name retrieved from archive. So we always need to
1350          * check what we read was valid after reading into the buffer. If it's
1351          * invalid, we try to open and read the file again.
1352          */
1353         if (am_cascading_walsender)
1354         {
1355                 /* use volatile pointer to prevent code rearrangement */
1356                 volatile WalSnd *walsnd = MyWalSnd;
1357                 bool            reload;
1358
1359                 SpinLockAcquire(&walsnd->mutex);
1360                 reload = walsnd->needreload;
1361                 walsnd->needreload = false;
1362                 SpinLockRelease(&walsnd->mutex);
1363
1364                 if (reload && sendFile >= 0)
1365                 {
1366                         close(sendFile);
1367                         sendFile = -1;
1368
1369                         goto retry;
1370                 }
1371         }
1372 }
1373
1374 /*
1375  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
1376  * but not yet sent to the client, and buffer it in the libpq output
1377  * buffer.
1378  *
1379  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
1380  * *caughtup is set to false.
1381  */
1382 static void
1383 XLogSend(bool *caughtup)
1384 {
1385         XLogRecPtr      SendRqstPtr;
1386         XLogRecPtr      startptr;
1387         XLogRecPtr      endptr;
1388         Size            nbytes;
1389
1390         if (streamingDoneSending)
1391         {
1392                 *caughtup = true;
1393                 return;
1394         }
1395
1396         /* Figure out how far we can safely send the WAL. */
1397         if (sendTimeLineIsHistoric)
1398         {
1399                 /*
1400                  * Streaming an old timeline timeline that's in this server's history,
1401                  * but is not the one we're currently inserting or replaying. It can
1402                  * be streamed up to the point where we switched off that timeline.
1403                  */
1404                 SendRqstPtr = sendTimeLineValidUpto;
1405         }
1406         else if (am_cascading_walsender)
1407         {
1408                 /*
1409                  * Streaming the latest timeline on a standby.
1410                  *
1411                  * Attempt to send all WAL that has already been replayed, so that
1412                  * we know it's valid. If we're receiving WAL through streaming
1413                  * replication, it's also OK to send any WAL that has been received
1414                  * but not replayed.
1415                  *
1416                  * The timeline we're recovering from can change, or we can be
1417                  * promoted. In either case, the current timeline becomes historic.
1418                  * We need to detect that so that we don't try to stream past the
1419                  * point where we switched to another timeline. We check for promotion
1420                  * or timeline switch after calculating FlushPtr, to avoid a race
1421                  * condition: if the timeline becomes historic just after we checked
1422                  * that it was still current, it's still be OK to stream it up to the
1423                  * FlushPtr that was calculated before it became historic.
1424                  */
1425                 bool            becameHistoric = false;
1426
1427                 SendRqstPtr = GetStandbyFlushRecPtr();
1428
1429                 if (!RecoveryInProgress())
1430                 {
1431                         /*
1432                          * We have been promoted. RecoveryInProgress() updated
1433                          * ThisTimeLineID to the new current timeline.
1434                          */
1435                         am_cascading_walsender = false;
1436                         becameHistoric = true;
1437                 }
1438                 else
1439                 {
1440                         /*
1441                          * Still a cascading standby. But is the timeline we're sending
1442                          * still the one recovery is recovering from? ThisTimeLineID was
1443                          * updated by the GetStandbyFlushRecPtr() call above.
1444                          */
1445                         if (sendTimeLine != ThisTimeLineID)
1446                                 becameHistoric = true;
1447                 }
1448
1449                 if (becameHistoric)
1450                 {
1451                         /*
1452                          * The timeline we were sending has become historic. Read the
1453                          * timeline history file of the new timeline to see where exactly
1454                          * we forked off from the timeline we were sending.
1455                          */
1456                         List       *history;
1457
1458                         history = readTimeLineHistory(ThisTimeLineID);
1459                         sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
1460                         Assert(sentPtr <= sendTimeLineValidUpto);
1461                         Assert(sendTimeLine < sendTimeLineNextTLI);
1462                         list_free_deep(history);
1463
1464                         /* the current send pointer should be <= the switchpoint */
1465                         if (!(sentPtr <= sendTimeLineValidUpto))
1466                                 elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
1467                                          sendTimeLine,
1468                                          (uint32) (sendTimeLineValidUpto >> 32),
1469                                          (uint32) sendTimeLineValidUpto,
1470                                          (uint32) (sentPtr >> 32),
1471                                          (uint32) sentPtr);
1472
1473                         sendTimeLineIsHistoric = true;
1474
1475                         SendRqstPtr = sendTimeLineValidUpto;
1476                 }
1477         }
1478         else
1479         {
1480                 /*
1481                  * Streaming the current timeline on a master.
1482                  *
1483                  * Attempt to send all data that's already been written out and
1484                  * fsync'd to disk.  We cannot go further than what's been written out
1485                  * given the current implementation of XLogRead().  And in any case
1486                  * it's unsafe to send WAL that is not securely down to disk on the
1487                  * master: if the master subsequently crashes and restarts, slaves
1488                  * must not have applied any WAL that gets lost on the master.
1489                  */
1490                 SendRqstPtr = GetFlushRecPtr();
1491         }
1492
1493         /*
1494          * If this is a historic timeline and we've reached the point where we
1495          * forked to the next timeline, stop streaming.
1496          */
1497         if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
1498         {
1499                 /* close the current file. */
1500                 if (sendFile >= 0)
1501                         close(sendFile);
1502                 sendFile = -1;
1503
1504                 /* Send CopyDone */
1505                 pq_putmessage_noblock('c', NULL, 0);
1506                 streamingDoneSending = true;
1507
1508                 *caughtup = true;
1509                 return;
1510         }
1511
1512         /* Do we have any work to do? */
1513         Assert(sentPtr <= SendRqstPtr);
1514         if (SendRqstPtr <= sentPtr)
1515         {
1516                 *caughtup = true;
1517                 return;
1518         }
1519
1520         /*
1521          * Figure out how much to send in one message. If there's no more than
1522          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
1523          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
1524          *
1525          * The rounding is not only for performance reasons. Walreceiver relies on
1526          * the fact that we never split a WAL record across two messages. Since a
1527          * long WAL record is split at page boundary into continuation records,
1528          * page boundary is always a safe cut-off point. We also assume that
1529          * SendRqstPtr never points to the middle of a WAL record.
1530          */
1531         startptr = sentPtr;
1532         endptr = startptr;
1533         endptr += MAX_SEND_SIZE;
1534
1535         /* if we went beyond SendRqstPtr, back off */
1536         if (SendRqstPtr <= endptr)
1537         {
1538                 endptr = SendRqstPtr;
1539                 if (sendTimeLineIsHistoric)
1540                         *caughtup = false;
1541                 else
1542                         *caughtup = true;
1543         }
1544         else
1545         {
1546                 /* round down to page boundary. */
1547                 endptr -= (endptr % XLOG_BLCKSZ);
1548                 *caughtup = false;
1549         }
1550
1551         nbytes = endptr - startptr;
1552         Assert(nbytes <= MAX_SEND_SIZE);
1553
1554         /*
1555          * OK to read and send the slice.
1556          */
1557         resetStringInfo(&output_message);
1558         pq_sendbyte(&output_message, 'w');
1559
1560         pq_sendint64(&output_message, startptr);        /* dataStart */
1561         pq_sendint64(&output_message, SendRqstPtr);     /* walEnd */
1562         pq_sendint64(&output_message, 0);                       /* sendtime, filled in last */
1563
1564         /*
1565          * Read the log directly into the output buffer to avoid extra memcpy
1566          * calls.
1567          */
1568         enlargeStringInfo(&output_message, nbytes);
1569         XLogRead(&output_message.data[output_message.len], startptr, nbytes);
1570         output_message.len += nbytes;
1571         output_message.data[output_message.len] = '\0';
1572
1573         /*
1574          * Fill the send timestamp last, so that it is taken as late as possible.
1575          */
1576         resetStringInfo(&tmpbuf);
1577         pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1578         memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
1579                    tmpbuf.data, sizeof(int64));
1580
1581         pq_putmessage_noblock('d', output_message.data, output_message.len);
1582
1583         sentPtr = endptr;
1584
1585         /* Update shared memory status */
1586         {
1587                 /* use volatile pointer to prevent code rearrangement */
1588                 volatile WalSnd *walsnd = MyWalSnd;
1589
1590                 SpinLockAcquire(&walsnd->mutex);
1591                 walsnd->sentPtr = sentPtr;
1592                 SpinLockRelease(&walsnd->mutex);
1593         }
1594
1595         /* Report progress of XLOG streaming in PS display */
1596         if (update_process_title)
1597         {
1598                 char            activitymsg[50];
1599
1600                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1601                                  (uint32) (sentPtr >> 32), (uint32) sentPtr);
1602                 set_ps_display(activitymsg, false);
1603         }
1604
1605         return;
1606 }
1607
1608 /*
1609  * Returns the latest point in WAL that has been safely flushed to disk, and
1610  * can be sent to the standby. This should only be called when in recovery,
1611  * ie. we're streaming to a cascaded standby.
1612  *
1613  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
1614  * replayed WAL record.
1615  */
1616 static XLogRecPtr
1617 GetStandbyFlushRecPtr(void)
1618 {
1619         XLogRecPtr replayPtr;
1620         TimeLineID replayTLI;
1621         XLogRecPtr receivePtr;
1622         TimeLineID receiveTLI;
1623         XLogRecPtr      result;
1624
1625         /*
1626          * We can safely send what's already been replayed. Also, if walreceiver
1627          * is streaming WAL from the same timeline, we can send anything that
1628          * it has streamed, but hasn't been replayed yet.
1629          */
1630
1631         receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
1632         replayPtr = GetXLogReplayRecPtr(&replayTLI);
1633
1634         ThisTimeLineID = replayTLI;
1635
1636         result = replayPtr;
1637         if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
1638                 result = receivePtr;
1639
1640         return result;
1641 }
1642
1643 /*
1644  * Request walsenders to reload the currently-open WAL file
1645  */
1646 void
1647 WalSndRqstFileReload(void)
1648 {
1649         int                     i;
1650
1651         for (i = 0; i < max_wal_senders; i++)
1652         {
1653                 /* use volatile pointer to prevent code rearrangement */
1654                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1655
1656                 if (walsnd->pid == 0)
1657                         continue;
1658
1659                 SpinLockAcquire(&walsnd->mutex);
1660                 walsnd->needreload = true;
1661                 SpinLockRelease(&walsnd->mutex);
1662         }
1663 }
1664
1665 /* SIGHUP: set flag to re-read config file at next convenient time */
1666 static void
1667 WalSndSigHupHandler(SIGNAL_ARGS)
1668 {
1669         int                     save_errno = errno;
1670
1671         got_SIGHUP = true;
1672         if (MyWalSnd)
1673                 SetLatch(&MyWalSnd->latch);
1674
1675         errno = save_errno;
1676 }
1677
1678 /* SIGUSR1: set flag to send WAL records */
1679 static void
1680 WalSndXLogSendHandler(SIGNAL_ARGS)
1681 {
1682         int                     save_errno = errno;
1683
1684         latch_sigusr1_handler();
1685
1686         errno = save_errno;
1687 }
1688
1689 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1690 static void
1691 WalSndLastCycleHandler(SIGNAL_ARGS)
1692 {
1693         int                     save_errno = errno;
1694
1695         /*
1696          * If replication has not yet started, die like with SIGTERM. If
1697          * replication is active, only set a flag and wake up the main loop. It
1698          * will send any outstanding WAL, and then exit gracefully.
1699          */
1700         if (!replication_active)
1701                 kill(MyProcPid, SIGTERM);
1702
1703         walsender_ready_to_stop = true;
1704         if (MyWalSnd)
1705                 SetLatch(&MyWalSnd->latch);
1706
1707         errno = save_errno;
1708 }
1709
1710 /* Set up signal handlers */
1711 void
1712 WalSndSignals(void)
1713 {
1714         /* Set up signal handlers */
1715         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
1716                                                                                                  * file */
1717         pqsignal(SIGINT, SIG_IGN);      /* not used */
1718         pqsignal(SIGTERM, die);                                         /* request shutdown */
1719         pqsignal(SIGQUIT, quickdie);                            /* hard crash time */
1720         InitializeTimeouts();           /* establishes SIGALRM handler */
1721         pqsignal(SIGPIPE, SIG_IGN);
1722         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
1723         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
1724                                                                                                  * shutdown */
1725
1726         /* Reset some signals that are accepted by postmaster but not here */
1727         pqsignal(SIGCHLD, SIG_DFL);
1728         pqsignal(SIGTTIN, SIG_DFL);
1729         pqsignal(SIGTTOU, SIG_DFL);
1730         pqsignal(SIGCONT, SIG_DFL);
1731         pqsignal(SIGWINCH, SIG_DFL);
1732 }
1733
1734 /* Report shared-memory space needed by WalSndShmemInit */
1735 Size
1736 WalSndShmemSize(void)
1737 {
1738         Size            size = 0;
1739
1740         size = offsetof(WalSndCtlData, walsnds);
1741         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1742
1743         return size;
1744 }
1745
1746 /* Allocate and initialize walsender-related shared memory */
1747 void
1748 WalSndShmemInit(void)
1749 {
1750         bool            found;
1751         int                     i;
1752
1753         WalSndCtl = (WalSndCtlData *)
1754                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1755
1756         if (!found)
1757         {
1758                 /* First time through, so initialize */
1759                 MemSet(WalSndCtl, 0, WalSndShmemSize());
1760
1761                 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
1762                         SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
1763
1764                 for (i = 0; i < max_wal_senders; i++)
1765                 {
1766                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
1767
1768                         SpinLockInit(&walsnd->mutex);
1769                         InitSharedLatch(&walsnd->latch);
1770                 }
1771         }
1772 }
1773
1774 /*
1775  * Wake up all walsenders
1776  *
1777  * This will be called inside critical sections, so throwing an error is not
1778  * adviseable.
1779  */
1780 void
1781 WalSndWakeup(void)
1782 {
1783         int                     i;
1784
1785         for (i = 0; i < max_wal_senders; i++)
1786                 SetLatch(&WalSndCtl->walsnds[i].latch);
1787 }
1788
1789 /* Set state for current walsender (only called in walsender) */
1790 void
1791 WalSndSetState(WalSndState state)
1792 {
1793         /* use volatile pointer to prevent code rearrangement */
1794         volatile WalSnd *walsnd = MyWalSnd;
1795
1796         Assert(am_walsender);
1797
1798         if (walsnd->state == state)
1799                 return;
1800
1801         SpinLockAcquire(&walsnd->mutex);
1802         walsnd->state = state;
1803         SpinLockRelease(&walsnd->mutex);
1804 }
1805
1806 /*
1807  * Return a string constant representing the state. This is used
1808  * in system views, and should *not* be translated.
1809  */
1810 static const char *
1811 WalSndGetStateString(WalSndState state)
1812 {
1813         switch (state)
1814         {
1815                 case WALSNDSTATE_STARTUP:
1816                         return "startup";
1817                 case WALSNDSTATE_BACKUP:
1818                         return "backup";
1819                 case WALSNDSTATE_CATCHUP:
1820                         return "catchup";
1821                 case WALSNDSTATE_STREAMING:
1822                         return "streaming";
1823         }
1824         return "UNKNOWN";
1825 }
1826
1827
1828 /*
1829  * Returns activity of walsenders, including pids and xlog locations sent to
1830  * standby servers.
1831  */
1832 Datum
1833 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1834 {
1835 #define PG_STAT_GET_WAL_SENDERS_COLS    8
1836         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1837         TupleDesc       tupdesc;
1838         Tuplestorestate *tupstore;
1839         MemoryContext per_query_ctx;
1840         MemoryContext oldcontext;
1841         int                *sync_priority;
1842         int                     priority = 0;
1843         int                     sync_standby = -1;
1844         int                     i;
1845
1846         /* check to see if caller supports us returning a tuplestore */
1847         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1848                 ereport(ERROR,
1849                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1850                                  errmsg("set-valued function called in context that cannot accept a set")));
1851         if (!(rsinfo->allowedModes & SFRM_Materialize))
1852                 ereport(ERROR,
1853                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1854                                  errmsg("materialize mode required, but it is not " \
1855                                                 "allowed in this context")));
1856
1857         /* Build a tuple descriptor for our result type */
1858         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1859                 elog(ERROR, "return type must be a row type");
1860
1861         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1862         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1863
1864         tupstore = tuplestore_begin_heap(true, false, work_mem);
1865         rsinfo->returnMode = SFRM_Materialize;
1866         rsinfo->setResult = tupstore;
1867         rsinfo->setDesc = tupdesc;
1868
1869         MemoryContextSwitchTo(oldcontext);
1870
1871         /*
1872          * Get the priorities of sync standbys all in one go, to minimise lock
1873          * acquisitions and to allow us to evaluate who is the current sync
1874          * standby. This code must match the code in SyncRepReleaseWaiters().
1875          */
1876         sync_priority = palloc(sizeof(int) * max_wal_senders);
1877         LWLockAcquire(SyncRepLock, LW_SHARED);
1878         for (i = 0; i < max_wal_senders; i++)
1879         {
1880                 /* use volatile pointer to prevent code rearrangement */
1881                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1882
1883                 if (walsnd->pid != 0)
1884                 {
1885                         /*
1886                          * Treat a standby such as a pg_basebackup background process
1887                          * which always returns an invalid flush location, as an
1888                          * asynchronous standby.
1889                          */
1890                         sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
1891                                 0 : walsnd->sync_standby_priority;
1892
1893                         if (walsnd->state == WALSNDSTATE_STREAMING &&
1894                                 walsnd->sync_standby_priority > 0 &&
1895                                 (priority == 0 ||
1896                                  priority > walsnd->sync_standby_priority) &&
1897                                 !XLogRecPtrIsInvalid(walsnd->flush))
1898                         {
1899                                 priority = walsnd->sync_standby_priority;
1900                                 sync_standby = i;
1901                         }
1902                 }
1903         }
1904         LWLockRelease(SyncRepLock);
1905
1906         for (i = 0; i < max_wal_senders; i++)
1907         {
1908                 /* use volatile pointer to prevent code rearrangement */
1909                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1910                 char            location[MAXFNAMELEN];
1911                 XLogRecPtr      sentPtr;
1912                 XLogRecPtr      write;
1913                 XLogRecPtr      flush;
1914                 XLogRecPtr      apply;
1915                 WalSndState state;
1916                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1917                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1918
1919                 if (walsnd->pid == 0)
1920                         continue;
1921
1922                 SpinLockAcquire(&walsnd->mutex);
1923                 sentPtr = walsnd->sentPtr;
1924                 state = walsnd->state;
1925                 write = walsnd->write;
1926                 flush = walsnd->flush;
1927                 apply = walsnd->apply;
1928                 SpinLockRelease(&walsnd->mutex);
1929
1930                 memset(nulls, 0, sizeof(nulls));
1931                 values[0] = Int32GetDatum(walsnd->pid);
1932
1933                 if (!superuser())
1934                 {
1935                         /*
1936                          * Only superusers can see details. Other users only get the pid
1937                          * value to know it's a walsender, but no details.
1938                          */
1939                         MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
1940                 }
1941                 else
1942                 {
1943                         values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1944
1945                         snprintf(location, sizeof(location), "%X/%X",
1946                                          (uint32) (sentPtr >> 32), (uint32) sentPtr);
1947                         values[2] = CStringGetTextDatum(location);
1948
1949                         if (write == 0)
1950                                 nulls[3] = true;
1951                         snprintf(location, sizeof(location), "%X/%X",
1952                                          (uint32) (write >> 32), (uint32) write);
1953                         values[3] = CStringGetTextDatum(location);
1954
1955                         if (flush == 0)
1956                                 nulls[4] = true;
1957                         snprintf(location, sizeof(location), "%X/%X",
1958                                          (uint32) (flush >> 32), (uint32) flush);
1959                         values[4] = CStringGetTextDatum(location);
1960
1961                         if (apply == 0)
1962                                 nulls[5] = true;
1963                         snprintf(location, sizeof(location), "%X/%X",
1964                                          (uint32) (apply >> 32), (uint32) apply);
1965                         values[5] = CStringGetTextDatum(location);
1966
1967                         values[6] = Int32GetDatum(sync_priority[i]);
1968
1969                         /*
1970                          * More easily understood version of standby state. This is purely
1971                          * informational, not different from priority.
1972                          */
1973                         if (sync_priority[i] == 0)
1974                                 values[7] = CStringGetTextDatum("async");
1975                         else if (i == sync_standby)
1976                                 values[7] = CStringGetTextDatum("sync");
1977                         else
1978                                 values[7] = CStringGetTextDatum("potential");
1979                 }
1980
1981                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1982         }
1983         pfree(sync_priority);
1984
1985         /* clean up and return the tuplestore */
1986         tuplestore_donestoring(tupstore);
1987
1988         return (Datum) 0;
1989 }
1990
1991 /*
1992   * This function is used to send keepalive message to standby.
1993   * If requestReply is set, sets a flag in the message requesting the standby
1994   * to send a message back to us, for heartbeat purposes.
1995   */
1996 static void
1997 WalSndKeepalive(bool requestReply)
1998 {
1999         elog(DEBUG2, "sending replication keepalive");
2000
2001         /* construct the message... */
2002         resetStringInfo(&output_message);
2003         pq_sendbyte(&output_message, 'k');
2004         pq_sendint64(&output_message, sentPtr);
2005         pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
2006         pq_sendbyte(&output_message, requestReply ? 1 : 0);
2007
2008         /* ... and send it wrapped in CopyData */
2009         pq_putmessage_noblock('d', output_message.data, output_message.len);
2010 }
2011
2012 /*
2013  * This isn't currently used for anything. Monitoring tools might be
2014  * interested in the future, and we'll need something like this in the
2015  * future for synchronous replication.
2016  */
2017 #ifdef NOT_USED
2018 /*
2019  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
2020  * if none.
2021  */
2022 XLogRecPtr
2023 GetOldestWALSendPointer(void)
2024 {
2025         XLogRecPtr      oldest = {0, 0};
2026         int                     i;
2027         bool            found = false;
2028
2029         for (i = 0; i < max_wal_senders; i++)
2030         {
2031                 /* use volatile pointer to prevent code rearrangement */
2032                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
2033                 XLogRecPtr      recptr;
2034
2035                 if (walsnd->pid == 0)
2036                         continue;
2037
2038                 SpinLockAcquire(&walsnd->mutex);
2039                 recptr = walsnd->sentPtr;
2040                 SpinLockRelease(&walsnd->mutex);
2041
2042                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
2043                         continue;
2044
2045                 if (!found || recptr < oldest)
2046                         oldest = recptr;
2047                 found = true;
2048         }
2049         return oldest;
2050 }
2051
2052 #endif