From: Andres Freund Date: Sat, 3 Jan 2015 19:51:52 +0000 (+0100) Subject: Prevent WAL files created by pg_basebackup -x/X from being archived again. X-Git-Tag: REL9_5_ALPHA1~972 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=2c0a485896;p=postgresql Prevent WAL files created by pg_basebackup -x/X from being archived again. WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de --- diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index fbcecbb972..24c3d8d314 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); } + /* send the WAL file itself */ _tarWriteHeader(pathbuf, NULL, &statbuf); while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0) @@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) } /* XLogSegSize is a multiple of 512, so no need for padding */ + FreeFile(fp); + + /* + * Mark file as archived, otherwise files can get archived again + * after promotion of a new node. This is in line with + * walreceiver.c always doing a XLogArchiveForceDone() after a + * complete segment. + */ + StatusFilePath(pathbuf, walFiles[i], ".done"); + sendFileWithContent(pathbuf, ""); } /* @@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) errmsg("could not stat file \"%s\": %m", pathbuf))); sendFile(pathbuf, pathbuf, &statbuf, false); + + /* unconditionally mark file as archived */ + StatusFilePath(pathbuf, fname, ".done"); + sendFileWithContent(pathbuf, ""); } /* Send CopyDone message for the last tar file */ @@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces) _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); } size += 512; /* Size of the header just added */ + + /* + * Also send archive_status directory (by hackishly reusing + * statbuf from above ...). + */ + if (!sizeonly) + _tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf); + size += 512; /* Size of the header just added */ + continue; /* don't recurse into pg_xlog */ } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 0470401aea..ed2f3021d0 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -25,6 +25,7 @@ #include #endif +#include "common/string.h" #include "getopt_long.h" #include "libpq-fe.h" #include "pqexpbuffer.h" @@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, - NULL, false)) + NULL, false, true)) /* * Any errors will already have been reported in the function process, @@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) logstreamer_param *param; uint32 hi, lo; + char statusdir[MAXPGPATH]; param = pg_malloc0(sizeof(logstreamer_param)); param->timeline = timeline; @@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) /* Error message already written in GetConnection() */ exit(1); + snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); + /* - * Always in plain format, so we can write to basedir/pg_xlog. But the - * directory entry in the tar file may arrive later, so make sure it's - * created before we start. + * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to + * basedir/pg_xlog as the directory entry in the tar file may arrive + * later. */ - snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); - verify_dir_is_empty_or_create(param->xlogdir); + snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status", + basedir); + + if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST) + { + fprintf(stderr, + _("%s: could not create directory \"%s\": %s\n"), + progname, statusdir, strerror(errno)); + disconnect_and_exit(1); + } /* * Start a child process and tell it to start streaming. On Unix, this is @@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * by the wal receiver process. Also, when transaction * log directory location was specified, pg_xlog has * already been created as a symbolic link before - * starting the actual backup. So just ignore failure - * on them. + * starting the actual backup. So just ignore creation + * failures on related directories. */ - if ((!streamwal && (strcmp(xlog_dir, "") == 0)) - || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0) + if (!((pg_str_endswith(filename, "/pg_xlog") || + pg_str_endswith(filename, "/archive_status")) && + errno == EEXIST)) { fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 4658f080f3..b10da73dc5 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -342,7 +342,7 @@ StreamLog(void) ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", - synchronous); + synchronous, false); PQfinish(conn); conn = NULL; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index f0f8760e2d..123f44526c 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, - bool synchronous); + bool synchronous, bool mark_done); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, @@ -45,20 +45,50 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, - char *partial_suffix); + char *partial_suffix, bool mark_done); static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos); + XLogRecPtr *stoppos, bool mark_done); static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos); + char *partial_suffix, XLogRecPtr *stoppos, + bool mark_done); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, int64 last_status); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); +static bool +mark_file_as_archived(const char *basedir, const char *fname) +{ + int fd; + static char tmppath[MAXPGPATH]; + + snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done", + basedir, fname); + + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + if (fsync(fd) != 0) + { + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + close(fd); + + return true; +} + /* * Open a new WAL file in the specified directory. * @@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) +close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done) { off_t currpos; @@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); + /* + * Mark file as archived if requested by the caller - pg_basebackup needs + * to do so as files can otherwise get archived again after promotion of a + * new node. This is in line with walreceiver.c always doing a + * XLogArchiveForceDone() after a complete segment. + */ + if (currpos == XLOG_SEG_SIZE && mark_done) + { + /* writes error message if failed */ + if (!mark_file_as_archived(basedir, current_walfile_name)) + return false; + } + lastFlushPosition = pos; return true; } @@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) } static bool -writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content) +writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, + char *content, bool mark_done) { int size = strlen(content); char path[MAXPGPATH]; @@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co return false; } + /* Maintain archive_status, check close_walfile() for details. */ + if (mark_done) + { + /* writes error message if failed */ + if (!mark_file_as_archived(basedir, histfname)) + return false; + } + return true; } @@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - bool synchronous) + bool synchronous, bool mark_done) { char query[128]; char slotcmd[128]; @@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Write the history file to disk */ writeTimeLineHistoryFile(basedir, timeline, PQgetvalue(res, 0, 0), - PQgetvalue(res, 0, 1)); + PQgetvalue(res, 0, 1), + mark_done); PQclear(res); } @@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, - &stoppos, synchronous); + &stoppos, synchronous, mark_done); if (res == NULL) goto error; @@ -757,7 +810,7 @@ static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos, bool synchronous) + XLogRecPtr *stoppos, bool synchronous, bool mark_done) { char *copybuf = NULL; int64 last_status = -1; @@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos)) + stream_stop, partial_suffix, stoppos, + mark_done)) goto error; now = feGetCurrentTimestamp(); @@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == -2) { PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, - basedir, partial_suffix, stoppos); + basedir, partial_suffix, + stoppos, mark_done); if (res == NULL) goto error; else @@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, else if (copybuf[0] == 'w') { if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, - timeline, basedir, stream_stop, partial_suffix)) + timeline, basedir, stream_stop, + partial_suffix, true)) goto error; /* * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos)) + stream_stop, partial_suffix, stoppos, + mark_done)) goto error; } else @@ -1055,7 +1112,7 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, - char *partial_suffix) + char *partial_suffix, bool mark_done) { int xlogoff; int bytes_left; @@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix, *blockpos)) + if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done)) /* Error message written in close_walfile() */ return false; @@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static PGresult * HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos) + XLogRecPtr *stoppos, bool mark_done) { PGresult *res = PQgetResult(conn); @@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix, blockpos)) + if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) { /* Error message written in close_walfile() */ PQclear(res); @@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos) + char *partial_suffix, XLogRecPtr *stoppos, bool mark_done) { if (still_sending && stream_stop(blockpos, timeline, false)) { - if (!close_walfile(basedir, partial_suffix, blockpos)) + if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) { /* Potential error message is written by close_walfile */ return false; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 9dd7005167..1f64a740cc 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -31,6 +31,7 @@ extern bool ReceiveXlogStream(PGconn *conn, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - bool synchronous); + bool synchronous, + bool mark_done); #endif /* RECEIVELOG_H */