static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
+static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -F --fsync-interval=INTERVAL\n"
+ " frequency of syncs to transaction log files (in seconds)\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
- stop_streaming, standby_message_timeout, ".partial");
+ stop_streaming, standby_message_timeout, ".partial",
+ fsync_interval);
PQfinish(conn);
}
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
}
}
- while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+ while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
case 'n':
noloop = 1;
break;
+ case 'F':
+ fsync_interval = atoi(optarg) * 1000;
+ if (fsync_interval < -1000)
+ {
+ fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
- char *partial_suffix, XLogRecPtr *stoppos);
+ char *partial_suffix, XLogRecPtr *stoppos,
+ int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+ uint32 timeline, char *basedir,
+ stream_stop_callback stream_stop,
+ char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+ int64 last_status, int fsync_interval,
+ XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
+ last_fsync = feGetCurrentTimestamp();
return true;
}
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix)
+ int standby_message_timeout, char *partial_suffix,
+ int fsync_interval)
{
char query[128];
char slotcmd[128];
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
- &stoppos);
+ &stoppos, fsync_interval);
if (res == NULL)
goto error;
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos)
+ XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
/*
* Check if we should continue streaming, or abort at this point.
*/
- if (still_sending && stream_stop(blockpos, timeline, false))
+ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+ stream_stop, partial_suffix, stoppos))
+ goto error;
+
+ now = feGetCurrentTimestamp();
+
+ /*
+ * If fsync_interval has elapsed since last WAL flush and we've written
+ * some WAL data, flush them to disk.
+ */
+ if (lastFlushPosition < blockpos &&
+ walfile != -1 &&
+ ((fsync_interval > 0 &&
+ feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+ fsync_interval < 0))
{
- if (!close_walfile(basedir, partial_suffix, blockpos))
- {
- /* Potential error message is written by close_walfile */
- goto error;
- }
- if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ if (fsync(walfile) != 0)
{
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
+ fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ progname, current_walfile_name, strerror(errno));
goto error;
}
- still_sending = false;
+
+ lastFlushPosition = blockpos;
+ last_fsync = now;
}
/*
* Potentially send a status message to the master
*/
- now = feGetCurrentTimestamp();
if (still_sending && standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
}
/*
- * Compute how long send/receive loops should sleep
+ * Calculate how long send/receive loops should sleep
*/
- if (standby_message_timeout && still_sending)
- {
- int64 targettime;
- long secs;
- int usecs;
-
- targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
- feTimestampDifference(now,
- targettime,
- &secs,
- &usecs);
- /* Always sleep at least 1 sec */
- if (secs <= 0)
- {
- secs = 1;
- usecs = 0;
- }
-
- sleeptime = secs * 1000 + usecs / 1000;
- }
- else
- sleeptime = -1;
+ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+ last_status, fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, ©buf);
- if (r == 0)
- continue;
- if (r == -1)
- goto error;
- if (r == -2)
+ while (r != 0)
{
- PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
- basedir, partial_suffix, stoppos);
- if (res == NULL)
+ if (r == -1)
goto error;
- else
- return res;
- }
+ if (r == -2)
+ {
+ PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+ basedir, partial_suffix, stoppos);
+ if (res == NULL)
+ goto error;
+ else
+ return res;
+ }
- /* Check the message type. */
- if (copybuf[0] == 'k')
- {
- if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
- &last_status))
- goto error;
- }
- else if (copybuf[0] == 'w')
- {
- if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
- timeline, basedir, stream_stop, partial_suffix))
+ /* Check the message type. */
+ if (copybuf[0] == 'k')
+ {
+ if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+ &last_status))
+ goto error;
+ }
+ else if (copybuf[0] == 'w')
+ {
+ if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+ timeline, basedir, stream_stop, partial_suffix))
+ goto error;
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+ stream_stop, partial_suffix, stoppos))
+ goto error;
+ }
+ else
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
goto error;
- }
- else
- {
- fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
- progname, copybuf[0]);
- goto error;
+ }
+
+ /*
+ * Process the received data, and any subsequent data we
+ * can read without blocking.
+ */
+ r = CopyStreamReceive(conn, 0, ©buf);
}
}
*stoppos = blockpos;
return res;
}
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+ char *basedir, stream_stop_callback stream_stop,
+ char *partial_suffix, XLogRecPtr *stoppos)
+{
+ if (still_sending && stream_stop(blockpos, timeline, false))
+ {
+ if (!close_walfile(basedir, partial_suffix, blockpos))
+ {
+ /* Potential error message is written by close_walfile */
+ return false;
+ }
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ still_sending = false;
+ }
+
+ return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+ int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+ int64 targettime = 0;
+ int64 status_targettime = 0;
+ int64 fsync_targettime = 0;
+ long sleeptime;
+
+ if (standby_message_timeout && still_sending)
+ status_targettime = last_status +
+ (standby_message_timeout - 1) * ((int64) 1000);
+
+ if (fsync_interval > 0 && lastFlushPosition < blockpos)
+ fsync_targettime = last_fsync +
+ (fsync_interval - 1) * ((int64) 1000);
+
+ if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+ fsync_targettime == 0)
+ targettime = status_targettime;
+ else
+ targettime = fsync_targettime;
+
+ if (targettime > 0)
+ {
+ long secs;
+ int usecs;
+
+ feTimestampDifference(now,
+ targettime,
+ &secs,
+ &usecs);
+ /* Always sleep at least 1 sec */
+ if (secs <= 0)
+ {
+ secs = 1;
+ usecs = 0;
+ }
+
+ sleeptime = secs * 1000 + usecs / 1000;
+ }
+ else
+ sleeptime = -1;
+
+ return sleeptime;
+}