]> granicus.if.org Git - postgresql/commitdiff
Add support for managing physical replication slots to pg_receivexlog.
authorAndres Freund <andres@anarazel.de>
Mon, 6 Oct 2014 10:51:37 +0000 (12:51 +0200)
committerAndres Freund <andres@anarazel.de>
Mon, 6 Oct 2014 10:51:37 +0000 (12:51 +0200)
pg_receivexlog already has the capability to use a replication slot to
reserve WAL on the upstream node. But the used slot currently has to
be created via SQL.

To allow using slots directly, without involving SQL, add
--create-slot and --drop-slot actions, analogous to the logical slot
manipulation support in pg_recvlogical.

Author: Michael Paquier
Discussion: CABUevEx+zrOHZOQg+dPapNPFRJdsk59b=TSVf30Z71GnFXhQaw@mail.gmail.com

doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/pg_receivexlog.c

index 5916b8f40dacf291e7c4848eb1853a1464c8a535..c1af12abf7552ecdbef41865897ea50b42f06049 100644 (file)
@@ -255,13 +255,42 @@ PostgreSQL documentation
          to make sure that <application>pg_receivexlog</> cannot become the
          synchronous standby through an incautious setting of
          <xref linkend="guc-synchronous-standby-names">; it does not flush
-         data frequently enough for this to work correctly.
+         data frequently enough for this to work correctly. In
+         <option>--create-slot</option> mode, create the slot with this name.
+         In <option>--drop-slot</option> mode, delete the slot with this name.
         </para>
       </listitem>
      </varlistentry>
     </variablelist>
    </para>
 
+   <para>
+    <application>pg_receivexlog</application> can perform one of the two
+    following actions in order to control physical replication slots:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>--create-slot</option></term>
+      <listitem>
+       <para>
+        Create a new physical replication slot with the name specified in
+        <option>--slot</option>, then start to stream WAL.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--drop-slot</option></term>
+      <listitem>
+       <para>
+        Drop the replication slot with the name specified in
+        <option>--slot</option>, then exit.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
    <para>
     Other options are also available:
 
index 171cf431f57a65ddb9e35d527ce345d19f451da2..e6f69e4edd1aab7e78a99468fee52a277f8cc447 100644 (file)
@@ -38,11 +38,15 @@ static int  noloop = 0;
 static int     standby_message_timeout = 10 * 1000;            /* 10 sec = default */
 static int     fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
+static bool do_create_slot = false;
+static bool do_drop_slot = false;
 
 
 static void usage(void);
+static DIR* get_destination_dir(char *dest_folder);
+static void close_destination_dir(DIR *dest_dir, char *dest_folder);
 static XLogRecPtr FindStreamingStart(uint32 *tli);
-static void StreamLog();
+static void StreamLog(void);
 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
                           bool segment_finished);
 
@@ -78,6 +82,9 @@ usage(void)
        printf(_("  -w, --no-password      never prompt for password\n"));
        printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
        printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
+       printf(_("\nOptional actions:\n"));
+       printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
+       printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
        printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
 }
 
@@ -118,6 +125,44 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
        return false;
 }
 
+
+/*
+ * Get destination directory.
+ */
+static DIR*
+get_destination_dir(char *dest_folder)
+{
+       DIR *dir;
+
+       Assert(dest_folder != NULL);
+       dir = opendir(dest_folder);
+       if (dir == NULL)
+       {
+               fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+                               progname, basedir, strerror(errno));
+               disconnect_and_exit(1);
+       }
+
+       return dir;
+}
+
+
+/*
+ * Close existing directory.
+ */
+static void
+close_destination_dir(DIR *dest_dir, char *dest_folder)
+{
+       Assert(dest_dir != NULL && dest_folder != NULL);
+       if (closedir(dest_dir))
+       {
+               fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
+                               progname, dest_folder, strerror(errno));
+               disconnect_and_exit(1);
+       }
+}
+
+
 /*
  * Determine starting location for streaming, based on any existing xlog
  * segments in the directory. We start at the end of the last one that is
@@ -134,13 +179,7 @@ FindStreamingStart(uint32 *tli)
        uint32          high_tli = 0;
        bool            high_ispartial = false;
 
-       dir = opendir(basedir);
-       if (dir == NULL)
-       {
-               fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
-                               progname, basedir, strerror(errno));
-               disconnect_and_exit(1);
-       }
+       dir = get_destination_dir(basedir);
 
        while (errno = 0, (dirent = readdir(dir)) != NULL)
        {
@@ -219,12 +258,7 @@ FindStreamingStart(uint32 *tli)
                disconnect_and_exit(1);
        }
 
-       if (closedir(dir))
-       {
-               fprintf(stderr, _("%s: could not close directory \"%s\": %s\n"),
-                               progname, basedir, strerror(errno));
-               disconnect_and_exit(1);
-       }
+       close_destination_dir(dir, basedir);
 
        if (high_segno > 0)
        {
@@ -344,11 +378,15 @@ main(int argc, char **argv)
                {"status-interval", required_argument, NULL, 's'},
                {"slot", required_argument, NULL, 'S'},
                {"verbose", no_argument, NULL, 'v'},
+/* action */
+               {"create-slot", no_argument, NULL, 1},
+               {"drop-slot", no_argument, NULL, 2},
                {NULL, 0, NULL, 0}
        };
 
        int                     c;
        int                     option_index;
+       char       *db_name;
 
        progname = get_progname(argv[0]);
        set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
@@ -427,6 +465,13 @@ main(int argc, char **argv)
                        case 'v':
                                verbose++;
                                break;
+/* action */
+                       case 1:
+                               do_create_slot = true;
+                               break;
+                       case 2:
+                               do_drop_slot = true;
+                               break;
                        default:
 
                                /*
@@ -451,10 +496,26 @@ main(int argc, char **argv)
                exit(1);
        }
 
+       if (replication_slot == NULL && (do_drop_slot || do_create_slot))
+       {
+               fprintf(stderr, _("%s: --create-slot and --drop-slot need a slot to be specified using --slot\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
+       if (do_drop_slot && do_create_slot)
+       {
+               fprintf(stderr, _("%s: cannot use --create-slot together with --drop-slot\n"), progname);
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
+               exit(1);
+       }
+
        /*
         * Required arguments
         */
-       if (basedir == NULL)
+       if (basedir == NULL && !do_drop_slot)
        {
                fprintf(stderr, _("%s: no target directory specified\n"), progname);
                fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
@@ -462,10 +523,74 @@ main(int argc, char **argv)
                exit(1);
        }
 
+       /*
+        * Check existence of destination folder.
+        */
+       if (!do_drop_slot)
+       {
+               DIR *dir = get_destination_dir(basedir);
+               close_destination_dir(dir, basedir);
+       }
+
 #ifndef WIN32
        pqsignal(SIGINT, sigint_handler);
 #endif
 
+       /*
+        * Obtain a connection before doing anything.
+        */
+       conn = GetConnection();
+       if (!conn)
+               /* error message already written in GetConnection() */
+               exit(1);
+
+       /*
+        * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
+        * replication connection and haven't connected using a database specific
+        * connection.
+        */
+       if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
+               disconnect_and_exit(1);
+
+       /*
+        * Check that there is a database associated with connection, none
+        * should be defined in this context.
+        */
+       if (db_name)
+       {
+               fprintf(stderr,
+                               _("%s: replication connection using slot \"%s\" is unexpectedly database specific\n"),
+                               progname, replication_slot);
+               disconnect_and_exit(1);
+       }
+
+       /*
+        * Drop a replication slot.
+        */
+       if (do_drop_slot)
+       {
+               if (verbose)
+                       fprintf(stderr,
+                                       _("%s: dropping replication slot \"%s\"\n"),
+                                       progname, replication_slot);
+
+               if (!DropReplicationSlot(conn, replication_slot))
+                       disconnect_and_exit(1);
+               disconnect_and_exit(0);
+       }
+
+       /* Create a replication slot */
+       if (do_create_slot)
+       {
+               if (verbose)
+                       fprintf(stderr,
+                                       _("%s: creating replication slot \"%s\"\n"),
+                                       progname, replication_slot);
+
+               if (!CreateReplicationSlot(conn, replication_slot, NULL, NULL, true))
+                       disconnect_and_exit(1);
+       }
+
        while (true)
        {
                StreamLog();