]> granicus.if.org Git - postgresql/commitdiff
Add --synchronous option to pg_receivexlog, for more reliable WAL writing.
authorFujii Masao <fujii@postgresql.org>
Mon, 17 Nov 2014 17:32:48 +0000 (02:32 +0900)
committerFujii Masao <fujii@postgresql.org>
Mon, 17 Nov 2014 17:32:48 +0000 (02:32 +0900)
Previously pg_receivexlog flushed WAL data only when WAL file was switched.
Then 3dad73e added -F option to pg_receivexlog so that users could control
how frequently sync commands were issued to WAL files. It also allowed users
to make pg_receivexlog flush WAL data immediately after writing by
specifying 0 in -F option. However feedback messages were not sent back
immediately even after a flush location was updated. So even if WAL data
was flushed in real time, the server could not see that for a while.

This commit removes -F option from and adds --synchronous to pg_receivexlog.
If --synchronous is specified, like the standby's wal receiver, pg_receivexlog
flushes WAL data as soon as there is WAL data which has not been flushed yet.
Then it sends back the feedback message identifying the latest flush location
to the server. This option is useful to make pg_receivexlog behave as sync
standby by using replication slot, for example.

Original patch by Furuya Osamu, heavily rewritten by me.
Reviewed by Heikki Linnakangas, Alvaro Herrera and Sawada Masahiko.

doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index 74ed45db97ca0c3cd8917f9cc6f5724e27ba3627..be321b56ce7152396f282958ae683cbfea746ce4 100644 (file)
@@ -48,6 +48,13 @@ PostgreSQL documentation
     <application>pg_receivexlog</application>.
   </para>
 
+  <para>
+   Unlike the standby's WAL receiver, <application>pg_receivexlog</>
+   flushes WAL data only when WAL file is closed, by default.
+   <literal>--synchronous</> option must be specified to flush WAL data
+   in real time and ensure it's safely flushed to disk.
+  </para>
+
   <para>
    The transaction log is streamed over a regular
    <productname>PostgreSQL</productname> connection, and uses the replication
@@ -85,21 +92,6 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
-     <varlistentry>
-       <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
-       <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
-       <listitem>
-        <para>
-        Specifies the maximum time to issue sync commands to ensure the
-        received WAL file is safely flushed to disk, in seconds. The default
-        value is zero, which disables issuing fsyncs except when WAL file is
-        closed. If <literal>-1</literal> is specified, WAL file is flushed as
-        soon as possible, that is, as soon as there are WAL data which has
-        not been flushed yet.
-        </para>
-       </listitem>
-      </varlistentry>
-
      <varlistentry>
       <term><option>-n</option></term>
       <term><option>--no-loop</option></term>
@@ -135,15 +127,25 @@ PostgreSQL documentation
          When this option is used, <application>pg_receivexlog</> will report
          a flush position to the server, indicating when each segment has been
          synchronized to disk so that the server can remove that segment if it
-         is not otherwise needed.  When using this parameter, it is important
-         to make sure that <application>pg_receivexlog</> cannot become the
-         synchronous standby through an incautious setting of
-         <xref linkend="guc-synchronous-standby-names">; it does not flush
-         data frequently enough for this to work correctly.
+         is not otherwise needed. <literal>--synchronous</literal> option must
+         be specified when making <application>pg_receivexlog</> run as
+         synchronous standby by using replication slot. Otherwise WAL data
+         cannot be flushed frequently enough for this to work correctly.
         </para>
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--synchronous</option></term>
+      <listitem>
+       <para>
+        Issue sync commands as soon as there is WAL data which has not been
+        flushed yet. Also status packets are sent back to the server just after
+        WAL data is flushed whatever <literal>--status-interval</> is set to.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-v</option></term>
       <term><option>--verbose</option></term>
index 0ebda9ae9e04f44bd2f7c6bf22454ed2f7095499..e7c2939c52346c346deaf65ffe83f08c6243afec 100644 (file)
@@ -370,7 +370,7 @@ LogStreamerMain(logstreamer_param *param)
        if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                                                   param->sysidentifier, param->xlogdir,
                                                   reached_end_position, standby_message_timeout,
-                                                  NULL, 0))
+                                                  NULL, false))
 
                /*
                 * Any errors will already have been reported in the function process,
index bc0940aeafe7260296d1cda1a4823372078c2427..4658f080f34389d3412d1fac50a04175748eb34c 100644 (file)
@@ -36,10 +36,10 @@ static char *basedir = NULL;
 static int     verbose = 0;
 static int     noloop = 0;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
-static int     fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
 static bool do_create_slot = false;
 static bool do_drop_slot = false;
+static bool synchronous = false;
 
 
 static void usage(void);
@@ -66,12 +66,11 @@ usage(void)
        printf(_("  %s [OPTION]...\n"), progname);
        printf(_("\nOptions:\n"));
        printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
-       printf(_("  -F  --fsync-interval=SECS\n"
-                        "                         time between fsyncs to transaction log files (default: %d)\n"), (fsync_interval / 1000));
        printf(_("  -n, --no-loop          do not loop on connection lost\n"));
        printf(_("  -s, --status-interval=SECS\n"
                         "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
        printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
+       printf(_("      --synchronous      flush transaction log immediately after writing\n"));
        printf(_("  -v, --verbose          output verbose messages\n"));
        printf(_("  -V, --version          output version information, then exit\n"));
        printf(_("  -?, --help             show this help, then exit\n"));
@@ -343,7 +342,7 @@ StreamLog(void)
 
        ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
                                          stop_streaming, standby_message_timeout, ".partial",
-                                         fsync_interval);
+                                         synchronous);
 
        PQfinish(conn);
        conn = NULL;
@@ -374,7 +373,6 @@ main(int argc, char **argv)
                {"port", required_argument, NULL, 'p'},
                {"username", required_argument, NULL, 'U'},
                {"no-loop", no_argument, NULL, 'n'},
-               {"fsync-interval", required_argument, NULL, 'F'},
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
                {"status-interval", required_argument, NULL, 's'},
@@ -383,6 +381,7 @@ main(int argc, char **argv)
 /* action */
                {"create-slot", no_argument, NULL, 1},
                {"drop-slot", no_argument, NULL, 2},
+               {"synchronous", no_argument, NULL, 3},
                {NULL, 0, NULL, 0}
        };
 
@@ -408,7 +407,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv",
+       while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -455,15 +454,6 @@ main(int argc, char **argv)
                        case 'n':
                                noloop = 1;
                                break;
-               case 'F':
-                       fsync_interval = atoi(optarg) * 1000;
-                       if (fsync_interval < -1000)
-                       {
-                               fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
-                                               progname, optarg);
-                               exit(1);
-                       }
-                       break;
                        case 'v':
                                verbose++;
                                break;
@@ -474,6 +464,9 @@ main(int argc, char **argv)
                        case 2:
                                do_drop_slot = true;
                                break;
+                       case 3:
+                               synchronous = true;
+                               break;
                        default:
 
                                /*
index c6c90fb2ea376f598be95e021bc0f764ee7c2a77..e51cac9b87b8a495432d826f7a68b47eb65cc2f2 100644 (file)
@@ -31,14 +31,13 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
-static int64 last_fsync = -1;          /* timestamp of last WAL file flush */
 static bool still_sending = true;              /* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                                 uint32 timeline, char *basedir,
                           stream_stop_callback stream_stop, int standby_message_timeout,
                                  char *partial_suffix, XLogRecPtr *stoppos,
-                                 int fsync_interval);
+                                 bool synchronous);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -55,8 +54,7 @@ static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
                                                                stream_stop_callback stream_stop,
                                                                char *partial_suffix, XLogRecPtr *stoppos);
 static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
-                                                                                int64 last_status, int fsync_interval,
-                                                                                XLogRecPtr blockpos);
+                                                                                int64 last_status);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                                                 uint32 *timeline);
@@ -209,7 +207,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
                                progname, current_walfile_name, partial_suffix);
 
        lastFlushPosition = pos;
-       last_fsync = feGetCurrentTimestamp();
        return true;
 }
 
@@ -440,8 +437,8 @@ CheckServerVersionForStreaming(PGconn *conn)
  * allows you to tell the difference between partial and completed files,
  * so that you can continue later where you left.
  *
- * fsync_interval controls how often we flush to the received WAL file,
- * in milliseconds.
+ * If 'synchronous' is true, the received WAL is flushed as soon as written,
+ * otherwise only when the WAL file is closed.
  *
  * Note: The log position *must* be at a log segment start!
  */
@@ -450,7 +447,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                  char *sysidentifier, char *basedir,
                                  stream_stop_callback stream_stop,
                                  int standby_message_timeout, char *partial_suffix,
-                                 int fsync_interval)
+                                 bool synchronous)
 {
        char            query[128];
        char            slotcmd[128];
@@ -595,7 +592,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /* Stream the WAL */
                res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                                                           standby_message_timeout, partial_suffix,
-                                                          &stoppos, fsync_interval);
+                                                          &stoppos, synchronous);
                if (res == NULL)
                        goto error;
 
@@ -760,7 +757,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                 char *basedir, stream_stop_callback stream_stop,
                                 int standby_message_timeout, char *partial_suffix,
-                                XLogRecPtr *stoppos, int fsync_interval)
+                                XLogRecPtr *stoppos, bool synchronous)
 {
        char       *copybuf = NULL;
        int64           last_status = -1;
@@ -784,14 +781,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                now = feGetCurrentTimestamp();
 
                /*
-                * If fsync_interval has elapsed since last WAL flush and we've written
-                * some WAL data, flush them to disk.
+                * If synchronous option is true, issue sync command as soon as
+                * there are WAL data which has not been flushed yet.
                 */
-               if (lastFlushPosition < blockpos &&
-                       walfile != -1 &&
-                       ((fsync_interval > 0 &&
-                         feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
-                        fsync_interval < 0))
+               if (synchronous && lastFlushPosition < blockpos && walfile != -1)
                {
                        if (fsync(walfile) != 0)
                        {
@@ -799,9 +792,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                                progname, current_walfile_name, strerror(errno));
                                goto error;
                        }
-
                        lastFlushPosition = blockpos;
-                       last_fsync = now;
+
+                       /*
+                        * Send feedback so that the server sees the latest WAL locations
+                        * immediately.
+                        */
+                       if (!sendFeedback(conn, blockpos, now, false))
+                               goto error;
+                       last_status = now;
                }
 
                /*
@@ -821,7 +820,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 * Calculate how long send/receive loops should sleep
                 */
                sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
-                                                                                                last_status, fsync_interval, blockpos);
+                                                                                                last_status);
 
                r = CopyStreamReceive(conn, sleeptime, &copybuf);
                while (r != 0)
@@ -1244,34 +1243,22 @@ CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
  */
 static long
 CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
-                                                        int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+                                                        int64 last_status)
 {
-       int64           targettime = 0;
        int64           status_targettime = 0;
-       int64           fsync_targettime = 0;
        long            sleeptime;
 
        if (standby_message_timeout && still_sending)
                status_targettime = last_status +
                        (standby_message_timeout - 1) * ((int64) 1000);
 
-       if (fsync_interval > 0 && lastFlushPosition < blockpos)
-               fsync_targettime = last_fsync +
-                       (fsync_interval - 1) * ((int64) 1000);
-
-       if ((status_targettime < fsync_targettime && status_targettime > 0) ||
-               fsync_targettime == 0)
-               targettime = status_targettime;
-       else
-               targettime = fsync_targettime;
-
-       if (targettime > 0)
+       if (status_targettime > 0)
        {
                long            secs;
                int                     usecs;
 
                feTimestampDifference(now,
-                                                         targettime,
+                                                         status_targettime,
                                                          &secs,
                                                          &usecs);
                /* Always sleep at least 1 sec */
index 36eb1e128133f6083ad1b323ff6afbab0b1b7fa2..9dd7005167d7387d1486a8ecba8b04a378a51201 100644 (file)
@@ -31,6 +31,6 @@ extern bool ReceiveXlogStream(PGconn *conn,
                                  stream_stop_callback stream_stop,
                                  int standby_message_timeout,
                                  char *partial_suffix,
-                                 int fsync_interval);
+                                 bool synchronous);
 
 #endif /* RECEIVELOG_H */