]> granicus.if.org Git - postgresql/commitdiff
Refactor receivelog.c parameters
authorMagnus Hagander <magnus@hagander.net>
Fri, 11 Mar 2016 10:08:01 +0000 (11:08 +0100)
committerMagnus Hagander <magnus@hagander.net>
Fri, 11 Mar 2016 10:15:12 +0000 (11:15 +0100)
Much cruft had accumulated over time with a large number of parameters
passed down between functions very deep. With this refactoring, instead
introduce a StreamCtl structure that holds the parameters, and pass around
a pointer to this structure instead. This makes it much easier to add or
remove fields that are needed deeper down in the implementation without
having to modify every function header in the file.

Patch by me after much nagging from Andres
Reviewed by Craig Ringer and Daniel Gustafsson

src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index ab9692a6d567c9b4beb9e733a86be40085d85a06..94852877a2d40e4d168a11bb09e6ffd30b4e586c 100644 (file)
@@ -372,10 +372,20 @@ typedef struct
 static int
 LogStreamerMain(logstreamer_param *param)
 {
-       if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
-                                                  param->sysidentifier, param->xlogdir,
-                                                  reached_end_position, standby_message_timeout,
-                                                  NULL, false, true))
+       StreamCtl       stream;
+
+       MemSet(&stream, sizeof(stream), 0);
+       stream.startpos = param->startptr;
+       stream.timeline = param->timeline;
+       stream.sysidentifier = param->sysidentifier;
+       stream.stream_stop = reached_end_position;
+       stream.standby_message_timeout = standby_message_timeout;
+       stream.synchronous = false;
+       stream.mark_done = true;
+       stream.basedir = param->xlogdir;
+       stream.partial_suffix = NULL;
+
+       if (!ReceiveXlogStream(param->bgconn, &stream))
 
                /*
                 * Any errors will already have been reported in the function process,
index f96b547b0f3e0225bba4bf52af52277dab5621f0..7f7ee9dc9baeb6e0986e3ee06f5146bb195b04d1 100644 (file)
@@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli)
 static void
 StreamLog(void)
 {
-       XLogRecPtr      startpos,
-                               serverpos;
-       TimeLineID      starttli,
-                               servertli;
+       XLogRecPtr      serverpos;
+       TimeLineID      servertli;
+       StreamCtl       stream;
+
+       MemSet(&stream, 0, sizeof(stream));
 
        /*
         * Connect in replication mode to the server
@@ -311,17 +312,17 @@ StreamLog(void)
        /*
         * Figure out where to start streaming.
         */
-       startpos = FindStreamingStart(&starttli);
-       if (startpos == InvalidXLogRecPtr)
+       stream.startpos = FindStreamingStart(&stream.timeline);
+       if (stream.startpos == InvalidXLogRecPtr)
        {
-               startpos = serverpos;
-               starttli = servertli;
+               stream.startpos = serverpos;
+               stream.timeline = servertli;
        }
 
        /*
         * Always start streaming at the beginning of a segment
         */
-       startpos -= startpos % XLOG_SEG_SIZE;
+       stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
 
        /*
         * Start the replication
@@ -329,12 +330,17 @@ StreamLog(void)
        if (verbose)
                fprintf(stderr,
                                _("%s: starting log streaming at %X/%X (timeline %u)\n"),
-                               progname, (uint32) (startpos >> 32), (uint32) startpos,
-                               starttli);
+               progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
+                               stream.timeline);
+
+       stream.stream_stop = stop_streaming;
+       stream.standby_message_timeout = standby_message_timeout;
+       stream.synchronous = synchronous;
+       stream.mark_done = false;
+       stream.basedir = basedir;
+       stream.partial_suffix = ".partial";
 
-       ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-                                         stop_streaming, standby_message_timeout, ".partial",
-                                         synchronous, false);
+       ReceiveXlogStream(conn, &stream);
 
        PQfinish(conn);
        conn = NULL;
index 01c42fc063914cae520fb3ca72f06d7522cee669..595213f0420f524d8600b376b53faa8a085246c5 100644 (file)
@@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
 static bool still_sending = true;              /* feedback still needs to be sent? */
 
-static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
-                                uint32 timeline, char *basedir,
-                          stream_stop_callback stream_stop, int standby_message_timeout,
-                                char *partial_suffix, XLogRecPtr *stoppos,
-                                bool synchronous, bool mark_done);
+static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
+                                XLogRecPtr *stoppos);
 static int     CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int     CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
                                        XLogRecPtr blockpos, int64 *last_status);
-static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
-                                  XLogRecPtr *blockpos, uint32 timeline,
-                                  char *basedir, stream_stop_callback stream_stop,
-                                  char *partial_suffix, bool mark_done);
-static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
-                                       XLogRecPtr blockpos, char *basedir, char *partial_suffix,
-                                         XLogRecPtr *stoppos, bool mark_done);
-static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
-                                       uint32 timeline, char *basedir,
-                                       stream_stop_callback stream_stop,
-                                       char *partial_suffix, XLogRecPtr *stoppos,
-                                       bool mark_done);
+static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+                                  XLogRecPtr *blockpos);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
+                                         XLogRecPtr blockpos, XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
+                                       XLogRecPtr *stoppos);
 static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
                                                         int64 last_status);
 
@@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname)
  * partial_suffix) is stored in current_walfile_name.
  */
 static bool
-open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
-                        char *partial_suffix)
+open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 {
        int                     f;
        char            fn[MAXPGPATH];
@@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
        XLogSegNo       segno;
 
        XLByteToSeg(startpoint, segno);
-       XLogFileName(current_walfile_name, timeline, segno);
+       XLogFileName(current_walfile_name, stream->timeline, segno);
 
-       snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
-                        partial_suffix ? partial_suffix : "");
+       snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+                        stream->partial_suffix ? stream->partial_suffix : "");
        f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
        if (f == -1)
        {
@@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
+close_walfile(StreamCtl *stream, XLogRecPtr pos)
 {
        off_t           currpos;
 
@@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
        /*
         * If we finished writing a .partial file, rename it into place.
         */
-       if (currpos == XLOG_SEG_SIZE && partial_suffix)
+       if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
        {
                char            oldfn[MAXPGPATH];
                char            newfn[MAXPGPATH];
 
-               snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
-               snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
+               snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
+               snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
                if (rename(oldfn, newfn) != 0)
                {
                        fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
@@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
                        return false;
                }
        }
-       else if (partial_suffix)
+       else if (stream->partial_suffix)
                fprintf(stderr,
                                _("%s: not renaming \"%s%s\", segment is not complete\n"),
-                               progname, current_walfile_name, partial_suffix);
+                               progname, current_walfile_name, stream->partial_suffix);
 
        /*
         * Mark file as archived if requested by the caller - pg_basebackup needs
@@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
         * new node. This is in line with walreceiver.c always doing a
         * XLogArchiveForceDone() after a complete segment.
         */
-       if (currpos == XLOG_SEG_SIZE && mark_done)
+       if (currpos == XLOG_SEG_SIZE && stream->mark_done)
        {
                /* writes error message if failed */
-               if (!mark_file_as_archived(basedir, current_walfile_name))
+               if (!mark_file_as_archived(stream->basedir, current_walfile_name))
                        return false;
        }
 
@@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
  * Check if a timeline history file exists.
  */
 static bool
-existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
+existsTimeLineHistoryFile(StreamCtl *stream)
 {
        char            path[MAXPGPATH];
        char            histfname[MAXFNAMELEN];
@@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
         * Timeline 1 never has a history file. We treat that as if it existed,
         * since we never need to stream it.
         */
-       if (tli == 1)
+       if (stream->timeline == 1)
                return true;
 
-       TLHistoryFileName(histfname, tli);
+       TLHistoryFileName(histfname, stream->timeline);
 
-       snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+       snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
 
        fd = open(path, O_RDONLY | PG_BINARY, 0);
        if (fd < 0)
@@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
 }
 
 static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
-                                                char *content, bool mark_done)
+writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 {
        int                     size = strlen(content);
        char            path[MAXPGPATH];
@@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
         * Check that the server's idea of how timeline history files should be
         * named matches ours.
         */
-       TLHistoryFileName(histfname, tli);
+       TLHistoryFileName(histfname, stream->timeline);
        if (strcmp(histfname, filename) != 0)
        {
                fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
-                               progname, tli, filename);
+                               progname, stream->timeline, filename);
                return false;
        }
 
-       snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+       snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
 
        /*
         * Write into a temp file name.
@@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
        }
 
        /* Maintain archive_status, check close_walfile() for details. */
-       if (mark_done)
+       if (stream->mark_done)
        {
                /* writes error message if failed */
-               if (!mark_file_as_archived(basedir, histfname))
+               if (!mark_file_as_archived(stream->basedir, histfname))
                        return false;
        }
 
@@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn)
 /*
  * Receive a log stream starting at the specified position.
  *
+ * Individual parameters are passed through the StreamCtl structure.
+ *
  * If sysidentifier is specified, validate that both the system
  * identifier and the timeline matches the specified ones
  * (by sending an extra IDENTIFY_SYSTEM command)
@@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn)
  * Note: The log position *must* be at a log segment start!
  */
 bool
-ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
-                                 char *sysidentifier, char *basedir,
-                                 stream_stop_callback stream_stop,
-                                 int standby_message_timeout, char *partial_suffix,
-                                 bool synchronous, bool mark_done)
+ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 {
        char            query[128];
        char            slotcmd[128];
@@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                slotcmd[0] = 0;
        }
 
-       if (sysidentifier != NULL)
+       if (stream->sysidentifier != NULL)
        {
                /* Validate system identifier hasn't changed */
                res = PQexec(conn, "IDENTIFY_SYSTEM");
@@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        PQclear(res);
                        return false;
                }
-               if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+               if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
                {
                        fprintf(stderr,
                                        _("%s: system identifier does not match between base backup and streaming connection\n"),
@@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        PQclear(res);
                        return false;
                }
-               if (timeline > atoi(PQgetvalue(res, 0, 1)))
+               if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
                {
                        fprintf(stderr,
                                _("%s: starting timeline %u is not present in the server\n"),
-                                       progname, timeline);
+                                       progname, stream->timeline);
                        PQclear(res);
                        return false;
                }
@@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
         * initialize flush position to starting point, it's the caller's
         * responsibility that that's sane.
         */
-       lastFlushPosition = startpos;
+       lastFlushPosition = stream->startpos;
 
        while (1)
        {
@@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 * Fetch the timeline history file for this timeline, if we don't have
                 * it already.
                 */
-               if (!existsTimeLineHistoryFile(basedir, timeline))
+               if (!existsTimeLineHistoryFile(stream))
                {
-                       snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
+                       snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
                        res = PQexec(conn, query);
                        if (PQresultStatus(res) != PGRES_TUPLES_OK)
                        {
@@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        }
 
                        /* Write the history file to disk */
-                       writeTimeLineHistoryFile(basedir, timeline,
+                       writeTimeLineHistoryFile(stream,
                                                                         PQgetvalue(res, 0, 0),
-                                                                        PQgetvalue(res, 0, 1),
-                                                                        mark_done);
+                                                                        PQgetvalue(res, 0, 1));
 
                        PQclear(res);
                }
@@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 * Before we start streaming from the requested location, check if the
                 * callback tells us to stop here.
                 */
-               if (stream_stop(startpos, timeline, false))
+               if (stream->stream_stop(stream->startpos, stream->timeline, false))
                        return true;
 
                /* Initiate the replication stream at specified location */
                snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
                                 slotcmd,
-                                (uint32) (startpos >> 32), (uint32) startpos,
-                                timeline);
+                                (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
+                                stream->timeline);
                res = PQexec(conn, query);
                if (PQresultStatus(res) != PGRES_COPY_BOTH)
                {
@@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                PQclear(res);
 
                /* Stream the WAL */
-               res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
-                                                          standby_message_timeout, partial_suffix,
-                                                          &stoppos, synchronous, mark_done);
+               res = HandleCopyStream(conn, stream, &stoppos);
                if (res == NULL)
                        goto error;
 
@@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        uint32          newtimeline;
                        bool            parsed;
 
-                       parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
+                       parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
                        PQclear(res);
                        if (!parsed)
                                goto error;
 
                        /* Sanity check the values the server gave us */
-                       if (newtimeline <= timeline)
+                       if (newtimeline <= stream->timeline)
                        {
                                fprintf(stderr,
                                                _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
-                                               progname, newtimeline, timeline);
+                                               progname, newtimeline, stream->timeline);
                                goto error;
                        }
-                       if (startpos > stoppos)
+                       if (stream->startpos > stoppos)
                        {
                                fprintf(stderr,
                                                _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
                                                progname,
-                                               timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
-                                 newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
+                               stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
+                                               newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
                                goto error;
                        }
 
@@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                         * Loop back to start streaming from the new timeline. Always
                         * start streaming at the beginning of a segment.
                         */
-                       timeline = newtimeline;
-                       startpos = startpos - (startpos % XLOG_SEG_SIZE);
+                       stream->timeline = newtimeline;
+                       stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
                        continue;
                }
                else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                         * Check if the callback thinks it's OK to stop here. If not,
                         * complain.
                         */
-                       if (stream_stop(stoppos, timeline, false))
+                       if (stream->stream_stop(stoppos, stream->timeline, false))
                                return true;
                        else
                        {
@@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
  * On any other sort of error, returns NULL.
  */
 static PGresult *
-HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
-                                char *basedir, stream_stop_callback stream_stop,
-                                int standby_message_timeout, char *partial_suffix,
-                                XLogRecPtr *stoppos, bool synchronous, bool mark_done)
+HandleCopyStream(PGconn *conn, StreamCtl *stream,
+                                XLogRecPtr *stoppos)
 {
        char       *copybuf = NULL;
        int64           last_status = -1;
-       XLogRecPtr      blockpos = startpos;
+       XLogRecPtr      blockpos = stream->startpos;
 
        still_sending = true;
 
@@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Check if we should continue streaming, or abort at this point.
                 */
-               if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
-                                                                stream_stop, partial_suffix, stoppos,
-                                                                mark_done))
+               if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
                        goto error;
 
                now = feGetCurrentTimestamp();
@@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 * If synchronous option is true, issue sync command as soon as there
                 * are WAL data which has not been flushed yet.
                 */
-               if (synchronous && lastFlushPosition < blockpos && walfile != -1)
+               if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
                {
                        if (fsync(walfile) != 0)
                        {
@@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Potentially send a status message to the master
                 */
-               if (still_sending && standby_message_timeout > 0 &&
+               if (still_sending && stream->standby_message_timeout > 0 &&
                        feTimestampDifferenceExceeds(last_status, now,
-                                                                                standby_message_timeout))
+                                                                                stream->standby_message_timeout))
                {
                        /* Time to send feedback! */
                        if (!sendFeedback(conn, blockpos, now, false))
@@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Calculate how long send/receive loops should sleep
                 */
-               sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+               sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                                                                 last_status);
 
                r = CopyStreamReceive(conn, sleeptime, &copybuf);
@@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                goto error;
                        if (r == -2)
                        {
-                               PGresult   *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-                                                                                                        basedir, partial_suffix,
-                                                                                                               stoppos, mark_done);
+                               PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
 
                                if (res == NULL)
                                        goto error;
@@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        }
                        else if (copybuf[0] == 'w')
                        {
-                               if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-                                                                               timeline, basedir, stream_stop,
-                                                                               partial_suffix, mark_done))
+                               if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
                                        goto error;
 
                                /*
                                 * Check if we should continue streaming, or abort at this
                                 * point.
                                 */
-                               if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
-                                                                                stream_stop, partial_suffix, stoppos,
-                                                                                mark_done))
+                               if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
                                        goto error;
                        }
                        else
@@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
  * Process XLogData message.
  */
 static bool
-ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
-                                  XLogRecPtr *blockpos, uint32 timeline,
-                                  char *basedir, stream_stop_callback stream_stop,
-                                  char *partial_suffix, bool mark_done)
+ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+                                  XLogRecPtr *blockpos)
 {
        int                     xlogoff;
        int                     bytes_left;
@@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 
                if (walfile == -1)
                {
-                       if (!open_walfile(*blockpos, timeline,
-                                                         basedir, partial_suffix))
+                       if (!open_walfile(stream, *blockpos))
                        {
                                /* Error logged by open_walfile */
                                return false;
@@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
                /* Did we reach the end of a WAL segment? */
                if (*blockpos % XLOG_SEG_SIZE == 0)
                {
-                       if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
+                       if (!close_walfile(stream, *blockpos))
                                /* Error message written in close_walfile() */
                                return false;
 
                        xlogoff = 0;
 
-                       if (still_sending && stream_stop(*blockpos, timeline, true))
+                       if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
                        {
                                if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                                {
@@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
  * Handle end of the copy stream.
  */
 static PGresult *
-HandleEndOfCopyStream(PGconn *conn, char *copybuf,
-                                       XLogRecPtr blockpos, char *basedir, char *partial_suffix,
-                                         XLogRecPtr *stoppos, bool mark_done)
+HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
+                                         XLogRecPtr blockpos, XLogRecPtr *stoppos)
 {
        PGresult   *res = PQgetResult(conn);
 
@@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
         */
        if (still_sending)
        {
-               if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
+               if (!close_walfile(stream, blockpos))
                {
                        /* Error message written in close_walfile() */
                        PQclear(res);
@@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
  * Check if we should continue streaming, or abort at this point.
  */
 static bool
-CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
-                                       char *basedir, stream_stop_callback stream_stop,
-                                       char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
+CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
+                                       XLogRecPtr *stoppos)
 {
-       if (still_sending && stream_stop(blockpos, timeline, false))
+       if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
        {
-               if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
+               if (!close_walfile(stream, blockpos))
                {
                        /* Potential error message is written by close_walfile */
                        return false;
index 8d4dbf285b47bff19db15e14288bf67e61e6fc05..554ff8b5b28610df1385f3d2342b57f9c485a91d 100644 (file)
  */
 typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
 
+/*
+ * Global parameters when receiving xlog stream. For details about the individual fields,
+ * see the function comment for ReceiveXlogStream().
+ */
+typedef struct StreamCtl
+{
+       XLogRecPtr      startpos;               /* Start position for streaming */
+       TimeLineID      timeline;               /* Timeline to stream data from */
+       char       *sysidentifier;      /* Validate this system identifier and
+                                                                * timeline */
+       int                     standby_message_timeout;                /* Send status messages this
+                                                                                                * often */
+       bool            synchronous;    /* Flush data on write */
+       bool            mark_done;              /* Mark segment as done in generated archive */
+
+       stream_stop_callback stream_stop;       /* Stop streaming when returns true */
+
+       char       *basedir;            /* Received segments written to this dir */
+       char       *partial_suffix; /* Suffix appended to partially received files */
+} StreamCtl;
+
+
+
 extern bool CheckServerVersionForStreaming(PGconn *conn);
 extern bool ReceiveXlogStream(PGconn *conn,
-                                 XLogRecPtr startpos,
-                                 uint32 timeline,
-                                 char *sysidentifier,
-                                 char *basedir,
-                                 stream_stop_callback stream_stop,
-                                 int standby_message_timeout,
-                                 char *partial_suffix,
-                                 bool synchronous,
-                                 bool mark_done);
+                                 StreamCtl *stream);
 
 #endif   /* RECEIVELOG_H */