]> granicus.if.org Git - postgresql/commitdiff
Allow BASE_BACKUP to be throttled
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 27 Feb 2014 21:55:57 +0000 (18:55 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 27 Feb 2014 21:55:57 +0000 (18:55 -0300)
A new MAX_RATE option allows imposing a limit to the network transfer
rate from the server side.  This is useful to limit the stress that
taking a base backup has on the server.

pg_basebackup is now able to specify a value to the server, too.

Author: Antonin Houska

Patch reviewed by Stefan Radomski, Andres Freund, Zoltán Böszörményi,
Fujii Masao, and Álvaro Herrera.

doc/src/sgml/protocol.sgml
doc/src/sgml/ref/pg_basebackup.sgml
src/backend/replication/basebackup.c
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/bin/pg_basebackup/pg_basebackup.c
src/include/replication/basebackup.h

index 832524e95e45582059ef4b459aa24569c47411e5..d36f2f3af1f291af8f9fd9052167485e16996b87 100644 (file)
@@ -1772,7 +1772,7 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-    <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
+    <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal> <replaceable>rate</replaceable>]</term>
     <listitem>
      <para>
       Instructs the server to start streaming a base backup.
@@ -1840,7 +1840,21 @@ The commands accepted in walsender mode are:
           the waiting and the warning, leaving the client responsible for
           ensuring the required log is available.
          </para>
-         </listitem>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>MAX_RATE</literal> <replaceable>rate</></term>
+        <listitem>
+         <para>
+          Limit (throttle) the maximum amount of data transferred from server
+          to client per unit of time.  The expected unit is kilobytes per second.
+          If this option is specified, the value must either be equal to zero
+          or it must fall within the range from 32 kB through 1 GB (inclusive).
+          If zero is passed or the option is not specified, no restriction is
+          imposed on the transfer.
+         </para>
+        </listitem>
        </varlistentry>
       </variablelist>
      </para>
index 84b45ae7a00d591ff40feb327d73087d37f8acee..ede68db9388f316e044ae64cdf450c3abe58eb7b 100644 (file)
@@ -188,6 +188,27 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-r <replaceable class="parameter">rate</replaceable></option></term>
+      <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term>
+      <listitem>
+       <para>
+        The maximum transfer rate of data transferred from the server.  Values are
+        in kilobytes per second.  Use a suffix of <literal>M</> to indicate megabytes
+        per second.  A suffix of <literal>k</> is also accepted, and has no effect.
+        Valid values are between 32 kilobytes per second and 1024 megabytes per second.
+       </para>
+       <para>
+        The purpose is to limit the impact of <application>pg_basebackup</application>
+        on the running server.
+       </para>
+       <para>
+        This option always affects transfer of the data directory. Transfer of
+        WAL files is only affected if the collection method is <literal>fetch</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-R</option></term>
       <term><option>--write-recovery-conf</option></term>
index 2bbe384e351891fceda1cab90189e09c55c56ee1..d68a1533602bb5620fd6c58ac8e8a25556a29d1f 100644 (file)
@@ -25,6 +25,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "nodes/pg_list.h"
+#include "pgtar.h"
 #include "pgstat.h"
 #include "replication/basebackup.h"
 #include "replication/walsender.h"
@@ -34,7 +35,8 @@
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
-#include "pgtar.h"
+#include "utils/timestamp.h"
+
 
 typedef struct
 {
@@ -43,6 +45,7 @@ typedef struct
        bool            fastcheckpoint;
        bool            nowait;
        bool            includewal;
+       uint32          maxrate;
 } basebackup_options;
 
 
@@ -60,6 +63,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int     compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
 
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
@@ -72,6 +76,23 @@ static char *statrelpath = NULL;
  */
 #define TAR_SEND_SIZE 32768
 
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY   8
+
+/* The actual number of bytes, transfer of which may cause sleep. */
+static uint64 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled.  */
+static int64 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
 typedef struct
 {
        char       *oid;
@@ -203,6 +224,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                /* Send tablespace header */
                SendBackupHeader(tablespaces);
 
+               /* Setup and activate network throttling, if client requested it */
+               if (opt->maxrate > 0)
+               {
+                       throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY;
+
+                       /*
+                        * The minimum amount of time for throttling_sample
+                        * bytes to be transfered.
+                        */
+                       elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+                       /* Enable throttling. */
+                       throttling_counter = 0;
+
+                       /* The 'real data' starts now (header was ignored). */
+                       throttled_last = GetCurrentIntegerTimestamp();
+               }
+               else
+               {
+                       /* Disable throttling. */
+                       throttling_counter = -1;
+               }
+
                /* Send off our tablespaces one by one */
                foreach(lc, tablespaces)
                {
@@ -430,6 +474,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                                                        (errmsg("base backup could not send data, aborting backup")));
 
                                len += cnt;
+                               throttle(cnt);
+
                                if (len == XLogSegSize)
                                        break;
                        }
@@ -500,6 +546,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
        bool            o_fast = false;
        bool            o_nowait = false;
        bool            o_wal = false;
+       bool            o_maxrate = false;
 
        MemSet(opt, 0, sizeof(*opt));
        foreach(lopt, options)
@@ -551,6 +598,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
                        opt->includewal = true;
                        o_wal = true;
                }
+               else if (strcmp(defel->defname, "max_rate") == 0)
+               {
+                       long            maxrate;
+
+                       if (o_maxrate)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("duplicate option \"%s\"", defel->defname)));
+
+                       maxrate = intVal(defel->arg);
+                       if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+                                                errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)",
+                                                               (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER)));
+
+                       opt->maxrate = (uint32) maxrate;
+                       o_maxrate = true;
+               }
                else
                        elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
@@ -1112,6 +1178,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
                           (errmsg("base backup could not send data, aborting backup")));
 
                len += cnt;
+               throttle(cnt);
 
                if (len >= statbuf->st_size)
                {
@@ -1133,10 +1200,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
                        cnt = Min(sizeof(buf), statbuf->st_size - len);
                        pq_putmessage('d', buf, cnt);
                        len += cnt;
+                       throttle(cnt);
                }
        }
 
-       /* Pad to 512 byte boundary, per tar format requirements */
+       /*
+        * Pad to 512 byte boundary, per tar format requirements. (This small
+        * piece of data is probably not worth throttling.)
+        */
        pad = ((len + 511) & ~511) - len;
        if (pad > 0)
        {
@@ -1162,3 +1233,65 @@ _tarWriteHeader(const char *filename, const char *linktarget,
 
        pq_putmessage('d', h, 512);
 }
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(size_t increment)
+{
+       int64           elapsed,
+                               elapsed_min,
+                               sleep;
+       int                     wait_result;
+
+       if (throttling_counter < 0)
+               return;
+
+       throttling_counter += increment;
+       if (throttling_counter < throttling_sample)
+               return;
+
+       /* Time elapsed since the last measurement (and possible wake up). */
+       elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+       /* How much should have elapsed at minimum? */
+       elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+       sleep = elapsed_min - elapsed;
+       /* Only sleep if the transfer is faster than it should be. */
+       if (sleep > 0)
+       {
+               ResetLatch(&MyWalSnd->latch);
+
+               /*
+                * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+                * the maximum time to sleep. Thus the cast to long is safe.
+                */
+               wait_result = WaitLatch(&MyWalSnd->latch,
+                                                               WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                                               (long) (sleep / 1000));
+       }
+       else
+       {
+               /*
+                * The actual transfer rate is below the limit.  A negative value would
+                * distort the adjustment of throttled_last.
+                */
+               wait_result = 0;
+               sleep = 0;
+       }
+
+       /*
+        * Only a whole multiple of throttling_sample was processed. The rest will
+        * be done during the next call of this function.
+        */
+       throttling_counter %= throttling_sample;
+
+       /* Once the (possible) sleep has ended, new period starts. */
+       if (wait_result & WL_TIMEOUT)
+               throttled_last += elapsed + sleep;
+       else if (sleep > 0)
+               /* Sleep was necessary but might have been interrupted. */
+               throttled_last = GetCurrentIntegerTimestamp();
+}
index c3f4a24a8ffbecfd9478c345000909dcf1024a0b..308889b5c9a299a9bd8609793182241f397e5522 100644 (file)
@@ -69,6 +69,7 @@ Node *replication_parse_result;
 %token K_PROGRESS
 %token K_FAST
 %token K_NOWAIT
+%token K_MAX_RATE
 %token K_WAL
 %token K_TIMELINE
 %token K_PHYSICAL
@@ -113,7 +114,7 @@ identify_system:
                        ;
 
 /*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d]
  */
 base_backup:
                        K_BASE_BACKUP base_backup_opt_list
@@ -157,6 +158,11 @@ base_backup_opt:
                                  $$ = makeDefElem("nowait",
                                                                   (Node *)makeInteger(TRUE));
                                }
+                       | K_MAX_RATE UCONST
+                               {
+                                 $$ = makeDefElem("max_rate",
+                                                                  (Node *)makeInteger($2));
+                               }
                        ;
 
 /* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
index 24195a5971979420dbc5274a2eac1f509b45a0a7..ca32aa67ff16b7ea8b9e31e0aee4642f3efbbacb 100644 (file)
@@ -86,6 +86,7 @@ IDENTIFY_SYSTEM               { return K_IDENTIFY_SYSTEM; }
 LABEL                  { return K_LABEL; }
 NOWAIT                 { return K_NOWAIT; }
 PROGRESS                       { return K_PROGRESS; }
+MAX_RATE               { return K_MAX_RATE; }
 WAL                    { return K_WAL; }
 TIMELINE                       { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
index 9d7a1e38add4dfb6a5c8d475babcb2d8c3176cf0..919805f5cfa383b50ff4f8af7aa53a17ae44afed 100644 (file)
  */
 
 #include "postgres_fe.h"
-#include "libpq-fe.h"
-#include "pqexpbuffer.h"
-#include "pgtar.h"
-#include "pgtime.h"
 
 #include <unistd.h>
 #include <dirent.h>
 #endif
 
 #include "getopt_long.h"
-
+#include "libpq-fe.h"
+#include "pqexpbuffer.h"
+#include "pgtar.h"
+#include "pgtime.h"
 #include "receivelog.h"
+#include "replication/basebackup.h"
 #include "streamutil.h"
 
 
@@ -65,6 +65,8 @@ static bool   fastcheckpoint = false;
 static bool    writerecoveryconf = false;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
 static pg_time_t last_progress_report = 0;
+static int32 maxrate = 0;              /* no limit by default */
+
 
 /* Progress counters */
 static uint64 totalsize;
@@ -226,6 +228,7 @@ usage(void)
        printf(_("\nOptions controlling the output:\n"));
        printf(_("  -D, --pgdata=DIRECTORY receive base backup into directory\n"));
        printf(_("  -F, --format=p|t       output format (plain (default), tar)\n"));
+       printf(_("  -r, --max-rate=RATE    maximum transfer rate to transfer data directory\n"));
        printf(_("  -R, --write-recovery-conf\n"
                         "                         write recovery.conf after backup\n"));
        printf(_("  -T, --tablespace-mapping=OLDDIR=NEWDIR\n"
@@ -606,6 +609,97 @@ progress_report(int tablespacenum, const char *filename, bool force)
        fprintf(stderr, "\r");
 }
 
+static int32
+parse_max_rate(char *src)
+{
+       double          result;
+       char       *after_num;
+       char    *suffix = NULL;
+
+       errno = 0;
+       result = strtod(src, &after_num);
+       if (src == after_num)
+       {
+               fprintf(stderr,
+                               _("%s: transfer rate \"%s\" is not a valid value\n"),
+                               progname, src);
+               exit(1);
+       }
+       if (errno != 0)
+       {
+               fprintf(stderr,
+                               _("%s: invalid transfer rate \"%s\": %s\n"),
+                               progname, src, strerror(errno));
+               exit(1);
+       }
+
+       if (result <= 0)
+       {
+               /*
+                * Reject obviously wrong values here.
+                */
+               fprintf(stderr, _("%s: transfer rate must be greater than zero\n"),
+                               progname);
+               exit(1);
+       }
+
+       /*
+        * Evaluate suffix, after skipping over possible whitespace.
+        * Lack of suffix means kilobytes.
+        */
+       while (*after_num != '\0' && isspace((unsigned char) *after_num))
+               after_num++;
+
+       if (*after_num != '\0')
+       {
+               suffix = after_num;
+               if (*after_num == 'k')
+               {
+                       /* kilobyte is the expected unit. */
+                       after_num++;
+               }
+               else if (*after_num == 'M')
+               {
+                       after_num++;
+                       result *= 1024.0;
+               }
+       }
+
+       /* The rest can only consist of white space. */
+       while (*after_num != '\0' && isspace((unsigned char) *after_num))
+               after_num++;
+
+       if (*after_num != '\0')
+       {
+               fprintf(stderr,
+                               _("%s: invalid --max-rate units: \"%s\"\n"),
+                               progname, suffix);
+               exit(1);
+       }
+
+       /* Valid integer? */
+       if ((uint64) result != (uint64) ((uint32) result))
+       {
+               fprintf(stderr,
+                       _("%s: transfer rate \"%s\" exceeds integer range\n"),
+                       progname, src);
+               exit(1);
+       }
+
+       /*
+        * The range is checked on the server side too, but avoid the server
+        * connection if a nonsensical value was passed.
+        */
+       if (result < MAX_RATE_LOWER || result > MAX_RATE_UPPER)
+       {
+               fprintf(stderr,
+                               _("%s: transfer rate \"%s\" is out of range\n"),
+                               progname, src);
+               exit(1);
+       }
+
+       return (int32) result;
+}
 
 /*
  * Write a piece of tar data
@@ -1485,8 +1579,9 @@ BaseBackup(void)
        char       *sysidentifier;
        uint32          latesttli;
        uint32          starttli;
-       char            current_path[MAXPGPATH];
+       char       *basebkp;
        char            escaped_label[MAXPGPATH];
+       char       *maxrate_clause = NULL;
        int                     i;
        char            xlogstart[64];
        char            xlogend[64];
@@ -1559,15 +1654,20 @@ BaseBackup(void)
         * Start the actual backup
         */
        PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
-       snprintf(current_path, sizeof(current_path),
-                        "BASE_BACKUP LABEL '%s' %s %s %s %s",
-                        escaped_label,
-                        showprogress ? "PROGRESS" : "",
-                        includewal && !streamwal ? "WAL" : "",
-                        fastcheckpoint ? "FAST" : "",
-                        includewal ? "NOWAIT" : "");
 
-       if (PQsendQuery(conn, current_path) == 0)
+       if (maxrate > 0)
+               maxrate_clause = psprintf("MAX_RATE %u", maxrate);
+
+       basebkp =
+               psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s",
+                                escaped_label,
+                                showprogress ? "PROGRESS" : "",
+                                includewal && !streamwal ? "WAL" : "",
+                                fastcheckpoint ? "FAST" : "",
+                                includewal ? "NOWAIT" : "",
+                                maxrate_clause ? maxrate_clause : "");
+
+       if (PQsendQuery(conn, basebkp) == 0)
        {
                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                                progname, "BASE_BACKUP", PQerrorMessage(conn));
@@ -1847,6 +1947,7 @@ main(int argc, char **argv)
                {"pgdata", required_argument, NULL, 'D'},
                {"format", required_argument, NULL, 'F'},
                {"checkpoint", required_argument, NULL, 'c'},
+               {"max-rate", required_argument, NULL, 'r'},
                {"write-recovery-conf", no_argument, NULL, 'R'},
                {"tablespace-mapping", required_argument, NULL, 'T'},
                {"xlog", no_argument, NULL, 'x'},
@@ -1888,7 +1989,7 @@ main(int argc, char **argv)
                }
        }
 
-       while ((c = getopt_long(argc, argv, "D:F:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
+       while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
@@ -1909,6 +2010,9 @@ main(int argc, char **argv)
                                        exit(1);
                                }
                                break;
+                       case 'r':
+                               maxrate = parse_max_rate(optarg);
+                               break;
                        case 'R':
                                writerecoveryconf = true;
                                break;
index 4e3e5f35c689d1e9b72b215fbe5f9ba41eccf722..3dbc4bc9ef85d162e7ff14f9d42e3e0965cce13b 100644 (file)
 
 #include "nodes/replnodes.h"
 
+/*
+ * Minimum and maximum values of MAX_RATE option in BASE_BACKUP command.
+ */
+#define MAX_RATE_LOWER  32
+#define MAX_RATE_UPPER  1048576
+
+
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
 #endif   /* _BASEBACKUP_H */