]> granicus.if.org Git - postgresql/blob - src/backend/replication/walsender.c
Update copyright for 2014
[postgresql] / src / backend / replication / walsender.c
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until the either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all
28  * regular backends have exited and the shutdown checkpoint has been written.
29  * This instruct walsender to send any outstanding WAL, including the
30  * shutdown checkpoint record, wait for it to be replicated to the standby,
31  * and then exit.
32  *
33  *
34  * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
35  *
36  * IDENTIFICATION
37  *        src/backend/replication/walsender.c
38  *
39  *-------------------------------------------------------------------------
40  */
41 #include "postgres.h"
42
43 #include <signal.h>
44 #include <unistd.h>
45
46 #include "access/timeline.h"
47 #include "access/transam.h"
48 #include "access/xlog_internal.h"
49 #include "catalog/pg_type.h"
50 #include "funcapi.h"
51 #include "libpq/libpq.h"
52 #include "libpq/pqformat.h"
53 #include "miscadmin.h"
54 #include "nodes/replnodes.h"
55 #include "replication/basebackup.h"
56 #include "replication/syncrep.h"
57 #include "replication/walreceiver.h"
58 #include "replication/walsender.h"
59 #include "replication/walsender_private.h"
60 #include "storage/fd.h"
61 #include "storage/ipc.h"
62 #include "storage/pmsignal.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/memutils.h"
69 #include "utils/ps_status.h"
70 #include "utils/resowner.h"
71 #include "utils/timeout.h"
72 #include "utils/timestamp.h"
73
74 /*
75  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
76  *
77  * We don't have a good idea of what a good value would be; there's some
78  * overhead per message in both walsender and walreceiver, but on the other
79  * hand sending large batches makes walsender less responsive to signals
80  * because signals are checked only between messages.  128kB (with
81  * default 8k blocks) seems like a reasonable guess for now.
82  */
83 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
84
85 /* Array of WalSnds in shared memory */
86 WalSndCtlData *WalSndCtl = NULL;
87
88 /* My slot in the shared memory array */
89 WalSnd     *MyWalSnd = NULL;
90
91 /* Global state */
92 bool            am_walsender = false;           /* Am I a walsender process ? */
93 bool            am_cascading_walsender = false;         /* Am I cascading WAL to
94                                                                                                  * another standby ? */
95
96 /* User-settable parameters for walsender */
97 int                     max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
98 int                     wal_sender_timeout = 60 * 1000;         /* maximum time to send one
99                                                                                                  * WAL data message */
100
101 /*
102  * State for WalSndWakeupRequest
103  */
104 bool            wake_wal_senders = false;
105
106 /*
107  * These variables are used similarly to openLogFile/Id/Seg/Off,
108  * but for walsender to read the XLOG.
109  */
110 static int      sendFile = -1;
111 static XLogSegNo sendSegNo = 0;
112 static uint32 sendOff = 0;
113
114 /* Timeline ID of the currently open file */
115 static TimeLineID curFileTimeLine = 0;
116
117 /*
118  * These variables keep track of the state of the timeline we're currently
119  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
120  * the timeline is not the latest timeline on this server, and the server's
121  * history forked off from that timeline at sendTimeLineValidUpto.
122  */
123 static TimeLineID sendTimeLine = 0;
124 static TimeLineID sendTimeLineNextTLI = 0;
125 static bool sendTimeLineIsHistoric = false;
126 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
127
128 /*
129  * How far have we sent WAL already? This is also advertised in
130  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
131  */
132 static XLogRecPtr sentPtr = 0;
133
134 /* Buffers for constructing outgoing messages and processing reply messages. */
135 static StringInfoData output_message;
136 static StringInfoData reply_message;
137 static StringInfoData tmpbuf;
138
139 /*
140  * Timestamp of the last receipt of the reply from the standby.
141  */
142 static TimestampTz last_reply_timestamp;
143
144 /* Have we sent a heartbeat message asking for reply, since last reply? */
145 static bool ping_sent = false;
146
147 /*
148  * While streaming WAL in Copy mode, streamingDoneSending is set to true
149  * after we have sent CopyDone. We should not send any more CopyData messages
150  * after that. streamingDoneReceiving is set to true when we receive CopyDone
151  * from the other end. When both become true, it's time to exit Copy mode.
152  */
153 static bool streamingDoneSending;
154 static bool streamingDoneReceiving;
155
156 /* Flags set by signal handlers for later service in main loop */
157 static volatile sig_atomic_t got_SIGHUP = false;
158 static volatile sig_atomic_t walsender_ready_to_stop = false;
159
160 /*
161  * This is set while we are streaming. When not set, SIGUSR2 signal will be
162  * handled like SIGTERM. When set, the main loop is responsible for checking
163  * walsender_ready_to_stop and terminating when it's set (after streaming any
164  * remaining WAL).
165  */
166 static volatile sig_atomic_t replication_active = false;
167
168 /* Signal handlers */
169 static void WalSndSigHupHandler(SIGNAL_ARGS);
170 static void WalSndXLogSendHandler(SIGNAL_ARGS);
171 static void WalSndLastCycleHandler(SIGNAL_ARGS);
172
173 /* Prototypes for private functions */
174 static void WalSndLoop(void);
175 static void InitWalSenderSlot(void);
176 static void WalSndKill(int code, Datum arg);
177 static void XLogSend(bool *caughtup);
178 static XLogRecPtr GetStandbyFlushRecPtr(void);
179 static void IdentifySystem(void);
180 static void StartReplication(StartReplicationCmd *cmd);
181 static void ProcessStandbyMessage(void);
182 static void ProcessStandbyReplyMessage(void);
183 static void ProcessStandbyHSFeedbackMessage(void);
184 static void ProcessRepliesIfAny(void);
185 static void WalSndKeepalive(bool requestReply);
186
187
188 /* Initialize walsender process before entering the main command loop */
189 void
190 InitWalSender(void)
191 {
192         am_cascading_walsender = RecoveryInProgress();
193
194         /* Create a per-walsender data structure in shared memory */
195         InitWalSenderSlot();
196
197         /* Set up resource owner */
198         CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
199
200         /*
201          * Let postmaster know that we're a WAL sender. Once we've declared us as
202          * a WAL sender process, postmaster will let us outlive the bgwriter and
203          * kill us last in the shutdown sequence, so we get a chance to stream all
204          * remaining WAL at shutdown, including the shutdown checkpoint. Note that
205          * there's no going back, and we mustn't write any WAL records after this.
206          */
207         MarkPostmasterChildWalSender();
208         SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
209 }
210
211 /*
212  * Clean up after an error.
213  *
214  * WAL sender processes don't use transactions like regular backends do.
215  * This function does any cleanup requited after an error in a WAL sender
216  * process, similar to what transaction abort does in a regular backend.
217  */
218 void
219 WalSndErrorCleanup()
220 {
221         if (sendFile >= 0)
222         {
223                 close(sendFile);
224                 sendFile = -1;
225         }
226
227         replication_active = false;
228         if (walsender_ready_to_stop)
229                 proc_exit(0);
230
231         /* Revert back to startup state */
232         WalSndSetState(WALSNDSTATE_STARTUP);
233 }
234
235 /*
236  * Handle the IDENTIFY_SYSTEM command.
237  */
238 static void
239 IdentifySystem(void)
240 {
241         StringInfoData buf;
242         char            sysid[32];
243         char            tli[11];
244         char            xpos[MAXFNAMELEN];
245         XLogRecPtr      logptr;
246
247         /*
248          * Reply with a result set with one row, three columns. First col is
249          * system ID, second is timeline ID, and third is current xlog location.
250          */
251
252         snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
253                          GetSystemIdentifier());
254
255         am_cascading_walsender = RecoveryInProgress();
256         if (am_cascading_walsender)
257         {
258                 /* this also updates ThisTimeLineID */
259                 logptr = GetStandbyFlushRecPtr();
260         }
261         else
262                 logptr = GetInsertRecPtr();
263
264         snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
265
266         snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
267
268         /* Send a RowDescription message */
269         pq_beginmessage(&buf, 'T');
270         pq_sendint(&buf, 3, 2);         /* 3 fields */
271
272         /* first field */
273         pq_sendstring(&buf, "systemid");        /* col name */
274         pq_sendint(&buf, 0, 4);         /* table oid */
275         pq_sendint(&buf, 0, 2);         /* attnum */
276         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
277         pq_sendint(&buf, -1, 2);        /* typlen */
278         pq_sendint(&buf, 0, 4);         /* typmod */
279         pq_sendint(&buf, 0, 2);         /* format code */
280
281         /* second field */
282         pq_sendstring(&buf, "timeline");        /* col name */
283         pq_sendint(&buf, 0, 4);         /* table oid */
284         pq_sendint(&buf, 0, 2);         /* attnum */
285         pq_sendint(&buf, INT4OID, 4);           /* type oid */
286         pq_sendint(&buf, 4, 2);         /* typlen */
287         pq_sendint(&buf, 0, 4);         /* typmod */
288         pq_sendint(&buf, 0, 2);         /* format code */
289
290         /* third field */
291         pq_sendstring(&buf, "xlogpos");
292         pq_sendint(&buf, 0, 4);
293         pq_sendint(&buf, 0, 2);
294         pq_sendint(&buf, TEXTOID, 4);
295         pq_sendint(&buf, -1, 2);
296         pq_sendint(&buf, 0, 4);
297         pq_sendint(&buf, 0, 2);
298         pq_endmessage(&buf);
299
300         /* Send a DataRow message */
301         pq_beginmessage(&buf, 'D');
302         pq_sendint(&buf, 3, 2);         /* # of columns */
303         pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
304         pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
305         pq_sendint(&buf, strlen(tli), 4);       /* col2 len */
306         pq_sendbytes(&buf, (char *) tli, strlen(tli));
307         pq_sendint(&buf, strlen(xpos), 4);      /* col3 len */
308         pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
309
310         pq_endmessage(&buf);
311 }
312
313
314 /*
315  * Handle TIMELINE_HISTORY command.
316  */
317 static void
318 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
319 {
320         StringInfoData buf;
321         char            histfname[MAXFNAMELEN];
322         char            path[MAXPGPATH];
323         int                     fd;
324         off_t           histfilelen;
325         off_t           bytesleft;
326
327         /*
328          * Reply with a result set with one row, and two columns. The first col is
329          * the name of the history file, 2nd is the contents.
330          */
331
332         TLHistoryFileName(histfname, cmd->timeline);
333         TLHistoryFilePath(path, cmd->timeline);
334
335         /* Send a RowDescription message */
336         pq_beginmessage(&buf, 'T');
337         pq_sendint(&buf, 2, 2);         /* 2 fields */
338
339         /* first field */
340         pq_sendstring(&buf, "filename");        /* col name */
341         pq_sendint(&buf, 0, 4);         /* table oid */
342         pq_sendint(&buf, 0, 2);         /* attnum */
343         pq_sendint(&buf, TEXTOID, 4);           /* type oid */
344         pq_sendint(&buf, -1, 2);        /* typlen */
345         pq_sendint(&buf, 0, 4);         /* typmod */
346         pq_sendint(&buf, 0, 2);         /* format code */
347
348         /* second field */
349         pq_sendstring(&buf, "content");         /* col name */
350         pq_sendint(&buf, 0, 4);         /* table oid */
351         pq_sendint(&buf, 0, 2);         /* attnum */
352         pq_sendint(&buf, BYTEAOID, 4);          /* type oid */
353         pq_sendint(&buf, -1, 2);        /* typlen */
354         pq_sendint(&buf, 0, 4);         /* typmod */
355         pq_sendint(&buf, 0, 2);         /* format code */
356         pq_endmessage(&buf);
357
358         /* Send a DataRow message */
359         pq_beginmessage(&buf, 'D');
360         pq_sendint(&buf, 2, 2);         /* # of columns */
361         pq_sendint(&buf, strlen(histfname), 4);         /* col1 len */
362         pq_sendbytes(&buf, histfname, strlen(histfname));
363
364         fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
365         if (fd < 0)
366                 ereport(ERROR,
367                                 (errcode_for_file_access(),
368                                  errmsg("could not open file \"%s\": %m", path)));
369
370         /* Determine file length and send it to client */
371         histfilelen = lseek(fd, 0, SEEK_END);
372         if (histfilelen < 0)
373                 ereport(ERROR,
374                                 (errcode_for_file_access(),
375                                  errmsg("could not seek to end of file \"%s\": %m", path)));
376         if (lseek(fd, 0, SEEK_SET) != 0)
377                 ereport(ERROR,
378                                 (errcode_for_file_access(),
379                         errmsg("could not seek to beginning of file \"%s\": %m", path)));
380
381         pq_sendint(&buf, histfilelen, 4);       /* col2 len */
382
383         bytesleft = histfilelen;
384         while (bytesleft > 0)
385         {
386                 char            rbuf[BLCKSZ];
387                 int                     nread;
388
389                 nread = read(fd, rbuf, sizeof(rbuf));
390                 if (nread <= 0)
391                         ereport(ERROR,
392                                         (errcode_for_file_access(),
393                                          errmsg("could not read file \"%s\": %m",
394                                                         path)));
395                 pq_sendbytes(&buf, rbuf, nread);
396                 bytesleft -= nread;
397         }
398         CloseTransientFile(fd);
399
400         pq_endmessage(&buf);
401 }
402
403 /*
404  * Handle START_REPLICATION command.
405  *
406  * At the moment, this never returns, but an ereport(ERROR) will take us back
407  * to the main loop.
408  */
409 static void
410 StartReplication(StartReplicationCmd *cmd)
411 {
412         StringInfoData buf;
413         XLogRecPtr      FlushPtr;
414
415         /*
416          * We assume here that we're logging enough information in the WAL for
417          * log-shipping, since this is checked in PostmasterMain().
418          *
419          * NOTE: wal_level can only change at shutdown, so in most cases it is
420          * difficult for there to be WAL data that we can still see that was
421          * written at wal_level='minimal'.
422          */
423
424         /*
425          * Select the timeline. If it was given explicitly by the client, use
426          * that. Otherwise use the timeline of the last replayed record, which is
427          * kept in ThisTimeLineID.
428          */
429         if (am_cascading_walsender)
430         {
431                 /* this also updates ThisTimeLineID */
432                 FlushPtr = GetStandbyFlushRecPtr();
433         }
434         else
435                 FlushPtr = GetFlushRecPtr();
436
437         if (cmd->timeline != 0)
438         {
439                 XLogRecPtr      switchpoint;
440
441                 sendTimeLine = cmd->timeline;
442                 if (sendTimeLine == ThisTimeLineID)
443                 {
444                         sendTimeLineIsHistoric = false;
445                         sendTimeLineValidUpto = InvalidXLogRecPtr;
446                 }
447                 else
448                 {
449                         List       *timeLineHistory;
450
451                         sendTimeLineIsHistoric = true;
452
453                         /*
454                          * Check that the timeline the client requested for exists, and
455                          * the requested start location is on that timeline.
456                          */
457                         timeLineHistory = readTimeLineHistory(ThisTimeLineID);
458                         switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
459                                                                                  &sendTimeLineNextTLI);
460                         list_free_deep(timeLineHistory);
461
462                         /*
463                          * Found the requested timeline in the history. Check that
464                          * requested startpoint is on that timeline in our history.
465                          *
466                          * This is quite loose on purpose. We only check that we didn't
467                          * fork off the requested timeline before the switchpoint. We
468                          * don't check that we switched *to* it before the requested
469                          * starting point. This is because the client can legitimately
470                          * request to start replication from the beginning of the WAL
471                          * segment that contains switchpoint, but on the new timeline, so
472                          * that it doesn't end up with a partial segment. If you ask for a
473                          * too old starting point, you'll get an error later when we fail
474                          * to find the requested WAL segment in pg_xlog.
475                          *
476                          * XXX: we could be more strict here and only allow a startpoint
477                          * that's older than the switchpoint, if it it's still in the same
478                          * WAL segment.
479                          */
480                         if (!XLogRecPtrIsInvalid(switchpoint) &&
481                                 switchpoint < cmd->startpoint)
482                         {
483                                 ereport(ERROR,
484                                                 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
485                                                                 (uint32) (cmd->startpoint >> 32),
486                                                                 (uint32) (cmd->startpoint),
487                                                                 cmd->timeline),
488                                                  errdetail("This server's history forked from timeline %u at %X/%X.",
489                                                                    cmd->timeline,
490                                                                    (uint32) (switchpoint >> 32),
491                                                                    (uint32) (switchpoint))));
492                         }
493                         sendTimeLineValidUpto = switchpoint;
494                 }
495         }
496         else
497         {
498                 sendTimeLine = ThisTimeLineID;
499                 sendTimeLineValidUpto = InvalidXLogRecPtr;
500                 sendTimeLineIsHistoric = false;
501         }
502
503         streamingDoneSending = streamingDoneReceiving = false;
504
505         /* If there is nothing to stream, don't even enter COPY mode */
506         if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
507         {
508                 /*
509                  * When we first start replication the standby will be behind the
510                  * primary. For some applications, for example, synchronous
511                  * replication, it is important to have a clear state for this initial
512                  * catchup mode, so we can trigger actions when we change streaming
513                  * state later. We may stay in this state for a long time, which is
514                  * exactly why we want to be able to monitor whether or not we are
515                  * still here.
516                  */
517                 WalSndSetState(WALSNDSTATE_CATCHUP);
518
519                 /* Send a CopyBothResponse message, and start streaming */
520                 pq_beginmessage(&buf, 'W');
521                 pq_sendbyte(&buf, 0);
522                 pq_sendint(&buf, 0, 2);
523                 pq_endmessage(&buf);
524                 pq_flush();
525
526                 /*
527                  * Don't allow a request to stream from a future point in WAL that
528                  * hasn't been flushed to disk in this server yet.
529                  */
530                 if (FlushPtr < cmd->startpoint)
531                 {
532                         ereport(ERROR,
533                                         (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
534                                                         (uint32) (cmd->startpoint >> 32),
535                                                         (uint32) (cmd->startpoint),
536                                                         (uint32) (FlushPtr >> 32),
537                                                         (uint32) (FlushPtr))));
538                 }
539
540                 /* Start streaming from the requested point */
541                 sentPtr = cmd->startpoint;
542
543                 /* Initialize shared memory status, too */
544                 {
545                         /* use volatile pointer to prevent code rearrangement */
546                         volatile WalSnd *walsnd = MyWalSnd;
547
548                         SpinLockAcquire(&walsnd->mutex);
549                         walsnd->sentPtr = sentPtr;
550                         SpinLockRelease(&walsnd->mutex);
551                 }
552
553                 SyncRepInitConfig();
554
555                 /* Main loop of walsender */
556                 replication_active = true;
557
558                 WalSndLoop();
559
560                 replication_active = false;
561                 if (walsender_ready_to_stop)
562                         proc_exit(0);
563                 WalSndSetState(WALSNDSTATE_STARTUP);
564
565                 Assert(streamingDoneSending && streamingDoneReceiving);
566         }
567
568         /*
569          * Copy is finished now. Send a single-row result set indicating the next
570          * timeline.
571          */
572         if (sendTimeLineIsHistoric)
573         {
574                 char            tli_str[11];
575                 char            startpos_str[8 + 1 + 8 + 1];
576
577                 snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
578                 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
579                                  (uint32) (sendTimeLineValidUpto >> 32),
580                                  (uint32) sendTimeLineValidUpto);
581
582                 pq_beginmessage(&buf, 'T');             /* RowDescription */
583                 pq_sendint(&buf, 2, 2); /* 2 fields */
584
585                 /* Field header */
586                 pq_sendstring(&buf, "next_tli");
587                 pq_sendint(&buf, 0, 4); /* table oid */
588                 pq_sendint(&buf, 0, 2); /* attnum */
589
590                 /*
591                  * int8 may seem like a surprising data type for this, but in theory
592                  * int4 would not be wide enough for this, as TimeLineID is unsigned.
593                  */
594                 pq_sendint(&buf, INT8OID, 4);   /* type oid */
595                 pq_sendint(&buf, -1, 2);
596                 pq_sendint(&buf, 0, 4);
597                 pq_sendint(&buf, 0, 2);
598
599                 pq_sendstring(&buf, "next_tli_startpos");
600                 pq_sendint(&buf, 0, 4); /* table oid */
601                 pq_sendint(&buf, 0, 2); /* attnum */
602                 pq_sendint(&buf, TEXTOID, 4);   /* type oid */
603                 pq_sendint(&buf, -1, 2);
604                 pq_sendint(&buf, 0, 4);
605                 pq_sendint(&buf, 0, 2);
606                 pq_endmessage(&buf);
607
608                 /* Data row */
609                 pq_beginmessage(&buf, 'D');
610                 pq_sendint(&buf, 2, 2); /* number of columns */
611
612                 pq_sendint(&buf, strlen(tli_str), 4);   /* length */
613                 pq_sendbytes(&buf, tli_str, strlen(tli_str));
614
615                 pq_sendint(&buf, strlen(startpos_str), 4);              /* length */
616                 pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
617
618                 pq_endmessage(&buf);
619         }
620
621         /* Send CommandComplete message */
622         pq_puttextmessage('C', "START_STREAMING");
623 }
624
625 /*
626  * Execute an incoming replication command.
627  */
628 void
629 exec_replication_command(const char *cmd_string)
630 {
631         int                     parse_rc;
632         Node       *cmd_node;
633         MemoryContext cmd_context;
634         MemoryContext old_context;
635
636         elog(DEBUG1, "received replication command: %s", cmd_string);
637
638         CHECK_FOR_INTERRUPTS();
639
640         cmd_context = AllocSetContextCreate(CurrentMemoryContext,
641                                                                                 "Replication command context",
642                                                                                 ALLOCSET_DEFAULT_MINSIZE,
643                                                                                 ALLOCSET_DEFAULT_INITSIZE,
644                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
645         old_context = MemoryContextSwitchTo(cmd_context);
646
647         replication_scanner_init(cmd_string);
648         parse_rc = replication_yyparse();
649         if (parse_rc != 0)
650                 ereport(ERROR,
651                                 (errcode(ERRCODE_SYNTAX_ERROR),
652                                  (errmsg_internal("replication command parser returned %d",
653                                                                   parse_rc))));
654
655         cmd_node = replication_parse_result;
656
657         switch (cmd_node->type)
658         {
659                 case T_IdentifySystemCmd:
660                         IdentifySystem();
661                         break;
662
663                 case T_StartReplicationCmd:
664                         StartReplication((StartReplicationCmd *) cmd_node);
665                         break;
666
667                 case T_BaseBackupCmd:
668                         SendBaseBackup((BaseBackupCmd *) cmd_node);
669                         break;
670
671                 case T_TimeLineHistoryCmd:
672                         SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
673                         break;
674
675                 default:
676                         elog(ERROR, "unrecognized replication command node tag: %u",
677                                  cmd_node->type);
678         }
679
680         /* done */
681         MemoryContextSwitchTo(old_context);
682         MemoryContextDelete(cmd_context);
683
684         /* Send CommandComplete message */
685         EndCommand("SELECT", DestRemote);
686 }
687
688 /*
689  * Process any incoming messages while streaming. Also checks if the remote
690  * end has closed the connection.
691  */
692 static void
693 ProcessRepliesIfAny(void)
694 {
695         unsigned char firstchar;
696         int                     r;
697         bool            received = false;
698
699         for (;;)
700         {
701                 r = pq_getbyte_if_available(&firstchar);
702                 if (r < 0)
703                 {
704                         /* unexpected error or EOF */
705                         ereport(COMMERROR,
706                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
707                                          errmsg("unexpected EOF on standby connection")));
708                         proc_exit(0);
709                 }
710                 if (r == 0)
711                 {
712                         /* no data available without blocking */
713                         break;
714                 }
715
716                 /*
717                  * If we already received a CopyDone from the frontend, the frontend
718                  * should not send us anything until we've closed our end of the COPY.
719                  * XXX: In theory, the frontend could already send the next command
720                  * before receiving the CopyDone, but libpq doesn't currently allow
721                  * that.
722                  */
723                 if (streamingDoneReceiving && firstchar != 'X')
724                         ereport(FATAL,
725                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
726                                          errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
727                                                         firstchar)));
728
729                 /* Handle the very limited subset of commands expected in this phase */
730                 switch (firstchar)
731                 {
732                                 /*
733                                  * 'd' means a standby reply wrapped in a CopyData packet.
734                                  */
735                         case 'd':
736                                 ProcessStandbyMessage();
737                                 received = true;
738                                 break;
739
740                                 /*
741                                  * CopyDone means the standby requested to finish streaming.
742                                  * Reply with CopyDone, if we had not sent that already.
743                                  */
744                         case 'c':
745                                 if (!streamingDoneSending)
746                                 {
747                                         pq_putmessage_noblock('c', NULL, 0);
748                                         streamingDoneSending = true;
749                                 }
750
751                                 /* consume the CopyData message */
752                                 resetStringInfo(&reply_message);
753                                 if (pq_getmessage(&reply_message, 0))
754                                 {
755                                         ereport(COMMERROR,
756                                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
757                                                          errmsg("unexpected EOF on standby connection")));
758                                         proc_exit(0);
759                                 }
760
761                                 streamingDoneReceiving = true;
762                                 received = true;
763                                 break;
764
765                                 /*
766                                  * 'X' means that the standby is closing down the socket.
767                                  */
768                         case 'X':
769                                 proc_exit(0);
770
771                         default:
772                                 ereport(FATAL,
773                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
774                                                  errmsg("invalid standby message type \"%c\"",
775                                                                 firstchar)));
776                 }
777         }
778
779         /*
780          * Save the last reply timestamp if we've received at least one reply.
781          */
782         if (received)
783         {
784                 last_reply_timestamp = GetCurrentTimestamp();
785                 ping_sent = false;
786         }
787 }
788
789 /*
790  * Process a status update message received from standby.
791  */
792 static void
793 ProcessStandbyMessage(void)
794 {
795         char            msgtype;
796
797         resetStringInfo(&reply_message);
798
799         /*
800          * Read the message contents.
801          */
802         if (pq_getmessage(&reply_message, 0))
803         {
804                 ereport(COMMERROR,
805                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
806                                  errmsg("unexpected EOF on standby connection")));
807                 proc_exit(0);
808         }
809
810         /*
811          * Check message type from the first byte.
812          */
813         msgtype = pq_getmsgbyte(&reply_message);
814
815         switch (msgtype)
816         {
817                 case 'r':
818                         ProcessStandbyReplyMessage();
819                         break;
820
821                 case 'h':
822                         ProcessStandbyHSFeedbackMessage();
823                         break;
824
825                 default:
826                         ereport(COMMERROR,
827                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
828                                          errmsg("unexpected message type \"%c\"", msgtype)));
829                         proc_exit(0);
830         }
831 }
832
833 /*
834  * Regular reply from standby advising of WAL positions on standby server.
835  */
836 static void
837 ProcessStandbyReplyMessage(void)
838 {
839         XLogRecPtr      writePtr,
840                                 flushPtr,
841                                 applyPtr;
842         bool            replyRequested;
843
844         /* the caller already consumed the msgtype byte */
845         writePtr = pq_getmsgint64(&reply_message);
846         flushPtr = pq_getmsgint64(&reply_message);
847         applyPtr = pq_getmsgint64(&reply_message);
848         (void) pq_getmsgint64(&reply_message);          /* sendTime; not used ATM */
849         replyRequested = pq_getmsgbyte(&reply_message);
850
851         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
852                  (uint32) (writePtr >> 32), (uint32) writePtr,
853                  (uint32) (flushPtr >> 32), (uint32) flushPtr,
854                  (uint32) (applyPtr >> 32), (uint32) applyPtr,
855                  replyRequested ? " (reply requested)" : "");
856
857         /* Send a reply if the standby requested one. */
858         if (replyRequested)
859                 WalSndKeepalive(false);
860
861         /*
862          * Update shared state for this WalSender process based on reply data from
863          * standby.
864          */
865         {
866                 /* use volatile pointer to prevent code rearrangement */
867                 volatile WalSnd *walsnd = MyWalSnd;
868
869                 SpinLockAcquire(&walsnd->mutex);
870                 walsnd->write = writePtr;
871                 walsnd->flush = flushPtr;
872                 walsnd->apply = applyPtr;
873                 SpinLockRelease(&walsnd->mutex);
874         }
875
876         if (!am_cascading_walsender)
877                 SyncRepReleaseWaiters();
878 }
879
880 /*
881  * Hot Standby feedback
882  */
883 static void
884 ProcessStandbyHSFeedbackMessage(void)
885 {
886         TransactionId nextXid;
887         uint32          nextEpoch;
888         TransactionId feedbackXmin;
889         uint32          feedbackEpoch;
890
891         /*
892          * Decipher the reply message. The caller already consumed the msgtype
893          * byte.
894          */
895         (void) pq_getmsgint64(&reply_message);          /* sendTime; not used ATM */
896         feedbackXmin = pq_getmsgint(&reply_message, 4);
897         feedbackEpoch = pq_getmsgint(&reply_message, 4);
898
899         elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
900                  feedbackXmin,
901                  feedbackEpoch);
902
903         /* Unset WalSender's xmin if the feedback message value is invalid */
904         if (!TransactionIdIsNormal(feedbackXmin))
905         {
906                 MyPgXact->xmin = InvalidTransactionId;
907                 return;
908         }
909
910         /*
911          * Check that the provided xmin/epoch are sane, that is, not in the future
912          * and not so far back as to be already wrapped around.  Ignore if not.
913          *
914          * Epoch of nextXid should be same as standby, or if the counter has
915          * wrapped, then one greater than standby.
916          */
917         GetNextXidAndEpoch(&nextXid, &nextEpoch);
918
919         if (feedbackXmin <= nextXid)
920         {
921                 if (feedbackEpoch != nextEpoch)
922                         return;
923         }
924         else
925         {
926                 if (feedbackEpoch + 1 != nextEpoch)
927                         return;
928         }
929
930         if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
931                 return;                                 /* epoch OK, but it's wrapped around */
932
933         /*
934          * Set the WalSender's xmin equal to the standby's requested xmin, so that
935          * the xmin will be taken into account by GetOldestXmin.  This will hold
936          * back the removal of dead rows and thereby prevent the generation of
937          * cleanup conflicts on the standby server.
938          *
939          * There is a small window for a race condition here: although we just
940          * checked that feedbackXmin precedes nextXid, the nextXid could have
941          * gotten advanced between our fetching it and applying the xmin below,
942          * perhaps far enough to make feedbackXmin wrap around.  In that case the
943          * xmin we set here would be "in the future" and have no effect.  No point
944          * in worrying about this since it's too late to save the desired data
945          * anyway.      Assuming that the standby sends us an increasing sequence of
946          * xmins, this could only happen during the first reply cycle, else our
947          * own xmin would prevent nextXid from advancing so far.
948          *
949          * We don't bother taking the ProcArrayLock here.  Setting the xmin field
950          * is assumed atomic, and there's no real need to prevent a concurrent
951          * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
952          * safe, and if we're moving it backwards, well, the data is at risk
953          * already since a VACUUM could have just finished calling GetOldestXmin.)
954          */
955         MyPgXact->xmin = feedbackXmin;
956 }
957
958 /* Main loop of walsender process that streams the WAL over Copy messages. */
959 static void
960 WalSndLoop(void)
961 {
962         bool            caughtup = false;
963
964         /*
965          * Allocate buffers that will be used for each outgoing and incoming
966          * message.  We do this just once to reduce palloc overhead.
967          */
968         initStringInfo(&output_message);
969         initStringInfo(&reply_message);
970         initStringInfo(&tmpbuf);
971
972         /* Initialize the last reply timestamp */
973         last_reply_timestamp = GetCurrentTimestamp();
974         ping_sent = false;
975
976         /*
977          * Loop until we reach the end of this timeline or the client requests to
978          * stop streaming.
979          */
980         for (;;)
981         {
982                 /* Clear any already-pending wakeups */
983                 ResetLatch(&MyWalSnd->latch);
984
985                 /*
986                  * Emergency bailout if postmaster has died.  This is to avoid the
987                  * necessity for manual cleanup of all postmaster children.
988                  */
989                 if (!PostmasterIsAlive())
990                         exit(1);
991
992                 /* Process any requests or signals received recently */
993                 if (got_SIGHUP)
994                 {
995                         got_SIGHUP = false;
996                         ProcessConfigFile(PGC_SIGHUP);
997                         SyncRepInitConfig();
998                 }
999
1000                 CHECK_FOR_INTERRUPTS();
1001
1002                 /* Check for input from the client */
1003                 ProcessRepliesIfAny();
1004
1005                 /*
1006                  * If we have received CopyDone from the client, sent CopyDone
1007                  * ourselves, and the output buffer is empty, it's time to exit
1008                  * streaming.
1009                  */
1010                 if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
1011                         break;
1012
1013                 /*
1014                  * If we don't have any pending data in the output buffer, try to send
1015                  * some more.  If there is some, we don't bother to call XLogSend
1016                  * again until we've flushed it ... but we'd better assume we are not
1017                  * caught up.
1018                  */
1019                 if (!pq_is_send_pending())
1020                         XLogSend(&caughtup);
1021                 else
1022                         caughtup = false;
1023
1024                 /* Try to flush pending output to the client */
1025                 if (pq_flush_if_writable() != 0)
1026                         goto send_failure;
1027
1028                 /* If nothing remains to be sent right now ... */
1029                 if (caughtup && !pq_is_send_pending())
1030                 {
1031                         /*
1032                          * If we're in catchup state, move to streaming.  This is an
1033                          * important state change for users to know about, since before
1034                          * this point data loss might occur if the primary dies and we
1035                          * need to failover to the standby. The state change is also
1036                          * important for synchronous replication, since commits that
1037                          * started to wait at that point might wait for some time.
1038                          */
1039                         if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1040                         {
1041                                 ereport(DEBUG1,
1042                                          (errmsg("standby \"%s\" has now caught up with primary",
1043                                                          application_name)));
1044                                 WalSndSetState(WALSNDSTATE_STREAMING);
1045                         }
1046
1047                         /*
1048                          * When SIGUSR2 arrives, we send any outstanding logs up to the
1049                          * shutdown checkpoint record (i.e., the latest record), wait
1050                          * for them to be replicated to the standby, and exit.
1051                          * This may be a normal termination at shutdown, or a promotion,
1052                          * the walsender is not sure which.
1053                          */
1054                         if (walsender_ready_to_stop)
1055                         {
1056                                 /* ... let's just be real sure we're caught up ... */
1057                                 XLogSend(&caughtup);
1058                                 if (caughtup && sentPtr == MyWalSnd->flush &&
1059                                         !pq_is_send_pending())
1060                                 {
1061                                         /* Inform the standby that XLOG streaming is done */
1062                                         EndCommand("COPY 0", DestRemote);
1063                                         pq_flush();
1064
1065                                         proc_exit(0);
1066                                 }
1067                         }
1068                 }
1069
1070                 /*
1071                  * We don't block if not caught up, unless there is unsent data
1072                  * pending in which case we'd better block until the socket is
1073                  * write-ready.  This test is only needed for the case where XLogSend
1074                  * loaded a subset of the available data but then pq_flush_if_writable
1075                  * flushed it all --- we should immediately try to send more.
1076                  */
1077                 if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
1078                 {
1079                         TimestampTz timeout = 0;
1080                         long            sleeptime = 10000;              /* 10 s */
1081                         int                     wakeEvents;
1082
1083                         wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
1084                                 WL_SOCKET_READABLE;
1085
1086                         if (pq_is_send_pending())
1087                                 wakeEvents |= WL_SOCKET_WRITEABLE;
1088                         else if (wal_sender_timeout > 0 && !ping_sent)
1089                         {
1090                                 /*
1091                                  * If half of wal_sender_timeout has lapsed without receiving
1092                                  * any reply from standby, send a keep-alive message to
1093                                  * standby requesting an immediate reply.
1094                                  */
1095                                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1096                                                                                                           wal_sender_timeout / 2);
1097                                 if (GetCurrentTimestamp() >= timeout)
1098                                 {
1099                                         WalSndKeepalive(true);
1100                                         ping_sent = true;
1101                                         /* Try to flush pending output to the client */
1102                                         if (pq_flush_if_writable() != 0)
1103                                                 break;
1104                                 }
1105                         }
1106
1107                         /* Determine time until replication timeout */
1108                         if (wal_sender_timeout > 0)
1109                         {
1110                                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1111                                                                                                           wal_sender_timeout);
1112                                 sleeptime = 1 + (wal_sender_timeout / 10);
1113                         }
1114
1115                         /* Sleep until something happens or we time out */
1116                         ImmediateInterruptOK = true;
1117                         CHECK_FOR_INTERRUPTS();
1118                         WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
1119                                                           MyProcPort->sock, sleeptime);
1120                         ImmediateInterruptOK = false;
1121
1122                         /*
1123                          * Check for replication timeout.  Note we ignore the corner case
1124                          * possibility that the client replied just as we reached the
1125                          * timeout ... he's supposed to reply *before* that.
1126                          */
1127                         if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
1128                         {
1129                                 /*
1130                                  * Since typically expiration of replication timeout means
1131                                  * communication problem, we don't send the error message to
1132                                  * the standby.
1133                                  */
1134                                 ereport(COMMERROR,
1135                                                 (errmsg("terminating walsender process due to replication timeout")));
1136                                 goto send_failure;
1137                         }
1138                 }
1139         }
1140         return;
1141
1142 send_failure:
1143
1144         /*
1145          * Get here on send failure.  Clean up and exit.
1146          *
1147          * Reset whereToSendOutput to prevent ereport from attempting to send any
1148          * more messages to the standby.
1149          */
1150         if (whereToSendOutput == DestRemote)
1151                 whereToSendOutput = DestNone;
1152
1153         proc_exit(0);
1154         abort();                                        /* keep the compiler quiet */
1155 }
1156
1157 /* Initialize a per-walsender data structure for this walsender process */
1158 static void
1159 InitWalSenderSlot(void)
1160 {
1161         int                     i;
1162
1163         /*
1164          * WalSndCtl should be set up already (we inherit this by fork() or
1165          * EXEC_BACKEND mechanism from the postmaster).
1166          */
1167         Assert(WalSndCtl != NULL);
1168         Assert(MyWalSnd == NULL);
1169
1170         /*
1171          * Find a free walsender slot and reserve it. If this fails, we must be
1172          * out of WalSnd structures.
1173          */
1174         for (i = 0; i < max_wal_senders; i++)
1175         {
1176                 /* use volatile pointer to prevent code rearrangement */
1177                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1178
1179                 SpinLockAcquire(&walsnd->mutex);
1180
1181                 if (walsnd->pid != 0)
1182                 {
1183                         SpinLockRelease(&walsnd->mutex);
1184                         continue;
1185                 }
1186                 else
1187                 {
1188                         /*
1189                          * Found a free slot. Reserve it for us.
1190                          */
1191                         walsnd->pid = MyProcPid;
1192                         walsnd->sentPtr = InvalidXLogRecPtr;
1193                         walsnd->state = WALSNDSTATE_STARTUP;
1194                         SpinLockRelease(&walsnd->mutex);
1195                         /* don't need the lock anymore */
1196                         OwnLatch((Latch *) &walsnd->latch);
1197                         MyWalSnd = (WalSnd *) walsnd;
1198
1199                         break;
1200                 }
1201         }
1202         if (MyWalSnd == NULL)
1203                 ereport(FATAL,
1204                                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
1205                                  errmsg("number of requested standby connections "
1206                                                 "exceeds max_wal_senders (currently %d)",
1207                                                 max_wal_senders)));
1208
1209         /* Arrange to clean up at walsender exit */
1210         on_shmem_exit(WalSndKill, 0);
1211 }
1212
1213 /* Destroy the per-walsender data structure for this walsender process */
1214 static void
1215 WalSndKill(int code, Datum arg)
1216 {
1217         Assert(MyWalSnd != NULL);
1218
1219         /*
1220          * Mark WalSnd struct no longer in use. Assume that no lock is required
1221          * for this.
1222          */
1223         MyWalSnd->pid = 0;
1224         DisownLatch(&MyWalSnd->latch);
1225
1226         /* WalSnd struct isn't mine anymore */
1227         MyWalSnd = NULL;
1228 }
1229
1230 /*
1231  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
1232  *
1233  * XXX probably this should be improved to suck data directly from the
1234  * WAL buffers when possible.
1235  *
1236  * Will open, and keep open, one WAL segment stored in the global file
1237  * descriptor sendFile. This means if XLogRead is used once, there will
1238  * always be one descriptor left open until the process ends, but never
1239  * more than one.
1240  */
1241 static void
1242 XLogRead(char *buf, XLogRecPtr startptr, Size count)
1243 {
1244         char       *p;
1245         XLogRecPtr      recptr;
1246         Size            nbytes;
1247         XLogSegNo       segno;
1248
1249 retry:
1250         p = buf;
1251         recptr = startptr;
1252         nbytes = count;
1253
1254         while (nbytes > 0)
1255         {
1256                 uint32          startoff;
1257                 int                     segbytes;
1258                 int                     readbytes;
1259
1260                 startoff = recptr % XLogSegSize;
1261
1262                 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
1263                 {
1264                         char            path[MAXPGPATH];
1265
1266                         /* Switch to another logfile segment */
1267                         if (sendFile >= 0)
1268                                 close(sendFile);
1269
1270                         XLByteToSeg(recptr, sendSegNo);
1271
1272                         /*-------
1273                          * When reading from a historic timeline, and there is a timeline
1274                          * switch within this segment, read from the WAL segment belonging
1275                          * to the new timeline.
1276                          *
1277                          * For example, imagine that this server is currently on timeline
1278                          * 5, and we're streaming timeline 4. The switch from timeline 4
1279                          * to 5 happened at 0/13002088. In pg_xlog, we have these files:
1280                          *
1281                          * ...
1282                          * 000000040000000000000012
1283                          * 000000040000000000000013
1284                          * 000000050000000000000013
1285                          * 000000050000000000000014
1286                          * ...
1287                          *
1288                          * In this situation, when requested to send the WAL from
1289                          * segment 0x13, on timeline 4, we read the WAL from file
1290                          * 000000050000000000000013. Archive recovery prefers files from
1291                          * newer timelines, so if the segment was restored from the
1292                          * archive on this server, the file belonging to the old timeline,
1293                          * 000000040000000000000013, might not exist. Their contents are
1294                          * equal up to the switchpoint, because at a timeline switch, the
1295                          * used portion of the old segment is copied to the new file.
1296                          *-------
1297                          */
1298                         curFileTimeLine = sendTimeLine;
1299                         if (sendTimeLineIsHistoric)
1300                         {
1301                                 XLogSegNo       endSegNo;
1302
1303                                 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
1304                                 if (sendSegNo == endSegNo)
1305                                         curFileTimeLine = sendTimeLineNextTLI;
1306                         }
1307
1308                         XLogFilePath(path, curFileTimeLine, sendSegNo);
1309
1310                         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1311                         if (sendFile < 0)
1312                         {
1313                                 /*
1314                                  * If the file is not found, assume it's because the standby
1315                                  * asked for a too old WAL segment that has already been
1316                                  * removed or recycled.
1317                                  */
1318                                 if (errno == ENOENT)
1319                                         ereport(ERROR,
1320                                                         (errcode_for_file_access(),
1321                                                          errmsg("requested WAL segment %s has already been removed",
1322                                                                 XLogFileNameP(curFileTimeLine, sendSegNo))));
1323                                 else
1324                                         ereport(ERROR,
1325                                                         (errcode_for_file_access(),
1326                                                          errmsg("could not open file \"%s\": %m",
1327                                                                         path)));
1328                         }
1329                         sendOff = 0;
1330                 }
1331
1332                 /* Need to seek in the file? */
1333                 if (sendOff != startoff)
1334                 {
1335                         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
1336                                 ereport(ERROR,
1337                                                 (errcode_for_file_access(),
1338                                   errmsg("could not seek in log segment %s to offset %u: %m",
1339                                                  XLogFileNameP(curFileTimeLine, sendSegNo),
1340                                                  startoff)));
1341                         sendOff = startoff;
1342                 }
1343
1344                 /* How many bytes are within this segment? */
1345                 if (nbytes > (XLogSegSize - startoff))
1346                         segbytes = XLogSegSize - startoff;
1347                 else
1348                         segbytes = nbytes;
1349
1350                 readbytes = read(sendFile, p, segbytes);
1351                 if (readbytes <= 0)
1352                 {
1353                         ereport(ERROR,
1354                                         (errcode_for_file_access(),
1355                                          errmsg("could not read from log segment %s, offset %u, length %lu: %m",
1356                                                         XLogFileNameP(curFileTimeLine, sendSegNo),
1357                                                         sendOff, (unsigned long) segbytes)));
1358                 }
1359
1360                 /* Update state for read */
1361                 recptr += readbytes;
1362
1363                 sendOff += readbytes;
1364                 nbytes -= readbytes;
1365                 p += readbytes;
1366         }
1367
1368         /*
1369          * After reading into the buffer, check that what we read was valid. We do
1370          * this after reading, because even though the segment was present when we
1371          * opened it, it might get recycled or removed while we read it. The
1372          * read() succeeds in that case, but the data we tried to read might
1373          * already have been overwritten with new WAL records.
1374          */
1375         XLByteToSeg(startptr, segno);
1376         CheckXLogRemoved(segno, ThisTimeLineID);
1377
1378         /*
1379          * During recovery, the currently-open WAL file might be replaced with the
1380          * file of the same name retrieved from archive. So we always need to
1381          * check what we read was valid after reading into the buffer. If it's
1382          * invalid, we try to open and read the file again.
1383          */
1384         if (am_cascading_walsender)
1385         {
1386                 /* use volatile pointer to prevent code rearrangement */
1387                 volatile WalSnd *walsnd = MyWalSnd;
1388                 bool            reload;
1389
1390                 SpinLockAcquire(&walsnd->mutex);
1391                 reload = walsnd->needreload;
1392                 walsnd->needreload = false;
1393                 SpinLockRelease(&walsnd->mutex);
1394
1395                 if (reload && sendFile >= 0)
1396                 {
1397                         close(sendFile);
1398                         sendFile = -1;
1399
1400                         goto retry;
1401                 }
1402         }
1403 }
1404
1405 /*
1406  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
1407  * but not yet sent to the client, and buffer it in the libpq output
1408  * buffer.
1409  *
1410  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
1411  * *caughtup is set to false.
1412  */
1413 static void
1414 XLogSend(bool *caughtup)
1415 {
1416         XLogRecPtr      SendRqstPtr;
1417         XLogRecPtr      startptr;
1418         XLogRecPtr      endptr;
1419         Size            nbytes;
1420
1421         if (streamingDoneSending)
1422         {
1423                 *caughtup = true;
1424                 return;
1425         }
1426
1427         /* Figure out how far we can safely send the WAL. */
1428         if (sendTimeLineIsHistoric)
1429         {
1430                 /*
1431                  * Streaming an old timeline timeline that's in this server's history,
1432                  * but is not the one we're currently inserting or replaying. It can
1433                  * be streamed up to the point where we switched off that timeline.
1434                  */
1435                 SendRqstPtr = sendTimeLineValidUpto;
1436         }
1437         else if (am_cascading_walsender)
1438         {
1439                 /*
1440                  * Streaming the latest timeline on a standby.
1441                  *
1442                  * Attempt to send all WAL that has already been replayed, so that we
1443                  * know it's valid. If we're receiving WAL through streaming
1444                  * replication, it's also OK to send any WAL that has been received
1445                  * but not replayed.
1446                  *
1447                  * The timeline we're recovering from can change, or we can be
1448                  * promoted. In either case, the current timeline becomes historic. We
1449                  * need to detect that so that we don't try to stream past the point
1450                  * where we switched to another timeline. We check for promotion or
1451                  * timeline switch after calculating FlushPtr, to avoid a race
1452                  * condition: if the timeline becomes historic just after we checked
1453                  * that it was still current, it's still be OK to stream it up to the
1454                  * FlushPtr that was calculated before it became historic.
1455                  */
1456                 bool            becameHistoric = false;
1457
1458                 SendRqstPtr = GetStandbyFlushRecPtr();
1459
1460                 if (!RecoveryInProgress())
1461                 {
1462                         /*
1463                          * We have been promoted. RecoveryInProgress() updated
1464                          * ThisTimeLineID to the new current timeline.
1465                          */
1466                         am_cascading_walsender = false;
1467                         becameHistoric = true;
1468                 }
1469                 else
1470                 {
1471                         /*
1472                          * Still a cascading standby. But is the timeline we're sending
1473                          * still the one recovery is recovering from? ThisTimeLineID was
1474                          * updated by the GetStandbyFlushRecPtr() call above.
1475                          */
1476                         if (sendTimeLine != ThisTimeLineID)
1477                                 becameHistoric = true;
1478                 }
1479
1480                 if (becameHistoric)
1481                 {
1482                         /*
1483                          * The timeline we were sending has become historic. Read the
1484                          * timeline history file of the new timeline to see where exactly
1485                          * we forked off from the timeline we were sending.
1486                          */
1487                         List       *history;
1488
1489                         history = readTimeLineHistory(ThisTimeLineID);
1490                         sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
1491
1492                         Assert(sendTimeLine < sendTimeLineNextTLI);
1493                         list_free_deep(history);
1494
1495                         sendTimeLineIsHistoric = true;
1496
1497                         SendRqstPtr = sendTimeLineValidUpto;
1498                 }
1499         }
1500         else
1501         {
1502                 /*
1503                  * Streaming the current timeline on a master.
1504                  *
1505                  * Attempt to send all data that's already been written out and
1506                  * fsync'd to disk.  We cannot go further than what's been written out
1507                  * given the current implementation of XLogRead().      And in any case
1508                  * it's unsafe to send WAL that is not securely down to disk on the
1509                  * master: if the master subsequently crashes and restarts, slaves
1510                  * must not have applied any WAL that gets lost on the master.
1511                  */
1512                 SendRqstPtr = GetFlushRecPtr();
1513         }
1514
1515         /*
1516          * If this is a historic timeline and we've reached the point where we
1517          * forked to the next timeline, stop streaming.
1518          *
1519          * Note: We might already have sent WAL > sendTimeLineValidUpto. The
1520          * startup process will normally replay all WAL that has been received
1521          * from the master, before promoting, but if the WAL streaming is
1522          * terminated at a WAL page boundary, the valid portion of the timeline
1523          * might end in the middle of a WAL record. We might've already sent the
1524          * first half of that partial WAL record to the cascading standby, so that
1525          * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
1526          * replay the partial WAL record either, so it can still follow our
1527          * timeline switch.
1528          */
1529         if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
1530         {
1531                 /* close the current file. */
1532                 if (sendFile >= 0)
1533                         close(sendFile);
1534                 sendFile = -1;
1535
1536                 /* Send CopyDone */
1537                 pq_putmessage_noblock('c', NULL, 0);
1538                 streamingDoneSending = true;
1539
1540                 *caughtup = true;
1541
1542                 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
1543                          (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
1544                          (uint32) (sentPtr >> 32), (uint32) sentPtr);
1545                 return;
1546         }
1547
1548         /* Do we have any work to do? */
1549         Assert(sentPtr <= SendRqstPtr);
1550         if (SendRqstPtr <= sentPtr)
1551         {
1552                 *caughtup = true;
1553                 return;
1554         }
1555
1556         /*
1557          * Figure out how much to send in one message. If there's no more than
1558          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
1559          * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
1560          *
1561          * The rounding is not only for performance reasons. Walreceiver relies on
1562          * the fact that we never split a WAL record across two messages. Since a
1563          * long WAL record is split at page boundary into continuation records,
1564          * page boundary is always a safe cut-off point. We also assume that
1565          * SendRqstPtr never points to the middle of a WAL record.
1566          */
1567         startptr = sentPtr;
1568         endptr = startptr;
1569         endptr += MAX_SEND_SIZE;
1570
1571         /* if we went beyond SendRqstPtr, back off */
1572         if (SendRqstPtr <= endptr)
1573         {
1574                 endptr = SendRqstPtr;
1575                 if (sendTimeLineIsHistoric)
1576                         *caughtup = false;
1577                 else
1578                         *caughtup = true;
1579         }
1580         else
1581         {
1582                 /* round down to page boundary. */
1583                 endptr -= (endptr % XLOG_BLCKSZ);
1584                 *caughtup = false;
1585         }
1586
1587         nbytes = endptr - startptr;
1588         Assert(nbytes <= MAX_SEND_SIZE);
1589
1590         /*
1591          * OK to read and send the slice.
1592          */
1593         resetStringInfo(&output_message);
1594         pq_sendbyte(&output_message, 'w');
1595
1596         pq_sendint64(&output_message, startptr);        /* dataStart */
1597         pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
1598         pq_sendint64(&output_message, 0);       /* sendtime, filled in last */
1599
1600         /*
1601          * Read the log directly into the output buffer to avoid extra memcpy
1602          * calls.
1603          */
1604         enlargeStringInfo(&output_message, nbytes);
1605         XLogRead(&output_message.data[output_message.len], startptr, nbytes);
1606         output_message.len += nbytes;
1607         output_message.data[output_message.len] = '\0';
1608
1609         /*
1610          * Fill the send timestamp last, so that it is taken as late as possible.
1611          */
1612         resetStringInfo(&tmpbuf);
1613         pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1614         memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
1615                    tmpbuf.data, sizeof(int64));
1616
1617         pq_putmessage_noblock('d', output_message.data, output_message.len);
1618
1619         sentPtr = endptr;
1620
1621         /* Update shared memory status */
1622         {
1623                 /* use volatile pointer to prevent code rearrangement */
1624                 volatile WalSnd *walsnd = MyWalSnd;
1625
1626                 SpinLockAcquire(&walsnd->mutex);
1627                 walsnd->sentPtr = sentPtr;
1628                 SpinLockRelease(&walsnd->mutex);
1629         }
1630
1631         /* Report progress of XLOG streaming in PS display */
1632         if (update_process_title)
1633         {
1634                 char            activitymsg[50];
1635
1636                 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1637                                  (uint32) (sentPtr >> 32), (uint32) sentPtr);
1638                 set_ps_display(activitymsg, false);
1639         }
1640
1641         return;
1642 }
1643
1644 /*
1645  * Returns the latest point in WAL that has been safely flushed to disk, and
1646  * can be sent to the standby. This should only be called when in recovery,
1647  * ie. we're streaming to a cascaded standby.
1648  *
1649  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
1650  * replayed WAL record.
1651  */
1652 static XLogRecPtr
1653 GetStandbyFlushRecPtr(void)
1654 {
1655         XLogRecPtr      replayPtr;
1656         TimeLineID      replayTLI;
1657         XLogRecPtr      receivePtr;
1658         TimeLineID      receiveTLI;
1659         XLogRecPtr      result;
1660
1661         /*
1662          * We can safely send what's already been replayed. Also, if walreceiver
1663          * is streaming WAL from the same timeline, we can send anything that it
1664          * has streamed, but hasn't been replayed yet.
1665          */
1666
1667         receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
1668         replayPtr = GetXLogReplayRecPtr(&replayTLI);
1669
1670         ThisTimeLineID = replayTLI;
1671
1672         result = replayPtr;
1673         if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
1674                 result = receivePtr;
1675
1676         return result;
1677 }
1678
1679 /*
1680  * Request walsenders to reload the currently-open WAL file
1681  */
1682 void
1683 WalSndRqstFileReload(void)
1684 {
1685         int                     i;
1686
1687         for (i = 0; i < max_wal_senders; i++)
1688         {
1689                 /* use volatile pointer to prevent code rearrangement */
1690                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1691
1692                 if (walsnd->pid == 0)
1693                         continue;
1694
1695                 SpinLockAcquire(&walsnd->mutex);
1696                 walsnd->needreload = true;
1697                 SpinLockRelease(&walsnd->mutex);
1698         }
1699 }
1700
1701 /* SIGHUP: set flag to re-read config file at next convenient time */
1702 static void
1703 WalSndSigHupHandler(SIGNAL_ARGS)
1704 {
1705         int                     save_errno = errno;
1706
1707         got_SIGHUP = true;
1708         if (MyWalSnd)
1709                 SetLatch(&MyWalSnd->latch);
1710
1711         errno = save_errno;
1712 }
1713
1714 /* SIGUSR1: set flag to send WAL records */
1715 static void
1716 WalSndXLogSendHandler(SIGNAL_ARGS)
1717 {
1718         int                     save_errno = errno;
1719
1720         latch_sigusr1_handler();
1721
1722         errno = save_errno;
1723 }
1724
1725 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1726 static void
1727 WalSndLastCycleHandler(SIGNAL_ARGS)
1728 {
1729         int                     save_errno = errno;
1730
1731         /*
1732          * If replication has not yet started, die like with SIGTERM. If
1733          * replication is active, only set a flag and wake up the main loop. It
1734          * will send any outstanding WAL, wait for it to be replicated to
1735          * the standby, and then exit gracefully.
1736          */
1737         if (!replication_active)
1738                 kill(MyProcPid, SIGTERM);
1739
1740         walsender_ready_to_stop = true;
1741         if (MyWalSnd)
1742                 SetLatch(&MyWalSnd->latch);
1743
1744         errno = save_errno;
1745 }
1746
1747 /* Set up signal handlers */
1748 void
1749 WalSndSignals(void)
1750 {
1751         /* Set up signal handlers */
1752         pqsignal(SIGHUP, WalSndSigHupHandler);          /* set flag to read config
1753                                                                                                  * file */
1754         pqsignal(SIGINT, SIG_IGN);      /* not used */
1755         pqsignal(SIGTERM, die);         /* request shutdown */
1756         pqsignal(SIGQUIT, quickdie);    /* hard crash time */
1757         InitializeTimeouts();           /* establishes SIGALRM handler */
1758         pqsignal(SIGPIPE, SIG_IGN);
1759         pqsignal(SIGUSR1, WalSndXLogSendHandler);       /* request WAL sending */
1760         pqsignal(SIGUSR2, WalSndLastCycleHandler);      /* request a last cycle and
1761                                                                                                  * shutdown */
1762
1763         /* Reset some signals that are accepted by postmaster but not here */
1764         pqsignal(SIGCHLD, SIG_DFL);
1765         pqsignal(SIGTTIN, SIG_DFL);
1766         pqsignal(SIGTTOU, SIG_DFL);
1767         pqsignal(SIGCONT, SIG_DFL);
1768         pqsignal(SIGWINCH, SIG_DFL);
1769 }
1770
1771 /* Report shared-memory space needed by WalSndShmemInit */
1772 Size
1773 WalSndShmemSize(void)
1774 {
1775         Size            size = 0;
1776
1777         size = offsetof(WalSndCtlData, walsnds);
1778         size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1779
1780         return size;
1781 }
1782
1783 /* Allocate and initialize walsender-related shared memory */
1784 void
1785 WalSndShmemInit(void)
1786 {
1787         bool            found;
1788         int                     i;
1789
1790         WalSndCtl = (WalSndCtlData *)
1791                 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1792
1793         if (!found)
1794         {
1795                 /* First time through, so initialize */
1796                 MemSet(WalSndCtl, 0, WalSndShmemSize());
1797
1798                 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
1799                         SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
1800
1801                 for (i = 0; i < max_wal_senders; i++)
1802                 {
1803                         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
1804
1805                         SpinLockInit(&walsnd->mutex);
1806                         InitSharedLatch(&walsnd->latch);
1807                 }
1808         }
1809 }
1810
1811 /*
1812  * Wake up all walsenders
1813  *
1814  * This will be called inside critical sections, so throwing an error is not
1815  * adviseable.
1816  */
1817 void
1818 WalSndWakeup(void)
1819 {
1820         int                     i;
1821
1822         for (i = 0; i < max_wal_senders; i++)
1823                 SetLatch(&WalSndCtl->walsnds[i].latch);
1824 }
1825
1826 /* Set state for current walsender (only called in walsender) */
1827 void
1828 WalSndSetState(WalSndState state)
1829 {
1830         /* use volatile pointer to prevent code rearrangement */
1831         volatile WalSnd *walsnd = MyWalSnd;
1832
1833         Assert(am_walsender);
1834
1835         if (walsnd->state == state)
1836                 return;
1837
1838         SpinLockAcquire(&walsnd->mutex);
1839         walsnd->state = state;
1840         SpinLockRelease(&walsnd->mutex);
1841 }
1842
1843 /*
1844  * Return a string constant representing the state. This is used
1845  * in system views, and should *not* be translated.
1846  */
1847 static const char *
1848 WalSndGetStateString(WalSndState state)
1849 {
1850         switch (state)
1851         {
1852                 case WALSNDSTATE_STARTUP:
1853                         return "startup";
1854                 case WALSNDSTATE_BACKUP:
1855                         return "backup";
1856                 case WALSNDSTATE_CATCHUP:
1857                         return "catchup";
1858                 case WALSNDSTATE_STREAMING:
1859                         return "streaming";
1860         }
1861         return "UNKNOWN";
1862 }
1863
1864
1865 /*
1866  * Returns activity of walsenders, including pids and xlog locations sent to
1867  * standby servers.
1868  */
1869 Datum
1870 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1871 {
1872 #define PG_STAT_GET_WAL_SENDERS_COLS    8
1873         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1874         TupleDesc       tupdesc;
1875         Tuplestorestate *tupstore;
1876         MemoryContext per_query_ctx;
1877         MemoryContext oldcontext;
1878         int                *sync_priority;
1879         int                     priority = 0;
1880         int                     sync_standby = -1;
1881         int                     i;
1882
1883         /* check to see if caller supports us returning a tuplestore */
1884         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1885                 ereport(ERROR,
1886                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1887                                  errmsg("set-valued function called in context that cannot accept a set")));
1888         if (!(rsinfo->allowedModes & SFRM_Materialize))
1889                 ereport(ERROR,
1890                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1891                                  errmsg("materialize mode required, but it is not " \
1892                                                 "allowed in this context")));
1893
1894         /* Build a tuple descriptor for our result type */
1895         if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1896                 elog(ERROR, "return type must be a row type");
1897
1898         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1899         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1900
1901         tupstore = tuplestore_begin_heap(true, false, work_mem);
1902         rsinfo->returnMode = SFRM_Materialize;
1903         rsinfo->setResult = tupstore;
1904         rsinfo->setDesc = tupdesc;
1905
1906         MemoryContextSwitchTo(oldcontext);
1907
1908         /*
1909          * Get the priorities of sync standbys all in one go, to minimise lock
1910          * acquisitions and to allow us to evaluate who is the current sync
1911          * standby. This code must match the code in SyncRepReleaseWaiters().
1912          */
1913         sync_priority = palloc(sizeof(int) * max_wal_senders);
1914         LWLockAcquire(SyncRepLock, LW_SHARED);
1915         for (i = 0; i < max_wal_senders; i++)
1916         {
1917                 /* use volatile pointer to prevent code rearrangement */
1918                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1919
1920                 if (walsnd->pid != 0)
1921                 {
1922                         /*
1923                          * Treat a standby such as a pg_basebackup background process
1924                          * which always returns an invalid flush location, as an
1925                          * asynchronous standby.
1926                          */
1927                         sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
1928                                 0 : walsnd->sync_standby_priority;
1929
1930                         if (walsnd->state == WALSNDSTATE_STREAMING &&
1931                                 walsnd->sync_standby_priority > 0 &&
1932                                 (priority == 0 ||
1933                                  priority > walsnd->sync_standby_priority) &&
1934                                 !XLogRecPtrIsInvalid(walsnd->flush))
1935                         {
1936                                 priority = walsnd->sync_standby_priority;
1937                                 sync_standby = i;
1938                         }
1939                 }
1940         }
1941         LWLockRelease(SyncRepLock);
1942
1943         for (i = 0; i < max_wal_senders; i++)
1944         {
1945                 /* use volatile pointer to prevent code rearrangement */
1946                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1947                 char            location[MAXFNAMELEN];
1948                 XLogRecPtr      sentPtr;
1949                 XLogRecPtr      write;
1950                 XLogRecPtr      flush;
1951                 XLogRecPtr      apply;
1952                 WalSndState state;
1953                 Datum           values[PG_STAT_GET_WAL_SENDERS_COLS];
1954                 bool            nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1955
1956                 if (walsnd->pid == 0)
1957                         continue;
1958
1959                 SpinLockAcquire(&walsnd->mutex);
1960                 sentPtr = walsnd->sentPtr;
1961                 state = walsnd->state;
1962                 write = walsnd->write;
1963                 flush = walsnd->flush;
1964                 apply = walsnd->apply;
1965                 SpinLockRelease(&walsnd->mutex);
1966
1967                 memset(nulls, 0, sizeof(nulls));
1968                 values[0] = Int32GetDatum(walsnd->pid);
1969
1970                 if (!superuser())
1971                 {
1972                         /*
1973                          * Only superusers can see details. Other users only get the pid
1974                          * value to know it's a walsender, but no details.
1975                          */
1976                         MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
1977                 }
1978                 else
1979                 {
1980                         values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1981
1982                         snprintf(location, sizeof(location), "%X/%X",
1983                                          (uint32) (sentPtr >> 32), (uint32) sentPtr);
1984                         values[2] = CStringGetTextDatum(location);
1985
1986                         if (write == 0)
1987                                 nulls[3] = true;
1988                         snprintf(location, sizeof(location), "%X/%X",
1989                                          (uint32) (write >> 32), (uint32) write);
1990                         values[3] = CStringGetTextDatum(location);
1991
1992                         if (flush == 0)
1993                                 nulls[4] = true;
1994                         snprintf(location, sizeof(location), "%X/%X",
1995                                          (uint32) (flush >> 32), (uint32) flush);
1996                         values[4] = CStringGetTextDatum(location);
1997
1998                         if (apply == 0)
1999                                 nulls[5] = true;
2000                         snprintf(location, sizeof(location), "%X/%X",
2001                                          (uint32) (apply >> 32), (uint32) apply);
2002                         values[5] = CStringGetTextDatum(location);
2003
2004                         values[6] = Int32GetDatum(sync_priority[i]);
2005
2006                         /*
2007                          * More easily understood version of standby state. This is purely
2008                          * informational, not different from priority.
2009                          */
2010                         if (sync_priority[i] == 0)
2011                                 values[7] = CStringGetTextDatum("async");
2012                         else if (i == sync_standby)
2013                                 values[7] = CStringGetTextDatum("sync");
2014                         else
2015                                 values[7] = CStringGetTextDatum("potential");
2016                 }
2017
2018                 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
2019         }
2020         pfree(sync_priority);
2021
2022         /* clean up and return the tuplestore */
2023         tuplestore_donestoring(tupstore);
2024
2025         return (Datum) 0;
2026 }
2027
2028 /*
2029   * This function is used to send keepalive message to standby.
2030   * If requestReply is set, sets a flag in the message requesting the standby
2031   * to send a message back to us, for heartbeat purposes.
2032   */
2033 static void
2034 WalSndKeepalive(bool requestReply)
2035 {
2036         elog(DEBUG2, "sending replication keepalive");
2037
2038         /* construct the message... */
2039         resetStringInfo(&output_message);
2040         pq_sendbyte(&output_message, 'k');
2041         pq_sendint64(&output_message, sentPtr);
2042         pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
2043         pq_sendbyte(&output_message, requestReply ? 1 : 0);
2044
2045         /* ... and send it wrapped in CopyData */
2046         pq_putmessage_noblock('d', output_message.data, output_message.len);
2047 }
2048
2049 /*
2050  * This isn't currently used for anything. Monitoring tools might be
2051  * interested in the future, and we'll need something like this in the
2052  * future for synchronous replication.
2053  */
2054 #ifdef NOT_USED
2055 /*
2056  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
2057  * if none.
2058  */
2059 XLogRecPtr
2060 GetOldestWALSendPointer(void)
2061 {
2062         XLogRecPtr      oldest = {0, 0};
2063         int                     i;
2064         bool            found = false;
2065
2066         for (i = 0; i < max_wal_senders; i++)
2067         {
2068                 /* use volatile pointer to prevent code rearrangement */
2069                 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
2070                 XLogRecPtr      recptr;
2071
2072                 if (walsnd->pid == 0)
2073                         continue;
2074
2075                 SpinLockAcquire(&walsnd->mutex);
2076                 recptr = walsnd->sentPtr;
2077                 SpinLockRelease(&walsnd->mutex);
2078
2079                 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
2080                         continue;
2081
2082                 if (!found || recptr < oldest)
2083                         oldest = recptr;
2084                 found = true;
2085         }
2086         return oldest;
2087 }
2088
2089 #endif