]> granicus.if.org Git - postgresql/commitdiff
pg_basebackup: Add option to create replication slot
authorPeter Eisentraut <peter_e@gmx.net>
Tue, 26 Sep 2017 20:07:52 +0000 (16:07 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Wed, 27 Sep 2017 12:49:47 +0000 (08:49 -0400)
When requesting a particular replication slot, the new pg_basebackup
option -C/--create-slot creates it before starting to replicate from it.

Further refactor the slot creation logic to include the temporary slot
creation logic into the same function.  Add new arguments is_temporary
and preserve_wal to CreateReplicationSlot().  Print in --verbose mode
that a slot has been created.

Author: Michael Banck <michael.banck@credativ.de>

doc/src/sgml/ref/pg_basebackup.sgml
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/pg_recvlogical.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h
src/bin/pg_basebackup/t/010_pg_basebackup.pl

index b7aa128f7f7c036be3b82b2dd7dac7a858387628..f790c5600345791a818fd5f8b575efd424f3d4dc 100644 (file)
@@ -382,6 +382,18 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-C</option></term>
+      <term><option>--create-slot</option></term>
+      <listitem>
+       <para>
+        This option causes the replication slot specified by the
+        option <literal>--slot</literal> to be created before starting the
+        backup.  In this case, an error is raised if the slot already exists.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-l <replaceable class="parameter">label</replaceable></option></term>
       <term><option>--label=<replaceable class="parameter">label</replaceable></option></term>
@@ -462,6 +474,10 @@ PostgreSQL documentation
         the server does not remove any necessary WAL data in the time between
         the end of the base backup and the start of streaming replication.
        </para>
+       <para>
+        The specified replication slot has to exist unless the
+        option <option>-C</option> is also used.
+       </para>
        <para>
         If this option is not specified and the server supports temporary
         replication slots (version 10 and later), then a temporary replication
index 537978090e008666b22e5f7b40b9899779f46cec..dac7299ff48ff01178b7a0de1d38c7ae4bfa0446 100644 (file)
@@ -93,6 +93,8 @@ static pg_time_t last_progress_report = 0;
 static int32 maxrate = 0;              /* no limit by default */
 static char *replication_slot = NULL;
 static bool temp_replication_slot = true;
+static bool create_slot = false;
+static bool no_slot = false;
 
 static bool success = false;
 static bool made_new_pgdata = false;
@@ -346,6 +348,7 @@ usage(void)
        printf(_("\nGeneral options:\n"));
        printf(_("  -c, --checkpoint=fast|spread\n"
                         "                         set fast or spread checkpointing\n"));
+       printf(_("  -C, --create-slot      create replication slot\n"));
        printf(_("  -l, --label=LABEL      set backup label\n"));
        printf(_("  -n, --no-clean         do not clean up after errors\n"));
        printf(_("  -N, --no-sync          do not wait for changes to be written safely to disk\n"));
@@ -466,7 +469,6 @@ typedef struct
        char            xlog[MAXPGPATH];        /* directory or tarfile depending on mode */
        char       *sysidentifier;
        int                     timeline;
-       bool            temp_slot;
 } logstreamer_param;
 
 static int
@@ -492,9 +494,6 @@ LogStreamerMain(logstreamer_param *param)
        stream.mark_done = true;
        stream.partial_suffix = NULL;
        stream.replication_slot = replication_slot;
-       stream.temp_slot = param->temp_slot;
-       if (stream.temp_slot && !stream.replication_slot)
-               stream.replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
 
        if (format == 'p')
                stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
@@ -583,9 +582,29 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 
        /* Temporary replication slots are only supported in 10 and newer */
        if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
-               param->temp_slot = false;
-       else
-               param->temp_slot = temp_replication_slot;
+               temp_replication_slot = false;
+
+       /*
+        * Create replication slot if requested
+        */
+       if (temp_replication_slot && !replication_slot)
+               replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
+       if (temp_replication_slot || create_slot)
+       {
+               if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
+                                                                  temp_replication_slot, true, true, false))
+                       disconnect_and_exit(1);
+
+               if (verbose)
+               {
+                       if (temp_replication_slot)
+                               fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
+                                               progname, replication_slot);
+                       else
+                               fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
+                                               progname, replication_slot);
+               }
+       }
 
        if (format == 'p')
        {
@@ -2079,6 +2098,7 @@ main(int argc, char **argv)
                {"pgdata", required_argument, NULL, 'D'},
                {"format", required_argument, NULL, 'F'},
                {"checkpoint", required_argument, NULL, 'c'},
+               {"create-slot", no_argument, NULL, 'C'},
                {"max-rate", required_argument, NULL, 'r'},
                {"write-recovery-conf", no_argument, NULL, 'R'},
                {"slot", required_argument, NULL, 'S'},
@@ -2105,7 +2125,6 @@ main(int argc, char **argv)
        int                     c;
 
        int                     option_index;
-       bool            no_slot = false;
 
        progname = get_progname(argv[0]);
        set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
@@ -2127,11 +2146,14 @@ main(int argc, char **argv)
 
        atexit(cleanup_directories_atexit);
 
-       while ((c = getopt_long(argc, argv, "D:F:r:RT:X:l:nNzZ:d:c:h:p:U:s:S:wWvP",
+       while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWvP",
                                                        long_options, &option_index)) != -1)
        {
                switch (c)
                {
+                       case 'C':
+                               create_slot = true;
+                               break;
                        case 'D':
                                basedir = pg_strdup(optarg);
                                break;
@@ -2348,6 +2370,29 @@ main(int argc, char **argv)
                temp_replication_slot = false;
        }
 
+       if (create_slot)
+       {
+               if (!replication_slot)
+               {
+                       fprintf(stderr,
+                                       _("%s: --create-slot needs a slot to be specified using --slot\n"),
+                                       progname);
+                       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                                       progname);
+                       exit(1);
+               }
+
+               if (no_slot)
+               {
+                       fprintf(stderr,
+                                       _("%s: --create-slot and --no-slot are incompatible options\n"),
+                                       progname);
+                       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                                       progname);
+                       exit(1);
+               }
+       }
+
        if (xlog_dir)
        {
                if (format != 'p')
index fbac0df93d858a87d4acedeb5707963a7e9d2c40..888ae6c571902be4c9f91fec1587aee8acafc87c 100644 (file)
@@ -431,7 +431,6 @@ StreamLog(void)
                                                                                                stream.do_sync);
        stream.partial_suffix = ".partial";
        stream.replication_slot = replication_slot;
-       stream.temp_slot = false;
 
        ReceiveXlogStream(conn, &stream);
 
@@ -728,7 +727,7 @@ main(int argc, char **argv)
                                        _("%s: creating replication slot \"%s\"\n"),
                                        progname, replication_slot);
 
-               if (!CreateReplicationSlot(conn, replication_slot, NULL, true,
+               if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
                                                                   slot_exists_ok))
                        disconnect_and_exit(1);
                disconnect_and_exit(0);
index 6811a55e7646ea5c0270fe5ee6957115d210146e..3109d0f99fb071a48f84d7d38dc5a45111ff9cf9 100644 (file)
@@ -979,8 +979,8 @@ main(int argc, char **argv)
                                        _("%s: creating replication slot \"%s\"\n"),
                                        progname, replication_slot);
 
-               if (!CreateReplicationSlot(conn, replication_slot, plugin,
-                                                                  false, slot_exists_ok))
+               if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
+                                                                  false, false, slot_exists_ok))
                        disconnect_and_exit(1);
                startpos = InvalidXLogRecPtr;
        }
index 65931f6454187717afa8f43caadace440b871b6e..07509cb82594cba735c1068eff1d988243913075 100644 (file)
@@ -522,24 +522,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
                PQclear(res);
        }
 
-       /*
-        * Create temporary replication slot if one is needed
-        */
-       if (stream->temp_slot)
-       {
-               snprintf(query, sizeof(query),
-                                "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
-                                stream->replication_slot);
-               res = PQexec(conn, query);
-               if (PQresultStatus(res) != PGRES_TUPLES_OK)
-               {
-                       fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
-                                       progname, stream->replication_slot, PQerrorMessage(conn));
-                       PQclear(res);
-                       return false;
-               }
-       }
-
        /*
         * initialize flush position to starting point, it's the caller's
         * responsibility that that's sane.
index bb786ce289e1db63c4b8bd5196d0753754d84b85..5b8c33fc26c360ef0677c20b7509a424f76a9bd2 100644 (file)
@@ -47,7 +47,6 @@ typedef struct StreamCtl
        WalWriteMethod *walmethod;      /* How to write the WAL */
        char       *partial_suffix; /* Suffix appended to partially received files */
        char       *replication_slot;   /* Replication slot to use, or NULL */
-       bool            temp_slot;              /* Create temporary replication slot */
 } StreamCtl;
 
 
index df17f60596a4b74d98810fdd876beec9050cb234..81fef8cd516c6e836c5ab85ade3c40b70cd51353 100644 (file)
@@ -398,7 +398,8 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
  */
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
-                                         bool is_physical, bool slot_exists_ok)
+                                         bool is_temporary, bool is_physical, bool reserve_wal,
+                                         bool slot_exists_ok)
 {
        PQExpBuffer query;
        PGresult   *res;
@@ -410,13 +411,18 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
        Assert(slot_name != NULL);
 
        /* Build query */
+       appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+       if (is_temporary)
+               appendPQExpBuffer(query, " TEMPORARY");
        if (is_physical)
-               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
-                                                 slot_name);
+       {
+               appendPQExpBuffer(query, " PHYSICAL");
+               if (reserve_wal)
+                       appendPQExpBuffer(query, " RESERVE_WAL");
+       }
        else
        {
-               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-                                                 slot_name, plugin);
+               appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
                if (PQserverVersion(conn) >= 100000)
                        /* pg_recvlogical doesn't use an exported snapshot, so suppress */
                        appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
index ec227712d5622b8b2c0e83974f11a8ebebf159e7..908fd68c2bf1771ea4337cb48aaa12dfbf60c673 100644 (file)
@@ -33,8 +33,9 @@ extern PGconn *GetConnection(void);
 
 /* Replication commands */
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
-                                         const char *plugin, bool is_physical,
-                                         bool slot_exists_ok);
+                                                                 const char *plugin, bool is_temporary,
+                                                                 bool is_physical, bool reserve_wal,
+                                                                 bool slot_exists_ok);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
                                  TimeLineID *starttli,
index cce14b83e10697f66c173e9ac6a6b1723f4fa707..6a8be09f4cd9a6e208d28a724239776d8abcb3ce 100644 (file)
@@ -4,7 +4,7 @@ use Cwd;
 use Config;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 72;
+use Test::More tests => 78;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -259,9 +259,32 @@ $node->command_fails(
        [   'pg_basebackup',             '-D',
                "$tempdir/backupxs_sl_fail", '-X',
                'stream',                    '-S',
-               'slot1' ],
+               'slot0' ],
        'pg_basebackup fails with nonexistent replication slot');
 
+$node->command_fails(
+       [   'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
+       'pg_basebackup -C fails without slot name');
+
+$node->command_fails(
+       [   'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '--no-slot' ],
+       'pg_basebackup fails with -C -S --no-slot');
+
+$node->command_ok(
+       [   'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0' ],
+       'pg_basebackup -C runs');
+
+is($node->safe_psql('postgres', q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}),
+   'slot0',
+   'replication slot was created');
+isnt($node->safe_psql('postgres', q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}),
+   '',
+   'restart LSN of new slot is not null');
+
+$node->command_fails(
+       [   'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0' ],
+       'pg_basebackup fails with -C -S and a previously existing slot');
+
 $node->safe_psql('postgres',
        q{SELECT * FROM pg_create_physical_replication_slot('slot1')});
 my $lsn = $node->safe_psql('postgres',