From 31d965819bdaa971ae007a67611e78ec1d185f14 Mon Sep 17 00:00:00 2001 From: Magnus Hagander Date: Fri, 25 May 2012 11:36:22 +0200 Subject: [PATCH] Fix base backup streaming xlog from standby When backing up from a standby server, the backup process will not automatically switch xlog segment. So we must accept a partially transferred xlog file in this case, but rename it into position anyway. In passing, merge the two callbacks for segment end and stop stream into a single callback, since their implementations were close to identical, and rename this callback to reflect that it stops streaming rather than continues it. Patch by Magnus Hagander, review by Fujii Masao --- src/bin/pg_basebackup/pg_basebackup.c | 9 ++++--- src/bin/pg_basebackup/pg_receivexlog.c | 19 ++++----------- src/bin/pg_basebackup/receivelog.c | 33 +++++++++++++++----------- src/bin/pg_basebackup/receivelog.h | 18 +++++--------- 4 files changed, 34 insertions(+), 45 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 0289c4bc4f..6a2e557809 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -78,7 +78,7 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); -static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); +static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); #ifdef HAVE_LIBZ static const char * @@ -129,8 +129,7 @@ usage(void) /* - * Called in the background process whenever a complete segment of WAL - * has been received. + * Called in the background process every time data is received. * On Unix, we check to see if there is any data on our pipe * (which would mean we have a stop position), and if it is, check if * it is time to stop. @@ -138,7 +137,7 @@ usage(void) * time to stop. */ static bool -segment_callback(XLogRecPtr segendpos, uint32 timeline) +reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) { if (!has_xlogendptr) { @@ -231,7 +230,7 @@ LogStreamerMain(logstreamer_param * param) { if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, - segment_callback, NULL, standby_message_timeout)) + reached_end_position, standby_message_timeout, true)) /* * Any errors will already have been reported in the function process, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 2134c8729c..01f20f372a 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -43,7 +43,7 @@ volatile bool time_to_abort = false; static void usage(void); static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); static void StreamLog(); -static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); +static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); static void usage(void) @@ -69,21 +69,12 @@ usage(void) } static bool -segment_callback(XLogRecPtr segendpos, uint32 timeline) +stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) { - if (verbose) + if (verbose && segment_finished) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), progname, segendpos.xlogid, segendpos.xrecoff, timeline); - /* - * Never abort from this - we handle all aborting in continue_streaming() - */ - return false; -} - -static bool -continue_streaming(void) -{ if (time_to_abort) { fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), @@ -268,8 +259,8 @@ StreamLog(void) progname, startpos.xlogid, startpos.xrecoff, timeline); ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, - segment_callback, continue_streaming, - standby_message_timeout); + stop_streaming, + standby_message_timeout, false); PQfinish(conn); } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index b0cf836968..efbc4ca653 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -113,8 +113,14 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu return f; } +/* + * Close the current WAL file, and rename it to the correct filename if it's complete. + * + * If segment_complete is true, rename the current WAL file even if we've not + * completed writing the whole segment. + */ static bool -close_walfile(int walfile, char *basedir, char *walname) +close_walfile(int walfile, char *basedir, char *walname, bool segment_complete) { off_t currpos = lseek(walfile, 0, SEEK_CUR); @@ -141,9 +147,9 @@ close_walfile(int walfile, char *basedir, char *walname) /* * Rename the .partial file only if we've completed writing the - * whole segment. + * whole segment or segment_complete is true. */ - if (currpos == XLOG_SEG_SIZE) + if (currpos == XLOG_SEG_SIZE || segment_complete) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; @@ -199,11 +205,10 @@ localGetCurrentTimestamp(void) * All received segments will be written to the directory * specified by basedir. * - * The segment_finish callback will be called after each segment - * has been finished, and the stream_continue callback will be - * called every time data is received. If either of these callbacks - * return true, the streaming will stop and the function - * return. As long as they return false, streaming will continue + * The stream_stop callback will be called every time data + * is received, and whenever a segment is completed. If it returns + * true, the streaming will stop and the function + * return. As long as it returns false, streaming will continue * indefinitely. * * standby_message_timeout controls how often we send a message @@ -214,7 +219,7 @@ localGetCurrentTimestamp(void) * Note: The log position *must* be at a log segment start! */ bool -ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout) +ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial) { char query[128]; char current_walfile_name[MAXPGPATH]; @@ -288,11 +293,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* * Check if we should continue streaming, or abort at this point. */ - if (stream_continue && stream_continue()) + if (stream_stop && stream_stop(blockpos, timeline, false)) { if (walfile != -1) /* Potential error message is written by close_walfile */ - return close_walfile(walfile, basedir, current_walfile_name); + return close_walfile(walfile, basedir, current_walfile_name, rename_partial); return true; } @@ -486,20 +491,20 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { - if (!close_walfile(walfile, basedir, current_walfile_name)) + if (!close_walfile(walfile, basedir, current_walfile_name, false)) /* Error message written in close_walfile() */ return false; walfile = -1; xlogoff = 0; - if (segment_finish != NULL) + if (stream_stop != NULL) { /* * Callback when the segment finished, and return if it * told us to. */ - if (segment_finish(blockpos, timeline)) + if (stream_stop(blockpos, timeline, true)) return true; } } diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 1c61ea8ac1..0a803ee4ac 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -1,22 +1,16 @@ #include "access/xlogdefs.h" /* - * Called whenever a segment is finished, return true to stop - * the streaming at this point. + * Called before trying to read more data or when a segment is + * finished. Return true to stop streaming. */ -typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline); - -/* - * Called before trying to read more data. Return true to stop - * the streaming at this point. - */ -typedef bool (*stream_continue_callback)(void); +typedef bool (*stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); extern bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, - segment_finish_callback segment_finish, - stream_continue_callback stream_continue, - int standby_message_timeout); + stream_stop_callback stream_stop, + int standby_message_timeout, + bool rename_partial); -- 2.40.0