From 3dad73e71f08abd86564d5090a58ca71740e07e0 Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Fri, 8 Aug 2014 16:50:54 +0900 Subject: [PATCH] Add -F option to pg_receivexlog, for specifying fsync interval. This allows us to specify the maximum time to issue fsync to ensure the received WAL file is safely flushed to disk. Without this, pg_receivexlog always flushes WAL file only when it's closed and which can cause WAL data to be lost at the event of a crash. Furuya Osamu, heavily modified by me. --- doc/src/sgml/ref/pg_receivexlog.sgml | 15 ++ src/bin/pg_basebackup/pg_basebackup.c | 2 +- src/bin/pg_basebackup/pg_receivexlog.c | 18 +- src/bin/pg_basebackup/receivelog.c | 227 ++++++++++++++++++------- src/bin/pg_basebackup/receivelog.h | 3 +- 5 files changed, 195 insertions(+), 70 deletions(-) diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml index 7c50b01a57..c15776fc58 100644 --- a/doc/src/sgml/ref/pg_receivexlog.sgml +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -105,6 +105,21 @@ PostgreSQL documentation + + + + + + Specifies the maximum time to issue sync commands to ensure the + received WAL file is safely flushed to disk, in seconds. The default + value is zero, which disables issuing fsyncs except when WAL file is + closed. If -1 is specified, WAL file is flushed as + soon as possible, that is, as soon as there are WAL data which has + not been flushed yet. + + + + diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 5df2eb8c0d..0b02c4c401 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, - NULL)) + NULL, 0)) /* * Any errors will already have been reported in the function process, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 9640838906..0b7af54a7b 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -36,6 +36,7 @@ static char *basedir = NULL; static int verbose = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; @@ -62,6 +63,8 @@ usage(void) printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -F --fsync-interval=INTERVAL\n" + " frequency of syncs to transaction log files (in seconds)\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); @@ -330,7 +333,8 @@ StreamLog(void) starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, - stop_streaming, standby_message_timeout, ".partial"); + stop_streaming, standby_message_timeout, ".partial", + fsync_interval); PQfinish(conn); } @@ -360,6 +364,7 @@ main(int argc, char **argv) {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, + {"fsync-interval", required_argument, NULL, 'F'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, @@ -389,7 +394,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv", + while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv", long_options, &option_index)) != -1) { switch (c) @@ -436,6 +441,15 @@ main(int argc, char **argv) case 'n': noloop = 1; break; + case 'F': + fsync_interval = atoi(optarg) * 1000; + if (fsync_interval < -1000) + { + fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; case 'v': verbose++; break; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d28e13b4d8..89b22f20e2 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; +static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, - char *partial_suffix, XLogRecPtr *stoppos); + char *partial_suffix, XLogRecPtr *stoppos, + int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, @@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr *stoppos); +static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, + uint32 timeline, char *basedir, + stream_stop_callback stream_stop, + char *partial_suffix, XLogRecPtr *stoppos); +static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, + int64 last_status, int fsync_interval, + XLogRecPtr blockpos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); @@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) progname, current_walfile_name, partial_suffix); lastFlushPosition = pos; + last_fsync = feGetCurrentTimestamp(); return true; } @@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * + * fsync_interval controls how often we flush to the received WAL file, + * in milliseconds. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix) + int standby_message_timeout, char *partial_suffix, + int fsync_interval) { char query[128]; char slotcmd[128]; @@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, - &stoppos); + &stoppos, fsync_interval); if (res == NULL) goto error; @@ -746,7 +760,7 @@ static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos) + XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; int64 last_status = -1; @@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ - if (still_sending && stream_stop(blockpos, timeline, false)) + if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, + stream_stop, partial_suffix, stoppos)) + goto error; + + now = feGetCurrentTimestamp(); + + /* + * If fsync_interval has elapsed since last WAL flush and we've written + * some WAL data, flush them to disk. + */ + if (lastFlushPosition < blockpos && + walfile != -1 && + ((fsync_interval > 0 && + feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) || + fsync_interval < 0)) { - if (!close_walfile(basedir, partial_suffix, blockpos)) - { - /* Potential error message is written by close_walfile */ - goto error; - } - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + if (fsync(walfile) != 0) { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, current_walfile_name, strerror(errno)); goto error; } - still_sending = false; + + lastFlushPosition = blockpos; + last_fsync = now; } /* * Potentially send a status message to the master */ - now = feGetCurrentTimestamp(); if (still_sending && standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) @@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* - * Compute how long send/receive loops should sleep + * Calculate how long send/receive loops should sleep */ - if (standby_message_timeout && still_sending) - { - int64 targettime; - long secs; - int usecs; - - targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - feTimestampDifference(now, - targettime, - &secs, - &usecs); - /* Always sleep at least 1 sec */ - if (secs <= 0) - { - secs = 1; - usecs = 0; - } - - sleeptime = secs * 1000 + usecs / 1000; - } - else - sleeptime = -1; + sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, + last_status, fsync_interval, blockpos); r = CopyStreamReceive(conn, sleeptime, ©buf); - if (r == 0) - continue; - if (r == -1) - goto error; - if (r == -2) + while (r != 0) { - PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, - basedir, partial_suffix, stoppos); - if (res == NULL) + if (r == -1) goto error; - else - return res; - } + if (r == -2) + { + PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, + basedir, partial_suffix, stoppos); + if (res == NULL) + goto error; + else + return res; + } - /* Check the message type. */ - if (copybuf[0] == 'k') - { - if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, - &last_status)) - goto error; - } - else if (copybuf[0] == 'w') - { - if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, - timeline, basedir, stream_stop, partial_suffix)) + /* Check the message type. */ + if (copybuf[0] == 'k') + { + if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, + &last_status)) + goto error; + } + else if (copybuf[0] == 'w') + { + if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, + timeline, basedir, stream_stop, partial_suffix)) + goto error; + + /* + * Check if we should continue streaming, or abort at this point. + */ + if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, + stream_stop, partial_suffix, stoppos)) + goto error; + } + else + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); goto error; - } - else - { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); - goto error; + } + + /* + * Process the received data, and any subsequent data we + * can read without blocking. + */ + r = CopyStreamReceive(conn, 0, ©buf); } } @@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, *stoppos = blockpos; return res; } + +/* + * Check if we should continue streaming, or abort at this point. + */ +static bool +CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + char *partial_suffix, XLogRecPtr *stoppos) +{ + if (still_sending && stream_stop(blockpos, timeline, false)) + { + if (!close_walfile(basedir, partial_suffix, blockpos)) + { + /* Potential error message is written by close_walfile */ + return false; + } + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + still_sending = false; + } + + return true; +} + +/* + * Calculate how long send/receive loops should sleep + */ +static long +CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, + int64 last_status, int fsync_interval, XLogRecPtr blockpos) +{ + int64 targettime = 0; + int64 status_targettime = 0; + int64 fsync_targettime = 0; + long sleeptime; + + if (standby_message_timeout && still_sending) + status_targettime = last_status + + (standby_message_timeout - 1) * ((int64) 1000); + + if (fsync_interval > 0 && lastFlushPosition < blockpos) + fsync_targettime = last_fsync + + (fsync_interval - 1) * ((int64) 1000); + + if ((status_targettime < fsync_targettime && status_targettime > 0) || + fsync_targettime == 0) + targettime = status_targettime; + else + targettime = fsync_targettime; + + if (targettime > 0) + { + long secs; + int usecs; + + feTimestampDifference(now, + targettime, + &secs, + &usecs); + /* Always sleep at least 1 sec */ + if (secs <= 0) + { + secs = 1; + usecs = 0; + } + + sleeptime = secs * 1000 + usecs / 1000; + } + else + sleeptime = -1; + + return sleeptime; +} diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index f4789a580a..72f8245373 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, - char *partial_suffix); + char *partial_suffix, + int fsync_interval); -- 2.40.0