1 /*-------------------------------------------------------------------------
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until the either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
27 * If the server is shut down, postmaster sends us SIGUSR2 after all
28 * regular backends have exited and the shutdown checkpoint has been written.
29 * This instruct walsender to send any outstanding WAL, including the
30 * shutdown checkpoint record, and then exit.
33 * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
36 * src/backend/replication/walsender.c
38 *-------------------------------------------------------------------------
45 #include "access/timeline.h"
46 #include "access/transam.h"
47 #include "access/xlog_internal.h"
48 #include "catalog/pg_type.h"
50 #include "libpq/libpq.h"
51 #include "libpq/pqformat.h"
52 #include "libpq/pqsignal.h"
53 #include "miscadmin.h"
54 #include "nodes/replnodes.h"
55 #include "replication/basebackup.h"
56 #include "replication/syncrep.h"
57 #include "replication/walreceiver.h"
58 #include "replication/walsender.h"
59 #include "replication/walsender_private.h"
60 #include "storage/fd.h"
61 #include "storage/ipc.h"
62 #include "storage/pmsignal.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/memutils.h"
69 #include "utils/ps_status.h"
70 #include "utils/resowner.h"
71 #include "utils/timeout.h"
72 #include "utils/timestamp.h"
75 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
77 * We don't have a good idea of what a good value would be; there's some
78 * overhead per message in both walsender and walreceiver, but on the other
79 * hand sending large batches makes walsender less responsive to signals
80 * because signals are checked only between messages. 128kB (with
81 * default 8k blocks) seems like a reasonable guess for now.
83 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
85 /* Array of WalSnds in shared memory */
86 WalSndCtlData *WalSndCtl = NULL;
88 /* My slot in the shared memory array */
89 WalSnd *MyWalSnd = NULL;
92 bool am_walsender = false; /* Am I a walsender process ? */
93 bool am_cascading_walsender = false; /* Am I cascading WAL to
94 * another standby ? */
96 /* User-settable parameters for walsender */
97 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
98 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
101 * State for WalSndWakeupRequest
103 bool wake_wal_senders = false;
106 * These variables are used similarly to openLogFile/Id/Seg/Off,
107 * but for walsender to read the XLOG.
109 static int sendFile = -1;
110 static XLogSegNo sendSegNo = 0;
111 static uint32 sendOff = 0;
113 /* Timeline ID of the currently open file */
114 static TimeLineID curFileTimeLine = 0;
117 * These variables keep track of the state of the timeline we're currently
118 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
119 * the timeline is not the latest timeline on this server, and the server's
120 * history forked off from that timeline at sendTimeLineValidUpto.
122 static TimeLineID sendTimeLine = 0;
123 static TimeLineID sendTimeLineNextTLI = 0;
124 static bool sendTimeLineIsHistoric = false;
125 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
128 * How far have we sent WAL already? This is also advertised in
129 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
131 static XLogRecPtr sentPtr = 0;
133 /* Buffers for constructing outgoing messages and processing reply messages. */
134 static StringInfoData output_message;
135 static StringInfoData reply_message;
136 static StringInfoData tmpbuf;
139 * Timestamp of the last receipt of the reply from the standby.
141 static TimestampTz last_reply_timestamp;
142 /* Have we sent a heartbeat message asking for reply, since last reply? */
143 static bool ping_sent = false;
146 * While streaming WAL in Copy mode, streamingDoneSending is set to true
147 * after we have sent CopyDone. We should not send any more CopyData messages
148 * after that. streamingDoneReceiving is set to true when we receive CopyDone
149 * from the other end. When both become true, it's time to exit Copy mode.
151 static bool streamingDoneSending;
152 static bool streamingDoneReceiving;
154 /* Flags set by signal handlers for later service in main loop */
155 static volatile sig_atomic_t got_SIGHUP = false;
156 static volatile sig_atomic_t walsender_ready_to_stop = false;
159 * This is set while we are streaming. When not set, SIGUSR2 signal will be
160 * handled like SIGTERM. When set, the main loop is responsible for checking
161 * walsender_ready_to_stop and terminating when it's set (after streaming any
164 static volatile sig_atomic_t replication_active = false;
166 /* Signal handlers */
167 static void WalSndSigHupHandler(SIGNAL_ARGS);
168 static void WalSndXLogSendHandler(SIGNAL_ARGS);
169 static void WalSndLastCycleHandler(SIGNAL_ARGS);
171 /* Prototypes for private functions */
172 static void WalSndLoop(void);
173 static void InitWalSenderSlot(void);
174 static void WalSndKill(int code, Datum arg);
175 static void XLogSend(bool *caughtup);
176 static XLogRecPtr GetStandbyFlushRecPtr(void);
177 static void IdentifySystem(void);
178 static void StartReplication(StartReplicationCmd *cmd);
179 static void ProcessStandbyMessage(void);
180 static void ProcessStandbyReplyMessage(void);
181 static void ProcessStandbyHSFeedbackMessage(void);
182 static void ProcessRepliesIfAny(void);
183 static void WalSndKeepalive(bool requestReply);
186 /* Initialize walsender process before entering the main command loop */
190 am_cascading_walsender = RecoveryInProgress();
192 /* Create a per-walsender data structure in shared memory */
195 /* Set up resource owner */
196 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
199 * Let postmaster know that we're a WAL sender. Once we've declared us as
200 * a WAL sender process, postmaster will let us outlive the bgwriter and
201 * kill us last in the shutdown sequence, so we get a chance to stream all
202 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
203 * there's no going back, and we mustn't write any WAL records after this.
205 MarkPostmasterChildWalSender();
206 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
210 * Clean up after an error.
212 * WAL sender processes don't use transactions like regular backends do.
213 * This function does any cleanup requited after an error in a WAL sender
214 * process, similar to what transaction abort does in a regular backend.
225 replication_active = false;
226 if (walsender_ready_to_stop)
229 /* Revert back to startup state */
230 WalSndSetState(WALSNDSTATE_STARTUP);
234 * Handle the IDENTIFY_SYSTEM command.
242 char xpos[MAXFNAMELEN];
246 * Reply with a result set with one row, three columns. First col is
247 * system ID, second is timeline ID, and third is current xlog location.
250 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
251 GetSystemIdentifier());
253 am_cascading_walsender = RecoveryInProgress();
254 if (am_cascading_walsender)
256 /* this also updates ThisTimeLineID */
257 logptr = GetStandbyFlushRecPtr();
260 logptr = GetInsertRecPtr();
262 snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
264 snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
266 /* Send a RowDescription message */
267 pq_beginmessage(&buf, 'T');
268 pq_sendint(&buf, 3, 2); /* 3 fields */
271 pq_sendstring(&buf, "systemid"); /* col name */
272 pq_sendint(&buf, 0, 4); /* table oid */
273 pq_sendint(&buf, 0, 2); /* attnum */
274 pq_sendint(&buf, TEXTOID, 4); /* type oid */
275 pq_sendint(&buf, -1, 2); /* typlen */
276 pq_sendint(&buf, 0, 4); /* typmod */
277 pq_sendint(&buf, 0, 2); /* format code */
280 pq_sendstring(&buf, "timeline"); /* col name */
281 pq_sendint(&buf, 0, 4); /* table oid */
282 pq_sendint(&buf, 0, 2); /* attnum */
283 pq_sendint(&buf, INT4OID, 4); /* type oid */
284 pq_sendint(&buf, 4, 2); /* typlen */
285 pq_sendint(&buf, 0, 4); /* typmod */
286 pq_sendint(&buf, 0, 2); /* format code */
289 pq_sendstring(&buf, "xlogpos");
290 pq_sendint(&buf, 0, 4);
291 pq_sendint(&buf, 0, 2);
292 pq_sendint(&buf, TEXTOID, 4);
293 pq_sendint(&buf, -1, 2);
294 pq_sendint(&buf, 0, 4);
295 pq_sendint(&buf, 0, 2);
298 /* Send a DataRow message */
299 pq_beginmessage(&buf, 'D');
300 pq_sendint(&buf, 3, 2); /* # of columns */
301 pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
302 pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
303 pq_sendint(&buf, strlen(tli), 4); /* col2 len */
304 pq_sendbytes(&buf, (char *) tli, strlen(tli));
305 pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
306 pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
313 * Handle TIMELINE_HISTORY command.
316 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
319 char histfname[MAXFNAMELEN];
320 char path[MAXPGPATH];
326 * Reply with a result set with one row, and two columns. The first col
327 * is the name of the history file, 2nd is the contents.
330 TLHistoryFileName(histfname, cmd->timeline);
331 TLHistoryFilePath(path, cmd->timeline);
333 /* Send a RowDescription message */
334 pq_beginmessage(&buf, 'T');
335 pq_sendint(&buf, 2, 2); /* 2 fields */
338 pq_sendstring(&buf, "filename"); /* col name */
339 pq_sendint(&buf, 0, 4); /* table oid */
340 pq_sendint(&buf, 0, 2); /* attnum */
341 pq_sendint(&buf, TEXTOID, 4); /* type oid */
342 pq_sendint(&buf, -1, 2); /* typlen */
343 pq_sendint(&buf, 0, 4); /* typmod */
344 pq_sendint(&buf, 0, 2); /* format code */
347 pq_sendstring(&buf, "content"); /* col name */
348 pq_sendint(&buf, 0, 4); /* table oid */
349 pq_sendint(&buf, 0, 2); /* attnum */
350 pq_sendint(&buf, BYTEAOID, 4); /* type oid */
351 pq_sendint(&buf, -1, 2); /* typlen */
352 pq_sendint(&buf, 0, 4); /* typmod */
353 pq_sendint(&buf, 0, 2); /* format code */
356 /* Send a DataRow message */
357 pq_beginmessage(&buf, 'D');
358 pq_sendint(&buf, 2, 2); /* # of columns */
359 pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
360 pq_sendbytes(&buf, histfname, strlen(histfname));
362 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
365 (errcode_for_file_access(),
366 errmsg("could not open file \"%s\": %m", path)));
368 /* Determine file length and send it to client */
369 histfilelen = lseek(fd, 0, SEEK_END);
372 (errcode_for_file_access(),
373 errmsg("could not seek to end of file \"%s\": %m", path)));
374 if (lseek(fd, 0, SEEK_SET) != 0)
376 (errcode_for_file_access(),
377 errmsg("could not seek to beginning of file \"%s\": %m", path)));
379 pq_sendint(&buf, histfilelen, 4); /* col2 len */
381 bytesleft = histfilelen;
382 while (bytesleft > 0)
387 nread = read(fd, rbuf, sizeof(rbuf));
390 (errcode_for_file_access(),
391 errmsg("could not read file \"%s\": %m",
393 pq_sendbytes(&buf, rbuf, nread);
396 CloseTransientFile(fd);
402 * Handle START_REPLICATION command.
404 * At the moment, this never returns, but an ereport(ERROR) will take us back
408 StartReplication(StartReplicationCmd *cmd)
414 * We assume here that we're logging enough information in the WAL for
415 * log-shipping, since this is checked in PostmasterMain().
417 * NOTE: wal_level can only change at shutdown, so in most cases it is
418 * difficult for there to be WAL data that we can still see that was
419 * written at wal_level='minimal'.
423 * Select the timeline. If it was given explicitly by the client, use
424 * that. Otherwise use the timeline of the last replayed record, which
425 * is kept in ThisTimeLineID.
427 if (am_cascading_walsender)
429 /* this also updates ThisTimeLineID */
430 FlushPtr = GetStandbyFlushRecPtr();
433 FlushPtr = GetFlushRecPtr();
435 if (cmd->timeline != 0)
437 XLogRecPtr switchpoint;
439 sendTimeLine = cmd->timeline;
440 if (sendTimeLine == ThisTimeLineID)
442 sendTimeLineIsHistoric = false;
443 sendTimeLineValidUpto = InvalidXLogRecPtr;
447 List *timeLineHistory;
449 sendTimeLineIsHistoric = true;
452 * Check that the timeline the client requested for exists, and the
453 * requested start location is on that timeline.
455 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
456 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
457 &sendTimeLineNextTLI);
458 list_free_deep(timeLineHistory);
461 * Found the requested timeline in the history. Check that
462 * requested startpoint is on that timeline in our history.
464 * This is quite loose on purpose. We only check that we didn't
465 * fork off the requested timeline before the switchpoint. We don't
466 * check that we switched *to* it before the requested starting
467 * point. This is because the client can legitimately request to
468 * start replication from the beginning of the WAL segment that
469 * contains switchpoint, but on the new timeline, so that it
470 * doesn't end up with a partial segment. If you ask for a too old
471 * starting point, you'll get an error later when we fail to find
472 * the requested WAL segment in pg_xlog.
474 * XXX: we could be more strict here and only allow a startpoint
475 * that's older than the switchpoint, if it it's still in the same
478 if (!XLogRecPtrIsInvalid(switchpoint) &&
479 switchpoint < cmd->startpoint)
482 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
483 (uint32) (cmd->startpoint >> 32),
484 (uint32) (cmd->startpoint),
486 errdetail("This server's history forked from timeline %u at %X/%X",
488 (uint32) (switchpoint >> 32),
489 (uint32) (switchpoint))));
491 sendTimeLineValidUpto = switchpoint;
496 sendTimeLine = ThisTimeLineID;
497 sendTimeLineValidUpto = InvalidXLogRecPtr;
498 sendTimeLineIsHistoric = false;
501 streamingDoneSending = streamingDoneReceiving = false;
503 /* If there is nothing to stream, don't even enter COPY mode */
504 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
507 * When we first start replication the standby will be behind the primary.
508 * For some applications, for example, synchronous replication, it is
509 * important to have a clear state for this initial catchup mode, so we
510 * can trigger actions when we change streaming state later. We may stay
511 * in this state for a long time, which is exactly why we want to be able
512 * to monitor whether or not we are still here.
514 WalSndSetState(WALSNDSTATE_CATCHUP);
516 /* Send a CopyBothResponse message, and start streaming */
517 pq_beginmessage(&buf, 'W');
518 pq_sendbyte(&buf, 0);
519 pq_sendint(&buf, 0, 2);
524 * Don't allow a request to stream from a future point in WAL that
525 * hasn't been flushed to disk in this server yet.
527 if (FlushPtr < cmd->startpoint)
530 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
531 (uint32) (cmd->startpoint >> 32),
532 (uint32) (cmd->startpoint),
533 (uint32) (FlushPtr >> 32),
534 (uint32) (FlushPtr))));
537 /* Start streaming from the requested point */
538 sentPtr = cmd->startpoint;
540 /* Initialize shared memory status, too */
542 /* use volatile pointer to prevent code rearrangement */
543 volatile WalSnd *walsnd = MyWalSnd;
545 SpinLockAcquire(&walsnd->mutex);
546 walsnd->sentPtr = sentPtr;
547 SpinLockRelease(&walsnd->mutex);
552 /* Main loop of walsender */
553 replication_active = true;
557 replication_active = false;
558 if (walsender_ready_to_stop)
560 WalSndSetState(WALSNDSTATE_STARTUP);
562 Assert(streamingDoneSending && streamingDoneReceiving);
566 * Copy is finished now. Send a single-row result set indicating the next
569 if (sendTimeLineIsHistoric)
572 snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
574 pq_beginmessage(&buf, 'T'); /* RowDescription */
575 pq_sendint(&buf, 1, 2); /* 1 field */
578 pq_sendstring(&buf, "next_tli");
579 pq_sendint(&buf, 0, 4); /* table oid */
580 pq_sendint(&buf, 0, 2); /* attnum */
582 * int8 may seem like a surprising data type for this, but in theory
583 * int4 would not be wide enough for this, as TimeLineID is unsigned.
585 pq_sendint(&buf, INT8OID, 4); /* type oid */
586 pq_sendint(&buf, -1, 2);
587 pq_sendint(&buf, 0, 4);
588 pq_sendint(&buf, 0, 2);
592 pq_beginmessage(&buf, 'D');
593 pq_sendint(&buf, 1, 2); /* number of columns */
594 pq_sendint(&buf, strlen(str), 4); /* length */
595 pq_sendbytes(&buf, str, strlen(str));
599 /* Send CommandComplete message */
600 pq_puttextmessage('C', "START_STREAMING");
604 * Execute an incoming replication command.
607 exec_replication_command(const char *cmd_string)
611 MemoryContext cmd_context;
612 MemoryContext old_context;
614 elog(DEBUG1, "received replication command: %s", cmd_string);
616 CHECK_FOR_INTERRUPTS();
618 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
619 "Replication command context",
620 ALLOCSET_DEFAULT_MINSIZE,
621 ALLOCSET_DEFAULT_INITSIZE,
622 ALLOCSET_DEFAULT_MAXSIZE);
623 old_context = MemoryContextSwitchTo(cmd_context);
625 replication_scanner_init(cmd_string);
626 parse_rc = replication_yyparse();
629 (errcode(ERRCODE_SYNTAX_ERROR),
630 (errmsg_internal("replication command parser returned %d",
633 cmd_node = replication_parse_result;
635 switch (cmd_node->type)
637 case T_IdentifySystemCmd:
641 case T_StartReplicationCmd:
642 StartReplication((StartReplicationCmd *) cmd_node);
645 case T_BaseBackupCmd:
646 SendBaseBackup((BaseBackupCmd *) cmd_node);
649 case T_TimeLineHistoryCmd:
650 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
654 elog(ERROR, "unrecognized replication command node tag: %u",
659 MemoryContextSwitchTo(old_context);
660 MemoryContextDelete(cmd_context);
662 /* Send CommandComplete message */
663 EndCommand("SELECT", DestRemote);
667 * Process any incoming messages while streaming. Also checks if the remote
668 * end has closed the connection.
671 ProcessRepliesIfAny(void)
673 unsigned char firstchar;
675 bool received = false;
678 * If we already received a CopyDone from the frontend, any subsequent
679 * message is the beginning of a new command, and should be processed in
680 * the main processing loop.
682 while (!streamingDoneReceiving)
684 r = pq_getbyte_if_available(&firstchar);
687 /* unexpected error or EOF */
689 (errcode(ERRCODE_PROTOCOL_VIOLATION),
690 errmsg("unexpected EOF on standby connection")));
695 /* no data available without blocking */
699 /* Handle the very limited subset of commands expected in this phase */
703 * 'd' means a standby reply wrapped in a CopyData packet.
706 ProcessStandbyMessage();
711 * CopyDone means the standby requested to finish streaming.
712 * Reply with CopyDone, if we had not sent that already.
715 if (!streamingDoneSending)
717 pq_putmessage_noblock('c', NULL, 0);
718 streamingDoneSending = true;
721 /* consume the CopyData message */
722 resetStringInfo(&reply_message);
723 if (pq_getmessage(&reply_message, 0))
726 (errcode(ERRCODE_PROTOCOL_VIOLATION),
727 errmsg("unexpected EOF on standby connection")));
731 streamingDoneReceiving = true;
736 * 'X' means that the standby is closing down the socket.
743 (errcode(ERRCODE_PROTOCOL_VIOLATION),
744 errmsg("invalid standby message type \"%c\"",
750 * Save the last reply timestamp if we've received at least one reply.
754 last_reply_timestamp = GetCurrentTimestamp();
760 * Process a status update message received from standby.
763 ProcessStandbyMessage(void)
767 resetStringInfo(&reply_message);
770 * Read the message contents.
772 if (pq_getmessage(&reply_message, 0))
775 (errcode(ERRCODE_PROTOCOL_VIOLATION),
776 errmsg("unexpected EOF on standby connection")));
781 * Check message type from the first byte.
783 msgtype = pq_getmsgbyte(&reply_message);
788 ProcessStandbyReplyMessage();
792 ProcessStandbyHSFeedbackMessage();
797 (errcode(ERRCODE_PROTOCOL_VIOLATION),
798 errmsg("unexpected message type \"%c\"", msgtype)));
804 * Regular reply from standby advising of WAL positions on standby server.
807 ProcessStandbyReplyMessage(void)
814 /* the caller already consumed the msgtype byte */
815 writePtr = pq_getmsgint64(&reply_message);
816 flushPtr = pq_getmsgint64(&reply_message);
817 applyPtr = pq_getmsgint64(&reply_message);
818 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
819 replyRequested = pq_getmsgbyte(&reply_message);
821 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
822 (uint32) (writePtr >> 32), (uint32) writePtr,
823 (uint32) (flushPtr >> 32), (uint32) flushPtr,
824 (uint32) (applyPtr >> 32), (uint32) applyPtr,
825 replyRequested ? " (reply requested)" : "");
827 /* Send a reply if the standby requested one. */
829 WalSndKeepalive(false);
832 * Update shared state for this WalSender process based on reply data from
836 /* use volatile pointer to prevent code rearrangement */
837 volatile WalSnd *walsnd = MyWalSnd;
839 SpinLockAcquire(&walsnd->mutex);
840 walsnd->write = writePtr;
841 walsnd->flush = flushPtr;
842 walsnd->apply = applyPtr;
843 SpinLockRelease(&walsnd->mutex);
846 if (!am_cascading_walsender)
847 SyncRepReleaseWaiters();
851 * Hot Standby feedback
854 ProcessStandbyHSFeedbackMessage(void)
856 TransactionId nextXid;
858 TransactionId feedbackXmin;
859 uint32 feedbackEpoch;
862 * Decipher the reply message. The caller already consumed the msgtype
865 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
866 feedbackXmin = pq_getmsgint(&reply_message, 4);
867 feedbackEpoch = pq_getmsgint(&reply_message, 4);
869 elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
873 /* Unset WalSender's xmin if the feedback message value is invalid */
874 if (!TransactionIdIsNormal(feedbackXmin))
876 MyPgXact->xmin = InvalidTransactionId;
881 * Check that the provided xmin/epoch are sane, that is, not in the future
882 * and not so far back as to be already wrapped around. Ignore if not.
884 * Epoch of nextXid should be same as standby, or if the counter has
885 * wrapped, then one greater than standby.
887 GetNextXidAndEpoch(&nextXid, &nextEpoch);
889 if (feedbackXmin <= nextXid)
891 if (feedbackEpoch != nextEpoch)
896 if (feedbackEpoch + 1 != nextEpoch)
900 if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
901 return; /* epoch OK, but it's wrapped around */
904 * Set the WalSender's xmin equal to the standby's requested xmin, so that
905 * the xmin will be taken into account by GetOldestXmin. This will hold
906 * back the removal of dead rows and thereby prevent the generation of
907 * cleanup conflicts on the standby server.
909 * There is a small window for a race condition here: although we just
910 * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
911 * advanced between our fetching it and applying the xmin below, perhaps
912 * far enough to make feedbackXmin wrap around. In that case the xmin we
913 * set here would be "in the future" and have no effect. No point in
914 * worrying about this since it's too late to save the desired data
915 * anyway. Assuming that the standby sends us an increasing sequence of
916 * xmins, this could only happen during the first reply cycle, else our
917 * own xmin would prevent nextXid from advancing so far.
919 * We don't bother taking the ProcArrayLock here. Setting the xmin field
920 * is assumed atomic, and there's no real need to prevent a concurrent
921 * GetOldestXmin. (If we're moving our xmin forward, this is obviously
922 * safe, and if we're moving it backwards, well, the data is at risk
923 * already since a VACUUM could have just finished calling GetOldestXmin.)
925 MyPgXact->xmin = feedbackXmin;
928 /* Main loop of walsender process that streams the WAL over Copy messages. */
932 bool caughtup = false;
935 * Allocate buffers that will be used for each outgoing and incoming
936 * message. We do this just once to reduce palloc overhead.
938 initStringInfo(&output_message);
939 initStringInfo(&reply_message);
940 initStringInfo(&tmpbuf);
942 /* Initialize the last reply timestamp */
943 last_reply_timestamp = GetCurrentTimestamp();
947 * Loop until we reach the end of this timeline or the client requests
952 /* Clear any already-pending wakeups */
953 ResetLatch(&MyWalSnd->latch);
956 * Emergency bailout if postmaster has died. This is to avoid the
957 * necessity for manual cleanup of all postmaster children.
959 if (!PostmasterIsAlive())
962 /* Process any requests or signals received recently */
966 ProcessConfigFile(PGC_SIGHUP);
970 CHECK_FOR_INTERRUPTS();
972 /* Check for input from the client */
973 ProcessRepliesIfAny();
976 * If we have received CopyDone from the client, sent CopyDone
977 * ourselves, and the output buffer is empty, it's time to exit
980 if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
984 * If we don't have any pending data in the output buffer, try to send
985 * some more. If there is some, we don't bother to call XLogSend
986 * again until we've flushed it ... but we'd better assume we are not
989 if (!pq_is_send_pending())
994 /* Try to flush pending output to the client */
995 if (pq_flush_if_writable() != 0)
998 /* If nothing remains to be sent right now ... */
999 if (caughtup && !pq_is_send_pending())
1002 * If we're in catchup state, move to streaming. This is an
1003 * important state change for users to know about, since before
1004 * this point data loss might occur if the primary dies and we
1005 * need to failover to the standby. The state change is also
1006 * important for synchronous replication, since commits that
1007 * started to wait at that point might wait for some time.
1009 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1012 (errmsg("standby \"%s\" has now caught up with primary",
1013 application_name)));
1014 WalSndSetState(WALSNDSTATE_STREAMING);
1018 * When SIGUSR2 arrives, we send any outstanding logs up to the
1019 * shutdown checkpoint record (i.e., the latest record) and exit.
1020 * This may be a normal termination at shutdown, or a promotion,
1021 * the walsender is not sure which.
1023 if (walsender_ready_to_stop)
1025 /* ... let's just be real sure we're caught up ... */
1026 XLogSend(&caughtup);
1027 if (caughtup && !pq_is_send_pending())
1029 /* Inform the standby that XLOG streaming is done */
1030 EndCommand("COPY 0", DestRemote);
1039 * We don't block if not caught up, unless there is unsent data
1040 * pending in which case we'd better block until the socket is
1041 * write-ready. This test is only needed for the case where XLogSend
1042 * loaded a subset of the available data but then pq_flush_if_writable
1043 * flushed it all --- we should immediately try to send more.
1045 if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
1047 TimestampTz timeout = 0;
1048 long sleeptime = 10000; /* 10 s */
1051 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
1053 if (!streamingDoneReceiving)
1054 wakeEvents |= WL_SOCKET_READABLE;
1056 if (pq_is_send_pending())
1057 wakeEvents |= WL_SOCKET_WRITEABLE;
1058 else if (wal_sender_timeout > 0 && !ping_sent)
1061 * If half of wal_sender_timeout has lapsed without receiving
1062 * any reply from standby, send a keep-alive message to standby
1063 * requesting an immediate reply.
1065 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1066 wal_sender_timeout / 2);
1067 if (GetCurrentTimestamp() >= timeout)
1069 WalSndKeepalive(true);
1071 /* Try to flush pending output to the client */
1072 if (pq_flush_if_writable() != 0)
1077 /* Determine time until replication timeout */
1078 if (wal_sender_timeout > 0)
1080 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1081 wal_sender_timeout);
1082 sleeptime = 1 + (wal_sender_timeout / 10);
1085 /* Sleep until something happens or we time out */
1086 ImmediateInterruptOK = true;
1087 CHECK_FOR_INTERRUPTS();
1088 WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
1089 MyProcPort->sock, sleeptime);
1090 ImmediateInterruptOK = false;
1093 * Check for replication timeout. Note we ignore the corner case
1094 * possibility that the client replied just as we reached the
1095 * timeout ... he's supposed to reply *before* that.
1097 if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
1100 * Since typically expiration of replication timeout means
1101 * communication problem, we don't send the error message to
1105 (errmsg("terminating walsender process due to replication timeout")));
1114 * Get here on send failure. Clean up and exit.
1116 * Reset whereToSendOutput to prevent ereport from attempting to send any
1117 * more messages to the standby.
1119 if (whereToSendOutput == DestRemote)
1120 whereToSendOutput = DestNone;
1123 abort(); /* keep the compiler quiet */
1126 /* Initialize a per-walsender data structure for this walsender process */
1128 InitWalSenderSlot(void)
1133 * WalSndCtl should be set up already (we inherit this by fork() or
1134 * EXEC_BACKEND mechanism from the postmaster).
1136 Assert(WalSndCtl != NULL);
1137 Assert(MyWalSnd == NULL);
1140 * Find a free walsender slot and reserve it. If this fails, we must be
1141 * out of WalSnd structures.
1143 for (i = 0; i < max_wal_senders; i++)
1145 /* use volatile pointer to prevent code rearrangement */
1146 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1148 SpinLockAcquire(&walsnd->mutex);
1150 if (walsnd->pid != 0)
1152 SpinLockRelease(&walsnd->mutex);
1158 * Found a free slot. Reserve it for us.
1160 walsnd->pid = MyProcPid;
1161 walsnd->sentPtr = InvalidXLogRecPtr;
1162 walsnd->state = WALSNDSTATE_STARTUP;
1163 SpinLockRelease(&walsnd->mutex);
1164 /* don't need the lock anymore */
1165 OwnLatch((Latch *) &walsnd->latch);
1166 MyWalSnd = (WalSnd *) walsnd;
1171 if (MyWalSnd == NULL)
1173 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
1174 errmsg("number of requested standby connections "
1175 "exceeds max_wal_senders (currently %d)",
1178 /* Arrange to clean up at walsender exit */
1179 on_shmem_exit(WalSndKill, 0);
1182 /* Destroy the per-walsender data structure for this walsender process */
1184 WalSndKill(int code, Datum arg)
1186 Assert(MyWalSnd != NULL);
1189 * Mark WalSnd struct no longer in use. Assume that no lock is required
1193 DisownLatch(&MyWalSnd->latch);
1195 /* WalSnd struct isn't mine anymore */
1200 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
1202 * XXX probably this should be improved to suck data directly from the
1203 * WAL buffers when possible.
1205 * Will open, and keep open, one WAL segment stored in the global file
1206 * descriptor sendFile. This means if XLogRead is used once, there will
1207 * always be one descriptor left open until the process ends, but never
1211 XLogRead(char *buf, XLogRecPtr startptr, Size count)
1229 startoff = recptr % XLogSegSize;
1231 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
1233 char path[MAXPGPATH];
1235 /* Switch to another logfile segment */
1239 XLByteToSeg(recptr, sendSegNo);
1242 * When reading from a historic timeline, and there is a timeline
1243 * switch within this segment, read from the WAL segment belonging
1244 * to the new timeline.
1246 * For example, imagine that this server is currently on timeline
1247 * 5, and we're streaming timeline 4. The switch from timeline 4
1248 * to 5 happened at 0/13002088. In pg_xlog, we have these files:
1251 * 000000040000000000000012
1252 * 000000040000000000000013
1253 * 000000050000000000000013
1254 * 000000050000000000000014
1257 * In this situation, when requested to send the WAL from
1258 * segment 0x13, on timeline 4, we read the WAL from file
1259 * 000000050000000000000013. Archive recovery prefers files from
1260 * newer timelines, so if the segment was restored from the
1261 * archive on this server, the file belonging to the old timeline,
1262 * 000000040000000000000013, might not exist. Their contents are
1263 * equal up to the switchpoint, because at a timeline switch, the
1264 * used portion of the old segment is copied to the new file.
1267 curFileTimeLine = sendTimeLine;
1268 if (sendTimeLineIsHistoric)
1272 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
1273 if (sendSegNo == endSegNo)
1274 curFileTimeLine = sendTimeLineNextTLI;
1277 XLogFilePath(path, curFileTimeLine, sendSegNo);
1279 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1283 * If the file is not found, assume it's because the standby
1284 * asked for a too old WAL segment that has already been
1285 * removed or recycled.
1287 if (errno == ENOENT)
1289 (errcode_for_file_access(),
1290 errmsg("requested WAL segment %s has already been removed",
1291 XLogFileNameP(curFileTimeLine, sendSegNo))));
1294 (errcode_for_file_access(),
1295 errmsg("could not open file \"%s\": %m",
1301 /* Need to seek in the file? */
1302 if (sendOff != startoff)
1304 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
1306 (errcode_for_file_access(),
1307 errmsg("could not seek in log segment %s to offset %u: %m",
1308 XLogFileNameP(curFileTimeLine, sendSegNo),
1313 /* How many bytes are within this segment? */
1314 if (nbytes > (XLogSegSize - startoff))
1315 segbytes = XLogSegSize - startoff;
1319 readbytes = read(sendFile, p, segbytes);
1323 (errcode_for_file_access(),
1324 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
1325 XLogFileNameP(curFileTimeLine, sendSegNo),
1326 sendOff, (unsigned long) segbytes)));
1329 /* Update state for read */
1330 recptr += readbytes;
1332 sendOff += readbytes;
1333 nbytes -= readbytes;
1338 * After reading into the buffer, check that what we read was valid. We do
1339 * this after reading, because even though the segment was present when we
1340 * opened it, it might get recycled or removed while we read it. The
1341 * read() succeeds in that case, but the data we tried to read might
1342 * already have been overwritten with new WAL records.
1344 XLByteToSeg(startptr, segno);
1345 CheckXLogRemoved(segno, ThisTimeLineID);
1348 * During recovery, the currently-open WAL file might be replaced with the
1349 * file of the same name retrieved from archive. So we always need to
1350 * check what we read was valid after reading into the buffer. If it's
1351 * invalid, we try to open and read the file again.
1353 if (am_cascading_walsender)
1355 /* use volatile pointer to prevent code rearrangement */
1356 volatile WalSnd *walsnd = MyWalSnd;
1359 SpinLockAcquire(&walsnd->mutex);
1360 reload = walsnd->needreload;
1361 walsnd->needreload = false;
1362 SpinLockRelease(&walsnd->mutex);
1364 if (reload && sendFile >= 0)
1375 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
1376 * but not yet sent to the client, and buffer it in the libpq output
1379 * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
1380 * *caughtup is set to false.
1383 XLogSend(bool *caughtup)
1385 XLogRecPtr SendRqstPtr;
1386 XLogRecPtr startptr;
1390 if (streamingDoneSending)
1396 /* Figure out how far we can safely send the WAL. */
1397 if (sendTimeLineIsHistoric)
1400 * Streaming an old timeline timeline that's in this server's history,
1401 * but is not the one we're currently inserting or replaying. It can
1402 * be streamed up to the point where we switched off that timeline.
1404 SendRqstPtr = sendTimeLineValidUpto;
1406 else if (am_cascading_walsender)
1409 * Streaming the latest timeline on a standby.
1411 * Attempt to send all WAL that has already been replayed, so that
1412 * we know it's valid. If we're receiving WAL through streaming
1413 * replication, it's also OK to send any WAL that has been received
1416 * The timeline we're recovering from can change, or we can be
1417 * promoted. In either case, the current timeline becomes historic.
1418 * We need to detect that so that we don't try to stream past the
1419 * point where we switched to another timeline. We check for promotion
1420 * or timeline switch after calculating FlushPtr, to avoid a race
1421 * condition: if the timeline becomes historic just after we checked
1422 * that it was still current, it's still be OK to stream it up to the
1423 * FlushPtr that was calculated before it became historic.
1425 bool becameHistoric = false;
1427 SendRqstPtr = GetStandbyFlushRecPtr();
1429 if (!RecoveryInProgress())
1432 * We have been promoted. RecoveryInProgress() updated
1433 * ThisTimeLineID to the new current timeline.
1435 am_cascading_walsender = false;
1436 becameHistoric = true;
1441 * Still a cascading standby. But is the timeline we're sending
1442 * still the one recovery is recovering from? ThisTimeLineID was
1443 * updated by the GetStandbyFlushRecPtr() call above.
1445 if (sendTimeLine != ThisTimeLineID)
1446 becameHistoric = true;
1452 * The timeline we were sending has become historic. Read the
1453 * timeline history file of the new timeline to see where exactly
1454 * we forked off from the timeline we were sending.
1458 history = readTimeLineHistory(ThisTimeLineID);
1459 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
1460 Assert(sentPtr <= sendTimeLineValidUpto);
1461 Assert(sendTimeLine < sendTimeLineNextTLI);
1462 list_free_deep(history);
1464 /* the current send pointer should be <= the switchpoint */
1465 if (!(sentPtr <= sendTimeLineValidUpto))
1466 elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
1468 (uint32) (sendTimeLineValidUpto >> 32),
1469 (uint32) sendTimeLineValidUpto,
1470 (uint32) (sentPtr >> 32),
1473 sendTimeLineIsHistoric = true;
1475 SendRqstPtr = sendTimeLineValidUpto;
1481 * Streaming the current timeline on a master.
1483 * Attempt to send all data that's already been written out and
1484 * fsync'd to disk. We cannot go further than what's been written out
1485 * given the current implementation of XLogRead(). And in any case
1486 * it's unsafe to send WAL that is not securely down to disk on the
1487 * master: if the master subsequently crashes and restarts, slaves
1488 * must not have applied any WAL that gets lost on the master.
1490 SendRqstPtr = GetFlushRecPtr();
1494 * If this is a historic timeline and we've reached the point where we
1495 * forked to the next timeline, stop streaming.
1497 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
1499 /* close the current file. */
1505 pq_putmessage_noblock('c', NULL, 0);
1506 streamingDoneSending = true;
1512 /* Do we have any work to do? */
1513 Assert(sentPtr <= SendRqstPtr);
1514 if (SendRqstPtr <= sentPtr)
1521 * Figure out how much to send in one message. If there's no more than
1522 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
1523 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
1525 * The rounding is not only for performance reasons. Walreceiver relies on
1526 * the fact that we never split a WAL record across two messages. Since a
1527 * long WAL record is split at page boundary into continuation records,
1528 * page boundary is always a safe cut-off point. We also assume that
1529 * SendRqstPtr never points to the middle of a WAL record.
1533 endptr += MAX_SEND_SIZE;
1535 /* if we went beyond SendRqstPtr, back off */
1536 if (SendRqstPtr <= endptr)
1538 endptr = SendRqstPtr;
1539 if (sendTimeLineIsHistoric)
1546 /* round down to page boundary. */
1547 endptr -= (endptr % XLOG_BLCKSZ);
1551 nbytes = endptr - startptr;
1552 Assert(nbytes <= MAX_SEND_SIZE);
1555 * OK to read and send the slice.
1557 resetStringInfo(&output_message);
1558 pq_sendbyte(&output_message, 'w');
1560 pq_sendint64(&output_message, startptr); /* dataStart */
1561 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
1562 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
1565 * Read the log directly into the output buffer to avoid extra memcpy
1568 enlargeStringInfo(&output_message, nbytes);
1569 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
1570 output_message.len += nbytes;
1571 output_message.data[output_message.len] = '\0';
1574 * Fill the send timestamp last, so that it is taken as late as possible.
1576 resetStringInfo(&tmpbuf);
1577 pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
1578 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
1579 tmpbuf.data, sizeof(int64));
1581 pq_putmessage_noblock('d', output_message.data, output_message.len);
1585 /* Update shared memory status */
1587 /* use volatile pointer to prevent code rearrangement */
1588 volatile WalSnd *walsnd = MyWalSnd;
1590 SpinLockAcquire(&walsnd->mutex);
1591 walsnd->sentPtr = sentPtr;
1592 SpinLockRelease(&walsnd->mutex);
1595 /* Report progress of XLOG streaming in PS display */
1596 if (update_process_title)
1598 char activitymsg[50];
1600 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1601 (uint32) (sentPtr >> 32), (uint32) sentPtr);
1602 set_ps_display(activitymsg, false);
1609 * Returns the latest point in WAL that has been safely flushed to disk, and
1610 * can be sent to the standby. This should only be called when in recovery,
1611 * ie. we're streaming to a cascaded standby.
1613 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
1614 * replayed WAL record.
1617 GetStandbyFlushRecPtr(void)
1619 XLogRecPtr replayPtr;
1620 TimeLineID replayTLI;
1621 XLogRecPtr receivePtr;
1622 TimeLineID receiveTLI;
1626 * We can safely send what's already been replayed. Also, if walreceiver
1627 * is streaming WAL from the same timeline, we can send anything that
1628 * it has streamed, but hasn't been replayed yet.
1631 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
1632 replayPtr = GetXLogReplayRecPtr(&replayTLI);
1634 ThisTimeLineID = replayTLI;
1637 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
1638 result = receivePtr;
1644 * Request walsenders to reload the currently-open WAL file
1647 WalSndRqstFileReload(void)
1651 for (i = 0; i < max_wal_senders; i++)
1653 /* use volatile pointer to prevent code rearrangement */
1654 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1656 if (walsnd->pid == 0)
1659 SpinLockAcquire(&walsnd->mutex);
1660 walsnd->needreload = true;
1661 SpinLockRelease(&walsnd->mutex);
1665 /* SIGHUP: set flag to re-read config file at next convenient time */
1667 WalSndSigHupHandler(SIGNAL_ARGS)
1669 int save_errno = errno;
1673 SetLatch(&MyWalSnd->latch);
1678 /* SIGUSR1: set flag to send WAL records */
1680 WalSndXLogSendHandler(SIGNAL_ARGS)
1682 int save_errno = errno;
1684 latch_sigusr1_handler();
1689 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1691 WalSndLastCycleHandler(SIGNAL_ARGS)
1693 int save_errno = errno;
1696 * If replication has not yet started, die like with SIGTERM. If
1697 * replication is active, only set a flag and wake up the main loop. It
1698 * will send any outstanding WAL, and then exit gracefully.
1700 if (!replication_active)
1701 kill(MyProcPid, SIGTERM);
1703 walsender_ready_to_stop = true;
1705 SetLatch(&MyWalSnd->latch);
1710 /* Set up signal handlers */
1714 /* Set up signal handlers */
1715 pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
1717 pqsignal(SIGINT, SIG_IGN); /* not used */
1718 pqsignal(SIGTERM, die); /* request shutdown */
1719 pqsignal(SIGQUIT, quickdie); /* hard crash time */
1720 InitializeTimeouts(); /* establishes SIGALRM handler */
1721 pqsignal(SIGPIPE, SIG_IGN);
1722 pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
1723 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
1726 /* Reset some signals that are accepted by postmaster but not here */
1727 pqsignal(SIGCHLD, SIG_DFL);
1728 pqsignal(SIGTTIN, SIG_DFL);
1729 pqsignal(SIGTTOU, SIG_DFL);
1730 pqsignal(SIGCONT, SIG_DFL);
1731 pqsignal(SIGWINCH, SIG_DFL);
1734 /* Report shared-memory space needed by WalSndShmemInit */
1736 WalSndShmemSize(void)
1740 size = offsetof(WalSndCtlData, walsnds);
1741 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1746 /* Allocate and initialize walsender-related shared memory */
1748 WalSndShmemInit(void)
1753 WalSndCtl = (WalSndCtlData *)
1754 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1758 /* First time through, so initialize */
1759 MemSet(WalSndCtl, 0, WalSndShmemSize());
1761 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
1762 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
1764 for (i = 0; i < max_wal_senders; i++)
1766 WalSnd *walsnd = &WalSndCtl->walsnds[i];
1768 SpinLockInit(&walsnd->mutex);
1769 InitSharedLatch(&walsnd->latch);
1775 * Wake up all walsenders
1777 * This will be called inside critical sections, so throwing an error is not
1785 for (i = 0; i < max_wal_senders; i++)
1786 SetLatch(&WalSndCtl->walsnds[i].latch);
1789 /* Set state for current walsender (only called in walsender) */
1791 WalSndSetState(WalSndState state)
1793 /* use volatile pointer to prevent code rearrangement */
1794 volatile WalSnd *walsnd = MyWalSnd;
1796 Assert(am_walsender);
1798 if (walsnd->state == state)
1801 SpinLockAcquire(&walsnd->mutex);
1802 walsnd->state = state;
1803 SpinLockRelease(&walsnd->mutex);
1807 * Return a string constant representing the state. This is used
1808 * in system views, and should *not* be translated.
1811 WalSndGetStateString(WalSndState state)
1815 case WALSNDSTATE_STARTUP:
1817 case WALSNDSTATE_BACKUP:
1819 case WALSNDSTATE_CATCHUP:
1821 case WALSNDSTATE_STREAMING:
1829 * Returns activity of walsenders, including pids and xlog locations sent to
1833 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1835 #define PG_STAT_GET_WAL_SENDERS_COLS 8
1836 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1838 Tuplestorestate *tupstore;
1839 MemoryContext per_query_ctx;
1840 MemoryContext oldcontext;
1843 int sync_standby = -1;
1846 /* check to see if caller supports us returning a tuplestore */
1847 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1849 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1850 errmsg("set-valued function called in context that cannot accept a set")));
1851 if (!(rsinfo->allowedModes & SFRM_Materialize))
1853 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1854 errmsg("materialize mode required, but it is not " \
1855 "allowed in this context")));
1857 /* Build a tuple descriptor for our result type */
1858 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1859 elog(ERROR, "return type must be a row type");
1861 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1862 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1864 tupstore = tuplestore_begin_heap(true, false, work_mem);
1865 rsinfo->returnMode = SFRM_Materialize;
1866 rsinfo->setResult = tupstore;
1867 rsinfo->setDesc = tupdesc;
1869 MemoryContextSwitchTo(oldcontext);
1872 * Get the priorities of sync standbys all in one go, to minimise lock
1873 * acquisitions and to allow us to evaluate who is the current sync
1874 * standby. This code must match the code in SyncRepReleaseWaiters().
1876 sync_priority = palloc(sizeof(int) * max_wal_senders);
1877 LWLockAcquire(SyncRepLock, LW_SHARED);
1878 for (i = 0; i < max_wal_senders; i++)
1880 /* use volatile pointer to prevent code rearrangement */
1881 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1883 if (walsnd->pid != 0)
1886 * Treat a standby such as a pg_basebackup background process
1887 * which always returns an invalid flush location, as an
1888 * asynchronous standby.
1890 sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
1891 0 : walsnd->sync_standby_priority;
1893 if (walsnd->state == WALSNDSTATE_STREAMING &&
1894 walsnd->sync_standby_priority > 0 &&
1896 priority > walsnd->sync_standby_priority) &&
1897 !XLogRecPtrIsInvalid(walsnd->flush))
1899 priority = walsnd->sync_standby_priority;
1904 LWLockRelease(SyncRepLock);
1906 for (i = 0; i < max_wal_senders; i++)
1908 /* use volatile pointer to prevent code rearrangement */
1909 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1910 char location[MAXFNAMELEN];
1916 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
1917 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1919 if (walsnd->pid == 0)
1922 SpinLockAcquire(&walsnd->mutex);
1923 sentPtr = walsnd->sentPtr;
1924 state = walsnd->state;
1925 write = walsnd->write;
1926 flush = walsnd->flush;
1927 apply = walsnd->apply;
1928 SpinLockRelease(&walsnd->mutex);
1930 memset(nulls, 0, sizeof(nulls));
1931 values[0] = Int32GetDatum(walsnd->pid);
1936 * Only superusers can see details. Other users only get the pid
1937 * value to know it's a walsender, but no details.
1939 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
1943 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1945 snprintf(location, sizeof(location), "%X/%X",
1946 (uint32) (sentPtr >> 32), (uint32) sentPtr);
1947 values[2] = CStringGetTextDatum(location);
1951 snprintf(location, sizeof(location), "%X/%X",
1952 (uint32) (write >> 32), (uint32) write);
1953 values[3] = CStringGetTextDatum(location);
1957 snprintf(location, sizeof(location), "%X/%X",
1958 (uint32) (flush >> 32), (uint32) flush);
1959 values[4] = CStringGetTextDatum(location);
1963 snprintf(location, sizeof(location), "%X/%X",
1964 (uint32) (apply >> 32), (uint32) apply);
1965 values[5] = CStringGetTextDatum(location);
1967 values[6] = Int32GetDatum(sync_priority[i]);
1970 * More easily understood version of standby state. This is purely
1971 * informational, not different from priority.
1973 if (sync_priority[i] == 0)
1974 values[7] = CStringGetTextDatum("async");
1975 else if (i == sync_standby)
1976 values[7] = CStringGetTextDatum("sync");
1978 values[7] = CStringGetTextDatum("potential");
1981 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1983 pfree(sync_priority);
1985 /* clean up and return the tuplestore */
1986 tuplestore_donestoring(tupstore);
1992 * This function is used to send keepalive message to standby.
1993 * If requestReply is set, sets a flag in the message requesting the standby
1994 * to send a message back to us, for heartbeat purposes.
1997 WalSndKeepalive(bool requestReply)
1999 elog(DEBUG2, "sending replication keepalive");
2001 /* construct the message... */
2002 resetStringInfo(&output_message);
2003 pq_sendbyte(&output_message, 'k');
2004 pq_sendint64(&output_message, sentPtr);
2005 pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
2006 pq_sendbyte(&output_message, requestReply ? 1 : 0);
2008 /* ... and send it wrapped in CopyData */
2009 pq_putmessage_noblock('d', output_message.data, output_message.len);
2013 * This isn't currently used for anything. Monitoring tools might be
2014 * interested in the future, and we'll need something like this in the
2015 * future for synchronous replication.
2019 * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
2023 GetOldestWALSendPointer(void)
2025 XLogRecPtr oldest = {0, 0};
2029 for (i = 0; i < max_wal_senders; i++)
2031 /* use volatile pointer to prevent code rearrangement */
2032 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
2035 if (walsnd->pid == 0)
2038 SpinLockAcquire(&walsnd->mutex);
2039 recptr = walsnd->sentPtr;
2040 SpinLockRelease(&walsnd->mutex);
2042 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
2045 if (!found || recptr < oldest)