1 /*-------------------------------------------------------------------------
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 * It attempts to keep reading XLOG records from the disk and sending them
11 * to the standby server, as long as the connection is alive (i.e., like
12 * any backend, there is a one-to-one relationship between a connection
13 * and a walsender process).
15 * Normal termination is by SIGTERM, which instructs the walsender to
16 * close the connection and exit(0) at next convenient moment. Emergency
17 * termination is by SIGQUIT; like any backend, the walsender will simply
18 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
19 * are treated as not a crash but approximately normal termination;
20 * the walsender will exit quickly without sending any more XLOG records.
22 * If the server is shut down, postmaster sends us SIGUSR2 after all
23 * regular backends have exited and the shutdown checkpoint has been written.
24 * This instruct walsender to send any outstanding WAL, including the
25 * shutdown checkpoint record, and then exit.
28 * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
31 * src/backend/replication/walsender.c
33 *-------------------------------------------------------------------------
41 #include "access/xlog_internal.h"
42 #include "access/transam.h"
43 #include "catalog/pg_type.h"
44 #include "libpq/libpq.h"
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47 #include "miscadmin.h"
48 #include "replication/basebackup.h"
49 #include "replication/replnodes.h"
50 #include "replication/walprotocol.h"
51 #include "replication/walsender.h"
52 #include "storage/fd.h"
53 #include "storage/ipc.h"
54 #include "storage/pmsignal.h"
55 #include "storage/proc.h"
56 #include "storage/procarray.h"
57 #include "tcop/tcopprot.h"
58 #include "utils/builtins.h"
59 #include "utils/guc.h"
60 #include "utils/memutils.h"
61 #include "utils/ps_status.h"
62 #include "utils/resowner.h"
65 /* Array of WalSnds in shared memory */
66 WalSndCtlData *WalSndCtl = NULL;
68 /* My slot in the shared memory array */
69 static WalSnd *MyWalSnd = NULL;
72 bool am_walsender = false; /* Am I a walsender process ? */
74 /* User-settable parameters for walsender */
75 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
76 int WalSndDelay = 200; /* max sleep time between some actions */
79 * These variables are used similarly to openLogFile/Id/Seg/Off,
80 * but for walsender to read the XLOG.
82 static int sendFile = -1;
83 static uint32 sendId = 0;
84 static uint32 sendSeg = 0;
85 static uint32 sendOff = 0;
88 * How far have we sent WAL already? This is also advertised in
89 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
91 static XLogRecPtr sentPtr = {0, 0};
94 * Buffer for processing reply messages.
96 static StringInfoData reply_message;
98 /* Flags set by signal handlers for later service in main loop */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 volatile sig_atomic_t walsender_shutdown_requested = false;
101 volatile sig_atomic_t walsender_ready_to_stop = false;
103 /* Signal handlers */
104 static void WalSndSigHupHandler(SIGNAL_ARGS);
105 static void WalSndShutdownHandler(SIGNAL_ARGS);
106 static void WalSndQuickDieHandler(SIGNAL_ARGS);
107 static void WalSndXLogSendHandler(SIGNAL_ARGS);
108 static void WalSndLastCycleHandler(SIGNAL_ARGS);
110 /* Prototypes for private functions */
111 static bool HandleReplicationCommand(const char *cmd_string);
112 static int WalSndLoop(void);
113 static void InitWalSnd(void);
114 static void WalSndHandshake(void);
115 static void WalSndKill(int code, Datum arg);
116 static bool XLogSend(char *msgbuf, bool *caughtup);
117 static void IdentifySystem(void);
118 static void StartReplication(StartReplicationCmd * cmd);
119 static void ProcessStandbyReplyMessage(void);
120 static void ProcessRepliesIfAny(void);
123 /* Main entry point for walsender process */
127 MemoryContext walsnd_context;
129 if (RecoveryInProgress())
131 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
132 errmsg("recovery is still in progress, can't accept WAL streaming connections")));
134 /* Create a per-walsender data structure in shared memory */
138 * Create a memory context that we will do all our work in. We do this so
139 * that we can reset the context during error recovery and thereby avoid
140 * possible memory leaks. Formerly this code just ran in
141 * TopMemoryContext, but resetting that would be a really bad idea.
143 * XXX: we don't actually attempt error recovery in walsender, we just
144 * close the connection and exit.
146 walsnd_context = AllocSetContextCreate(TopMemoryContext,
148 ALLOCSET_DEFAULT_MINSIZE,
149 ALLOCSET_DEFAULT_INITSIZE,
150 ALLOCSET_DEFAULT_MAXSIZE);
151 MemoryContextSwitchTo(walsnd_context);
153 /* Set up resource owner */
154 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
156 /* Unblock signals (they were blocked when the postmaster forked us) */
157 PG_SETMASK(&UnBlockSig);
159 /* Tell the standby that walsender is ready for receiving commands */
160 ReadyForQuery(DestRemote);
162 /* Handle handshake messages before streaming */
165 /* Initialize shared memory status */
167 /* use volatile pointer to prevent code rearrangement */
168 volatile WalSnd *walsnd = MyWalSnd;
170 SpinLockAcquire(&walsnd->mutex);
171 walsnd->sentPtr = sentPtr;
172 SpinLockRelease(&walsnd->mutex);
175 /* Main loop of walsender */
180 * Execute commands from walreceiver, until we enter streaming mode.
183 WalSndHandshake(void)
185 StringInfoData input_message;
186 bool replication_started = false;
188 initStringInfo(&input_message);
190 while (!replication_started)
194 WalSndSetState(WALSNDSTATE_STARTUP);
195 set_ps_display("idle", false);
197 /* Wait for a command to arrive */
198 firstchar = pq_getbyte();
201 * Emergency bailout if postmaster has died. This is to avoid the
202 * necessity for manual cleanup of all postmaster children.
204 if (!PostmasterIsAlive(true))
208 * Check for any other interesting events that happened while we
214 ProcessConfigFile(PGC_SIGHUP);
217 if (firstchar != EOF)
220 * Read the message contents. This is expected to be done without
221 * blocking because we've been able to get message type code.
223 if (pq_getmessage(&input_message, 0))
224 firstchar = EOF; /* suitable message already logged */
227 /* Handle the very limited subset of commands expected in this phase */
230 case 'Q': /* Query message */
232 const char *query_string;
234 query_string = pq_getmsgstring(&input_message);
235 pq_getmsgend(&input_message);
237 if (HandleReplicationCommand(query_string))
238 replication_started = true;
243 /* standby is closing the connection */
247 /* standby disconnected unexpectedly */
249 (errcode(ERRCODE_PROTOCOL_VIOLATION),
250 errmsg("unexpected EOF on standby connection")));
255 (errcode(ERRCODE_PROTOCOL_VIOLATION),
256 errmsg("invalid standby handshake message type %d", firstchar)));
270 char xpos[MAXFNAMELEN];
274 * Reply with a result set with one row, three columns. First col is system
275 * ID, second is timeline ID, and third is current xlog location.
278 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
279 GetSystemIdentifier());
280 snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
282 logptr = GetInsertRecPtr();
284 snprintf(xpos, sizeof(xpos), "%X/%X",
285 logptr.xlogid, logptr.xrecoff);
287 /* Send a RowDescription message */
288 pq_beginmessage(&buf, 'T');
289 pq_sendint(&buf, 3, 2); /* 3 fields */
292 pq_sendstring(&buf, "systemid"); /* col name */
293 pq_sendint(&buf, 0, 4); /* table oid */
294 pq_sendint(&buf, 0, 2); /* attnum */
295 pq_sendint(&buf, TEXTOID, 4); /* type oid */
296 pq_sendint(&buf, -1, 2); /* typlen */
297 pq_sendint(&buf, 0, 4); /* typmod */
298 pq_sendint(&buf, 0, 2); /* format code */
301 pq_sendstring(&buf, "timeline"); /* col name */
302 pq_sendint(&buf, 0, 4); /* table oid */
303 pq_sendint(&buf, 0, 2); /* attnum */
304 pq_sendint(&buf, INT4OID, 4); /* type oid */
305 pq_sendint(&buf, 4, 2); /* typlen */
306 pq_sendint(&buf, 0, 4); /* typmod */
307 pq_sendint(&buf, 0, 2); /* format code */
310 pq_sendstring(&buf, "xlogpos");
311 pq_sendint(&buf, 0, 4);
312 pq_sendint(&buf, 0, 2);
313 pq_sendint(&buf, TEXTOID, 4);
314 pq_sendint(&buf, -1, 2);
315 pq_sendint(&buf, 0, 4);
316 pq_sendint(&buf, 0, 2);
319 /* Send a DataRow message */
320 pq_beginmessage(&buf, 'D');
321 pq_sendint(&buf, 3, 2); /* # of columns */
322 pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
323 pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
324 pq_sendint(&buf, strlen(tli), 4); /* col2 len */
325 pq_sendbytes(&buf, (char *) tli, strlen(tli));
326 pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
327 pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
331 /* Send CommandComplete and ReadyForQuery messages */
332 EndCommand("SELECT", DestRemote);
333 ReadyForQuery(DestRemote);
334 /* ReadyForQuery did pq_flush for us */
341 StartReplication(StartReplicationCmd * cmd)
346 * Let postmaster know that we're streaming. Once we've declared us as
347 * a WAL sender process, postmaster will let us outlive the bgwriter and
348 * kill us last in the shutdown sequence, so we get a chance to stream
349 * all remaining WAL at shutdown, including the shutdown checkpoint.
350 * Note that there's no going back, and we mustn't write any WAL records
353 MarkPostmasterChildWalSender();
356 * Check that we're logging enough information in the WAL for
359 * NOTE: This only checks the current value of wal_level. Even if the
360 * current setting is not 'minimal', there can be old WAL in the pg_xlog
361 * directory that was created with 'minimal'. So this is not bulletproof,
362 * the purpose is just to give a user-friendly error message that hints
363 * how to configure the system correctly.
365 if (wal_level == WAL_LEVEL_MINIMAL)
367 (errcode(ERRCODE_CANNOT_CONNECT_NOW),
368 errmsg("standby connections not allowed because wal_level=minimal")));
370 /* Send a CopyBothResponse message, and start streaming */
371 pq_beginmessage(&buf, 'W');
372 pq_sendbyte(&buf, 0);
373 pq_sendint(&buf, 0, 2);
378 * Initialize position to the received one, then the xlog records begin to
379 * be shipped from that position
381 sentPtr = cmd->startpoint;
385 * Execute an incoming replication command.
388 HandleReplicationCommand(const char *cmd_string)
390 bool replication_started = false;
393 MemoryContext cmd_context;
394 MemoryContext old_context;
396 elog(DEBUG1, "received replication command: %s", cmd_string);
398 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
399 "Replication command context",
400 ALLOCSET_DEFAULT_MINSIZE,
401 ALLOCSET_DEFAULT_INITSIZE,
402 ALLOCSET_DEFAULT_MAXSIZE);
403 old_context = MemoryContextSwitchTo(cmd_context);
405 replication_scanner_init(cmd_string);
406 parse_rc = replication_yyparse();
409 (errcode(ERRCODE_SYNTAX_ERROR),
410 (errmsg_internal("replication command parser returned %d",
413 cmd_node = replication_parse_result;
415 switch (cmd_node->type)
417 case T_IdentifySystemCmd:
421 case T_StartReplicationCmd:
422 StartReplication((StartReplicationCmd *) cmd_node);
424 /* break out of the loop */
425 replication_started = true;
428 case T_BaseBackupCmd:
429 SendBaseBackup((BaseBackupCmd *) cmd_node);
431 /* Send CommandComplete and ReadyForQuery messages */
432 EndCommand("SELECT", DestRemote);
433 ReadyForQuery(DestRemote);
434 /* ReadyForQuery did pq_flush for us */
439 (errcode(ERRCODE_PROTOCOL_VIOLATION),
440 errmsg("invalid standby query string: %s", cmd_string)));
444 MemoryContextSwitchTo(old_context);
445 MemoryContextDelete(cmd_context);
447 return replication_started;
451 * Check if the remote end has closed the connection.
454 ProcessRepliesIfAny(void)
456 unsigned char firstchar;
459 r = pq_getbyte_if_available(&firstchar);
462 /* unexpected error or EOF */
464 (errcode(ERRCODE_PROTOCOL_VIOLATION),
465 errmsg("unexpected EOF on standby connection")));
470 /* no data available without blocking */
474 /* Handle the very limited subset of commands expected in this phase */
478 * 'd' means a standby reply wrapped in a CopyData packet.
481 ProcessStandbyReplyMessage();
485 * 'X' means that the standby is closing down the socket.
492 (errcode(ERRCODE_PROTOCOL_VIOLATION),
493 errmsg("invalid standby closing message type %d",
499 * Process a status update message received from standby.
502 ProcessStandbyReplyMessage(void)
504 StandbyReplyMessage reply;
506 TransactionId newxmin = InvalidTransactionId;
508 resetStringInfo(&reply_message);
511 * Read the message contents.
513 if (pq_getmessage(&reply_message, 0))
516 (errcode(ERRCODE_PROTOCOL_VIOLATION),
517 errmsg("unexpected EOF on standby connection")));
522 * Check message type from the first byte. At the moment, there is only
525 msgtype = pq_getmsgbyte(&reply_message);
529 (errcode(ERRCODE_PROTOCOL_VIOLATION),
530 errmsg("unexpected message type %c", msgtype)));
534 pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
536 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
537 reply.write.xlogid, reply.write.xrecoff,
538 reply.flush.xlogid, reply.flush.xrecoff,
539 reply.apply.xlogid, reply.apply.xrecoff,
544 * Update shared state for this WalSender process
545 * based on reply data from standby.
548 /* use volatile pointer to prevent code rearrangement */
549 volatile WalSnd *walsnd = MyWalSnd;
551 SpinLockAcquire(&walsnd->mutex);
552 walsnd->write = reply.write;
553 walsnd->flush = reply.flush;
554 walsnd->apply = reply.apply;
555 SpinLockRelease(&walsnd->mutex);
559 * Update the WalSender's proc xmin to allow it to be visible
560 * to snapshots. This will hold back the removal of dead rows
561 * and thereby prevent the generation of cleanup conflicts
562 * on the standby server.
564 if (TransactionIdIsValid(reply.xmin))
566 TransactionId nextXid;
568 bool epochOK = false;
570 GetNextXidAndEpoch(&nextXid, &nextEpoch);
573 * Epoch of oldestXmin should be same as standby or
574 * if the counter has wrapped, then one less than reply.
576 if (reply.xmin <= nextXid)
578 if (reply.epoch == nextEpoch)
583 if (nextEpoch > 0 && reply.epoch == nextEpoch - 1)
588 * Feedback from standby must not go backwards, nor should it go
589 * forwards further than our most recent xid.
591 if (epochOK && TransactionIdPrecedesOrEquals(reply.xmin, nextXid))
593 if (!TransactionIdIsValid(MyProc->xmin))
595 TransactionId oldestXmin = GetOldestXmin(true, true);
596 if (TransactionIdPrecedes(oldestXmin, reply.xmin))
597 newxmin = reply.xmin;
599 newxmin = oldestXmin;
603 if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
604 newxmin = reply.xmin;
606 newxmin = MyProc->xmin; /* stay the same */
612 * Grab the ProcArrayLock to set xmin, or invalidate for bad reply
614 if (MyProc->xmin != newxmin)
616 LWLockAcquire(ProcArrayLock, LW_SHARED);
617 MyProc->xmin = newxmin;
618 LWLockRelease(ProcArrayLock);
622 /* Main loop of walsender process */
626 char *output_message;
627 bool caughtup = false;
630 * Allocate buffer that will be used for each output message. We do this
631 * just once to reduce palloc overhead. The buffer must be made large
632 * enough for maximum-sized messages.
634 output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
637 * Allocate buffer that will be used for processing reply messages. As
638 * above, do this just once to reduce palloc overhead.
640 initStringInfo(&reply_message);
642 /* Loop forever, unless we get an error */
646 * Emergency bailout if postmaster has died. This is to avoid the
647 * necessity for manual cleanup of all postmaster children.
649 if (!PostmasterIsAlive(true))
652 /* Process any requests or signals received recently */
656 ProcessConfigFile(PGC_SIGHUP);
660 * When SIGUSR2 arrives, we send all outstanding logs up to the
661 * shutdown checkpoint record (i.e., the latest record) and exit.
663 if (walsender_ready_to_stop)
665 if (!XLogSend(output_message, &caughtup))
667 ProcessRepliesIfAny();
669 walsender_shutdown_requested = true;
672 /* Normal exit from the walsender is here */
673 if (walsender_shutdown_requested)
675 /* Inform the standby that XLOG streaming was done */
676 pq_puttextmessage('C', "COPY 0");
683 * If we had sent all accumulated WAL in last round, nap for the
684 * configured time before retrying.
689 * Even if we wrote all the WAL that was available when we started
690 * sending, more might have arrived while we were sending this
691 * batch. We had the latch set while sending, so we have not
692 * received any signals from that time. Let's arm the latch
693 * again, and after that check that we're still up-to-date.
695 ResetLatch(&MyWalSnd->latch);
697 if (!XLogSend(output_message, &caughtup))
699 if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
702 * XXX: We don't really need the periodic wakeups anymore,
703 * WaitLatchOrSocket should reliably wake up as soon as
704 * something interesting happens.
708 WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
709 WalSndDelay * 1000L);
714 /* Attempt to send the log once every loop */
715 if (!XLogSend(output_message, &caughtup))
719 /* Update our state to indicate if we're behind or not */
720 WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
721 ProcessRepliesIfAny();
725 * Get here on send failure. Clean up and exit.
727 * Reset whereToSendOutput to prevent ereport from attempting to send any
728 * more messages to the standby.
730 if (whereToSendOutput == DestRemote)
731 whereToSendOutput = DestNone;
734 return 1; /* keep the compiler quiet */
737 /* Initialize a per-walsender data structure for this walsender process */
744 * WalSndCtl should be set up already (we inherit this by fork() or
745 * EXEC_BACKEND mechanism from the postmaster).
747 Assert(WalSndCtl != NULL);
748 Assert(MyWalSnd == NULL);
751 * Find a free walsender slot and reserve it. If this fails, we must be
752 * out of WalSnd structures.
754 for (i = 0; i < max_wal_senders; i++)
756 /* use volatile pointer to prevent code rearrangement */
757 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
759 SpinLockAcquire(&walsnd->mutex);
761 if (walsnd->pid != 0)
763 SpinLockRelease(&walsnd->mutex);
769 * Found a free slot. Reserve it for us.
771 walsnd->pid = MyProcPid;
772 MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
773 walsnd->state = WALSNDSTATE_STARTUP;
774 SpinLockRelease(&walsnd->mutex);
775 /* don't need the lock anymore */
776 OwnLatch((Latch *) &walsnd->latch);
777 MyWalSnd = (WalSnd *) walsnd;
782 if (MyWalSnd == NULL)
784 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
785 errmsg("number of requested standby connections "
786 "exceeds max_wal_senders (currently %d)",
789 /* Arrange to clean up at walsender exit */
790 on_shmem_exit(WalSndKill, 0);
793 /* Destroy the per-walsender data structure for this walsender process */
795 WalSndKill(int code, Datum arg)
797 Assert(MyWalSnd != NULL);
800 * Mark WalSnd struct no longer in use. Assume that no lock is required
804 DisownLatch(&MyWalSnd->latch);
806 /* WalSnd struct isn't mine anymore */
811 * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
813 * XXX probably this should be improved to suck data directly from the
814 * WAL buffers when possible.
816 * Will open, and keep open, one WAL segment stored in the global file
817 * descriptor sendFile. This means if XLogRead is used once, there will
818 * always be one descriptor left open until the process ends, but never
822 XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
824 XLogRecPtr startRecPtr = recptr;
825 char path[MAXPGPATH];
826 uint32 lastRemovedLog;
827 uint32 lastRemovedSeg;
837 startoff = recptr.xrecoff % XLogSegSize;
839 if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
841 /* Switch to another logfile segment */
845 XLByteToSeg(recptr, sendId, sendSeg);
846 XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
848 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
852 * If the file is not found, assume it's because the standby
853 * asked for a too old WAL segment that has already been
854 * removed or recycled.
858 char filename[MAXFNAMELEN];
860 XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
862 (errcode_for_file_access(),
863 errmsg("requested WAL segment %s has already been removed",
868 (errcode_for_file_access(),
869 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
870 path, sendId, sendSeg)));
875 /* Need to seek in the file? */
876 if (sendOff != startoff)
878 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
880 (errcode_for_file_access(),
881 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
882 sendId, sendSeg, startoff)));
886 /* How many bytes are within this segment? */
887 if (nbytes > (XLogSegSize - startoff))
888 segbytes = XLogSegSize - startoff;
892 readbytes = read(sendFile, buf, segbytes);
895 (errcode_for_file_access(),
896 errmsg("could not read from log file %u, segment %u, offset %u, "
898 sendId, sendSeg, sendOff, (unsigned long) segbytes)));
900 /* Update state for read */
901 XLByteAdvance(recptr, readbytes);
903 sendOff += readbytes;
909 * After reading into the buffer, check that what we read was valid. We do
910 * this after reading, because even though the segment was present when we
911 * opened it, it might get recycled or removed while we read it. The
912 * read() succeeds in that case, but the data we tried to read might
913 * already have been overwritten with new WAL records.
915 XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
916 XLByteToSeg(startRecPtr, log, seg);
917 if (log < lastRemovedLog ||
918 (log == lastRemovedLog && seg <= lastRemovedSeg))
920 char filename[MAXFNAMELEN];
922 XLogFileName(filename, ThisTimeLineID, log, seg);
924 (errcode_for_file_access(),
925 errmsg("requested WAL segment %s has already been removed",
931 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
932 * but not yet sent to the client, and send it.
934 * msgbuf is a work area in which the output message is constructed. It's
935 * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
936 * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
938 * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
939 * *caughtup is set to false.
941 * Returns true if OK, false if trouble.
944 XLogSend(char *msgbuf, bool *caughtup)
946 XLogRecPtr SendRqstPtr;
950 WalDataMessageHeader msghdr;
953 * Attempt to send all data that's already been written out and fsync'd to
954 * disk. We cannot go further than what's been written out given the
955 * current implementation of XLogRead(). And in any case it's unsafe to
956 * send WAL that is not securely down to disk on the master: if the master
957 * subsequently crashes and restarts, slaves must not have applied any WAL
958 * that gets lost on the master.
960 SendRqstPtr = GetFlushRecPtr();
962 /* Quick exit if nothing to do */
963 if (XLByteLE(SendRqstPtr, sentPtr))
970 * Figure out how much to send in one message. If there's no more than
971 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
972 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
974 * The rounding is not only for performance reasons. Walreceiver relies on
975 * the fact that we never split a WAL record across two messages. Since a
976 * long WAL record is split at page boundary into continuation records,
977 * page boundary is always a safe cut-off point. We also assume that
978 * SendRqstPtr never points to the middle of a WAL record.
981 if (startptr.xrecoff >= XLogFileSize)
984 * crossing a logid boundary, skip the non-existent last log segment
985 * in previous logical log file.
987 startptr.xlogid += 1;
988 startptr.xrecoff = 0;
992 XLByteAdvance(endptr, MAX_SEND_SIZE);
993 if (endptr.xlogid != startptr.xlogid)
995 /* Don't cross a logfile boundary within one message */
996 Assert(endptr.xlogid == startptr.xlogid + 1);
997 endptr.xlogid = startptr.xlogid;
998 endptr.xrecoff = XLogFileSize;
1001 /* if we went beyond SendRqstPtr, back off */
1002 if (XLByteLE(SendRqstPtr, endptr))
1004 endptr = SendRqstPtr;
1009 /* round down to page boundary. */
1010 endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
1014 nbytes = endptr.xrecoff - startptr.xrecoff;
1015 Assert(nbytes <= MAX_SEND_SIZE);
1018 * OK to read and send the slice.
1023 * Read the log directly into the output buffer to avoid extra memcpy
1026 XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
1029 * We fill the message header last so that the send timestamp is taken as
1032 msghdr.dataStart = startptr;
1033 msghdr.walEnd = SendRqstPtr;
1034 msghdr.sendTime = GetCurrentTimestamp();
1036 memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
1038 pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
1040 /* Flush pending output to the client */
1046 /* Update shared memory status */
1048 /* use volatile pointer to prevent code rearrangement */
1049 volatile WalSnd *walsnd = MyWalSnd;
1051 SpinLockAcquire(&walsnd->mutex);
1052 walsnd->sentPtr = sentPtr;
1053 SpinLockRelease(&walsnd->mutex);
1056 /* Report progress of XLOG streaming in PS display */
1057 if (update_process_title)
1059 char activitymsg[50];
1061 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1062 sentPtr.xlogid, sentPtr.xrecoff);
1063 set_ps_display(activitymsg, false);
1069 /* SIGHUP: set flag to re-read config file at next convenient time */
1071 WalSndSigHupHandler(SIGNAL_ARGS)
1075 SetLatch(&MyWalSnd->latch);
1078 /* SIGTERM: set flag to shut down */
1080 WalSndShutdownHandler(SIGNAL_ARGS)
1082 walsender_shutdown_requested = true;
1084 SetLatch(&MyWalSnd->latch);
1088 * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
1090 * Some backend has bought the farm,
1091 * so we need to stop what we're doing and exit.
1094 WalSndQuickDieHandler(SIGNAL_ARGS)
1096 PG_SETMASK(&BlockSig);
1099 * We DO NOT want to run proc_exit() callbacks -- we're here because
1100 * shared memory may be corrupted, so we don't want to try to clean up our
1101 * transaction. Just nail the windows shut and get out of town. Now that
1102 * there's an atexit callback to prevent third-party code from breaking
1103 * things by calling exit() directly, we have to reset the callbacks
1104 * explicitly to make this work as intended.
1109 * Note we do exit(2) not exit(0). This is to force the postmaster into a
1110 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
1111 * backend. This is necessary precisely because we don't clean up our
1112 * shared memory state. (The "dead man switch" mechanism in pmsignal.c
1113 * should ensure the postmaster sees this as a crash, too, but no harm in
1114 * being doubly sure.)
1119 /* SIGUSR1: set flag to send WAL records */
1121 WalSndXLogSendHandler(SIGNAL_ARGS)
1123 latch_sigusr1_handler();
1126 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
1128 WalSndLastCycleHandler(SIGNAL_ARGS)
1130 walsender_ready_to_stop = true;
1132 SetLatch(&MyWalSnd->latch);
1135 /* Set up signal handlers */
1139 /* Set up signal handlers */
1140 pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
1142 pqsignal(SIGINT, SIG_IGN); /* not used */
1143 pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
1144 pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
1145 pqsignal(SIGALRM, SIG_IGN);
1146 pqsignal(SIGPIPE, SIG_IGN);
1147 pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
1148 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
1151 /* Reset some signals that are accepted by postmaster but not here */
1152 pqsignal(SIGCHLD, SIG_DFL);
1153 pqsignal(SIGTTIN, SIG_DFL);
1154 pqsignal(SIGTTOU, SIG_DFL);
1155 pqsignal(SIGCONT, SIG_DFL);
1156 pqsignal(SIGWINCH, SIG_DFL);
1159 /* Report shared-memory space needed by WalSndShmemInit */
1161 WalSndShmemSize(void)
1165 size = offsetof(WalSndCtlData, walsnds);
1166 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
1171 /* Allocate and initialize walsender-related shared memory */
1173 WalSndShmemInit(void)
1178 WalSndCtl = (WalSndCtlData *)
1179 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
1183 /* First time through, so initialize */
1184 MemSet(WalSndCtl, 0, WalSndShmemSize());
1186 for (i = 0; i < max_wal_senders; i++)
1188 WalSnd *walsnd = &WalSndCtl->walsnds[i];
1190 SpinLockInit(&walsnd->mutex);
1191 InitSharedLatch(&walsnd->latch);
1196 /* Wake up all walsenders */
1202 for (i = 0; i < max_wal_senders; i++)
1203 SetLatch(&WalSndCtl->walsnds[i].latch);
1206 /* Set state for current walsender (only called in walsender) */
1208 WalSndSetState(WalSndState state)
1210 /* use volatile pointer to prevent code rearrangement */
1211 volatile WalSnd *walsnd = MyWalSnd;
1213 Assert(am_walsender);
1215 if (walsnd->state == state)
1218 SpinLockAcquire(&walsnd->mutex);
1219 walsnd->state = state;
1220 SpinLockRelease(&walsnd->mutex);
1224 * Return a string constant representing the state. This is used
1225 * in system views, and should *not* be translated.
1228 WalSndGetStateString(WalSndState state)
1232 case WALSNDSTATE_STARTUP:
1234 case WALSNDSTATE_BACKUP:
1236 case WALSNDSTATE_CATCHUP:
1238 case WALSNDSTATE_STREAMING:
1246 * Returns activity of walsenders, including pids and xlog locations sent to
1250 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
1252 #define PG_STAT_GET_WAL_SENDERS_COLS 6
1253 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1255 Tuplestorestate *tupstore;
1256 MemoryContext per_query_ctx;
1257 MemoryContext oldcontext;
1260 /* check to see if caller supports us returning a tuplestore */
1261 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1263 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1264 errmsg("set-valued function called in context that cannot accept a set")));
1265 if (!(rsinfo->allowedModes & SFRM_Materialize))
1267 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1268 errmsg("materialize mode required, but it is not " \
1269 "allowed in this context")));
1271 /* Build a tuple descriptor for our result type */
1272 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1273 elog(ERROR, "return type must be a row type");
1275 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1276 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1278 tupstore = tuplestore_begin_heap(true, false, work_mem);
1279 rsinfo->returnMode = SFRM_Materialize;
1280 rsinfo->setResult = tupstore;
1281 rsinfo->setDesc = tupdesc;
1283 MemoryContextSwitchTo(oldcontext);
1285 for (i = 0; i < max_wal_senders; i++)
1287 /* use volatile pointer to prevent code rearrangement */
1288 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1289 char location[MAXFNAMELEN];
1295 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
1296 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
1298 if (walsnd->pid == 0)
1301 SpinLockAcquire(&walsnd->mutex);
1302 sentPtr = walsnd->sentPtr;
1303 state = walsnd->state;
1304 write = walsnd->write;
1305 flush = walsnd->flush;
1306 apply = walsnd->apply;
1307 SpinLockRelease(&walsnd->mutex);
1309 memset(nulls, 0, sizeof(nulls));
1310 values[0] = Int32GetDatum(walsnd->pid);
1315 * Only superusers can see details. Other users only get
1316 * the pid value to know it's a walsender, but no details.
1326 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
1328 snprintf(location, sizeof(location), "%X/%X",
1329 sentPtr.xlogid, sentPtr.xrecoff);
1330 values[2] = CStringGetTextDatum(location);
1332 if (write.xlogid == 0 && write.xrecoff == 0)
1334 snprintf(location, sizeof(location), "%X/%X",
1335 write.xlogid, write.xrecoff);
1336 values[3] = CStringGetTextDatum(location);
1338 if (flush.xlogid == 0 && flush.xrecoff == 0)
1340 snprintf(location, sizeof(location), "%X/%X",
1341 flush.xlogid, flush.xrecoff);
1342 values[4] = CStringGetTextDatum(location);
1344 if (apply.xlogid == 0 && apply.xrecoff == 0)
1346 snprintf(location, sizeof(location), "%X/%X",
1347 apply.xlogid, apply.xrecoff);
1348 values[5] = CStringGetTextDatum(location);
1351 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1354 /* clean up and return the tuplestore */
1355 tuplestore_donestoring(tupstore);
1361 * This isn't currently used for anything. Monitoring tools might be
1362 * interested in the future, and we'll need something like this in the
1363 * future for synchronous replication.
1367 * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
1371 GetOldestWALSendPointer(void)
1373 XLogRecPtr oldest = {0, 0};
1377 for (i = 0; i < max_wal_senders; i++)
1379 /* use volatile pointer to prevent code rearrangement */
1380 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
1383 if (walsnd->pid == 0)
1386 SpinLockAcquire(&walsnd->mutex);
1387 recptr = walsnd->sentPtr;
1388 SpinLockRelease(&walsnd->mutex);
1390 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
1393 if (!found || XLByteLT(recptr, oldest))