From: Magnus Hagander Date: Fri, 11 Mar 2016 10:08:01 +0000 (+0100) Subject: Refactor receivelog.c parameters X-Git-Tag: REL9_6_BETA1~522 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=38c83c9b7569378d864d8915e291716b8bec15f2;p=postgresql Refactor receivelog.c parameters Much cruft had accumulated over time with a large number of parameters passed down between functions very deep. With this refactoring, instead introduce a StreamCtl structure that holds the parameters, and pass around a pointer to this structure instead. This makes it much easier to add or remove fields that are needed deeper down in the implementation without having to modify every function header in the file. Patch by me after much nagging from Andres Reviewed by Craig Ringer and Daniel Gustafsson --- diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index ab9692a6d5..94852877a2 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -372,10 +372,20 @@ typedef struct static int LogStreamerMain(logstreamer_param *param) { - if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, - param->sysidentifier, param->xlogdir, - reached_end_position, standby_message_timeout, - NULL, false, true)) + StreamCtl stream; + + MemSet(&stream, sizeof(stream), 0); + stream.startpos = param->startptr; + stream.timeline = param->timeline; + stream.sysidentifier = param->sysidentifier; + stream.stream_stop = reached_end_position; + stream.standby_message_timeout = standby_message_timeout; + stream.synchronous = false; + stream.mark_done = true; + stream.basedir = param->xlogdir; + stream.partial_suffix = NULL; + + if (!ReceiveXlogStream(param->bgconn, &stream)) /* * Any errors will already have been reported in the function process, diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index f96b547b0f..7f7ee9dc9b 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli) static void StreamLog(void) { - XLogRecPtr startpos, - serverpos; - TimeLineID starttli, - servertli; + XLogRecPtr serverpos; + TimeLineID servertli; + StreamCtl stream; + + MemSet(&stream, 0, sizeof(stream)); /* * Connect in replication mode to the server @@ -311,17 +312,17 @@ StreamLog(void) /* * Figure out where to start streaming. */ - startpos = FindStreamingStart(&starttli); - if (startpos == InvalidXLogRecPtr) + stream.startpos = FindStreamingStart(&stream.timeline); + if (stream.startpos == InvalidXLogRecPtr) { - startpos = serverpos; - starttli = servertli; + stream.startpos = serverpos; + stream.timeline = servertli; } /* * Always start streaming at the beginning of a segment */ - startpos -= startpos % XLOG_SEG_SIZE; + stream.startpos -= stream.startpos % XLOG_SEG_SIZE; /* * Start the replication @@ -329,12 +330,17 @@ StreamLog(void) if (verbose) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), - progname, (uint32) (startpos >> 32), (uint32) startpos, - starttli); + progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos, + stream.timeline); + + stream.stream_stop = stop_streaming; + stream.standby_message_timeout = standby_message_timeout; + stream.synchronous = synchronous; + stream.mark_done = false; + stream.basedir = basedir; + stream.partial_suffix = ".partial"; - ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, - stop_streaming, standby_message_timeout, ".partial", - synchronous, false); + ReceiveXlogStream(conn, &stream); PQfinish(conn); conn = NULL; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 01c42fc063..595213f042 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ -static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, - uint32 timeline, char *basedir, - stream_stop_callback stream_stop, int standby_message_timeout, - char *partial_suffix, XLogRecPtr *stoppos, - bool synchronous, bool mark_done); +static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, + XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr blockpos, int64 *last_status); -static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, - XLogRecPtr *blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, bool mark_done); -static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, - XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos, bool mark_done); -static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, - uint32 timeline, char *basedir, - stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos, - bool mark_done); +static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos); +static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, + XLogRecPtr blockpos, XLogRecPtr *stoppos); +static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, + XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, int64 last_status); @@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname) * partial_suffix) is stored in current_walfile_name. */ static bool -open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, - char *partial_suffix) +open_walfile(StreamCtl *stream, XLogRecPtr startpoint) { int f; char fn[MAXPGPATH]; @@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, XLogSegNo segno; XLByteToSeg(startpoint, segno); - XLogFileName(current_walfile_name, timeline, segno); + XLogFileName(current_walfile_name, stream->timeline, segno); - snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, - partial_suffix ? partial_suffix : ""); + snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, + stream->partial_suffix ? stream->partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { @@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done) +close_walfile(StreamCtl *stream, XLogRecPtr pos) { off_t currpos; @@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don /* * If we finished writing a .partial file, rename it into place. */ - if (currpos == XLOG_SEG_SIZE && partial_suffix) + if (currpos == XLOG_SEG_SIZE && stream->partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; - snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); - snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); + snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix); + snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), @@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don return false; } } - else if (partial_suffix) + else if (stream->partial_suffix) fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), - progname, current_walfile_name, partial_suffix); + progname, current_walfile_name, stream->partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs @@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * new node. This is in line with walreceiver.c always doing a * XLogArchiveForceDone() after a complete segment. */ - if (currpos == XLOG_SEG_SIZE && mark_done) + if (currpos == XLOG_SEG_SIZE && stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(basedir, current_walfile_name)) + if (!mark_file_as_archived(stream->basedir, current_walfile_name)) return false; } @@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * Check if a timeline history file exists. */ static bool -existsTimeLineHistoryFile(char *basedir, TimeLineID tli) +existsTimeLineHistoryFile(StreamCtl *stream) { char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; @@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) * Timeline 1 never has a history file. We treat that as if it existed, * since we never need to stream it. */ - if (tli == 1) + if (stream->timeline == 1) return true; - TLHistoryFileName(histfname, tli); + TLHistoryFileName(histfname, stream->timeline); - snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); fd = open(path, O_RDONLY | PG_BINARY, 0); if (fd < 0) @@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) } static bool -writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, - char *content, bool mark_done) +writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) { int size = strlen(content); char path[MAXPGPATH]; @@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, * Check that the server's idea of how timeline history files should be * named matches ours. */ - TLHistoryFileName(histfname, tli); + TLHistoryFileName(histfname, stream->timeline); if (strcmp(histfname, filename) != 0) { fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), - progname, tli, filename); + progname, stream->timeline, filename); return false; } - snprintf(path, sizeof(path), "%s/%s", basedir, histfname); + snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); /* * Write into a temp file name. @@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, } /* Maintain archive_status, check close_walfile() for details. */ - if (mark_done) + if (stream->mark_done) { /* writes error message if failed */ - if (!mark_file_as_archived(basedir, histfname)) + if (!mark_file_as_archived(stream->basedir, histfname)) return false; } @@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn) /* * Receive a log stream starting at the specified position. * + * Individual parameters are passed through the StreamCtl structure. + * * If sysidentifier is specified, validate that both the system * identifier and the timeline matches the specified ones * (by sending an extra IDENTIFY_SYSTEM command) @@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn) * Note: The log position *must* be at a log segment start! */ bool -ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, - char *sysidentifier, char *basedir, - stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix, - bool synchronous, bool mark_done) +ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { char query[128]; char slotcmd[128]; @@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, slotcmd[0] = 0; } - if (sysidentifier != NULL) + if (stream->sysidentifier != NULL) { /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); @@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) + if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0) { fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), @@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } - if (timeline > atoi(PQgetvalue(res, 0, 1))) + if (stream->timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, _("%s: starting timeline %u is not present in the server\n"), - progname, timeline); + progname, stream->timeline); PQclear(res); return false; } @@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * initialize flush position to starting point, it's the caller's * responsibility that that's sane. */ - lastFlushPosition = startpos; + lastFlushPosition = stream->startpos; while (1) { @@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Fetch the timeline history file for this timeline, if we don't have * it already. */ - if (!existsTimeLineHistoryFile(basedir, timeline)) + if (!existsTimeLineHistoryFile(stream)) { - snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); + snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* Write the history file to disk */ - writeTimeLineHistoryFile(basedir, timeline, + writeTimeLineHistoryFile(stream, PQgetvalue(res, 0, 0), - PQgetvalue(res, 0, 1), - mark_done); + PQgetvalue(res, 0, 1)); PQclear(res); } @@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Before we start streaming from the requested location, check if the * callback tells us to stop here. */ - if (stream_stop(startpos, timeline, false)) + if (stream->stream_stop(stream->startpos, stream->timeline, false)) return true; /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", slotcmd, - (uint32) (startpos >> 32), (uint32) startpos, - timeline); + (uint32) (stream->startpos >> 32), (uint32) stream->startpos, + stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { @@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); /* Stream the WAL */ - res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, - standby_message_timeout, partial_suffix, - &stoppos, synchronous, mark_done); + res = HandleCopyStream(conn, stream, &stoppos); if (res == NULL) goto error; @@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, uint32 newtimeline; bool parsed; - parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline); + parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline); PQclear(res); if (!parsed) goto error; /* Sanity check the values the server gave us */ - if (newtimeline <= timeline) + if (newtimeline <= stream->timeline) { fprintf(stderr, _("%s: server reported unexpected next timeline %u, following timeline %u\n"), - progname, newtimeline, timeline); + progname, newtimeline, stream->timeline); goto error; } - if (startpos > stoppos) + if (stream->startpos > stoppos) { fprintf(stderr, _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), progname, - timeline, (uint32) (stoppos >> 32), (uint32) stoppos, - newtimeline, (uint32) (startpos >> 32), (uint32) startpos); + stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos, + newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos); goto error; } @@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Loop back to start streaming from the new timeline. Always * start streaming at the beginning of a segment. */ - timeline = newtimeline; - startpos = startpos - (startpos % XLOG_SEG_SIZE); + stream->timeline = newtimeline; + stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Check if the callback thinks it's OK to stop here. If not, * complain. */ - if (stream_stop(stoppos, timeline, false)) + if (stream->stream_stop(stoppos, stream->timeline, false)) return true; else { @@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) * On any other sort of error, returns NULL. */ static PGresult * -HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos, bool synchronous, bool mark_done) +HandleCopyStream(PGconn *conn, StreamCtl *stream, + XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; - XLogRecPtr blockpos = startpos; + XLogRecPtr blockpos = stream->startpos; still_sending = true; @@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ - if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos, - mark_done)) + if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; now = feGetCurrentTimestamp(); @@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ - if (synchronous && lastFlushPosition < blockpos && walfile != -1) + if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) { if (fsync(walfile) != 0) { @@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ - if (still_sending && standby_message_timeout > 0 && + if (still_sending && stream->standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, - standby_message_timeout)) + stream->standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) @@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Calculate how long send/receive loops should sleep */ - sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, + sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); @@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; if (r == -2) { - PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, - basedir, partial_suffix, - stoppos, mark_done); + PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos); if (res == NULL) goto error; @@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } else if (copybuf[0] == 'w') { - if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, - timeline, basedir, stream_stop, - partial_suffix, mark_done)) + if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)) goto error; /* * Check if we should continue streaming, or abort at this * point. */ - if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, - stream_stop, partial_suffix, stoppos, - mark_done)) + if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; } else @@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, * Process XLogData message. */ static bool -ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, - XLogRecPtr *blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, bool mark_done) +ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos) { int xlogoff; int bytes_left; @@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, if (walfile == -1) { - if (!open_walfile(*blockpos, timeline, - basedir, partial_suffix)) + if (!open_walfile(stream, *blockpos)) { /* Error logged by open_walfile */ return false; @@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done)) + if (!close_walfile(stream, *blockpos)) /* Error message written in close_walfile() */ return false; xlogoff = 0; - if (still_sending && stream_stop(*blockpos, timeline, true)) + if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true)) { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { @@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, * Handle end of the copy stream. */ static PGresult * -HandleEndOfCopyStream(PGconn *conn, char *copybuf, - XLogRecPtr blockpos, char *basedir, char *partial_suffix, - XLogRecPtr *stoppos, bool mark_done) +HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, + XLogRecPtr blockpos, XLogRecPtr *stoppos) { PGresult *res = PQgetResult(conn); @@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) + if (!close_walfile(stream, blockpos)) { /* Error message written in close_walfile() */ PQclear(res); @@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, * Check if we should continue streaming, or abort at this point. */ static bool -CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos, bool mark_done) +CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, + XLogRecPtr *stoppos) { - if (still_sending && stream_stop(blockpos, timeline, false)) + if (still_sending && stream->stream_stop(blockpos, stream->timeline, false)) { - if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) + if (!close_walfile(stream, blockpos)) { /* Potential error message is written by close_walfile */ return false; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 8d4dbf285b..554ff8b5b2 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -22,16 +22,31 @@ */ typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); +/* + * Global parameters when receiving xlog stream. For details about the individual fields, + * see the function comment for ReceiveXlogStream(). + */ +typedef struct StreamCtl +{ + XLogRecPtr startpos; /* Start position for streaming */ + TimeLineID timeline; /* Timeline to stream data from */ + char *sysidentifier; /* Validate this system identifier and + * timeline */ + int standby_message_timeout; /* Send status messages this + * often */ + bool synchronous; /* Flush data on write */ + bool mark_done; /* Mark segment as done in generated archive */ + + stream_stop_callback stream_stop; /* Stop streaming when returns true */ + + char *basedir; /* Received segments written to this dir */ + char *partial_suffix; /* Suffix appended to partially received files */ +} StreamCtl; + + + extern bool CheckServerVersionForStreaming(PGconn *conn); extern bool ReceiveXlogStream(PGconn *conn, - XLogRecPtr startpos, - uint32 timeline, - char *sysidentifier, - char *basedir, - stream_stop_callback stream_stop, - int standby_message_timeout, - char *partial_suffix, - bool synchronous, - bool mark_done); + StreamCtl *stream); #endif /* RECEIVELOG_H */