]> granicus.if.org Git - postgresql/commitdiff
Prevent WAL files created by pg_basebackup -x/X from being archived again.
authorAndres Freund <andres@anarazel.de>
Sat, 3 Jan 2015 19:51:52 +0000 (20:51 +0100)
committerAndres Freund <andres@anarazel.de>
Sat, 3 Jan 2015 19:54:13 +0000 (20:54 +0100)
WAL (and timeline history) files created by pg_basebackup did not
maintain the new base backup's archive status. That's currently not a
problem if the new node is used as a standby - but if that node is
promoted all still existing files can get archived again.  With a high
wal_keep_segment settings that can happen a significant time later -
which is quite confusing.

Change both the backend (for the -x/-X fetch case) and pg_basebackup
(for -X stream) itself to always mark WAL/timeline files included in
the base backup as .done. That's in line with walreceiver.c doing so.

The verbosity of the pg_basebackup changes show pretty clearly that it
needs some refactoring, but that'd result in not be backpatchable
changes.

Backpatch to 9.1 where pg_basebackup was introduced.

Discussion: 20141205002854.GE21964@awork2.anarazel.de

src/backend/replication/basebackup.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index b387612c090df232c140a31b26cbb3d19fb14763..bffed284431c7040daf6b5df77716f0f5c02e789 100644 (file)
@@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                                        errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
                        }
 
+                       /* send the WAL file itself */
                        _tarWriteHeader(pathbuf, NULL, &statbuf);
 
                        while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                        }
 
                        /* XLogSegSize is a multiple of 512, so no need for padding */
+
                        FreeFile(fp);
+
+                       /*
+                        * Mark file as archived, otherwise files can get archived again
+                        * after promotion of a new node. This is in line with
+                        * walreceiver.c always doing a XLogArchiveForceDone() after a
+                        * complete segment.
+                        */
+                       StatusFilePath(pathbuf, walFiles[i], ".done");
+                       sendFileWithContent(pathbuf, "");
                }
 
                /*
@@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                                                 errmsg("could not stat file \"%s\": %m", pathbuf)));
 
                        sendFile(pathbuf, pathbuf, &statbuf, false);
+
+                       /* unconditionally mark file as archived */
+                       StatusFilePath(pathbuf, fname, ".done");
+                       sendFileWithContent(pathbuf, "");
                }
 
                /* Send CopyDone message for the last tar file */
@@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
                                _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
                        }
                        size += 512;            /* Size of the header just added */
+
+                       /*
+                        * Also send archive_status directory (by hackishly reusing
+                        * statbuf from above ...).
+                        */
+                       if (!sizeonly)
+                               _tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
+                       size += 512;            /* Size of the header just added */
+
                        continue;                       /* don't recurse into pg_xlog */
                }
 
index 1ad2bd26cb47a4b9ef55e6ad44cb2359c2867fbc..1b44d4e5b8b52650e7fffe2cd2a5bfa96535a5fe 100644 (file)
@@ -25,6 +25,7 @@
 #include <zlib.h>
 #endif
 
+#include "common/string.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
 #include "pqexpbuffer.h"
@@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
        if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                                                   param->sysidentifier, param->xlogdir,
                                                   reached_end_position, standby_message_timeout,
-                                                  NULL))
+                                                  NULL, true))
 
                /*
                 * Any errors will already have been reported in the function process,
@@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
        logstreamer_param *param;
        uint32          hi,
                                lo;
+       char            statusdir[MAXPGPATH];
 
        param = pg_malloc0(sizeof(logstreamer_param));
        param->timeline = timeline;
@@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
                /* Error message already written in GetConnection() */
                exit(1);
 
+       snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+
        /*
-        * Always in plain format, so we can write to basedir/pg_xlog. But the
-        * directory entry in the tar file may arrive later, so make sure it's
-        * created before we start.
+        * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
+        * basedir/pg_xlog as the directory entry in the tar file may arrive
+        * later.
         */
-       snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-       verify_dir_is_empty_or_create(param->xlogdir);
+       snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+                        basedir);
+
+       if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+       {
+               fprintf(stderr,
+                               _("%s: could not create directory \"%s\": %s\n"),
+                               progname, statusdir, strerror(errno));
+               disconnect_and_exit(1);
+       }
 
        /*
         * Start a child process and tell it to start streaming. On Unix, this is
@@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
                                                 * by the wal receiver process. Also, when transaction
                                                 * log directory location was specified, pg_xlog has
                                                 * already been created as a symbolic link before
-                                                * starting the actual backup. So just ignore failure
-                                                * on them.
+                                                * starting the actual backup. So just ignore creation
+                                                * failures on related directories.
                                                 */
-                                               if ((!streamwal && (strcmp(xlog_dir, "") == 0))
-                                                       || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+                                               if (!((pg_str_endswith(filename, "/pg_xlog") ||
+                                                          pg_str_endswith(filename, "/archive_status")) &&
+                                                         errno == EEXIST))
                                                {
                                                        fprintf(stderr,
                                                        _("%s: could not create directory \"%s\": %s\n"),
index a31d581231210c00ce57884a7b573688ed76b7be..469c7d8756735a3f2dc1c43ed1ba73871e1fcd32 100644 (file)
@@ -330,7 +330,8 @@ StreamLog(void)
                                starttli);
 
        ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-                                         stop_streaming, standby_message_timeout, ".partial");
+                                         stop_streaming, standby_message_timeout, ".partial",
+                                         false);
 
        PQfinish(conn);
 }
index 0878809f7b7e98dd297ef49874271b27e3cc9102..b2c58cfdf3ff26ec3e6e8e1bf75915c3144b797d 100644 (file)
@@ -34,11 +34,41 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
                           stream_stop_callback stream_stop, int standby_message_timeout,
-                                char *partial_suffix, XLogRecPtr *stoppos);
+                                char *partial_suffix, XLogRecPtr *stoppos,
+                                bool mark_done);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
 
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+       int fd;
+       static char tmppath[MAXPGPATH];
+
+       snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+                        basedir, fname);
+
+       fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+       if (fd < 0)
+       {
+               fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       if (fsync(fd) != 0)
+       {
+               fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+                               progname, tmppath, strerror(errno));
+               return false;
+       }
+
+       close(fd);
+
+       return true;
+}
+
 /*
  * Open a new WAL file in the specified directory.
  *
@@ -132,7 +162,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
 {
        off_t           currpos;
 
@@ -186,6 +216,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
                                _("%s: not renaming \"%s%s\", segment is not complete\n"),
                                progname, current_walfile_name, partial_suffix);
 
+       /*
+        * Mark file as archived if requested by the caller - pg_basebackup needs
+        * to do so as files can otherwise get archived again after promotion of a
+        * new node. This is in line with walreceiver.c always doing a
+        * XLogArchiveForceDone() after a complete segment.
+        */
+       if (currpos == XLOG_SEG_SIZE && mark_done)
+       {
+               /* writes error message if failed */
+               if (!mark_file_as_archived(basedir, current_walfile_name))
+                       return false;
+       }
+
        lastFlushPosition = pos;
        return true;
 }
@@ -228,7 +271,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
 }
 
 static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
+                                                char *content, bool mark_done)
 {
        int                     size = strlen(content);
        char            path[MAXPGPATH];
@@ -307,6 +351,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
                return false;
        }
 
+       /* Maintain archive_status, check close_walfile() for details. */
+       if (mark_done)
+       {
+               /* writes error message if failed */
+               if (!mark_file_as_archived(basedir, histfname))
+                       return false;
+       }
+
        return true;
 }
 
@@ -423,7 +475,8 @@ bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  char *sysidentifier, char *basedir,
                                  stream_stop_callback stream_stop,
-                                 int standby_message_timeout, char *partial_suffix)
+                                 int standby_message_timeout, char *partial_suffix,
+                                 bool mark_done)
 {
        char            query[128];
        char            slotcmd[128];
@@ -538,7 +591,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        /* Write the history file to disk */
                        writeTimeLineHistoryFile(basedir, timeline,
                                                                         PQgetvalue(res, 0, 0),
-                                                                        PQgetvalue(res, 0, 1));
+                                                                        PQgetvalue(res, 0, 1),
+                                                                        mark_done);
 
                        PQclear(res);
                }
@@ -568,7 +622,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /* Stream the WAL */
                res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                                                           standby_message_timeout, partial_suffix,
-                                                          &stoppos);
+                                                          &stoppos, mark_done);
                if (res == NULL)
                        goto error;
 
@@ -733,7 +787,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                 char *basedir, stream_stop_callback stream_stop,
                                 int standby_message_timeout, char *partial_suffix,
-                                XLogRecPtr *stoppos)
+                                XLogRecPtr *stoppos, bool mark_done)
 {
        char       *copybuf = NULL;
        int64           last_status = -1;
@@ -760,7 +814,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 */
                if (still_sending && stream_stop(blockpos, timeline, false))
                {
-                       if (!close_walfile(basedir, partial_suffix, blockpos))
+                       if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
                        {
                                /* Potential error message is written by close_walfile */
                                goto error;
@@ -859,7 +913,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                         */
                        if (still_sending)
                        {
-                               if (!close_walfile(basedir, partial_suffix, blockpos))
+                               if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
                                {
                                        /* Error message written in close_walfile() */
                                        PQclear(res);
@@ -1046,7 +1100,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                /* Did we reach the end of a WAL segment? */
                                if (blockpos % XLOG_SEG_SIZE == 0)
                                {
-                                       if (!close_walfile(basedir, partial_suffix, blockpos))
+                                       if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
                                                /* Error message written in close_walfile() */
                                                goto error;
 
index f4789a580ae75df4b97fe40be89ec3fe4817ba70..3a670d8998ff34a3959ac780f0fea9a9f4eaae93 100644 (file)
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
                                  char *basedir,
                                  stream_stop_callback stream_stop,
                                  int standby_message_timeout,
-                                 char *partial_suffix);
+                                 char *partial_suffix,
+                                 bool mark_done);