]> granicus.if.org Git - postgresql/commitdiff
Add -F option to pg_receivexlog, for specifying fsync interval.
authorFujii Masao <fujii@postgresql.org>
Fri, 8 Aug 2014 07:50:54 +0000 (16:50 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 8 Aug 2014 07:50:54 +0000 (16:50 +0900)
This allows us to specify the maximum time to issue fsync to ensure
the received WAL file is safely flushed to disk. Without this,
pg_receivexlog always flushes WAL file only when it's closed and
which can cause WAL data to be lost at the event of a crash.

Furuya Osamu, heavily modified by me.

doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index 7c50b01a57b982a28136fca8fa853fa054c36e19..c15776fc5852824d3550c79e7dd5e8fec9c9d104 100644 (file)
@@ -105,6 +105,21 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+        Specifies the maximum time to issue sync commands to ensure the
+        received WAL file is safely flushed to disk, in seconds. The default
+        value is zero, which disables issuing fsyncs except when WAL file is
+        closed. If <literal>-1</literal> is specified, WAL file is flushed as
+        soon as possible, that is, as soon as there are WAL data which has
+        not been flushed yet.
+        </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry>
       <term><option>-v</option></term>
       <term><option>--verbose</option></term>
index 5df2eb8c0db7c01827bbdb405eb82e8978d3c1c3..0b02c4c4014b59a6e09fcd6c121caea7934a64de 100644 (file)
@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
        if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                                                   param->sysidentifier, param->xlogdir,
                                                   reached_end_position, standby_message_timeout,
-                                                  NULL))
+                                                  NULL, 0))
 
                /*
                 * Any errors will already have been reported in the function process,
index 96408389062cd3d3846d841a0d30bf709cf34cd7..0b7af54a7baff7fa935ac1e0c03d3a8946d59a46 100644 (file)
@@ -36,6 +36,7 @@ static char *basedir = NULL;
 static int     verbose = 0;
 static int     noloop = 0;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
+static int     fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
 
 
@@ -62,6 +63,8 @@ usage(void)
        printf(_("\nOptions:\n"));
        printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
        printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+       printf(_("  -F  --fsync-interval=INTERVAL\n"
+                        "                         frequency of syncs to transaction log files (in seconds)\n"));
        printf(_("  -v, --verbose          output verbose messages\n"));
        printf(_("  -V, --version          output version information, then exit\n"));
        printf(_("  -?, --help             show this help, then exit\n"));
@@ -330,7 +333,8 @@ StreamLog(void)
                                starttli);
 
        ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-                                         stop_streaming, standby_message_timeout, ".partial");
+                                         stop_streaming, standby_message_timeout, ".partial",
+                                         fsync_interval);
 
        PQfinish(conn);
 }
@@ -360,6 +364,7 @@ main(int argc, char **argv)
                {"port", required_argument, NULL, 'p'},
                {"username", required_argument, NULL, 'U'},
                {"no-loop", no_argument, NULL, 'n'},
+               {"fsync-interval", required_argument, NULL, 'F'},
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
                {"status-interval", required_argument, NULL, 's'},
@@ -389,7 +394,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -436,6 +441,15 @@ main(int argc, char **argv)
                        case 'n':
                                noloop = 1;
                                break;
+               case 'F':
+                       fsync_interval = atoi(optarg) * 1000;
+                       if (fsync_interval < -1000)
+                       {
+                               fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+                                               progname, optarg);
+                               exit(1);
+                       }
+                       break;
                        case 'v':
                                verbose++;
                                break;
index d28e13b4d8c9f3b9433aee886e2c8596a12ffa29..89b22f20e2a3b373adeb24b26f32a60876125bc4 100644 (file)
@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static int64 last_fsync = -1;          /* timestamp of last WAL file flush */
 static bool still_sending = true;              /* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
                           stream_stop_callback stream_stop, int standby_message_timeout,
-                                char *partial_suffix, XLogRecPtr *stoppos);
+                                 char *partial_suffix, XLogRecPtr *stoppos,
+                                 int fsync_interval);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
                                                                           XLogRecPtr blockpos, char *basedir, char *partial_suffix,
                                                                           XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+                                                               uint32 timeline, char *basedir,
+                                                               stream_stop_callback stream_stop,
+                                                               char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+                                                                                int64 last_status, int fsync_interval,
+                                                                                XLogRecPtr blockpos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
                                progname, current_walfile_name, partial_suffix);
 
        lastFlushPosition = pos;
+       last_fsync = feGetCurrentTimestamp();
        return true;
 }
 
@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
  * allows you to tell the difference between partial and completed files,
  * so that you can continue later where you left.
  *
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  char *sysidentifier, char *basedir,
                                  stream_stop_callback stream_stop,
-                                 int standby_message_timeout, char *partial_suffix)
+                                 int standby_message_timeout, char *partial_suffix,
+                                 int fsync_interval)
 {
        char            query[128];
        char            slotcmd[128];
@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /* Stream the WAL */
                res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                                                           standby_message_timeout, partial_suffix,
-                                                          &stoppos);
+                                                          &stoppos, fsync_interval);
                if (res == NULL)
                        goto error;
 
@@ -746,7 +760,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                 char *basedir, stream_stop_callback stream_stop,
                                 int standby_message_timeout, char *partial_suffix,
-                                XLogRecPtr *stoppos)
+                                XLogRecPtr *stoppos, int fsync_interval)
 {
        char       *copybuf = NULL;
        int64           last_status = -1;
@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /*
                 * Check if we should continue streaming, or abort at this point.
                 */
-               if (still_sending && stream_stop(blockpos, timeline, false))
+               if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+                                                               stream_stop, partial_suffix, stoppos))
+                       goto error;
+
+               now = feGetCurrentTimestamp();
+
+               /*
+                * If fsync_interval has elapsed since last WAL flush and we've written
+                * some WAL data, flush them to disk.
+                */
+               if (lastFlushPosition < blockpos &&
+                       walfile != -1 &&
+                       ((fsync_interval > 0 &&
+                         feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+                        fsync_interval < 0))
                {
-                       if (!close_walfile(basedir, partial_suffix, blockpos))
-                       {
-                               /* Potential error message is written by close_walfile */
-                               goto error;
-                       }
-                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                       if (fsync(walfile) != 0)
                        {
-                               fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-                                               progname, PQerrorMessage(conn));
+                               fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+                                               progname, current_walfile_name, strerror(errno));
                                goto error;
                        }
-                       still_sending = false;
+
+                       lastFlushPosition = blockpos;
+                       last_fsync = now;
                }
 
                /*
                 * Potentially send a status message to the master
                 */
-               now = feGetCurrentTimestamp();
                if (still_sending && standby_message_timeout > 0 &&
                        feTimestampDifferenceExceeds(last_status, now,
                                                                                 standby_message_timeout))
@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                }
 
                /*
-                * Compute how long send/receive loops should sleep
+                * Calculate how long send/receive loops should sleep
                 */
-               if (standby_message_timeout && still_sending)
-               {
-                       int64           targettime;
-                       long            secs;
-                       int                     usecs;
-
-                       targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-                       feTimestampDifference(now,
-                                                                 targettime,
-                                                                 &secs,
-                                                                 &usecs);
-                       /* Always sleep at least 1 sec */
-                       if (secs <= 0)
-                       {
-                               secs = 1;
-                               usecs = 0;
-                       }
-
-                       sleeptime = secs * 1000 + usecs / 1000;
-               }
-               else
-                       sleeptime = -1;
+               sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+                                                                                                last_status, fsync_interval, blockpos);
 
                r = CopyStreamReceive(conn, sleeptime, &copybuf);
-               if (r == 0)
-                       continue;
-               if (r == -1)
-                       goto error;
-               if (r == -2)
+               while (r != 0)
                {
-                       PGresult        *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-                                                                                                        basedir, partial_suffix, stoppos);
-                       if (res == NULL)
+                       if (r == -1)
                                goto error;
-                       else
-                               return res;
-               }
+                       if (r == -2)
+                       {
+                               PGresult        *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+                                                                                                                basedir, partial_suffix, stoppos);
+                               if (res == NULL)
+                                       goto error;
+                               else
+                                       return res;
+                       }
 
-               /* Check the message type. */
-               if (copybuf[0] == 'k')
-               {
-                       if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
-                                                                        &last_status))
-                               goto error;
-               }
-               else if (copybuf[0] == 'w')
-               {
-                       if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-                                                                       timeline, basedir, stream_stop, partial_suffix))
+                       /* Check the message type. */
+                       if (copybuf[0] == 'k')
+                       {
+                               if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+                                                                                &last_status))
+                                       goto error;
+                       }
+                       else if (copybuf[0] == 'w')
+                       {
+                               if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+                                                                               timeline, basedir, stream_stop, partial_suffix))
+                                       goto error;
+
+                               /*
+                                * Check if we should continue streaming, or abort at this point.
+                                */
+                               if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+                                                                                stream_stop, partial_suffix, stoppos))
+                                       goto error;
+                       }
+                       else
+                       {
+                               fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                                               progname, copybuf[0]);
                                goto error;
-               }
-               else
-               {
-                       fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-                                       progname, copybuf[0]);
-                       goto error;
+                       }
+
+                       /*
+                        * Process the received data, and any subsequent data we
+                        * can read without blocking.
+                        */
+                       r = CopyStreamReceive(conn, 0, &copybuf);
                }
        }
 
@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
        *stoppos = blockpos;
        return res;
 }
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+                                       char *basedir, stream_stop_callback stream_stop,
+                                       char *partial_suffix, XLogRecPtr *stoppos)
+{
+       if (still_sending && stream_stop(blockpos, timeline, false))
+       {
+               if (!close_walfile(basedir, partial_suffix, blockpos))
+               {
+                       /* Potential error message is written by close_walfile */
+                       return false;
+               }
+               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+               {
+                       fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                       progname, PQerrorMessage(conn));
+                       return false;
+               }
+               still_sending = false;
+       }
+
+       return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+                                                        int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+       int64           targettime = 0;
+       int64           status_targettime = 0;
+       int64           fsync_targettime = 0;
+       long            sleeptime;
+
+       if (standby_message_timeout && still_sending)
+               status_targettime = last_status +
+                       (standby_message_timeout - 1) * ((int64) 1000);
+
+       if (fsync_interval > 0 && lastFlushPosition < blockpos)
+               fsync_targettime = last_fsync +
+                       (fsync_interval - 1) * ((int64) 1000);
+
+       if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+               fsync_targettime == 0)
+               targettime = status_targettime;
+       else
+               targettime = fsync_targettime;
+
+       if (targettime > 0)
+       {
+               long            secs;
+               int                     usecs;
+
+               feTimestampDifference(now,
+                                                         targettime,
+                                                         &secs,
+                                                         &usecs);
+               /* Always sleep at least 1 sec */
+               if (secs <= 0)
+               {
+                       secs = 1;
+                       usecs = 0;
+               }
+
+               sleeptime = secs * 1000 + usecs / 1000;
+       }
+       else
+               sleeptime = -1;
+
+       return sleeptime;
+}
index f4789a580ae75df4b97fe40be89ec3fe4817ba70..72f82453733463400e9d0e6364ea8d9e3815c850 100644 (file)
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
                                  char *basedir,
                                  stream_stop_callback stream_stop,
                                  int standby_message_timeout,
-                                 char *partial_suffix);
+                                 char *partial_suffix,
+                                 int fsync_interval);