]> granicus.if.org Git - postgresql/commitdiff
Make pg_receivexlog and pg_basebackup -X stream work across timeline switches.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 18:23:00 +0000 (20:23 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 18:23:00 +0000 (20:23 +0200)
This mirrors the changes done earlier to the server in standby mode. When
receivelog reaches the end of a timeline, as reported by the server, it
fetches the timeline history file of the next timeline, and restarts
streaming from the new timeline by issuing a new START_STREAMING command.

When pg_receivexlog crosses a timeline, it leaves the .partial suffix on the
last segment on the old timeline. This helps you to tell apart a partial
segment left in the directory because of a timeline switch, and a completed
segment. If you just follow a single server, it won't make a difference, but
it can be significant in more complicated scenarios where new WAL is still
generated on the old timeline.

This includes two small changes to the streaming replication protocol:
First, when you reach the end of timeline while streaming, the server now
sends the TLI of the next timeline in the server's history to the client.
pg_receivexlog uses that as the next timeline, so that it doesn't need to
parse the timeline history file like a standby server does. Second, when
BASE_BACKUP command sends the begin and end WAL positions, it now also sends
the timeline IDs corresponding the positions.

12 files changed:
doc/src/sgml/protocol.sgml
src/backend/access/transam/timeline.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogfuncs.c
src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/include/access/timeline.h
src/include/access/xlog.h

index e14627c201ebc5ac63271afd3d79ddf900a60451..baae59de6e933c293b09db7c16288f375295cb2a 100644 (file)
@@ -1418,8 +1418,10 @@ The commands accepted in walsender mode are:
      <para>
       After streaming all the WAL on a timeline that is not the latest one,
       the server will end streaming by exiting the COPY mode. When the client
-      acknowledges this by also exiting COPY mode, the server responds with a
-      CommandComplete message, and is ready to accept a new command.
+      acknowledges this by also exiting COPY mode, the server sends a
+      single-row, single-column result set indicating the next timeline in
+      this server's history. That is followed by a CommandComplete message,
+      and the server is ready to accept a new command.
      </para>
 
      <para>
@@ -1784,7 +1786,9 @@ The commands accepted in walsender mode are:
      </para>
      <para>
       The first ordinary result set contains the starting position of the
-      backup, given in XLogRecPtr format as a single column in a single row.
+      backup, in a single row with two columns. The first column contains
+      the start position given in XLogRecPtr format, and the second column
+      contains the corresponding timeline ID.
      </para>
      <para>
       The second ordinary result set has one row for each tablespace.
@@ -1827,7 +1831,9 @@ The commands accepted in walsender mode are:
       <quote>ustar interchange format</> specified in the POSIX 1003.1-2008
       standard) dump of the tablespace contents, except that the two trailing
       blocks of zeroes specified in the standard are omitted.
-      After the tar data is complete, a final ordinary result set will be sent.
+      After the tar data is complete, a final ordinary result set will be sent,
+      containing the WAL end position of the backup, in the same format as
+      the start position.
      </para>
 
      <para>
index 46379bbff8880391824c1ad23bd4769f4f546eb2..ad4f3162c53852140fa1cb80902fcc4851eb206c 100644 (file)
@@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history)
 }
 
 /*
- * Returns the point in history where we branched off the given timeline.
- * Returns InvalidXLogRecPtr if the timeline is current (= we have not
- * branched off from it), and throws an error if the timeline is not part of
- * this server's history.
+ * Returns the point in history where we branched off the given timeline,
+ * and the timeline we branched to (*nextTLI). Returns InvalidXLogRecPtr if
+ * the timeline is current, ie. we have not branched off from it, and throws
+ * an error if the timeline is not part of this server's history.
  */
 XLogRecPtr
-tliSwitchPoint(TimeLineID tli, List *history)
+tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
 {
        ListCell   *cell;
 
+       if (nextTLI)
+               *nextTLI = 0;
        foreach (cell, history)
        {
                TimeLineHistoryEntry *tle = (TimeLineHistoryEntry *) lfirst(cell);
 
                if (tle->tli == tli)
                        return tle->end;
+               if (nextTLI)
+                       *nextTLI = tle->tli;
        }
 
        ereport(ERROR,
index ac2b26b49822561fe10da71b6a2e0daf1a7e803d..90ba32ef0f52f825fff375301e027506d785b89e 100644 (file)
@@ -4930,7 +4930,7 @@ StartupXLOG(void)
                 * tliSwitchPoint will throw an error if the checkpoint's timeline
                 * is not in expectedTLEs at all.
                 */
-               switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs);
+               switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
                ereport(FATAL,
                                (errmsg("requested timeline %u is not a child of this server's history",
                                                recoveryTargetTLI),
@@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
  * non-exclusive backups active at the same time, and they don't conflict
  * with an exclusive backup either.
  *
+ * Returns the minimum WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *starttli_p.
+ *
  * Every successfully started non-exclusive backup must be stopped by calling
  * do_pg_stop_backup() or do_pg_abort_backup().
  */
 XLogRecPtr
-do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
+do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
+                                  char **labelfile)
 {
        bool            exclusive = (labelfile == NULL);
        bool            backup_started_in_recovery = false;
        XLogRecPtr      checkpointloc;
        XLogRecPtr      startpoint;
+       TimeLineID      starttli;
        pg_time_t       stamp_time;
        char            strfbuf[128];
        char            xlogfilename[MAXFNAMELEN];
@@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
                        LWLockAcquire(ControlFileLock, LW_SHARED);
                        checkpointloc = ControlFile->checkPoint;
                        startpoint = ControlFile->checkPointCopy.redo;
+                       starttli = ControlFile->checkPointCopy.ThisTimeLineID;
                        checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
                        LWLockRelease(ControlFileLock);
 
@@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
        /*
         * We're done.  As a convenience, return the starting WAL location.
         */
+       if (starttli_p)
+               *starttli_p = starttli;
        return startpoint;
 }
 
@@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg)
 
  * If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
  * the non-exclusive backup specified by 'labelfile'.
+ *
+ * Returns the last WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *stoptli_p.
  */
 XLogRecPtr
-do_pg_stop_backup(char *labelfile, bool waitforarchive)
+do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 {
        bool            exclusive = (labelfile == NULL);
        bool            backup_started_in_recovery = false;
        XLogRecPtr      startpoint;
        XLogRecPtr      stoppoint;
+       TimeLineID      stoptli;
        XLogRecData rdata;
        pg_time_t       stamp_time;
        char            strfbuf[128];
@@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 
                LWLockAcquire(ControlFileLock, LW_SHARED);
                stoppoint = ControlFile->minRecoveryPoint;
+               stoptli = ControlFile->minRecoveryPointTLI;
                LWLockRelease(ControlFileLock);
 
+               if (stoptli_p)
+                       *stoptli_p = stoptli;
                return stoppoint;
        }
 
@@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        rdata.buffer = InvalidBuffer;
        rdata.next = NULL;
        stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+       stoptli = ThisTimeLineID;
 
        /*
         * Force a switch to a new xlog segment file, so that the backup is valid
@@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
        /*
         * We're done.  As a convenience, return the ending WAL location.
         */
+       if (stoptli_p)
+               *stoptli_p = stoptli;
        return stoppoint;
 }
 
index 96db5dbbf52ca729e2246d102fc9757043763e31..b6bb6773d6b433452dffd0b083fc5d1fddeb879a 100644 (file)
@@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS)
 
        backupidstr = text_to_cstring(backupid);
 
-       startpoint = do_pg_start_backup(backupidstr, fast, NULL);
+       startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL);
 
        snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X",
                         (uint32) (startpoint >> 32), (uint32) startpoint);
@@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS)
        XLogRecPtr      stoppoint;
        char            stopxlogstr[MAXFNAMELEN];
 
-       stoppoint = do_pg_stop_backup(NULL, true);
+       stoppoint = do_pg_stop_backup(NULL, true, NULL);
 
        snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X",
                         (uint32) (stoppoint >> 32), (uint32) stoppoint);
index 2330fcc23ad8e8c6130eba63495a27747135c4fb..57946a9fa978cf6a2e8e1e8b8aa0d62dcf03f025 100644 (file)
@@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces);
 static void base_backup_cleanup(int code, Datum arg);
 static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int compareWalFileNames(const void *a, const void *b);
 
 /* Was the backup currently in-progress initiated in recovery mode? */
@@ -94,13 +94,16 @@ static void
 perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 {
        XLogRecPtr      startptr;
+       TimeLineID      starttli;
        XLogRecPtr      endptr;
+       TimeLineID      endtli;
        char       *labelfile;
 
        backup_started_in_recovery = RecoveryInProgress();
 
-       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
-       SendXlogRecPtrResult(startptr);
+       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+                                                                 &labelfile);
+       SendXlogRecPtrResult(startptr, starttli);
 
        PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
        {
@@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
        }
        PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
 
-       endptr = do_pg_stop_backup(labelfile, !opt->nowait);
+       endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
 
        if (opt->includewal)
        {
@@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                /* Send CopyDone message for the last tar file */
                pq_putemptymessage('c');
        }
-       SendXlogRecPtrResult(endptr);
+       SendXlogRecPtrResult(endptr, endtli);
 }
 
 /*
@@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces)
  * XlogRecPtr record (in text format)
  */
 static void
-SendXlogRecPtrResult(XLogRecPtr ptr)
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
        StringInfoData buf;
        char            str[MAXFNAMELEN];
 
-       snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
-
        pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint(&buf, 1, 2);         /* 1 field */
+       pq_sendint(&buf, 2, 2);         /* 2 fields */
 
-       /* Field header */
+       /* Field headers */
        pq_sendstring(&buf, "recptr");
        pq_sendint(&buf, 0, 4);         /* table oid */
        pq_sendint(&buf, 0, 2);         /* attnum */
@@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr)
        pq_sendint(&buf, -1, 2);
        pq_sendint(&buf, 0, 4);
        pq_sendint(&buf, 0, 2);
+
+       pq_sendstring(&buf, "tli");
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       /*
+        * int8 may seem like a surprising data type for this, but in thory int4
+        * would not be wide enough for this, as TimeLineID is unsigned.
+        */
+       pq_sendint(&buf, INT8OID, 4);   /* type oid */
+       pq_sendint(&buf, -1, 2);
+       pq_sendint(&buf, 0, 4);
+       pq_sendint(&buf, 0, 2);
        pq_endmessage(&buf);
 
        /* Data row */
        pq_beginmessage(&buf, 'D');
-       pq_sendint(&buf, 1, 2);         /* number of columns */
+       pq_sendint(&buf, 2, 2);         /* number of columns */
+
+       snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
+       pq_sendint(&buf, strlen(str), 4);       /* length */
+       pq_sendbytes(&buf, str, strlen(str));
+
+       snprintf(str, sizeof(str), "%u", tli);
        pq_sendint(&buf, strlen(str), 4);       /* length */
        pq_sendbytes(&buf, str, strlen(str));
        pq_endmessage(&buf);
index ad7d1c911b377d94bb21fc20a72521b2bb72d3ac..ba138e73da387a6489ed8cbf26fe3950cce9aa60 100644 (file)
@@ -117,6 +117,7 @@ static uint32 sendOff = 0;
  * history forked off from that timeline at sendTimeLineValidUpto.
  */
 static TimeLineID      sendTimeLine = 0;
+static TimeLineID      sendTimeLineNextTLI = 0;
 static bool                    sendTimeLineIsHistoric = false;
 static XLogRecPtr      sendTimeLineValidUpto = InvalidXLogRecPtr;
 
@@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
                         * requested start location is on that timeline.
                         */
                        timeLineHistory = readTimeLineHistory(ThisTimeLineID);
-                       switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+                       switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
+                                                                                &sendTimeLineNextTLI);
                        list_free_deep(timeLineHistory);
 
                        /*
@@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd)
        streamingDoneSending = streamingDoneReceiving = false;
 
        /* If there is nothing to stream, don't even enter COPY mode */
-       if (!sendTimeLineIsHistoric ||
-               cmd->startpoint < sendTimeLineValidUpto)
+       if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
        {
                /*
                 * When we first start replication the standby will be behind the primary.
@@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd)
                if (walsender_ready_to_stop)
                        proc_exit(0);
                WalSndSetState(WALSNDSTATE_STARTUP);
+
+               Assert(streamingDoneSending && streamingDoneReceiving);
+       }
+
+       /*
+        * Copy is finished now. Send a single-row result set indicating the next
+        * timeline.
+        */
+       if (sendTimeLineIsHistoric)
+       {
+               char            str[11];
+               snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+
+               pq_beginmessage(&buf, 'T'); /* RowDescription */
+               pq_sendint(&buf, 1, 2);         /* 1 field */
+
+               /* Field header */
+               pq_sendstring(&buf, "next_tli");
+               pq_sendint(&buf, 0, 4);         /* table oid */
+               pq_sendint(&buf, 0, 2);         /* attnum */
+               /*
+                * int8 may seem like a surprising data type for this, but in theory
+                * int4 would not be wide enough for this, as TimeLineID is unsigned.
+                */
+               pq_sendint(&buf, INT8OID, 4);   /* type oid */
+               pq_sendint(&buf, -1, 2);
+               pq_sendint(&buf, 0, 4);
+               pq_sendint(&buf, 0, 2);
+               pq_endmessage(&buf);
+
+               /* Data row */
+               pq_beginmessage(&buf, 'D');
+               pq_sendint(&buf, 1, 2);         /* number of columns */
+               pq_sendint(&buf, strlen(str), 4);       /* length */
+               pq_sendbytes(&buf, str, strlen(str));
+               pq_endmessage(&buf);
        }
 
-       /* Get out of COPY mode (CommandComplete). */
-       EndCommand("COPY 0", DestRemote);
+       /* Send CommandComplete message */
+       pq_puttextmessage('C', "START_STREAMING");
 }
 
 /*
@@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup)
                        List       *history;
 
                        history = readTimeLineHistory(ThisTimeLineID);
-                       sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+                       sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
                        Assert(sentPtr <= sendTimeLineValidUpto);
+                       Assert(sendTimeLine < sendTimeLineNextTLI);
                        list_free_deep(history);
 
                        /* the current send pointer should be <= the switchpoint */
index a684c0c6fcfe9613a7c4e4d02c118ed511b5641d..b6f774469b1b4ef42a5b2ee55073127fd948fa71 100644 (file)
@@ -243,7 +243,7 @@ LogStreamerMain(logstreamer_param *param)
        if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                                                   param->sysidentifier, param->xlogdir,
                                                   reached_end_position, standby_message_timeout,
-                                                  true))
+                                                  NULL))
 
                /*
                 * Any errors will already have been reported in the function process,
@@ -1220,7 +1220,7 @@ BaseBackup(void)
 {
        PGresult   *res;
        char       *sysidentifier;
-       uint32          timeline;
+       uint32          starttli;
        char            current_path[MAXPGPATH];
        char            escaped_label[MAXPGPATH];
        int                     i;
@@ -1259,7 +1259,6 @@ BaseBackup(void)
                disconnect_and_exit(1);
        }
        sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
-       timeline = atoi(PQgetvalue(res, 0, 1));
        PQclear(res);
 
        /*
@@ -1291,18 +1290,24 @@ BaseBackup(void)
                                progname, PQerrorMessage(conn));
                disconnect_and_exit(1);
        }
-       if (PQntuples(res) != 1)
+       if (PQntuples(res) != 1 || PQnfields(res) < 2)
        {
-               fprintf(stderr, _("%s: no start point returned from server\n"),
-                               progname);
+               fprintf(stderr,
+                               _("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),
+                               progname, PQntuples(res), PQnfields(res), 1, 2);
                disconnect_and_exit(1);
        }
+
        strcpy(xlogstart, PQgetvalue(res, 0, 0));
-       if (verbose && includewal)
-               fprintf(stderr, "transaction log start point: %s\n", xlogstart);
+       starttli = atoi(PQgetvalue(res, 0, 1));
+
        PQclear(res);
        MemSet(xlogend, 0, sizeof(xlogend));
 
+       if (verbose && includewal)
+               fprintf(stderr, _("transaction log start point: %s on timeline %u\n"),
+                               xlogstart, starttli);
+
        /*
         * Get the header
         */
@@ -1358,7 +1363,7 @@ BaseBackup(void)
                if (verbose)
                        fprintf(stderr, _("%s: starting background WAL receiver\n"),
                                        progname);
-               StartLogStreamer(xlogstart, timeline, sysidentifier);
+               StartLogStreamer(xlogstart, starttli, sysidentifier);
        }
 
        /*
index 7f2db1946e7d4efb608f018e5aea00f920647ca2..33dbc50389b86d3cb3e467bcda8cc458d7d06911 100644 (file)
@@ -39,8 +39,7 @@ volatile bool time_to_abort = false;
 
 
 static void usage(void);
-static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos,
-                                  uint32 currenttimeline);
+static XLogRecPtr FindStreamingStart(uint32 *tli);
 static void StreamLog();
 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
                           bool segment_finished);
@@ -70,14 +69,31 @@ usage(void)
 }
 
 static bool
-stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
+stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
 {
+       static uint32 prevtimeline = 0;
+       static XLogRecPtr prevpos = InvalidXLogRecPtr;
+
+       /* we assume that we get called once at the end of each segment */
        if (verbose && segment_finished)
                fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
-                               progname,
-                               (uint32) (segendpos >> 32), (uint32) segendpos,
+                               progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
                                timeline);
 
+       /*
+        * Note that we report the previous, not current, position here. That's
+        * the exact location where the timeline switch happend. After the switch,
+        * we restart streaming from the beginning of the segment, so xlogpos can
+        * smaller than prevpos if we just switched to new timeline.
+        */
+       if (prevtimeline != 0 && prevtimeline != timeline)
+               fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
+                               progname, timeline,
+                               (uint32) (prevpos >> 32), (uint32) prevpos);
+
+       prevtimeline = timeline;
+       prevpos = xlogpos;
+
        if (time_to_abort)
        {
                fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
@@ -88,20 +104,19 @@ stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 }
 
 /*
- * Determine starting location for streaming, based on:
- * 1. If there are existing xlog segments, start at the end of the last one
- *       that is complete (size matches XLogSegSize)
- * 2. If no valid xlog exists, start from the beginning of the current
- *       WAL segment.
+ * Determine starting location for streaming, based on any existing xlog
+ * segments in the directory. We start at the end of the last one that is
+ * complete (size matches XLogSegSize), on the timeline with highest ID.
+ *
+ * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
  */
 static XLogRecPtr
-FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+FindStreamingStart(uint32 *tli)
 {
        DIR                *dir;
        struct dirent *dirent;
-       int                     i;
-       bool            b;
        XLogSegNo       high_segno = 0;
+       uint32          high_tli = 0;
 
        dir = opendir(basedir);
        if (dir == NULL)
@@ -120,26 +135,13 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                                        seg;
                XLogSegNo       segno;
 
-               if (strcmp(dirent->d_name, ".") == 0 ||
-                       strcmp(dirent->d_name, "..") == 0)
-                       continue;
-
-               /* xlog files are always 24 characters */
-               if (strlen(dirent->d_name) != 24)
-                       continue;
-
-               /* Filenames are always made out of 0-9 and A-F */
-               b = false;
-               for (i = 0; i < 24; i++)
-               {
-                       if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
-                               !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
-                       {
-                               b = true;
-                               break;
-                       }
-               }
-               if (b)
+               /*
+                * Check if the filename looks like an xlog file, or a .partial file.
+                * Xlog files are always 24 characters, and .partial files are 32
+                * characters.
+                */
+               if (strlen(dirent->d_name) != 24 ||
+                       !strspn(dirent->d_name, "0123456789ABCDEF") == 24)
                        continue;
 
                /*
@@ -154,10 +156,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                }
                segno = ((uint64) log) << 32 | seg;
 
-               /* Ignore any files that are for another timeline */
-               if (tli != currenttimeline)
-                       continue;
-
                /* Check if this is a completed segment or not */
                snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
                if (stat(fullpath, &statbuf) != 0)
@@ -170,9 +168,10 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                if (statbuf.st_size == XLOG_SEG_SIZE)
                {
                        /* Completed segment */
-                       if (segno > high_segno)
+                       if (segno > high_segno || (segno == high_segno && tli > high_tli))
                        {
                                high_segno = segno;
+                               high_tli = tli;
                                continue;
                        }
                }
@@ -199,10 +198,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
 
                XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
 
+               *tli = high_tli;
                return high_ptr;
        }
        else
-               return currentpos;
+               return InvalidXLogRecPtr;
 }
 
 /*
@@ -212,8 +212,10 @@ static void
 StreamLog(void)
 {
        PGresult   *res;
-       uint32          timeline;
        XLogRecPtr      startpos;
+       uint32          starttli;
+       XLogRecPtr      serverpos;
+       uint32          servertli;
        uint32          hi,
                                lo;
 
@@ -243,7 +245,7 @@ StreamLog(void)
                                progname, PQntuples(res), PQnfields(res), 1, 3);
                disconnect_and_exit(1);
        }
-       timeline = atoi(PQgetvalue(res, 0, 1));
+       servertli = atoi(PQgetvalue(res, 0, 1));
        if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
        {
                fprintf(stderr,
@@ -251,13 +253,18 @@ StreamLog(void)
                                progname, PQgetvalue(res, 0, 2));
                disconnect_and_exit(1);
        }
-       startpos = ((uint64) hi) << 32 | lo;
+       serverpos = ((uint64) hi) << 32 | lo;
        PQclear(res);
 
        /*
         * Figure out where to start streaming.
         */
-       startpos = FindStreamingStart(startpos, timeline);
+       startpos = FindStreamingStart(&starttli);
+       if (startpos == InvalidXLogRecPtr)
+       {
+               startpos = serverpos;
+               starttli = servertli;
+       }
 
        /*
         * Always start streaming at the beginning of a segment
@@ -271,10 +278,10 @@ StreamLog(void)
                fprintf(stderr,
                                _("%s: starting log streaming at %X/%X (timeline %u)\n"),
                                progname, (uint32) (startpos >> 32), (uint32) startpos,
-                               timeline);
+                               starttli);
 
-       ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
-                                         stop_streaming, standby_message_timeout, false);
+       ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
+                                         stop_streaming, standby_message_timeout, ".partial");
 
        PQfinish(conn);
 }
index 88d0c136b07cbe5e155ee8d402584e401c177bb3..03e275cb5b6b02482368d18daf0d23f1269dab5e 100644 (file)
 #include "streamutil.h"
 
 
-/* fd for currently open WAL file */
+/* fd and filename for currently open WAL file */
 static int     walfile = -1;
+static char    current_walfile_name[MAXPGPATH] = "";
+
+static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
+                                char *basedir, stream_stop_callback stream_stop,
+                                int standby_message_timeout, char *partial_suffix,
+                                XLogRecPtr *stoppos);
 
 /*
- * Open a new WAL file in the specified directory. Store the name
- * (not including the full directory) in namebuf. Assumes there is
- * enough room in this buffer...
+ * Open a new WAL file in the specified directory.
  *
- * The file will be padded to 16Mb with zeroes.
+ * The file will be padded to 16Mb with zeroes. The base filename (without
+ * partial_suffix) is stored in current_walfile_name.
  */
-static int
+static bool
 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
-                        char *namebuf)
+                        char *partial_suffix)
 {
        int                     f;
        char            fn[MAXPGPATH];
@@ -50,16 +55,17 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
        XLogSegNo       segno;
 
        XLByteToSeg(startpoint, segno);
-       XLogFileName(namebuf, timeline, segno);
+       XLogFileName(current_walfile_name, timeline, segno);
 
-       snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
+       snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
+                        partial_suffix ? partial_suffix : "");
        f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
        if (f == -1)
        {
                fprintf(stderr,
                                _("%s: could not open transaction log file \"%s\": %s\n"),
                                progname, fn, strerror(errno));
-               return -1;
+               return false;
        }
 
        /*
@@ -72,17 +78,21 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
                                _("%s: could not stat transaction log file \"%s\": %s\n"),
                                progname, fn, strerror(errno));
                close(f);
-               return -1;
+               return false;
        }
        if (statbuf.st_size == XLogSegSize)
-               return f;                               /* File is open and ready to use */
+       {
+               /* File is open and ready to use */
+               walfile = f;
+               return true;
+       }
        if (statbuf.st_size != 0)
        {
                fprintf(stderr,
                                _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
                                progname, fn, (int) statbuf.st_size, XLogSegSize);
                close(f);
-               return -1;
+               return false;
        }
 
        /* New, empty, file. So pad it to 16Mb with zeroes */
@@ -97,7 +107,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
                        free(zerobuf);
                        close(f);
                        unlink(fn);
-                       return -1;
+                       return false;
                }
        }
        free(zerobuf);
@@ -108,42 +118,45 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
                                _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
                                progname, fn, strerror(errno));
                close(f);
-               return -1;
+               return false;
        }
-       return f;
+       walfile = f;
+       return true;
 }
 
 /*
- * Close the current WAL file, and rename it to the correct filename if it's
- * complete.
- *
- * If segment_complete is true, rename the current WAL file even if we've not
- * completed writing the whole segment.
+ * Close the current WAL file (if open), and rename it to the correct
+ * filename if it's complete. On failure, prints an error message to stderr
+ * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *walname, bool segment_complete)
+close_walfile(char *basedir, char *partial_suffix)
 {
-       off_t           currpos = lseek(walfile, 0, SEEK_CUR);
+       off_t           currpos;
+
+       if (walfile == -1)
+               return true;
 
+       currpos = lseek(walfile, 0, SEEK_CUR);
        if (currpos == -1)
        {
                fprintf(stderr,
                         _("%s: could not determine seek position in file \"%s\": %s\n"),
-                               progname, walname, strerror(errno));
+                               progname, current_walfile_name, strerror(errno));
                return false;
        }
 
        if (fsync(walfile) != 0)
        {
                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-                               progname, walname, strerror(errno));
+                               progname, current_walfile_name, strerror(errno));
                return false;
        }
 
        if (close(walfile) != 0)
        {
                fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, walname, strerror(errno));
+                               progname, current_walfile_name, strerror(errno));
                walfile = -1;
                return false;
        }
@@ -153,24 +166,24 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
         * Rename the .partial file only if we've completed writing the whole
         * segment or segment_complete is true.
         */
-       if (currpos == XLOG_SEG_SIZE || segment_complete)
+       if (currpos == XLOG_SEG_SIZE && partial_suffix)
        {
                char            oldfn[MAXPGPATH];
                char            newfn[MAXPGPATH];
 
-               snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
-               snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+               snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
+               snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
                if (rename(oldfn, newfn) != 0)
                {
                        fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
-                                       progname, walname, strerror(errno));
+                                       progname, current_walfile_name, strerror(errno));
                        return false;
                }
        }
-       else
+       else if (partial_suffix)
                fprintf(stderr,
-                               _("%s: not renaming \"%s\", segment is not complete\n"),
-                               progname, walname);
+                               _("%s: not renaming \"%s%s\", segment is not complete\n"),
+                               progname, current_walfile_name, partial_suffix);
 
        return true;
 }
@@ -233,6 +246,123 @@ localTimestampDifferenceExceeds(int64 start_time,
        return (diff >= msec * INT64CONST(1000));
 }
 
+/*
+ * Check if a timeline history file exists.
+ */
+static bool
+existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
+{
+       char            path[MAXPGPATH];
+       char            histfname[MAXFNAMELEN];
+       int                     fd;
+
+       /*
+        * Timeline 1 never has a history file. We treat that as if it existed,
+        * since we never need to stream it.
+        */
+       if (tli == 1)
+               return true;
+
+       TLHistoryFileName(histfname, tli);
+
+       snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+
+       fd = open(path, O_RDONLY | PG_BINARY, 0);
+       if (fd < 0)
+       {
+               if (errno != ENOENT)
+                       fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s"),
+                                       progname, path, strerror(errno));
+               return false;
+       }
+       else
+       {
+               close(fd);
+               return true;
+       }
+}
+
+static bool
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+{
+       int                     size = strlen(content);
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+       char            histfname[MAXFNAMELEN];
+       int                     fd;
+
+       /*
+        * Check that the server's idea of how timeline history files should be
+        * named matches ours.
+        */
+       TLHistoryFileName(histfname, tli);
+       if (strcmp(histfname, filename) != 0)
+       {
+               fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s"),
+                               progname, tli, filename);
+               return false;
+       }
+
+       /*
+        * Write into a temp file name.
+        */
+       snprintf(tmppath, MAXPGPATH,  "%s.tmp", path);
+
+       unlink(tmppath);
+
+       fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+       if (fd < 0)
+       {
+               fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       errno = 0;
+       if ((int) write(fd, content, size) != size)
+       {
+               int                     save_errno = errno;
+
+               /*
+                * If we fail to make the file, delete it to release disk space
+                */
+               unlink(tmppath);
+               errno = save_errno;
+
+               fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       if (fsync(fd) != 0)
+       {
+               fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       if (close(fd) != 0)
+       {
+               fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       /*
+        * Now move the completed history file into place with its final name.
+        */
+
+       snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+       if (rename(tmppath, path) < 0)
+       {
+               fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
+                               progname, tmppath, path, strerror(errno));
+               return false;
+       }
+
+       return true;
+}
+
 /*
  * Converts an int64 to network byte order.
  */
@@ -314,7 +444,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
  * (by sending an extra IDENTIFY_SYSTEM command)
  *
  * All received segments will be written to the directory
- * specified by basedir.
+ * specified by basedir. This will also fetch any missing timeline history
+ * files.
  *
  * The stream_stop callback will be called every time data
  * is received, and whenever a segment is completed. If it returns
@@ -327,20 +458,22 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
  * This message will only contain the write location, and never
  * flush or replay.
  *
+ * If 'partial_suffix' is not NULL, files are initially created with the
+ * given suffix, and the suffix is removed once the file is finished. That
+ * allows you to tell the difference between partial and completed files,
+ * so that you can continue later where you left.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  char *sysidentifier, char *basedir,
                                  stream_stop_callback stream_stop,
-                                 int standby_message_timeout, bool rename_partial)
+                                 int standby_message_timeout, char *partial_suffix)
 {
        char            query[128];
-       char            current_walfile_name[MAXPGPATH];
        PGresult   *res;
-       char       *copybuf = NULL;
-       int64           last_status = -1;
-       XLogRecPtr      blockpos = InvalidXLogRecPtr;
+       XLogRecPtr      stoppos;
 
        /*
         * The message format used in streaming replication changed in 9.3, so we
@@ -359,7 +492,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 
        if (sysidentifier != NULL)
        {
-               /* Validate system identifier and timeline hasn't changed */
+               /* Validate system identifier hasn't changed */
                res = PQexec(conn, "IDENTIFY_SYSTEM");
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                {
@@ -385,33 +518,184 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        PQclear(res);
                        return false;
                }
-               if (timeline != atoi(PQgetvalue(res, 0, 1)))
+               if (timeline > atoi(PQgetvalue(res, 0, 1)))
                {
                        fprintf(stderr,
-                                       _("%s: timeline does not match between base backup and streaming connection\n"),
-                                       progname);
+                                       _("%s: starting timeline %u is not present in the server\n"),
+                                       progname, timeline);
                        PQclear(res);
                        return false;
                }
                PQclear(res);
        }
 
-       /* Initiate the replication stream at specified location */
-       snprintf(query, sizeof(query), "START_REPLICATION %X/%X",
-                        (uint32) (startpos >> 32), (uint32) startpos);
-       res = PQexec(conn, query);
-       if (PQresultStatus(res) != PGRES_COPY_BOTH)
+       while (1)
        {
-               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                               progname, "START_REPLICATION", PQresultErrorMessage(res));
+               /*
+                * Fetch the timeline history file for this timeline, if we don't
+                * have it already.
+                */
+               if (!existsTimeLineHistoryFile(basedir, timeline))
+               {
+                       snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
+                       res = PQexec(conn, query);
+                       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+                       {
+                               /* FIXME: we might send it ok, but get an error */
+                               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                               progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
+                               PQclear(res);
+                               return false;
+                       }
+
+                       /*
+                        * The response to TIMELINE_HISTORY is a single row result set
+                        * with two fields: filename and content
+                        */
+                       if (PQnfields(res) != 2 || PQntuples(res) != 1)
+                       {
+                               fprintf(stderr,
+                                               _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                                       progname, PQntuples(res), PQnfields(res), 1, 2);
+                       }
+
+                       /* Write the history file to disk */
+                       writeTimeLineHistoryFile(basedir, timeline,
+                                                                        PQgetvalue(res, 0, 0),
+                                                                        PQgetvalue(res, 0, 1));
+
+                       PQclear(res);
+               }
+
+               /*
+                * Before we start streaming from the requested location, check
+                * if the callback tells us to stop here.
+                */
+               if (stream_stop(startpos, timeline, false))
+                       return true;
+
+               /* Initiate the replication stream at specified location */
+               snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+                                (uint32) (startpos >> 32), (uint32) startpos,
+                                timeline);
+               res = PQexec(conn, query);
+               if (PQresultStatus(res) != PGRES_COPY_BOTH)
+               {
+                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                       progname, "START_REPLICATION", PQresultErrorMessage(res));
+                       PQclear(res);
+                       return false;
+               }
                PQclear(res);
-               return false;
+
+               /* Stream the WAL */
+               if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
+                                                         standby_message_timeout, partial_suffix,
+                                                         &stoppos))
+                       goto error;
+
+               /*
+                * Streaming finished.
+                *
+                * There are two possible reasons for that: a controlled shutdown,
+                * or we reached the end of the current timeline. In case of
+                * end-of-timeline, the server sends a result set after Copy has
+                * finished, containing the next timeline's ID. Read that, and
+                * restart streaming from the next timeline.
+                */
+
+               res = PQgetResult(conn);
+               if (PQresultStatus(res) == PGRES_TUPLES_OK)
+               {
+                       /*
+                        * End-of-timeline. Read the next timeline's ID.
+                        */
+                       uint32          newtimeline;
+
+                       newtimeline = atoi(PQgetvalue(res, 0, 0));
+                       PQclear(res);
+
+                       if (newtimeline <= timeline)
+                       {
+                               /* shouldn't happen */
+                               fprintf(stderr,
+                                               "server reported unexpected next timeline %u, following timeline %u\n",
+                                               newtimeline, timeline);
+                               goto error;
+                       }
+
+                       /* Read the final result, which should be CommandComplete. */
+                       res = PQgetResult(conn);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       {
+                               fprintf(stderr,
+                                               _("%s: unexpected termination of replication stream: %s"),
+                                               progname, PQresultErrorMessage(res));
+                               goto error;
+                       }
+                       PQclear(res);
+
+                       /*
+                        * Loop back to start streaming from the new timeline.
+                        * Always start streaming at the beginning of a segment.
+                        */
+                       timeline = newtimeline;
+                       startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
+                       continue;
+               }
+               else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       /*
+                        * End of replication (ie. controlled shut down of the server).
+                        *
+                        * Check if the callback thinks it's OK to stop here. If not,
+                        * complain.
+                        */
+                       if (stream_stop(stoppos, timeline, false))
+                               return true;
+                       else
+                       {
+                               fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
+                                               progname);
+                               goto error;
+                       }
+               }
+               else
+               {
+                       /* Server returned an error. */
+                       fprintf(stderr,
+                                       _("%s: unexpected termination of replication stream: %s"),
+                                       progname, PQresultErrorMessage(res));
+                       goto error;
+               }
        }
-       PQclear(res);
 
-       /*
-        * Receive the actual xlog data
-        */
+error:
+       if (walfile != -1 && close(walfile) != 0)
+               fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+                               progname, current_walfile_name, strerror(errno));
+       walfile = -1;
+       return false;
+}
+
+/*
+ * The main loop of ReceiveXLogStream. Handles the COPY stream after
+ * initiating streaming with the START_STREAMING command.
+ *
+ * If the COPY ends normally, returns true and sets *stoppos to the last
+ * byte written. On error, returns false.
+ */
+static bool
+HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
+                                char *basedir, stream_stop_callback stream_stop,
+                                int standby_message_timeout, char *partial_suffix,
+                                XLogRecPtr *stoppos)
+{
+       char       *copybuf = NULL;
+       int64           last_status = -1;
+       XLogRecPtr      blockpos = startpos;
+       bool            still_sending = true;
+
        while (1)
        {
                int                     r;
@@ -430,20 +714,27 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Check if we should continue streaming, or abort at this point.
                 */
-               if (stream_stop && stream_stop(blockpos, timeline, false))
+               if (still_sending && stream_stop(blockpos, timeline, false))
                {
-                       if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
-                                                                                               rename_partial))
+                       if (!close_walfile(basedir, partial_suffix))
+                       {
                                /* Potential error message is written by close_walfile */
                                goto error;
-                       return true;
+                       }
+                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                       {
+                               fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                               progname, PQerrorMessage(conn));
+                               goto error;
+                       }
+                       still_sending = false;
                }
 
                /*
                 * Potentially send a status message to the master
                 */
                now = localGetCurrentTimestamp();
-               if (standby_message_timeout > 0 &&
+               if (still_sending && standby_message_timeout > 0 &&
                        localTimestampDifferenceExceeds(last_status, now,
                                                                                        standby_message_timeout))
                {
@@ -457,9 +748,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                if (r == 0)
                {
                        /*
-                        * In async mode, and no data available. We block on reading but
-                        * not more than the specified timeout, so that we can send a
-                        * response back to the client.
+                        * No data available. Wait for some to appear, but not longer
+                        * than the specified timeout, so that we can ping the server.
                         */
                        fd_set          input_mask;
                        struct timeval timeout;
@@ -467,7 +757,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 
                        FD_ZERO(&input_mask);
                        FD_SET(PQsocket(conn), &input_mask);
-                       if (standby_message_timeout)
+                       if (standby_message_timeout && still_sending)
                        {
                                int64           targettime;
                                long            secs;
@@ -493,8 +783,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        {
                                /*
                                 * Got a timeout or signal. Continue the loop and either
-                                * deliver a status packet to the server or just go back into
-                                * blocking.
+                                * deliver a status packet to the server or just go back
+                                * into blocking.
                                 */
                                continue;
                        }
@@ -515,8 +805,31 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        continue;
                }
                if (r == -1)
-                       /* End of copy stream */
-                       break;
+               {
+                       /*
+                        * The server closed its end of the copy stream. Close ours
+                        * if we haven't done so already, and exit.
+                        */
+                       if (still_sending)
+                       {
+                               if (!close_walfile(basedir, partial_suffix))
+                               {
+                                       /* Error message written in close_walfile() */
+                                       goto error;
+                               }
+                               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                               {
+                                       fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                                       progname, PQerrorMessage(conn));
+                                       goto error;
+                               }
+                               still_sending = false;
+                       }
+                       if (copybuf != NULL)
+                               PQfreemem(copybuf);
+                       *stoppos = blockpos;
+                       return true;
+               }
                if (r == -2)
                {
                        fprintf(stderr, _("%s: could not read COPY data: %s"),
@@ -548,174 +861,148 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        replyRequested = copybuf[pos];
 
                        /* If the server requested an immediate reply, send one. */
-                       if (replyRequested)
+                       if (replyRequested && still_sending)
                        {
                                now = localGetCurrentTimestamp();
                                if (!sendFeedback(conn, blockpos, now, false))
                                        goto error;
                                last_status = now;
                        }
-                       continue;
                }
-               else if (copybuf[0] != 'w')
-               {
-                       fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-                                       progname, copybuf[0]);
-                       goto error;
-               }
-
-               /*
-                * Read the header of the XLogData message, enclosed in the CopyData
-                * message. We only need the WAL location field (dataStart), the rest
-                * of the header is ignored.
-                */
-               hdr_len = 1;    /* msgtype 'w' */
-               hdr_len += 8;   /* dataStart */
-               hdr_len += 8;   /* walEnd */
-               hdr_len += 8;   /* sendTime */
-               if (r < hdr_len + 1)
+               else if (copybuf[0] == 'w')
                {
-                       fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                                       progname, r);
-                       goto error;
-               }
-               blockpos = recvint64(&copybuf[1]);
-
-               /* Extract WAL location for this block */
-               xlogoff = blockpos % XLOG_SEG_SIZE;
+                       /*
+                        * Once we've decided we don't want to receive any more, just
+                        * ignore any subsequent XLogData messages.
+                        */
+                       if (!still_sending)
+                               continue;
 
-               /*
-                * Verify that the initial location in the stream matches where we
-                * think we are.
-                */
-               if (walfile == -1)
-               {
-                       /* No file open yet */
-                       if (xlogoff != 0)
-                       {
-                               fprintf(stderr,
-                                               _("%s: received transaction log record for offset %u with no file open\n"),
-                                               progname, xlogoff);
-                               goto error;
-                       }
-               }
-               else
-               {
-                       /* More data in existing segment */
-                       /* XXX: store seek value don't reseek all the time */
-                       if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+                       /*
+                        * Read the header of the XLogData message, enclosed in the
+                        * CopyData message. We only need the WAL location field
+                        * (dataStart), the rest of the header is ignored.
+                        */
+                       hdr_len = 1;    /* msgtype 'w' */
+                       hdr_len += 8;   /* dataStart */
+                       hdr_len += 8;   /* walEnd */
+                       hdr_len += 8;   /* sendTime */
+                       if (r < hdr_len + 1)
                        {
-                               fprintf(stderr,
-                                               _("%s: got WAL data offset %08x, expected %08x\n"),
-                                               progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                                               progname, r);
                                goto error;
                        }
-               }
-
-               bytes_left = r - hdr_len;
-               bytes_written = 0;
+                       blockpos = recvint64(&copybuf[1]);
 
-               while (bytes_left)
-               {
-                       int                     bytes_to_write;
+                       /* Extract WAL location for this block */
+                       xlogoff = blockpos % XLOG_SEG_SIZE;
 
                        /*
-                        * If crossing a WAL boundary, only write up until we reach
-                        * XLOG_SEG_SIZE.
+                        * Verify that the initial location in the stream matches where
+                        * we think we are.
                         */
-                       if (xlogoff + bytes_left > XLOG_SEG_SIZE)
-                               bytes_to_write = XLOG_SEG_SIZE - xlogoff;
-                       else
-                               bytes_to_write = bytes_left;
-
                        if (walfile == -1)
                        {
-                               walfile = open_walfile(blockpos, timeline,
-                                                                          basedir, current_walfile_name);
-                               if (walfile == -1)
-                                       /* Error logged by open_walfile */
+                               /* No file open yet */
+                               if (xlogoff != 0)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: received transaction log record for offset %u with no file open\n"),
+                                                       progname, xlogoff);
                                        goto error;
+                               }
                        }
-
-                       if (write(walfile,
-                                         copybuf + hdr_len + bytes_written,
-                                         bytes_to_write) != bytes_to_write)
+                       else
                        {
-                               fprintf(stderr,
-                                 _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
-                                               progname, bytes_to_write, current_walfile_name,
-                                               strerror(errno));
-                               goto error;
+                               /* More data in existing segment */
+                               /* XXX: store seek value don't reseek all the time */
+                               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: got WAL data offset %08x, expected %08x\n"),
+                                                       progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                                       goto error;
+                               }
                        }
 
-                       /* Write was successful, advance our position */
-                       bytes_written += bytes_to_write;
-                       bytes_left -= bytes_to_write;
-                       blockpos += bytes_to_write;
-                       xlogoff += bytes_to_write;
+                       bytes_left = r - hdr_len;
+                       bytes_written = 0;
 
-                       /* Did we reach the end of a WAL segment? */
-                       if (blockpos % XLOG_SEG_SIZE == 0)
+                       while (bytes_left)
                        {
-                               if (!close_walfile(basedir, current_walfile_name, false))
-                                       /* Error message written in close_walfile() */
-                                       goto error;
+                               int                     bytes_to_write;
 
-                               xlogoff = 0;
+                               /*
+                                * If crossing a WAL boundary, only write up until we reach
+                                * XLOG_SEG_SIZE.
+                                */
+                               if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+                                       bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+                               else
+                                       bytes_to_write = bytes_left;
 
-                               if (stream_stop != NULL)
+                               if (walfile == -1)
                                {
-                                       /*
-                                        * Callback when the segment finished, and return if it
-                                        * told us to.
-                                        */
-                                       if (stream_stop(blockpos, timeline, true))
-                                               return true;
+                                       if (!open_walfile(blockpos, timeline,
+                                                                         basedir, partial_suffix))
+                                       {
+                                               /* Error logged by open_walfile */
+                                               goto error;
+                                       }
                                }
-                       }
-               }
-               /* No more data left to write, start receiving next copy packet */
-       }
 
-       /*
-        * The only way to get out of the loop is if the server shut down the
-        * replication stream. If it's a controlled shutdown, the server will send
-        * a shutdown message, and we'll return the latest xlog location that has
-        * been streamed.
-        */
+                               if (write(walfile,
+                                                 copybuf + hdr_len + bytes_written,
+                                                 bytes_to_write) != bytes_to_write)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+                                                       progname, bytes_to_write, current_walfile_name,
+                                                       strerror(errno));
+                                       goto error;
+                               }
 
-       res = PQgetResult(conn);
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-       {
-               fprintf(stderr,
-                               _("%s: unexpected termination of replication stream: %s"),
-                               progname, PQresultErrorMessage(res));
-               goto error;
-       }
-       PQclear(res);
+                               /* Write was successful, advance our position */
+                               bytes_written += bytes_to_write;
+                               bytes_left -= bytes_to_write;
+                               blockpos += bytes_to_write;
+                               xlogoff += bytes_to_write;
 
-       /* Complain if we've not reached stop point yet */
-       if (stream_stop != NULL && !stream_stop(blockpos, timeline, false))
-       {
-               fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
-                               progname);
-               goto error;
+                               /* Did we reach the end of a WAL segment? */
+                               if (blockpos % XLOG_SEG_SIZE == 0)
+                               {
+                                       if (!close_walfile(basedir, partial_suffix))
+                                               /* Error message written in close_walfile() */
+                                               goto error;
+
+                                       xlogoff = 0;
+
+                                       if (still_sending && stream_stop(blockpos, timeline, false))
+                                       {
+                                               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                                               {
+                                                       fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                                                       progname, PQerrorMessage(conn));
+                                                       goto error;
+                                               }
+                                               still_sending = false;
+                                               break; /* ignore the rest of this XLogData packet */
+                                       }
+                               }
+                       }
+                       /* No more data left to write, receive next copy packet */
+               }
+               else
+               {
+                       fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                                       progname, copybuf[0]);
+                       goto error;
+               }
        }
 
-       if (copybuf != NULL)
-               PQfreemem(copybuf);
-       if (walfile != -1 && close(walfile) != 0)
-               fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-       walfile = -1;
-       return true;
-
 error:
        if (copybuf != NULL)
                PQfreemem(copybuf);
-       if (walfile != -1 && close(walfile) != 0)
-               fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-                               progname, current_walfile_name, strerror(errno));
-       walfile = -1;
        return false;
 }
index 7176a68beaab6d9f1d39d19a7e8cd35037e80b0c..53f31a78ecab4e41600901a738d640475d7324ee 100644 (file)
@@ -13,4 +13,4 @@ extern bool ReceiveXlogStream(PGconn *conn,
                                  char *basedir,
                                  stream_stop_callback stream_stop,
                                  int standby_message_timeout,
-                                 bool rename_partial);
+                                 char *partial_suffix);
index dd16f97bd79aa423d47d68399140a733cccc8487..7d45fcad8a4379f101d98cc69f8348c85936dbb2 100644 (file)
@@ -37,6 +37,7 @@ extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
 extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
 extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
-extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
+extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history,
+                          TimeLineID *nextTLI);
 
 #endif   /* TIMELINE_H */
index 885b5fc0ad74396321762d53e255f0faec0897fd..72e324259645774004478bdf8759cd8236fbee6e 100644 (file)
@@ -317,8 +317,10 @@ extern void SetWalWriterSleeping(bool sleeping);
 /*
  * Starting/stopping a base backup
  */
-extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile);
-extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive);
+extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
+                                  TimeLineID *starttli_p, char **labelfile);
+extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
+                                 TimeLineID *stoptli_p);
 extern void do_pg_abort_backup(void);
 
 /* File path names (all relative to $PGDATA) */