1 /*-------------------------------------------------------------------------
3 * receivelog.c - receive transaction log files using the streaming
4 * replication protocol.
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
21 #include "receivelog.h"
22 #include "streamutil.h"
25 #include "access/xlog_internal.h"
28 /* fd and filename for currently open WAL file */
29 static int walfile = -1;
30 static char current_walfile_name[MAXPGPATH] = "";
31 static bool reportFlushPosition = false;
32 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
34 static bool still_sending = true; /* feedback still needs to be sent? */
36 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
37 uint32 timeline, char *basedir,
38 stream_stop_callback stream_stop, int standby_message_timeout,
39 char *partial_suffix, XLogRecPtr *stoppos,
41 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
42 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
44 XLogRecPtr blockpos, int64 *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
46 XLogRecPtr *blockpos, uint32 timeline,
47 char *basedir, stream_stop_callback stream_stop,
48 char *partial_suffix);
49 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
50 XLogRecPtr blockpos, char *basedir, char *partial_suffix,
52 static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
53 uint32 timeline, char *basedir,
54 stream_stop_callback stream_stop,
55 char *partial_suffix, XLogRecPtr *stoppos);
56 static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
59 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
63 * Open a new WAL file in the specified directory.
65 * The file will be padded to 16Mb with zeroes. The base filename (without
66 * partial_suffix) is stored in current_walfile_name.
69 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
79 XLByteToSeg(startpoint, segno);
80 XLogFileName(current_walfile_name, timeline, segno);
82 snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
83 partial_suffix ? partial_suffix : "");
84 f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
88 _("%s: could not open transaction log file \"%s\": %s\n"),
89 progname, fn, strerror(errno));
94 * Verify that the file is either empty (just created), or a complete
95 * XLogSegSize segment. Anything in between indicates a corrupt file.
97 if (fstat(f, &statbuf) != 0)
100 _("%s: could not stat transaction log file \"%s\": %s\n"),
101 progname, fn, strerror(errno));
105 if (statbuf.st_size == XLogSegSize)
107 /* File is open and ready to use */
111 if (statbuf.st_size != 0)
114 _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
115 progname, fn, (int) statbuf.st_size, XLogSegSize);
120 /* New, empty, file. So pad it to 16Mb with zeroes */
121 zerobuf = pg_malloc0(XLOG_BLCKSZ);
122 for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
124 if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
127 _("%s: could not pad transaction log file \"%s\": %s\n"),
128 progname, fn, strerror(errno));
137 if (lseek(f, SEEK_SET, 0) != 0)
140 _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
141 progname, fn, strerror(errno));
150 * Close the current WAL file (if open), and rename it to the correct
151 * filename if it's complete. On failure, prints an error message to stderr
152 * and returns false, otherwise returns true.
155 close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
162 currpos = lseek(walfile, 0, SEEK_CUR);
166 _("%s: could not determine seek position in file \"%s\": %s\n"),
167 progname, current_walfile_name, strerror(errno));
171 if (fsync(walfile) != 0)
173 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
174 progname, current_walfile_name, strerror(errno));
178 if (close(walfile) != 0)
180 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
181 progname, current_walfile_name, strerror(errno));
188 * If we finished writing a .partial file, rename it into place.
190 if (currpos == XLOG_SEG_SIZE && partial_suffix)
192 char oldfn[MAXPGPATH];
193 char newfn[MAXPGPATH];
195 snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
196 snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
197 if (rename(oldfn, newfn) != 0)
199 fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
200 progname, current_walfile_name, strerror(errno));
204 else if (partial_suffix)
206 _("%s: not renaming \"%s%s\", segment is not complete\n"),
207 progname, current_walfile_name, partial_suffix);
209 lastFlushPosition = pos;
215 * Check if a timeline history file exists.
218 existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
220 char path[MAXPGPATH];
221 char histfname[MAXFNAMELEN];
225 * Timeline 1 never has a history file. We treat that as if it existed,
226 * since we never need to stream it.
231 TLHistoryFileName(histfname, tli);
233 snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
235 fd = open(path, O_RDONLY | PG_BINARY, 0);
239 fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
240 progname, path, strerror(errno));
251 writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
253 int size = strlen(content);
254 char path[MAXPGPATH];
255 char tmppath[MAXPGPATH];
256 char histfname[MAXFNAMELEN];
260 * Check that the server's idea of how timeline history files should be
261 * named matches ours.
263 TLHistoryFileName(histfname, tli);
264 if (strcmp(histfname, filename) != 0)
266 fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
267 progname, tli, filename);
271 snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
274 * Write into a temp file name.
276 snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
280 fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
283 fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
284 progname, tmppath, strerror(errno));
289 if ((int) write(fd, content, size) != size)
291 int save_errno = errno;
294 * If we fail to make the file, delete it to release disk space
300 fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
301 progname, tmppath, strerror(errno));
308 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
309 progname, tmppath, strerror(errno));
315 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
316 progname, tmppath, strerror(errno));
321 * Now move the completed history file into place with its final name.
323 if (rename(tmppath, path) < 0)
325 fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
326 progname, tmppath, path, strerror(errno));
334 * Send a Standby Status Update message to server.
337 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
339 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
344 fe_sendint64(blockpos, &replybuf[len]); /* write */
346 if (reportFlushPosition)
347 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
349 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
351 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
353 fe_sendint64(now, &replybuf[len]); /* sendTime */
355 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
358 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
360 fprintf(stderr, _("%s: could not send feedback packet: %s"),
361 progname, PQerrorMessage(conn));
369 * Check that the server version we're connected to is supported by
370 * ReceiveXlogStream().
372 * If it's not, an error message is printed to stderr, and false is returned.
375 CheckServerVersionForStreaming(PGconn *conn)
382 * The message format used in streaming replication changed in 9.3, so we
383 * cannot stream from older servers. And we don't support servers newer
384 * than the client; it might work, but we don't know, so err on the safe
387 minServerMajor = 903;
388 maxServerMajor = PG_VERSION_NUM / 100;
389 serverMajor = PQserverVersion(conn) / 100;
390 if (serverMajor < minServerMajor)
392 const char *serverver = PQparameterStatus(conn, "server_version");
394 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
396 serverver ? serverver : "'unknown'",
400 else if (serverMajor > maxServerMajor)
402 const char *serverver = PQparameterStatus(conn, "server_version");
404 fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
406 serverver ? serverver : "'unknown'",
414 * Receive a log stream starting at the specified position.
416 * If sysidentifier is specified, validate that both the system
417 * identifier and the timeline matches the specified ones
418 * (by sending an extra IDENTIFY_SYSTEM command)
420 * All received segments will be written to the directory
421 * specified by basedir. This will also fetch any missing timeline history
424 * The stream_stop callback will be called every time data
425 * is received, and whenever a segment is completed. If it returns
426 * true, the streaming will stop and the function
427 * return. As long as it returns false, streaming will continue
430 * standby_message_timeout controls how often we send a message
431 * back to the master letting it know our progress, in milliseconds.
432 * This message will only contain the write location, and never
435 * If 'partial_suffix' is not NULL, files are initially created with the
436 * given suffix, and the suffix is removed once the file is finished. That
437 * allows you to tell the difference between partial and completed files,
438 * so that you can continue later where you left.
440 * If 'synchronous' is true, the received WAL is flushed as soon as written,
441 * otherwise only when the WAL file is closed.
443 * Note: The log position *must* be at a log segment start!
446 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
447 char *sysidentifier, char *basedir,
448 stream_stop_callback stream_stop,
449 int standby_message_timeout, char *partial_suffix,
458 * The caller should've checked the server version already, but doesn't do
459 * any harm to check it here too.
461 if (!CheckServerVersionForStreaming(conn))
464 if (replication_slot != NULL)
467 * Report the flush position, so the primary can know what WAL we'll
468 * possibly re-request, and remove older WAL safely.
470 * We only report it when a slot has explicitly been used, because
471 * reporting the flush position makes one eligible as a synchronous
472 * replica. People shouldn't include generic names in
473 * synchronous_standby_names, but we've protected them against it so
474 * far, so let's continue to do so in the situations when possible. If
475 * they've got a slot, though, we need to report the flush position,
476 * so that the master can remove WAL.
478 reportFlushPosition = true;
479 sprintf(slotcmd, "SLOT \"%s\" ", replication_slot);
483 reportFlushPosition = false;
487 if (sysidentifier != NULL)
489 /* Validate system identifier hasn't changed */
490 res = PQexec(conn, "IDENTIFY_SYSTEM");
491 if (PQresultStatus(res) != PGRES_TUPLES_OK)
494 _("%s: could not send replication command \"%s\": %s"),
495 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
499 if (PQntuples(res) != 1 || PQnfields(res) < 3)
502 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
503 progname, PQntuples(res), PQnfields(res), 1, 3);
507 if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
510 _("%s: system identifier does not match between base backup and streaming connection\n"),
515 if (timeline > atoi(PQgetvalue(res, 0, 1)))
518 _("%s: starting timeline %u is not present in the server\n"),
527 * initialize flush position to starting point, it's the caller's
528 * responsibility that that's sane.
530 lastFlushPosition = startpos;
535 * Fetch the timeline history file for this timeline, if we don't have
538 if (!existsTimeLineHistoryFile(basedir, timeline))
540 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
541 res = PQexec(conn, query);
542 if (PQresultStatus(res) != PGRES_TUPLES_OK)
544 /* FIXME: we might send it ok, but get an error */
545 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
546 progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
552 * The response to TIMELINE_HISTORY is a single row result set
553 * with two fields: filename and content
555 if (PQnfields(res) != 2 || PQntuples(res) != 1)
558 _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
559 progname, PQntuples(res), PQnfields(res), 1, 2);
562 /* Write the history file to disk */
563 writeTimeLineHistoryFile(basedir, timeline,
564 PQgetvalue(res, 0, 0),
565 PQgetvalue(res, 0, 1));
571 * Before we start streaming from the requested location, check if the
572 * callback tells us to stop here.
574 if (stream_stop(startpos, timeline, false))
577 /* Initiate the replication stream at specified location */
578 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
580 (uint32) (startpos >> 32), (uint32) startpos,
582 res = PQexec(conn, query);
583 if (PQresultStatus(res) != PGRES_COPY_BOTH)
585 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
586 progname, "START_REPLICATION", PQresultErrorMessage(res));
593 res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
594 standby_message_timeout, partial_suffix,
595 &stoppos, synchronous);
600 * Streaming finished.
602 * There are two possible reasons for that: a controlled shutdown, or
603 * we reached the end of the current timeline. In case of
604 * end-of-timeline, the server sends a result set after Copy has
605 * finished, containing information about the next timeline. Read
606 * that, and restart streaming from the next timeline. In case of
607 * controlled shutdown, stop here.
609 if (PQresultStatus(res) == PGRES_TUPLES_OK)
612 * End-of-timeline. Read the next timeline's ID and starting
613 * position. Usually, the starting position will match the end of
614 * the previous timeline, but there are corner cases like if the
615 * server had sent us half of a WAL record, when it was promoted.
616 * The new timeline will begin at the end of the last complete
617 * record in that case, overlapping the partial WAL record on the
623 parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
628 /* Sanity check the values the server gave us */
629 if (newtimeline <= timeline)
632 _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
633 progname, newtimeline, timeline);
636 if (startpos > stoppos)
639 _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
641 timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
642 newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
646 /* Read the final result, which should be CommandComplete. */
647 res = PQgetResult(conn);
648 if (PQresultStatus(res) != PGRES_COMMAND_OK)
651 _("%s: unexpected termination of replication stream: %s"),
652 progname, PQresultErrorMessage(res));
659 * Loop back to start streaming from the new timeline. Always
660 * start streaming at the beginning of a segment.
662 timeline = newtimeline;
663 startpos = startpos - (startpos % XLOG_SEG_SIZE);
666 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
671 * End of replication (ie. controlled shut down of the server).
673 * Check if the callback thinks it's OK to stop here. If not,
676 if (stream_stop(stoppos, timeline, false))
680 fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
687 /* Server returned an error. */
689 _("%s: unexpected termination of replication stream: %s"),
690 progname, PQresultErrorMessage(res));
697 if (walfile != -1 && close(walfile) != 0)
698 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
699 progname, current_walfile_name, strerror(errno));
705 * Helper function to parse the result set returned by server after streaming
706 * has finished. On failure, prints an error to stderr and returns false.
709 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
711 uint32 startpos_xlogid,
715 * The result set consists of one row and two columns, e.g:
717 * next_tli | next_tli_startpos
718 * ----------+-------------------
721 * next_tli is the timeline ID of the next timeline after the one that
722 * just finished streaming. next_tli_startpos is the XLOG position where
723 * the server switched to it.
726 if (PQnfields(res) < 2 || PQntuples(res) != 1)
729 _("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
730 progname, PQntuples(res), PQnfields(res), 1, 2);
734 *timeline = atoi(PQgetvalue(res, 0, 0));
735 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
736 &startpos_xrecoff) != 2)
739 _("%s: could not parse next timeline's starting point \"%s\"\n"),
740 progname, PQgetvalue(res, 0, 1));
743 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
749 * The main loop of ReceiveXlogStream. Handles the COPY stream after
750 * initiating streaming with the START_STREAMING command.
752 * If the COPY ends (not necessarily successfully) due a message from the
753 * server, returns a PGresult and sets *stoppos to the last byte written.
754 * On any other sort of error, returns NULL.
757 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
758 char *basedir, stream_stop_callback stream_stop,
759 int standby_message_timeout, char *partial_suffix,
760 XLogRecPtr *stoppos, bool synchronous)
762 char *copybuf = NULL;
763 int64 last_status = -1;
764 XLogRecPtr blockpos = startpos;
766 still_sending = true;
775 * Check if we should continue streaming, or abort at this point.
777 if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
778 stream_stop, partial_suffix, stoppos))
781 now = feGetCurrentTimestamp();
784 * If synchronous option is true, issue sync command as soon as
785 * there are WAL data which has not been flushed yet.
787 if (synchronous && lastFlushPosition < blockpos && walfile != -1)
789 if (fsync(walfile) != 0)
791 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
792 progname, current_walfile_name, strerror(errno));
795 lastFlushPosition = blockpos;
798 * Send feedback so that the server sees the latest WAL locations
801 if (!sendFeedback(conn, blockpos, now, false))
807 * Potentially send a status message to the master
809 if (still_sending && standby_message_timeout > 0 &&
810 feTimestampDifferenceExceeds(last_status, now,
811 standby_message_timeout))
813 /* Time to send feedback! */
814 if (!sendFeedback(conn, blockpos, now, false))
820 * Calculate how long send/receive loops should sleep
822 sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
825 r = CopyStreamReceive(conn, sleeptime, ©buf);
832 PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
833 basedir, partial_suffix, stoppos);
840 /* Check the message type. */
841 if (copybuf[0] == 'k')
843 if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
847 else if (copybuf[0] == 'w')
849 if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
850 timeline, basedir, stream_stop, partial_suffix))
854 * Check if we should continue streaming, or abort at this point.
856 if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
857 stream_stop, partial_suffix, stoppos))
862 fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
863 progname, copybuf[0]);
868 * Process the received data, and any subsequent data we
869 * can read without blocking.
871 r = CopyStreamReceive(conn, 0, ©buf);
882 * Wait until we can read CopyData message, or timeout.
884 * Returns 1 if data has become available for reading, 0 if timed out
885 * or interrupted by signal, and -1 on an error.
888 CopyStreamPoll(PGconn *conn, long timeout_ms)
892 struct timeval timeout;
893 struct timeval *timeoutptr;
895 if (PQsocket(conn) < 0)
897 fprintf(stderr, _("%s: socket not open"), progname);
901 FD_ZERO(&input_mask);
902 FD_SET(PQsocket(conn), &input_mask);
908 timeout.tv_sec = timeout_ms / 1000L;
909 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
910 timeoutptr = &timeout;
913 ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
914 if (ret == 0 || (ret < 0 && errno == EINTR))
915 return 0; /* Got a timeout or signal */
918 fprintf(stderr, _("%s: select() failed: %s\n"),
919 progname, strerror(errno));
927 * Receive CopyData message available from XLOG stream, blocking for
928 * maximum of 'timeout' ms.
930 * If data was received, returns the length of the data. *buffer is set to
931 * point to a buffer holding the received message. The buffer is only valid
932 * until the next CopyStreamReceive call.
934 * 0 if no data was available within timeout, or wait was interrupted
935 * by signal. -1 on error. -2 if the server ended the COPY.
938 CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
940 char *copybuf = NULL;
947 /* Try to receive a CopyData message */
948 rawlen = PQgetCopyData(conn, ©buf, 1);
952 * No data available. Wait for some to appear, but not longer than
953 * the specified timeout, so that we can ping the server.
959 ret = CopyStreamPoll(conn, timeout);
964 /* Else there is actually data on the socket */
965 if (PQconsumeInput(conn) == 0)
968 _("%s: could not receive data from WAL stream: %s"),
969 progname, PQerrorMessage(conn));
973 /* Now that we've consumed some input, try again */
974 rawlen = PQgetCopyData(conn, ©buf, 1);
978 if (rawlen == -1) /* end-of-streaming or error */
982 fprintf(stderr, _("%s: could not read COPY data: %s"),
983 progname, PQerrorMessage(conn));
987 /* Return received messages to caller */
993 * Process the keepalive message.
996 ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
997 XLogRecPtr blockpos, int64 *last_status)
1000 bool replyRequested;
1004 * Parse the keepalive message, enclosed in the CopyData message.
1005 * We just check if the server requested a reply, and ignore the
1008 pos = 1; /* skip msgtype 'k' */
1009 pos += 8; /* skip walEnd */
1010 pos += 8; /* skip sendTime */
1014 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1018 replyRequested = copybuf[pos];
1020 /* If the server requested an immediate reply, send one. */
1021 if (replyRequested && still_sending)
1023 if (reportFlushPosition && lastFlushPosition < blockpos &&
1027 * If a valid flush location needs to be reported,
1028 * flush the current WAL file so that the latest flush
1029 * location is sent back to the server. This is necessary to
1030 * see whether the last WAL data has been successfully
1031 * replicated or not, at the normal shutdown of the server.
1033 if (fsync(walfile) != 0)
1035 fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
1036 progname, current_walfile_name, strerror(errno));
1039 lastFlushPosition = blockpos;
1042 now = feGetCurrentTimestamp();
1043 if (!sendFeedback(conn, blockpos, now, false))
1052 * Process XLogData message.
1055 ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
1056 XLogRecPtr *blockpos, uint32 timeline,
1057 char *basedir, stream_stop_callback stream_stop,
1058 char *partial_suffix)
1066 * Once we've decided we don't want to receive any more, just
1067 * ignore any subsequent XLogData messages.
1069 if (!(still_sending))
1073 * Read the header of the XLogData message, enclosed in the
1074 * CopyData message. We only need the WAL location field
1075 * (dataStart), the rest of the header is ignored.
1077 hdr_len = 1; /* msgtype 'w' */
1078 hdr_len += 8; /* dataStart */
1079 hdr_len += 8; /* walEnd */
1080 hdr_len += 8; /* sendTime */
1083 fprintf(stderr, _("%s: streaming header too small: %d\n"),
1087 *blockpos = fe_recvint64(©buf[1]);
1089 /* Extract WAL location for this block */
1090 xlogoff = *blockpos % XLOG_SEG_SIZE;
1093 * Verify that the initial location in the stream matches where we
1098 /* No file open yet */
1102 _("%s: received transaction log record for offset %u with no file open\n"),
1109 /* More data in existing segment */
1110 /* XXX: store seek value don't reseek all the time */
1111 if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
1114 _("%s: got WAL data offset %08x, expected %08x\n"),
1115 progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
1120 bytes_left = len - hdr_len;
1128 * If crossing a WAL boundary, only write up until we reach
1131 if (xlogoff + bytes_left > XLOG_SEG_SIZE)
1132 bytes_to_write = XLOG_SEG_SIZE - xlogoff;
1134 bytes_to_write = bytes_left;
1138 if (!open_walfile(*blockpos, timeline,
1139 basedir, partial_suffix))
1141 /* Error logged by open_walfile */
1147 copybuf + hdr_len + bytes_written,
1148 bytes_to_write) != bytes_to_write)
1151 _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
1152 progname, bytes_to_write, current_walfile_name,
1157 /* Write was successful, advance our position */
1158 bytes_written += bytes_to_write;
1159 bytes_left -= bytes_to_write;
1160 *blockpos += bytes_to_write;
1161 xlogoff += bytes_to_write;
1163 /* Did we reach the end of a WAL segment? */
1164 if (*blockpos % XLOG_SEG_SIZE == 0)
1166 if (!close_walfile(basedir, partial_suffix, *blockpos))
1167 /* Error message written in close_walfile() */
1172 if (still_sending && stream_stop(*blockpos, timeline, true))
1174 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1176 fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1177 progname, PQerrorMessage(conn));
1180 still_sending = false;
1181 return true; /* ignore the rest of this XLogData packet */
1185 /* No more data left to write, receive next copy packet */
1191 * Handle end of the copy stream.
1194 HandleEndOfCopyStream(PGconn *conn, char *copybuf,
1195 XLogRecPtr blockpos, char *basedir, char *partial_suffix,
1196 XLogRecPtr *stoppos)
1198 PGresult *res = PQgetResult(conn);
1201 * The server closed its end of the copy stream. If we haven't
1202 * closed ours already, we need to do so now, unless the server
1203 * threw an error, in which case we don't.
1207 if (!close_walfile(basedir, partial_suffix, blockpos))
1209 /* Error message written in close_walfile() */
1213 if (PQresultStatus(res) == PGRES_COPY_IN)
1215 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1218 _("%s: could not send copy-end packet: %s"),
1219 progname, PQerrorMessage(conn));
1223 res = PQgetResult(conn);
1225 still_sending = false;
1227 if (copybuf != NULL)
1229 *stoppos = blockpos;
1234 * Check if we should continue streaming, or abort at this point.
1237 CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
1238 char *basedir, stream_stop_callback stream_stop,
1239 char *partial_suffix, XLogRecPtr *stoppos)
1241 if (still_sending && stream_stop(blockpos, timeline, false))
1243 if (!close_walfile(basedir, partial_suffix, blockpos))
1245 /* Potential error message is written by close_walfile */
1248 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1250 fprintf(stderr, _("%s: could not send copy-end packet: %s"),
1251 progname, PQerrorMessage(conn));
1254 still_sending = false;
1261 * Calculate how long send/receive loops should sleep
1264 CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
1267 int64 status_targettime = 0;
1270 if (standby_message_timeout && still_sending)
1271 status_targettime = last_status +
1272 (standby_message_timeout - 1) * ((int64) 1000);
1274 if (status_targettime > 0)
1279 feTimestampDifference(now,
1283 /* Always sleep at least 1 sec */
1290 sleeptime = secs * 1000 + usecs / 1000;