]> granicus.if.org Git - postgresql/commitdiff
Refactor replication connection code of various pg_basebackup utilities.
authorAndres Freund <andres@anarazel.de>
Wed, 1 Oct 2014 15:22:21 +0000 (17:22 +0200)
committerAndres Freund <andres@anarazel.de>
Wed, 1 Oct 2014 15:35:56 +0000 (17:35 +0200)
Move some more code to manage replication connection command to
streamutil.c. A later patch will introduce replication slot via
pg_receivexlog and this avoid duplicating relevant code between
pg_receivexlog and pg_recvlogical.

Author: Michael Paquier, with some editing by me.

src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/pg_recvlogical.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h

index 8b9acea9f087516df589223fab4e351e4f12f354..0ebda9ae9e04f44bd2f7c6bf22454ed2f7095499 100644 (file)
@@ -1569,8 +1569,8 @@ BaseBackup(void)
 {
        PGresult   *res;
        char       *sysidentifier;
-       uint32          latesttli;
-       uint32          starttli;
+       TimeLineID      latesttli;
+       TimeLineID      starttli;
        char       *basebkp;
        char            escaped_label[MAXPGPATH];
        char       *maxrate_clause = NULL;
@@ -1624,23 +1624,8 @@ BaseBackup(void)
        /*
         * Run IDENTIFY_SYSTEM so we can get the timeline
         */
-       res = PQexec(conn, "IDENTIFY_SYSTEM");
-       if (PQresultStatus(res) != PGRES_TUPLES_OK)
-       {
-               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                               progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+       if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
                disconnect_and_exit(1);
-       }
-       if (PQntuples(res) != 1 || PQnfields(res) < 3)
-       {
-               fprintf(stderr,
-                               _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-                               progname, PQntuples(res), PQnfields(res), 1, 3);
-               disconnect_and_exit(1);
-       }
-       sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
-       latesttli = atoi(PQgetvalue(res, 0, 1));
-       PQclear(res);
 
        /*
         * Start the actual backup
index a8b9ad3c05fe71935da0c3f1cd4c45c3dafbfeed..171cf431f57a65ddb9e35d527ce345d19f451da2 100644 (file)
@@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli)
 static void
 StreamLog(void)
 {
-       PGresult   *res;
-       XLogRecPtr      startpos;
-       uint32          starttli;
-       XLogRecPtr      serverpos;
-       uint32          servertli;
-       uint32          hi,
-                               lo;
+       XLogRecPtr      startpos, serverpos;
+       TimeLineID      starttli, servertli;
 
        /*
         * Connect in replication mode to the server
@@ -280,33 +275,12 @@ StreamLog(void)
        }
 
        /*
-        * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-        * position.
+        * Identify server, obtaining start LSN position and current timeline ID
+        * at the same time, necessary if not valid data can be found in the
+        * existing output directory.
         */
-       res = PQexec(conn, "IDENTIFY_SYSTEM");
-       if (PQresultStatus(res) != PGRES_TUPLES_OK)
-       {
-               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                               progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-               disconnect_and_exit(1);
-       }
-       if (PQntuples(res) != 1 || PQnfields(res) < 3)
-       {
-               fprintf(stderr,
-                               _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-                               progname, PQntuples(res), PQnfields(res), 1, 3);
+       if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
                disconnect_and_exit(1);
-       }
-       servertli = atoi(PQgetvalue(res, 0, 1));
-       if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
-       {
-               fprintf(stderr,
-                               _("%s: could not parse transaction log location \"%s\"\n"),
-                               progname, PQgetvalue(res, 0, 2));
-               disconnect_and_exit(1);
-       }
-       serverpos = ((uint64) hi) << 32 | lo;
-       PQclear(res);
 
        /*
         * Figure out where to start streaming.
index a88ffacc06df62fb2d812c1da7e3449afad57937..c48ceccf90164597e235d64980382ef85b13baae 100644 (file)
@@ -596,7 +596,6 @@ sighup_handler(int signum)
 int
 main(int argc, char **argv)
 {
-       PGresult   *res;
        static struct option long_options[] = {
 /* general options */
                {"file", required_argument, NULL, 'f'},
@@ -628,6 +627,7 @@ main(int argc, char **argv)
        int                     option_index;
        uint32          hi,
                                lo;
+       char       *db_name;
 
        progname = get_progname(argv[0]);
        set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
@@ -834,124 +834,62 @@ main(int argc, char **argv)
 #endif
 
        /*
-        * don't really need this but it actually helps to get more precise error
-        * messages about authentication, required GUCs and such without starting
-        * to loop around connection attempts lateron.
+        * Obtain a connection to server. This is not really necessary but it
+        * helps to get more precise error messages about authentification,
+        * required GUC parameters and such.
         */
-       {
-               conn = GetConnection();
-               if (!conn)
-                       /* Error message already written in GetConnection() */
-                       exit(1);
+       conn = GetConnection();
+       if (!conn)
+               /* Error message already written in GetConnection() */
+               exit(1);
 
-               /*
-                * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-                * position.
-                */
-               res = PQexec(conn, "IDENTIFY_SYSTEM");
-               if (PQresultStatus(res) != PGRES_TUPLES_OK)
-               {
-                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                                       progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-                       disconnect_and_exit(1);
-               }
+       /*
+        * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
+        * replication connection.
+        */
+       if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
+               disconnect_and_exit(1);
 
-               if (PQntuples(res) != 1 || PQnfields(res) < 4)
-               {
-                       fprintf(stderr,
-                                       _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-                                       progname, PQntuples(res), PQnfields(res), 1, 4);
-                       disconnect_and_exit(1);
-               }
-               PQclear(res);
+       if (db_name == NULL)
+       {
+               fprintf(stderr,
+                               _("%s: failed to establish database specific replication connection\n"),
+                               progname);
+               disconnect_and_exit(1);
        }
 
-
-       /*
-        * drop a replication slot
-        */
+       /* Drop a replication slot. */
        if (do_drop_slot)
        {
-               char            query[256];
-
                if (verbose)
                        fprintf(stderr,
                                        _("%s: dropping replication slot \"%s\"\n"),
                                        progname, replication_slot);
 
-               snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
-                                replication_slot);
-               res = PQexec(conn, query);
-               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               {
-                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                                       progname, query, PQerrorMessage(conn));
+               if (!DropReplicationSlot(conn, replication_slot))
                        disconnect_and_exit(1);
-               }
-
-               if (PQntuples(res) != 0 || PQnfields(res) != 0)
-               {
-                       fprintf(stderr,
-                                       _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
-                                       progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0);
-                       disconnect_and_exit(1);
-               }
-
-               PQclear(res);
-               disconnect_and_exit(0);
        }
 
-       /*
-        * create a replication slot
-        */
+       /* Create a replication slot. */
        if (do_create_slot)
        {
-               char            query[256];
-
                if (verbose)
                        fprintf(stderr,
                                        _("%s: creating replication slot \"%s\"\n"),
                                        progname, replication_slot);
 
-               snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-                                replication_slot, plugin);
-
-               res = PQexec(conn, query);
-               if (PQresultStatus(res) != PGRES_TUPLES_OK)
-               {
-                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                                       progname, query, PQerrorMessage(conn));
+               if (!CreateReplicationSlot(conn, replication_slot, plugin,
+                                                                  &startpos, false))
                        disconnect_and_exit(1);
-               }
-
-               if (PQntuples(res) != 1 || PQnfields(res) != 4)
-               {
-                       fprintf(stderr,
-                                       _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
-                                       progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4);
-                       disconnect_and_exit(1);
-               }
-
-               if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
-               {
-                       fprintf(stderr,
-                                       _("%s: could not parse transaction log location \"%s\"\n"),
-                                       progname, PQgetvalue(res, 0, 1));
-                       disconnect_and_exit(1);
-               }
-               startpos = ((uint64) hi) << 32 | lo;
-
-               replication_slot = strdup(PQgetvalue(res, 0, 0));
-               PQclear(res);
        }
 
-
        if (!do_start_slot)
                disconnect_and_exit(0);
 
+       /* Stream loop */
        while (true)
        {
-               StreamLog();
+               StreamLogicalLog();
                if (time_to_abort)
                {
                        /*
index 1100260c05abd83bbdc432a24d4c1d8bfbbad119..2f4bac95508cdc66a7be6e6b6cff66baa9e10362 100644 (file)
@@ -27,6 +27,7 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#include "pqexpbuffer.h"
 #include "common/fe_memutils.h"
 #include "datatype/timestamp.h"
 
@@ -227,11 +228,183 @@ GetConnection(void)
        return tmpconn;
 }
 
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - System identifier
+ * - Plugin name
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+                                 XLogRecPtr *startpos, char **db_name)
+{
+       PGresult   *res;
+       uint32          hi, lo;
+
+       /* Check connection existence */
+       Assert(conn != NULL);
+
+       res = PQexec(conn, "IDENTIFY_SYSTEM");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                               progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+               return false;
+       }
+       if (PQntuples(res) != 1 || PQnfields(res) < 3)
+       {
+               fprintf(stderr,
+                               _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                               progname, PQntuples(res), PQnfields(res), 1, 3);
+               return false;
+       }
+
+       /* Get system identifier */
+       if (sysid != NULL)
+               *sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+       /* Get timeline ID to start streaming from */
+       if (starttli != NULL)
+               *starttli = atoi(PQgetvalue(res, 0, 1));
+
+       /* Get LSN start position if necessary */
+       if (startpos != NULL)
+       {
+               if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not parse transaction log location \"%s\"\n"),
+                                       progname, PQgetvalue(res, 0, 2));
+                       return false;
+               }
+               *startpos = ((uint64) hi) << 32 | lo;
+       }
+
+       /* Get database name, only available in 9.4 and newer versions */
+       if  (db_name != NULL)
+       {
+               if (PQnfields(res) < 4)
+                       fprintf(stderr,
+                                       _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                                       progname, PQntuples(res), PQnfields(res), 1, 4);
+
+               if (PQgetisnull(res, 0, 3))
+                       *db_name =  NULL;
+               else
+                       *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+       }
+
+       PQclear(res);
+       return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
+                                         XLogRecPtr *startpos, bool is_physical)
+{
+       PQExpBuffer query;
+       PGresult   *res;
+
+       query = createPQExpBuffer();
+
+       Assert((is_physical && plugin == NULL) ||
+                  (!is_physical && plugin != NULL));
+       Assert(slot_name != NULL);
+
+       /* Build query */
+       if (is_physical)
+               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+                                                 slot_name);
+       else
+               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+                                                 slot_name, plugin);
+
+       res = PQexec(conn, query->data);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                               progname, query->data, PQerrorMessage(conn));
+               return false;
+       }
+
+       if (PQntuples(res) != 1 || PQnfields(res) != 4)
+       {
+               fprintf(stderr,
+                               _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+                               progname, slot_name,
+                               PQntuples(res), PQnfields(res), 1, 4);
+               return false;
+       }
+
+       /* Get LSN start position if necessary */
+       if (startpos != NULL)
+       {
+               uint32          hi, lo;
+
+               if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+               {
+                       fprintf(stderr,
+                                       _("%s: could not parse transaction log location \"%s\"\n"),
+                                       progname, PQgetvalue(res, 0, 1));
+                       return false;
+               }
+               *startpos = ((uint64) hi) << 32 | lo;
+       }
+
+       PQclear(res);
+       return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, const char *slot_name)
+{
+       PQExpBuffer query;
+       PGresult   *res;
+
+       Assert(slot_name != NULL);
+
+       query = createPQExpBuffer();
+
+       /* Build query */
+       appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
+                                         slot_name);
+       res = PQexec(conn, query->data);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                               progname, query->data, PQerrorMessage(conn));
+               return false;
+       }
+
+       if (PQntuples(res) != 0 || PQnfields(res) != 0)
+       {
+               fprintf(stderr,
+                               _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+                               progname, slot_name,
+                               PQntuples(res), PQnfields(res), 0, 0);
+               return false;
+       }
+
+       PQclear(res);
+       return true;
+}
+
 
 /*
  * Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
+ * backend code. The replication protocol always uses integer timestamps,
+ * regardless of the server setting.
  */
 int64
 feGetCurrentTimestamp(void)
index 8c6691f9c8c6d318a17961484af9b7774427abf5..ac66145c359e91783060e8368b334a5e4e707096 100644 (file)
@@ -14,6 +14,8 @@
 
 #include "libpq-fe.h"
 
+#include "access/xlogdefs.h"
+
 extern const char *progname;
 extern char *connection_string;
 extern char *dbhost;
@@ -28,6 +30,15 @@ extern PGconn *conn;
 
 extern PGconn *GetConnection(void);
 
+/* Replication commands */
+extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
+                                                                 const char *plugin, XLogRecPtr *startpos,
+                                                                 bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
+extern bool RunIdentifySystem(PGconn *conn, char **sysid,
+                                                         TimeLineID *starttli,
+                                                         XLogRecPtr *startpos,
+                                                         char **db_name);
 extern int64 feGetCurrentTimestamp(void);
 extern void feTimestampDifference(int64 start_time, int64 stop_time,
                                          long *secs, int *microsecs);