</varlistentry>
<varlistentry>
- <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
+ <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal> <replaceable>rate</replaceable>]</term>
<listitem>
<para>
Instructs the server to start streaming a base backup.
the waiting and the warning, leaving the client responsible for
ensuring the required log is available.
</para>
- </listitem>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>MAX_RATE</literal> <replaceable>rate</></term>
+ <listitem>
+ <para>
+ Limit (throttle) the maximum amount of data transferred from server
+ to client per unit of time. The expected unit is kilobytes per second.
+ If this option is specified, the value must either be equal to zero
+ or it must fall within the range from 32 kB through 1 GB (inclusive).
+ If zero is passed or the option is not specified, no restriction is
+ imposed on the transfer.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-r <replaceable class="parameter">rate</replaceable></option></term>
+ <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term>
+ <listitem>
+ <para>
+ The maximum transfer rate of data transferred from the server. Values are
+ in kilobytes per second. Use a suffix of <literal>M</> to indicate megabytes
+ per second. A suffix of <literal>k</> is also accepted, and has no effect.
+ Valid values are between 32 kilobytes per second and 1024 megabytes per second.
+ </para>
+ <para>
+ The purpose is to limit the impact of <application>pg_basebackup</application>
+ on the running server.
+ </para>
+ <para>
+ This option always affects transfer of the data directory. Transfer of
+ WAL files is only affected if the collection method is <literal>fetch</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>-R</option></term>
<term><option>--write-recovery-conf</option></term>
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
+#include "pgtar.h"
#include "pgstat.h"
#include "replication/basebackup.h"
#include "replication/walsender.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/ps_status.h"
-#include "pgtar.h"
+#include "utils/timestamp.h"
+
typedef struct
{
bool fastcheckpoint;
bool nowait;
bool includewal;
+ uint32 maxrate;
} basebackup_options;
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
*/
#define TAR_SEND_SIZE 32768
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/* The actual number of bytes, transfer of which may cause sleep. */
+static uint64 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled. */
+static int64 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
typedef struct
{
char *oid;
/* Send tablespace header */
SendBackupHeader(tablespaces);
+ /* Setup and activate network throttling, if client requested it */
+ if (opt->maxrate > 0)
+ {
+ throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample
+ * bytes to be transfered.
+ */
+ elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ /* Enable throttling. */
+ throttling_counter = 0;
+
+ /* The 'real data' starts now (header was ignored). */
+ throttled_last = GetCurrentIntegerTimestamp();
+ }
+ else
+ {
+ /* Disable throttling. */
+ throttling_counter = -1;
+ }
+
/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
{
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
+
if (len == XLogSegSize)
break;
}
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;
+ bool o_maxrate = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
opt->includewal = true;
o_wal = true;
}
+ else if (strcmp(defel->defname, "max_rate") == 0)
+ {
+ long maxrate;
+
+ if (o_maxrate)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ maxrate = intVal(defel->arg);
+ if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)",
+ (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER)));
+
+ opt->maxrate = (uint32) maxrate;
+ o_maxrate = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
if (len >= statbuf->st_size)
{
cnt = Min(sizeof(buf), statbuf->st_size - len);
pq_putmessage('d', buf, cnt);
len += cnt;
+ throttle(cnt);
}
}
- /* Pad to 512 byte boundary, per tar format requirements */
+ /*
+ * Pad to 512 byte boundary, per tar format requirements. (This small
+ * piece of data is probably not worth throttling.)
+ */
pad = ((len + 511) & ~511) - len;
if (pad > 0)
{
pq_putmessage('d', h, 512);
}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(size_t increment)
+{
+ int64 elapsed,
+ elapsed_min,
+ sleep;
+ int wait_result;
+
+ if (throttling_counter < 0)
+ return;
+
+ throttling_counter += increment;
+ if (throttling_counter < throttling_sample)
+ return;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+ /* How much should have elapsed at minimum? */
+ elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+ sleep = elapsed_min - elapsed;
+ /* Only sleep if the transfer is faster than it should be. */
+ if (sleep > 0)
+ {
+ ResetLatch(&MyWalSnd->latch);
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(&MyWalSnd->latch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ (long) (sleep / 1000));
+ }
+ else
+ {
+ /*
+ * The actual transfer rate is below the limit. A negative value would
+ * distort the adjustment of throttled_last.
+ */
+ wait_result = 0;
+ sleep = 0;
+ }
+
+ /*
+ * Only a whole multiple of throttling_sample was processed. The rest will
+ * be done during the next call of this function.
+ */
+ throttling_counter %= throttling_sample;
+
+ /* Once the (possible) sleep has ended, new period starts. */
+ if (wait_result & WL_TIMEOUT)
+ throttled_last += elapsed + sleep;
+ else if (sleep > 0)
+ /* Sleep was necessary but might have been interrupted. */
+ throttled_last = GetCurrentIntegerTimestamp();
+}
%token K_PROGRESS
%token K_FAST
%token K_NOWAIT
+%token K_MAX_RATE
%token K_WAL
%token K_TIMELINE
%token K_PHYSICAL
;
/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d]
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
$$ = makeDefElem("nowait",
(Node *)makeInteger(TRUE));
}
+ | K_MAX_RATE UCONST
+ {
+ $$ = makeDefElem("max_rate",
+ (Node *)makeInteger($2));
+ }
;
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
+MAX_RATE { return K_MAX_RATE; }
WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
*/
#include "postgres_fe.h"
-#include "libpq-fe.h"
-#include "pqexpbuffer.h"
-#include "pgtar.h"
-#include "pgtime.h"
#include <unistd.h>
#include <dirent.h>
#endif
#include "getopt_long.h"
-
+#include "libpq-fe.h"
+#include "pqexpbuffer.h"
+#include "pgtar.h"
+#include "pgtime.h"
#include "receivelog.h"
+#include "replication/basebackup.h"
#include "streamutil.h"
static bool writerecoveryconf = false;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static pg_time_t last_progress_report = 0;
+static int32 maxrate = 0; /* no limit by default */
+
/* Progress counters */
static uint64 totalsize;
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain (default), tar)\n"));
+ printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n"));
printf(_(" -R, --write-recovery-conf\n"
" write recovery.conf after backup\n"));
printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n"
fprintf(stderr, "\r");
}
+static int32
+parse_max_rate(char *src)
+{
+ double result;
+ char *after_num;
+ char *suffix = NULL;
+
+ errno = 0;
+ result = strtod(src, &after_num);
+ if (src == after_num)
+ {
+ fprintf(stderr,
+ _("%s: transfer rate \"%s\" is not a valid value\n"),
+ progname, src);
+ exit(1);
+ }
+ if (errno != 0)
+ {
+ fprintf(stderr,
+ _("%s: invalid transfer rate \"%s\": %s\n"),
+ progname, src, strerror(errno));
+ exit(1);
+ }
+
+ if (result <= 0)
+ {
+ /*
+ * Reject obviously wrong values here.
+ */
+ fprintf(stderr, _("%s: transfer rate must be greater than zero\n"),
+ progname);
+ exit(1);
+ }
+
+ /*
+ * Evaluate suffix, after skipping over possible whitespace.
+ * Lack of suffix means kilobytes.
+ */
+ while (*after_num != '\0' && isspace((unsigned char) *after_num))
+ after_num++;
+
+ if (*after_num != '\0')
+ {
+ suffix = after_num;
+ if (*after_num == 'k')
+ {
+ /* kilobyte is the expected unit. */
+ after_num++;
+ }
+ else if (*after_num == 'M')
+ {
+ after_num++;
+ result *= 1024.0;
+ }
+ }
+
+ /* The rest can only consist of white space. */
+ while (*after_num != '\0' && isspace((unsigned char) *after_num))
+ after_num++;
+
+ if (*after_num != '\0')
+ {
+ fprintf(stderr,
+ _("%s: invalid --max-rate units: \"%s\"\n"),
+ progname, suffix);
+ exit(1);
+ }
+
+ /* Valid integer? */
+ if ((uint64) result != (uint64) ((uint32) result))
+ {
+ fprintf(stderr,
+ _("%s: transfer rate \"%s\" exceeds integer range\n"),
+ progname, src);
+ exit(1);
+ }
+
+ /*
+ * The range is checked on the server side too, but avoid the server
+ * connection if a nonsensical value was passed.
+ */
+ if (result < MAX_RATE_LOWER || result > MAX_RATE_UPPER)
+ {
+ fprintf(stderr,
+ _("%s: transfer rate \"%s\" is out of range\n"),
+ progname, src);
+ exit(1);
+ }
+
+ return (int32) result;
+}
/*
* Write a piece of tar data
char *sysidentifier;
uint32 latesttli;
uint32 starttli;
- char current_path[MAXPGPATH];
+ char *basebkp;
char escaped_label[MAXPGPATH];
+ char *maxrate_clause = NULL;
int i;
char xlogstart[64];
char xlogend[64];
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
- snprintf(current_path, sizeof(current_path),
- "BASE_BACKUP LABEL '%s' %s %s %s %s",
- escaped_label,
- showprogress ? "PROGRESS" : "",
- includewal && !streamwal ? "WAL" : "",
- fastcheckpoint ? "FAST" : "",
- includewal ? "NOWAIT" : "");
- if (PQsendQuery(conn, current_path) == 0)
+ if (maxrate > 0)
+ maxrate_clause = psprintf("MAX_RATE %u", maxrate);
+
+ basebkp =
+ psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s",
+ escaped_label,
+ showprogress ? "PROGRESS" : "",
+ includewal && !streamwal ? "WAL" : "",
+ fastcheckpoint ? "FAST" : "",
+ includewal ? "NOWAIT" : "",
+ maxrate_clause ? maxrate_clause : "");
+
+ if (PQsendQuery(conn, basebkp) == 0)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "BASE_BACKUP", PQerrorMessage(conn));
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
+ {"max-rate", required_argument, NULL, 'r'},
{"write-recovery-conf", no_argument, NULL, 'R'},
{"tablespace-mapping", required_argument, NULL, 'T'},
{"xlog", no_argument, NULL, 'x'},
}
}
- while ((c = getopt_long(argc, argv, "D:F:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
+ while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
exit(1);
}
break;
+ case 'r':
+ maxrate = parse_max_rate(optarg);
+ break;
case 'R':
writerecoveryconf = true;
break;
#include "nodes/replnodes.h"
+/*
+ * Minimum and maximum values of MAX_RATE option in BASE_BACKUP command.
+ */
+#define MAX_RATE_LOWER 32
+#define MAX_RATE_UPPER 1048576
+
+
extern void SendBaseBackup(BaseBackupCmd *cmd);
#endif /* _BASEBACKUP_H */