From: Heikki Linnakangas Date: Thu, 17 Jan 2013 18:23:00 +0000 (+0200) Subject: Make pg_receivexlog and pg_basebackup -X stream work across timeline switches. X-Git-Tag: REL9_3_BETA1~486 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=0b6329130e8e4576e97ff763f0e773347e1a88af;p=postgresql Make pg_receivexlog and pg_basebackup -X stream work across timeline switches. This mirrors the changes done earlier to the server in standby mode. When receivelog reaches the end of a timeline, as reported by the server, it fetches the timeline history file of the next timeline, and restarts streaming from the new timeline by issuing a new START_STREAMING command. When pg_receivexlog crosses a timeline, it leaves the .partial suffix on the last segment on the old timeline. This helps you to tell apart a partial segment left in the directory because of a timeline switch, and a completed segment. If you just follow a single server, it won't make a difference, but it can be significant in more complicated scenarios where new WAL is still generated on the old timeline. This includes two small changes to the streaming replication protocol: First, when you reach the end of timeline while streaming, the server now sends the TLI of the next timeline in the server's history to the client. pg_receivexlog uses that as the next timeline, so that it doesn't need to parse the timeline history file like a standby server does. Second, when BASE_BACKUP command sends the begin and end WAL positions, it now also sends the timeline IDs corresponding the positions. --- diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e14627c201..baae59de6e 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1418,8 +1418,10 @@ The commands accepted in walsender mode are: After streaming all the WAL on a timeline that is not the latest one, the server will end streaming by exiting the COPY mode. When the client - acknowledges this by also exiting COPY mode, the server responds with a - CommandComplete message, and is ready to accept a new command. + acknowledges this by also exiting COPY mode, the server sends a + single-row, single-column result set indicating the next timeline in + this server's history. That is followed by a CommandComplete message, + and the server is ready to accept a new command. @@ -1784,7 +1786,9 @@ The commands accepted in walsender mode are: The first ordinary result set contains the starting position of the - backup, given in XLogRecPtr format as a single column in a single row. + backup, in a single row with two columns. The first column contains + the start position given in XLogRecPtr format, and the second column + contains the corresponding timeline ID. The second ordinary result set has one row for each tablespace. @@ -1827,7 +1831,9 @@ The commands accepted in walsender mode are: ustar interchange format specified in the POSIX 1003.1-2008 standard) dump of the tablespace contents, except that the two trailing blocks of zeroes specified in the standard are omitted. - After the tar data is complete, a final ordinary result set will be sent. + After the tar data is complete, a final ordinary result set will be sent, + containing the WAL end position of the backup, in the same format as + the start position. diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index 46379bbff8..ad4f3162c5 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history) } /* - * Returns the point in history where we branched off the given timeline. - * Returns InvalidXLogRecPtr if the timeline is current (= we have not - * branched off from it), and throws an error if the timeline is not part of - * this server's history. + * Returns the point in history where we branched off the given timeline, + * and the timeline we branched to (*nextTLI). Returns InvalidXLogRecPtr if + * the timeline is current, ie. we have not branched off from it, and throws + * an error if the timeline is not part of this server's history. */ XLogRecPtr -tliSwitchPoint(TimeLineID tli, List *history) +tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI) { ListCell *cell; + if (nextTLI) + *nextTLI = 0; foreach (cell, history) { TimeLineHistoryEntry *tle = (TimeLineHistoryEntry *) lfirst(cell); if (tle->tli == tli) return tle->end; + if (nextTLI) + *nextTLI = tle->tli; } ereport(ERROR, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ac2b26b498..90ba32ef0f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4930,7 +4930,7 @@ StartupXLOG(void) * tliSwitchPoint will throw an error if the checkpoint's timeline * is not in expectedTLEs at all. */ - switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs); + switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL); ereport(FATAL, (errmsg("requested timeline %u is not a child of this server's history", recoveryTargetTLI), @@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno) * non-exclusive backups active at the same time, and they don't conflict * with an exclusive backup either. * + * Returns the minimum WAL position that must be present to restore from this + * backup, and the corresponding timeline ID in *starttli_p. + * * Every successfully started non-exclusive backup must be stopped by calling * do_pg_stop_backup() or do_pg_abort_backup(). */ XLogRecPtr -do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) +do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, + char **labelfile) { bool exclusive = (labelfile == NULL); bool backup_started_in_recovery = false; XLogRecPtr checkpointloc; XLogRecPtr startpoint; + TimeLineID starttli; pg_time_t stamp_time; char strfbuf[128]; char xlogfilename[MAXFNAMELEN]; @@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) LWLockAcquire(ControlFileLock, LW_SHARED); checkpointloc = ControlFile->checkPoint; startpoint = ControlFile->checkPointCopy.redo; + starttli = ControlFile->checkPointCopy.ThisTimeLineID; checkpointfpw = ControlFile->checkPointCopy.fullPageWrites; LWLockRelease(ControlFileLock); @@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) /* * We're done. As a convenience, return the starting WAL location. */ + if (starttli_p) + *starttli_p = starttli; return startpoint; } @@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg) * If labelfile is NULL, this stops an exclusive backup. Otherwise this stops * the non-exclusive backup specified by 'labelfile'. + * + * Returns the last WAL position that must be present to restore from this + * backup, and the corresponding timeline ID in *stoptli_p. */ XLogRecPtr -do_pg_stop_backup(char *labelfile, bool waitforarchive) +do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { bool exclusive = (labelfile == NULL); bool backup_started_in_recovery = false; XLogRecPtr startpoint; XLogRecPtr stoppoint; + TimeLineID stoptli; XLogRecData rdata; pg_time_t stamp_time; char strfbuf[128]; @@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) LWLockAcquire(ControlFileLock, LW_SHARED); stoppoint = ControlFile->minRecoveryPoint; + stoptli = ControlFile->minRecoveryPointTLI; LWLockRelease(ControlFileLock); + if (stoptli_p) + *stoptli_p = stoptli; return stoppoint; } @@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) rdata.buffer = InvalidBuffer; rdata.next = NULL; stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata); + stoptli = ThisTimeLineID; /* * Force a switch to a new xlog segment file, so that the backup is valid @@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) /* * We're done. As a convenience, return the ending WAL location. */ + if (stoptli_p) + *stoptli_p = stoptli; return stoppoint; } diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 96db5dbbf5..b6bb6773d6 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS) backupidstr = text_to_cstring(backupid); - startpoint = do_pg_start_backup(backupidstr, fast, NULL); + startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL); snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X", (uint32) (startpoint >> 32), (uint32) startpoint); @@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS) XLogRecPtr stoppoint; char stopxlogstr[MAXFNAMELEN]; - stoppoint = do_pg_stop_backup(NULL, true); + stoppoint = do_pg_stop_backup(NULL, true, NULL); snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X", (uint32) (stoppoint >> 32), (uint32) stoppoint); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 2330fcc23a..57946a9fa9 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces); static void base_backup_cleanup(int code, Datum arg); static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); -static void SendXlogRecPtrResult(XLogRecPtr ptr); +static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); /* Was the backup currently in-progress initiated in recovery mode? */ @@ -94,13 +94,16 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir) { XLogRecPtr startptr; + TimeLineID starttli; XLogRecPtr endptr; + TimeLineID endtli; char *labelfile; backup_started_in_recovery = RecoveryInProgress(); - startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); - SendXlogRecPtrResult(startptr); + startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, + &labelfile); + SendXlogRecPtrResult(startptr, starttli); PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); { @@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) } PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); - endptr = do_pg_stop_backup(labelfile, !opt->nowait); + endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli); if (opt->includewal) { @@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send CopyDone message for the last tar file */ pq_putemptymessage('c'); } - SendXlogRecPtrResult(endptr); + SendXlogRecPtrResult(endptr, endtli); } /* @@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces) * XlogRecPtr record (in text format) */ static void -SendXlogRecPtrResult(XLogRecPtr ptr) +SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) { StringInfoData buf; char str[MAXFNAMELEN]; - snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 1, 2); /* 1 field */ + pq_sendint(&buf, 2, 2); /* 2 fields */ - /* Field header */ + /* Field headers */ pq_sendstring(&buf, "recptr"); pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ @@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + pq_sendstring(&buf, "tli"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* + * int8 may seem like a surprising data type for this, but in thory int4 + * would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, 2, 2); /* number of columns */ + + snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + + snprintf(str, sizeof(str), "%u", tli); pq_sendint(&buf, strlen(str), 4); /* length */ pq_sendbytes(&buf, str, strlen(str)); pq_endmessage(&buf); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ad7d1c911b..ba138e73da 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -117,6 +117,7 @@ static uint32 sendOff = 0; * history forked off from that timeline at sendTimeLineValidUpto. */ static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; @@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd) * requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); - switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, + &sendTimeLineNextTLI); list_free_deep(timeLineHistory); /* @@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd) streamingDoneSending = streamingDoneReceiving = false; /* If there is nothing to stream, don't even enter COPY mode */ - if (!sendTimeLineIsHistoric || - cmd->startpoint < sendTimeLineValidUpto) + if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the primary. @@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd) if (walsender_ready_to_stop) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); + + Assert(streamingDoneSending && streamingDoneReceiving); + } + + /* + * Copy is finished now. Send a single-row result set indicating the next + * timeline. + */ + if (sendTimeLineIsHistoric) + { + char str[11]; + snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* Field header */ + pq_sendstring(&buf, "next_tli"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* + * int8 may seem like a surprising data type for this, but in theory + * int4 would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + pq_endmessage(&buf); } - /* Get out of COPY mode (CommandComplete). */ - EndCommand("COPY 0", DestRemote); + /* Send CommandComplete message */ + pq_puttextmessage('C', "START_STREAMING"); } /* @@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup) List *history; history = readTimeLineHistory(ThisTimeLineID); - sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); /* the current send pointer should be <= the switchpoint */ diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a684c0c6fc..b6f774469b 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -243,7 +243,7 @@ LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, - true)) + NULL)) /* * Any errors will already have been reported in the function process, @@ -1220,7 +1220,7 @@ BaseBackup(void) { PGresult *res; char *sysidentifier; - uint32 timeline; + uint32 starttli; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; @@ -1259,7 +1259,6 @@ BaseBackup(void) disconnect_and_exit(1); } sysidentifier = pg_strdup(PQgetvalue(res, 0, 0)); - timeline = atoi(PQgetvalue(res, 0, 1)); PQclear(res); /* @@ -1291,18 +1290,24 @@ BaseBackup(void) progname, PQerrorMessage(conn)); disconnect_and_exit(1); } - if (PQntuples(res) != 1) + if (PQntuples(res) != 1 || PQnfields(res) < 2) { - fprintf(stderr, _("%s: no start point returned from server\n"), - progname); + fprintf(stderr, + _("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 2); disconnect_and_exit(1); } + strcpy(xlogstart, PQgetvalue(res, 0, 0)); - if (verbose && includewal) - fprintf(stderr, "transaction log start point: %s\n", xlogstart); + starttli = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); MemSet(xlogend, 0, sizeof(xlogend)); + if (verbose && includewal) + fprintf(stderr, _("transaction log start point: %s on timeline %u\n"), + xlogstart, starttli); + /* * Get the header */ @@ -1358,7 +1363,7 @@ BaseBackup(void) if (verbose) fprintf(stderr, _("%s: starting background WAL receiver\n"), progname); - StartLogStreamer(xlogstart, timeline, sysidentifier); + StartLogStreamer(xlogstart, starttli, sysidentifier); } /* diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 7f2db1946e..33dbc50389 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -39,8 +39,7 @@ volatile bool time_to_abort = false; static void usage(void); -static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, - uint32 currenttimeline); +static XLogRecPtr FindStreamingStart(uint32 *tli); static void StreamLog(); static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); @@ -70,14 +69,31 @@ usage(void) } static bool -stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) +stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) { + static uint32 prevtimeline = 0; + static XLogRecPtr prevpos = InvalidXLogRecPtr; + + /* we assume that we get called once at the end of each segment */ if (verbose && segment_finished) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), - progname, - (uint32) (segendpos >> 32), (uint32) segendpos, + progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); + /* + * Note that we report the previous, not current, position here. That's + * the exact location where the timeline switch happend. After the switch, + * we restart streaming from the beginning of the segment, so xlogpos can + * smaller than prevpos if we just switched to new timeline. + */ + if (prevtimeline != 0 && prevtimeline != timeline) + fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"), + progname, timeline, + (uint32) (prevpos >> 32), (uint32) prevpos); + + prevtimeline = timeline; + prevpos = xlogpos; + if (time_to_abort) { fprintf(stderr, _("%s: received interrupt signal, exiting\n"), @@ -88,20 +104,19 @@ stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) } /* - * Determine starting location for streaming, based on: - * 1. If there are existing xlog segments, start at the end of the last one - * that is complete (size matches XLogSegSize) - * 2. If no valid xlog exists, start from the beginning of the current - * WAL segment. + * Determine starting location for streaming, based on any existing xlog + * segments in the directory. We start at the end of the last one that is + * complete (size matches XLogSegSize), on the timeline with highest ID. + * + * If there are no WAL files in the directory, returns InvalidXLogRecPtr. */ static XLogRecPtr -FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) +FindStreamingStart(uint32 *tli) { DIR *dir; struct dirent *dirent; - int i; - bool b; XLogSegNo high_segno = 0; + uint32 high_tli = 0; dir = opendir(basedir); if (dir == NULL) @@ -120,26 +135,13 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) seg; XLogSegNo segno; - if (strcmp(dirent->d_name, ".") == 0 || - strcmp(dirent->d_name, "..") == 0) - continue; - - /* xlog files are always 24 characters */ - if (strlen(dirent->d_name) != 24) - continue; - - /* Filenames are always made out of 0-9 and A-F */ - b = false; - for (i = 0; i < 24; i++) - { - if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') && - !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F')) - { - b = true; - break; - } - } - if (b) + /* + * Check if the filename looks like an xlog file, or a .partial file. + * Xlog files are always 24 characters, and .partial files are 32 + * characters. + */ + if (strlen(dirent->d_name) != 24 || + !strspn(dirent->d_name, "0123456789ABCDEF") == 24) continue; /* @@ -154,10 +156,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) } segno = ((uint64) log) << 32 | seg; - /* Ignore any files that are for another timeline */ - if (tli != currenttimeline) - continue; - /* Check if this is a completed segment or not */ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); if (stat(fullpath, &statbuf) != 0) @@ -170,9 +168,10 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) if (statbuf.st_size == XLOG_SEG_SIZE) { /* Completed segment */ - if (segno > high_segno) + if (segno > high_segno || (segno == high_segno && tli > high_tli)) { high_segno = segno; + high_tli = tli; continue; } } @@ -199,10 +198,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr); + *tli = high_tli; return high_ptr; } else - return currentpos; + return InvalidXLogRecPtr; } /* @@ -212,8 +212,10 @@ static void StreamLog(void) { PGresult *res; - uint32 timeline; XLogRecPtr startpos; + uint32 starttli; + XLogRecPtr serverpos; + uint32 servertli; uint32 hi, lo; @@ -243,7 +245,7 @@ StreamLog(void) progname, PQntuples(res), PQnfields(res), 1, 3); disconnect_and_exit(1); } - timeline = atoi(PQgetvalue(res, 0, 1)); + servertli = atoi(PQgetvalue(res, 0, 1)); if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) { fprintf(stderr, @@ -251,13 +253,18 @@ StreamLog(void) progname, PQgetvalue(res, 0, 2)); disconnect_and_exit(1); } - startpos = ((uint64) hi) << 32 | lo; + serverpos = ((uint64) hi) << 32 | lo; PQclear(res); /* * Figure out where to start streaming. */ - startpos = FindStreamingStart(startpos, timeline); + startpos = FindStreamingStart(&starttli); + if (startpos == InvalidXLogRecPtr) + { + startpos = serverpos; + starttli = servertli; + } /* * Always start streaming at the beginning of a segment @@ -271,10 +278,10 @@ StreamLog(void) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), progname, (uint32) (startpos >> 32), (uint32) startpos, - timeline); + starttli); - ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, - stop_streaming, standby_message_timeout, false); + ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, + stop_streaming, standby_message_timeout, ".partial"); PQfinish(conn); } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 88d0c136b0..03e275cb5b 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -28,19 +28,24 @@ #include "streamutil.h" -/* fd for currently open WAL file */ +/* fd and filename for currently open WAL file */ static int walfile = -1; +static char current_walfile_name[MAXPGPATH] = ""; + +static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + int standby_message_timeout, char *partial_suffix, + XLogRecPtr *stoppos); /* - * Open a new WAL file in the specified directory. Store the name - * (not including the full directory) in namebuf. Assumes there is - * enough room in this buffer... + * Open a new WAL file in the specified directory. * - * The file will be padded to 16Mb with zeroes. + * The file will be padded to 16Mb with zeroes. The base filename (without + * partial_suffix) is stored in current_walfile_name. */ -static int +static bool open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, - char *namebuf) + char *partial_suffix) { int f; char fn[MAXPGPATH]; @@ -50,16 +55,17 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, XLogSegNo segno; XLByteToSeg(startpoint, segno); - XLogFileName(namebuf, timeline, segno); + XLogFileName(current_walfile_name, timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf); + snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, + partial_suffix ? partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { fprintf(stderr, _("%s: could not open transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); - return -1; + return false; } /* @@ -72,17 +78,21 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, _("%s: could not stat transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); close(f); - return -1; + return false; } if (statbuf.st_size == XLogSegSize) - return f; /* File is open and ready to use */ + { + /* File is open and ready to use */ + walfile = f; + return true; + } if (statbuf.st_size != 0) { fprintf(stderr, _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"), progname, fn, (int) statbuf.st_size, XLogSegSize); close(f); - return -1; + return false; } /* New, empty, file. So pad it to 16Mb with zeroes */ @@ -97,7 +107,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, free(zerobuf); close(f); unlink(fn); - return -1; + return false; } } free(zerobuf); @@ -108,42 +118,45 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"), progname, fn, strerror(errno)); close(f); - return -1; + return false; } - return f; + walfile = f; + return true; } /* - * Close the current WAL file, and rename it to the correct filename if it's - * complete. - * - * If segment_complete is true, rename the current WAL file even if we've not - * completed writing the whole segment. + * Close the current WAL file (if open), and rename it to the correct + * filename if it's complete. On failure, prints an error message to stderr + * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *walname, bool segment_complete) +close_walfile(char *basedir, char *partial_suffix) { - off_t currpos = lseek(walfile, 0, SEEK_CUR); + off_t currpos; + + if (walfile == -1) + return true; + currpos = lseek(walfile, 0, SEEK_CUR); if (currpos == -1) { fprintf(stderr, _("%s: could not determine seek position in file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } if (fsync(walfile) != 0) { fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } if (close(walfile) != 0) { fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); walfile = -1; return false; } @@ -153,24 +166,24 @@ close_walfile(char *basedir, char *walname, bool segment_complete) * Rename the .partial file only if we've completed writing the whole * segment or segment_complete is true. */ - if (currpos == XLOG_SEG_SIZE || segment_complete) + if (currpos == XLOG_SEG_SIZE && partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; - snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname); - snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); + snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); + snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), - progname, walname, strerror(errno)); + progname, current_walfile_name, strerror(errno)); return false; } } - else + else if (partial_suffix) fprintf(stderr, - _("%s: not renaming \"%s\", segment is not complete\n"), - progname, walname); + _("%s: not renaming \"%s%s\", segment is not complete\n"), + progname, current_walfile_name, partial_suffix); return true; } @@ -233,6 +246,123 @@ localTimestampDifferenceExceeds(int64 start_time, return (diff >= msec * INT64CONST(1000)); } +/* + * Check if a timeline history file exists. + */ +static bool +existsTimeLineHistoryFile(char *basedir, TimeLineID tli) +{ + char path[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + int fd; + + /* + * Timeline 1 never has a history file. We treat that as if it existed, + * since we never need to stream it. + */ + if (tli == 1) + return true; + + TLHistoryFileName(histfname, tli); + + snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + + fd = open(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + { + if (errno != ENOENT) + fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s"), + progname, path, strerror(errno)); + return false; + } + else + { + close(fd); + return true; + } +} + +static bool +writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content) +{ + int size = strlen(content); + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + char histfname[MAXFNAMELEN]; + int fd; + + /* + * Check that the server's idea of how timeline history files should be + * named matches ours. + */ + TLHistoryFileName(histfname, tli); + if (strcmp(histfname, filename) != 0) + { + fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s"), + progname, tli, filename); + return false; + } + + /* + * Write into a temp file name. + */ + snprintf(tmppath, MAXPGPATH, "%s.tmp", path); + + unlink(tmppath); + + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s"), + progname, tmppath, strerror(errno)); + return false; + } + + errno = 0; + if ((int) write(fd, content, size) != size) + { + int save_errno = errno; + + /* + * If we fail to make the file, delete it to release disk space + */ + unlink(tmppath); + errno = save_errno; + + fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s"), + progname, tmppath, strerror(errno)); + return false; + } + + if (fsync(fd) != 0) + { + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + if (close(fd) != 0) + { + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + /* + * Now move the completed history file into place with its final name. + */ + + snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + if (rename(tmppath, path) < 0) + { + fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"), + progname, tmppath, path, strerror(errno)); + return false; + } + + return true; +} + /* * Converts an int64 to network byte order. */ @@ -314,7 +444,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) * (by sending an extra IDENTIFY_SYSTEM command) * * All received segments will be written to the directory - * specified by basedir. + * specified by basedir. This will also fetch any missing timeline history + * files. * * The stream_stop callback will be called every time data * is received, and whenever a segment is completed. If it returns @@ -327,20 +458,22 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) * This message will only contain the write location, and never * flush or replay. * + * If 'partial_suffix' is not NULL, files are initially created with the + * given suffix, and the suffix is removed once the file is finished. That + * allows you to tell the difference between partial and completed files, + * so that you can continue later where you left. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, bool rename_partial) + int standby_message_timeout, char *partial_suffix) { char query[128]; - char current_walfile_name[MAXPGPATH]; PGresult *res; - char *copybuf = NULL; - int64 last_status = -1; - XLogRecPtr blockpos = InvalidXLogRecPtr; + XLogRecPtr stoppos; /* * The message format used in streaming replication changed in 9.3, so we @@ -359,7 +492,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (sysidentifier != NULL) { - /* Validate system identifier and timeline hasn't changed */ + /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -385,33 +518,184 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (timeline != atoi(PQgetvalue(res, 0, 1))) + if (timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, - _("%s: timeline does not match between base backup and streaming connection\n"), - progname); + _("%s: starting timeline %u is not present in the server\n"), + progname, timeline); PQclear(res); return false; } PQclear(res); } - /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %X/%X", - (uint32) (startpos >> 32), (uint32) startpos); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_COPY_BOTH) + while (1) { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "START_REPLICATION", PQresultErrorMessage(res)); + /* + * Fetch the timeline history file for this timeline, if we don't + * have it already. + */ + if (!existsTimeLineHistoryFile(basedir, timeline)) + { + snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + /* FIXME: we might send it ok, but get an error */ + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); + PQclear(res); + return false; + } + + /* + * The response to TIMELINE_HISTORY is a single row result set + * with two fields: filename and content + */ + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + fprintf(stderr, + _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 2); + } + + /* Write the history file to disk */ + writeTimeLineHistoryFile(basedir, timeline, + PQgetvalue(res, 0, 0), + PQgetvalue(res, 0, 1)); + + PQclear(res); + } + + /* + * Before we start streaming from the requested location, check + * if the callback tells us to stop here. + */ + if (stream_stop(startpos, timeline, false)) + return true; + + /* Initiate the replication stream at specified location */ + snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpos >> 32), (uint32) startpos, + timeline); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "START_REPLICATION", PQresultErrorMessage(res)); + PQclear(res); + return false; + } PQclear(res); - return false; + + /* Stream the WAL */ + if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, + standby_message_timeout, partial_suffix, + &stoppos)) + goto error; + + /* + * Streaming finished. + * + * There are two possible reasons for that: a controlled shutdown, + * or we reached the end of the current timeline. In case of + * end-of-timeline, the server sends a result set after Copy has + * finished, containing the next timeline's ID. Read that, and + * restart streaming from the next timeline. + */ + + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + /* + * End-of-timeline. Read the next timeline's ID. + */ + uint32 newtimeline; + + newtimeline = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + if (newtimeline <= timeline) + { + /* shouldn't happen */ + fprintf(stderr, + "server reported unexpected next timeline %u, following timeline %u\n", + newtimeline, timeline); + goto error; + } + + /* Read the final result, which should be CommandComplete. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } + PQclear(res); + + /* + * Loop back to start streaming from the new timeline. + * Always start streaming at the beginning of a segment. + */ + timeline = newtimeline; + startpos = stoppos - (stoppos % XLOG_SEG_SIZE); + continue; + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * End of replication (ie. controlled shut down of the server). + * + * Check if the callback thinks it's OK to stop here. If not, + * complain. + */ + if (stream_stop(stoppos, timeline, false)) + return true; + else + { + fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), + progname); + goto error; + } + } + else + { + /* Server returned an error. */ + fprintf(stderr, + _("%s: unexpected termination of replication stream: %s"), + progname, PQresultErrorMessage(res)); + goto error; + } } - PQclear(res); - /* - * Receive the actual xlog data - */ +error: + if (walfile != -1 && close(walfile) != 0) + fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), + progname, current_walfile_name, strerror(errno)); + walfile = -1; + return false; +} + +/* + * The main loop of ReceiveXLogStream. Handles the COPY stream after + * initiating streaming with the START_STREAMING command. + * + * If the COPY ends normally, returns true and sets *stoppos to the last + * byte written. On error, returns false. + */ +static bool +HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + int standby_message_timeout, char *partial_suffix, + XLogRecPtr *stoppos) +{ + char *copybuf = NULL; + int64 last_status = -1; + XLogRecPtr blockpos = startpos; + bool still_sending = true; + while (1) { int r; @@ -430,20 +714,27 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ - if (stream_stop && stream_stop(blockpos, timeline, false)) + if (still_sending && stream_stop(blockpos, timeline, false)) { - if (walfile != -1 && !close_walfile(basedir, current_walfile_name, - rename_partial)) + if (!close_walfile(basedir, partial_suffix)) + { /* Potential error message is written by close_walfile */ goto error; - return true; + } + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; } /* * Potentially send a status message to the master */ now = localGetCurrentTimestamp(); - if (standby_message_timeout > 0 && + if (still_sending && standby_message_timeout > 0 && localTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) { @@ -457,9 +748,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == 0) { /* - * In async mode, and no data available. We block on reading but - * not more than the specified timeout, so that we can send a - * response back to the client. + * No data available. Wait for some to appear, but not longer + * than the specified timeout, so that we can ping the server. */ fd_set input_mask; struct timeval timeout; @@ -467,7 +757,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, FD_ZERO(&input_mask); FD_SET(PQsocket(conn), &input_mask); - if (standby_message_timeout) + if (standby_message_timeout && still_sending) { int64 targettime; long secs; @@ -493,8 +783,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, { /* * Got a timeout or signal. Continue the loop and either - * deliver a status packet to the server or just go back into - * blocking. + * deliver a status packet to the server or just go back + * into blocking. */ continue; } @@ -515,8 +805,31 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, continue; } if (r == -1) - /* End of copy stream */ - break; + { + /* + * The server closed its end of the copy stream. Close ours + * if we haven't done so already, and exit. + */ + if (still_sending) + { + if (!close_walfile(basedir, partial_suffix)) + { + /* Error message written in close_walfile() */ + goto error; + } + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; + } + if (copybuf != NULL) + PQfreemem(copybuf); + *stoppos = blockpos; + return true; + } if (r == -2) { fprintf(stderr, _("%s: could not read COPY data: %s"), @@ -548,174 +861,148 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, replyRequested = copybuf[pos]; /* If the server requested an immediate reply, send one. */ - if (replyRequested) + if (replyRequested && still_sending) { now = localGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; } - continue; } - else if (copybuf[0] != 'w') - { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); - goto error; - } - - /* - * Read the header of the XLogData message, enclosed in the CopyData - * message. We only need the WAL location field (dataStart), the rest - * of the header is ignored. - */ - hdr_len = 1; /* msgtype 'w' */ - hdr_len += 8; /* dataStart */ - hdr_len += 8; /* walEnd */ - hdr_len += 8; /* sendTime */ - if (r < hdr_len + 1) + else if (copybuf[0] == 'w') { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); - goto error; - } - blockpos = recvint64(©buf[1]); - - /* Extract WAL location for this block */ - xlogoff = blockpos % XLOG_SEG_SIZE; + /* + * Once we've decided we don't want to receive any more, just + * ignore any subsequent XLogData messages. + */ + if (!still_sending) + continue; - /* - * Verify that the initial location in the stream matches where we - * think we are. - */ - if (walfile == -1) - { - /* No file open yet */ - if (xlogoff != 0) - { - fprintf(stderr, - _("%s: received transaction log record for offset %u with no file open\n"), - progname, xlogoff); - goto error; - } - } - else - { - /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + /* + * Read the header of the XLogData message, enclosed in the + * CopyData message. We only need the WAL location field + * (dataStart), the rest of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) { - fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, r); goto error; } - } - - bytes_left = r - hdr_len; - bytes_written = 0; + blockpos = recvint64(©buf[1]); - while (bytes_left) - { - int bytes_to_write; + /* Extract WAL location for this block */ + xlogoff = blockpos % XLOG_SEG_SIZE; /* - * If crossing a WAL boundary, only write up until we reach - * XLOG_SEG_SIZE. + * Verify that the initial location in the stream matches where + * we think we are. */ - if (xlogoff + bytes_left > XLOG_SEG_SIZE) - bytes_to_write = XLOG_SEG_SIZE - xlogoff; - else - bytes_to_write = bytes_left; - if (walfile == -1) { - walfile = open_walfile(blockpos, timeline, - basedir, current_walfile_name); - if (walfile == -1) - /* Error logged by open_walfile */ + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, + _("%s: received transaction log record for offset %u with no file open\n"), + progname, xlogoff); goto error; + } } - - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) + else { - fprintf(stderr, - _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), - progname, bytes_to_write, current_walfile_name, - strerror(errno)); - goto error; + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, + _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + goto error; + } } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - blockpos += bytes_to_write; - xlogoff += bytes_to_write; + bytes_left = r - hdr_len; + bytes_written = 0; - /* Did we reach the end of a WAL segment? */ - if (blockpos % XLOG_SEG_SIZE == 0) + while (bytes_left) { - if (!close_walfile(basedir, current_walfile_name, false)) - /* Error message written in close_walfile() */ - goto error; + int bytes_to_write; - xlogoff = 0; + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; - if (stream_stop != NULL) + if (walfile == -1) { - /* - * Callback when the segment finished, and return if it - * told us to. - */ - if (stream_stop(blockpos, timeline, true)) - return true; + if (!open_walfile(blockpos, timeline, + basedir, partial_suffix)) + { + /* Error logged by open_walfile */ + goto error; + } } - } - } - /* No more data left to write, start receiving next copy packet */ - } - /* - * The only way to get out of the loop is if the server shut down the - * replication stream. If it's a controlled shutdown, the server will send - * a shutdown message, and we'll return the latest xlog location that has - * been streamed. - */ + if (write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, + _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), + progname, bytes_to_write, current_walfile_name, + strerror(errno)); + goto error; + } - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), - progname, PQresultErrorMessage(res)); - goto error; - } - PQclear(res); + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + blockpos += bytes_to_write; + xlogoff += bytes_to_write; - /* Complain if we've not reached stop point yet */ - if (stream_stop != NULL && !stream_stop(blockpos, timeline, false)) - { - fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), - progname); - goto error; + /* Did we reach the end of a WAL segment? */ + if (blockpos % XLOG_SEG_SIZE == 0) + { + if (!close_walfile(basedir, partial_suffix)) + /* Error message written in close_walfile() */ + goto error; + + xlogoff = 0; + + if (still_sending && stream_stop(blockpos, timeline, false)) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + still_sending = false; + break; /* ignore the rest of this XLogData packet */ + } + } + } + /* No more data left to write, receive next copy packet */ + } + else + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + goto error; + } } - if (copybuf != NULL) - PQfreemem(copybuf); - if (walfile != -1 && close(walfile) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; - return true; - error: if (copybuf != NULL) PQfreemem(copybuf); - if (walfile != -1 && close(walfile) != 0) - fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), - progname, current_walfile_name, strerror(errno)); - walfile = -1; return false; } diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 7176a68bea..53f31a78ec 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -13,4 +13,4 @@ extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, - bool rename_partial); + char *partial_suffix); diff --git a/src/include/access/timeline.h b/src/include/access/timeline.h index dd16f97bd7..7d45fcad8a 100644 --- a/src/include/access/timeline.h +++ b/src/include/access/timeline.h @@ -37,6 +37,7 @@ extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size); extern bool tliInHistory(TimeLineID tli, List *expectedTLIs); extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history); -extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history); +extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, + TimeLineID *nextTLI); #endif /* TIMELINE_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 885b5fc0ad..72e3242596 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -317,8 +317,10 @@ extern void SetWalWriterSleeping(bool sleeping); /* * Starting/stopping a base backup */ -extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile); -extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive); +extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast, + TimeLineID *starttli_p, char **labelfile); +extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive, + TimeLineID *stoptli_p); extern void do_pg_abort_backup(void); /* File path names (all relative to $PGDATA) */