]> granicus.if.org Git - postgresql/blobdiff - src/bin/pg_basebackup/streamutil.c
Empty search_path in Autovacuum and non-psql/pgbench clients.
[postgresql] / src / bin / pg_basebackup / streamutil.c
index ac84e6d360e410e79d9b2d0410fcddfed547f56b..296b1888aad729993f77d12778748e82c4b8d268 100644 (file)
@@ -1,10 +1,11 @@
 /*-------------------------------------------------------------------------
  *
- * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
+ *                                     pg_recvlogical
  *
  * Author: Magnus Hagander <magnus@hagander.net>
  *
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *               src/bin/pg_basebackup/streamutil.c
 
 #include "postgres_fe.h"
 
-#include <stdio.h>
-#include <string.h>
 #include <sys/time.h>
-#include <sys/types.h>
 #include <unistd.h>
 
-/* for ntohl/htonl */
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
 /* local includes */
 #include "receivelog.h"
 #include "streamutil.h"
 
-#include "pqexpbuffer.h"
+#include "access/xlog_internal.h"
 #include "common/fe_memutils.h"
 #include "datatype/timestamp.h"
+#include "fe_utils/connect.h"
+#include "port/pg_bswap.h"
+#include "pqexpbuffer.h"
+
+#define ERRCODE_DUPLICATE_OBJECT  "42710"
+
+uint32         WalSegSz;
+
+/* SHOW command for replication connection was introduced in version 10 */
+#define MINIMUM_VERSION_FOR_SHOW_CMD 100000
 
 const char *progname;
 char      *connection_string = NULL;
 char      *dbhost = NULL;
 char      *dbuser = NULL;
 char      *dbport = NULL;
-char      *replication_slot = NULL;
 char      *dbname = NULL;
 int                    dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
-static char *dbpassword = NULL;
+static bool have_password = false;
+static char password[100];
 PGconn    *conn = NULL;
 
 /*
@@ -62,9 +66,15 @@ GetConnection(void)
        PQconninfoOption *conn_opt;
        char       *err_msg = NULL;
 
+       /* pg_recvlogical uses dbname only; others use connection_string only. */
+       Assert(dbname == NULL || connection_string == NULL);
+
        /*
         * Merge the connection info inputs given in form of connection string,
         * options and default values (dbname=replication, replication=true, etc.)
+        * Explicitly discard any dbname value in the connection string;
+        * otherwise, PQconnectdbParams() would interpret that value as being
+        * itself a connection string.
         */
        i = 0;
        if (connection_string)
@@ -78,7 +88,8 @@ GetConnection(void)
 
                for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
                {
-                       if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+                       if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+                               strcmp(conn_opt->keyword, "dbname") != 0)
                                argcount++;
                }
 
@@ -87,7 +98,8 @@ GetConnection(void)
 
                for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
                {
-                       if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+                       if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+                               strcmp(conn_opt->keyword, "dbname") != 0)
                        {
                                keywords[i] = conn_opt->keyword;
                                values[i] = conn_opt->val;
@@ -131,24 +143,23 @@ GetConnection(void)
        }
 
        /* If -W was given, force prompt for password, but only the first time */
-       need_password = (dbgetpassword == 1 && dbpassword == NULL);
+       need_password = (dbgetpassword == 1 && !have_password);
 
        do
        {
                /* Get a new password if appropriate */
                if (need_password)
                {
-                       if (dbpassword)
-                               free(dbpassword);
-                       dbpassword = simple_prompt(_("Password: "), 100, false);
+                       simple_prompt("Password: ", password, sizeof(password), false);
+                       have_password = true;
                        need_password = false;
                }
 
                /* Use (or reuse, on a subsequent connection) password if we have it */
-               if (dbpassword)
+               if (have_password)
                {
                        keywords[i] = "password";
-                       values[i] = dbpassword;
+                       values[i] = password;
                }
                else
                {
@@ -182,7 +193,7 @@ GetConnection(void)
 
        if (PQstatus(tmpconn) != CONNECTION_OK)
        {
-               fprintf(stderr, _("%s: could not connect to server: %s\n"),
+               fprintf(stderr, _("%s: could not connect to server: %s"),
                                progname, PQerrorMessage(tmpconn));
                PQfinish(tmpconn);
                free(values);
@@ -198,28 +209,41 @@ GetConnection(void)
        if (conn_opts)
                PQconninfoFree(conn_opts);
 
+       /* Set always-secure search path, so malicious users can't get control. */
+       if (dbname != NULL)
+       {
+               PGresult   *res;
+
+               res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+               if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               {
+                       fprintf(stderr, _("%s: could not clear search_path: %s\n"),
+                                       progname, PQerrorMessage(tmpconn));
+                       PQclear(res);
+                       PQfinish(tmpconn);
+                       exit(1);
+               }
+               PQclear(res);
+       }
+
        /*
-        * Ensure we have the same value of integer timestamps as the server we
-        * are connecting to.
+        * Ensure we have the same value of integer_datetimes (now always "on") as
+        * the server we are connecting to.
         */
        tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
        if (!tmpparam)
        {
                fprintf(stderr,
-                _("%s: could not determine server setting for integer_datetimes\n"),
+                               _("%s: could not determine server setting for integer_datetimes\n"),
                                progname);
                PQfinish(tmpconn);
                exit(1);
        }
 
-#ifdef HAVE_INT64_TIMESTAMP
        if (strcmp(tmpparam, "on") != 0)
-#else
-       if (strcmp(tmpparam, "off") != 0)
-#endif
        {
                fprintf(stderr,
-                        _("%s: integer_datetimes compile flag does not match server\n"),
+                               _("%s: integer_datetimes compile flag does not match server\n"),
                                progname);
                PQfinish(tmpconn);
                exit(1);
@@ -228,13 +252,83 @@ GetConnection(void)
        return tmpconn;
 }
 
+/*
+ * From version 10, explicitly set wal segment size using SHOW wal_segment_size
+ * since ControlFile is not accessible here.
+ */
+bool
+RetrieveWalSegSize(PGconn *conn)
+{
+       PGresult   *res;
+       char            xlog_unit[3];
+       int                     xlog_val,
+                               multiplier = 1;
+
+       /* check connection existence */
+       Assert(conn != NULL);
+
+       /* for previous versions set the default xlog seg size */
+       if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
+       {
+               WalSegSz = DEFAULT_XLOG_SEG_SIZE;
+               return true;
+       }
+
+       res = PQexec(conn, "SHOW wal_segment_size");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
+                               progname, "SHOW wal_segment_size", PQerrorMessage(conn));
+
+               PQclear(res);
+               return false;
+       }
+       if (PQntuples(res) != 1 || PQnfields(res) < 1)
+       {
+               fprintf(stderr,
+                               _("%s: could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                               progname, PQntuples(res), PQnfields(res), 1, 1);
+
+               PQclear(res);
+               return false;
+       }
+
+       /* fetch xlog value and unit from the result */
+       if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &xlog_val, xlog_unit) != 2)
+       {
+               fprintf(stderr, _("%s: WAL segment size could not be parsed\n"),
+                               progname);
+               return false;
+       }
+
+       /* set the multiplier based on unit to convert xlog_val to bytes */
+       if (strcmp(xlog_unit, "MB") == 0)
+               multiplier = 1024 * 1024;
+       else if (strcmp(xlog_unit, "GB") == 0)
+               multiplier = 1024 * 1024 * 1024;
+
+       /* convert and set WalSegSz */
+       WalSegSz = xlog_val * multiplier;
+
+       if (!IsValidWalSegSize(WalSegSz))
+       {
+               fprintf(stderr,
+                               _("%s: WAL segment size must be a power of two between 1MB and 1GB, but the remote server reported a value of %d bytes\n"),
+                               progname, WalSegSz);
+               return false;
+       }
+
+       PQclear(res);
+       return true;
+}
+
 /*
  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
  * some result information if requested:
- * - Start LSN position
- * - Current timeline ID
  * - System identifier
- * - Plugin name
+ * - Current timeline ID
+ * - Start LSN position
+ * - Database name (NULL in servers prior to 9.4)
  */
 bool
 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
@@ -280,7 +374,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
                if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
                {
                        fprintf(stderr,
-                                 _("%s: could not parse transaction log location \"%s\"\n"),
+                                       _("%s: could not parse write-ahead log location \"%s\"\n"),
                                        progname, PQgetvalue(res, 0, 2));
 
                        PQclear(res);
@@ -292,15 +386,21 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
        /* Get database name, only available in 9.4 and newer versions */
        if (db_name != NULL)
        {
-               if (PQnfields(res) < 4)
-                       fprintf(stderr,
-                                       _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-                                       progname, PQntuples(res), PQnfields(res), 1, 4);
+               *db_name = NULL;
+               if (PQserverVersion(conn) >= 90400)
+               {
+                       if (PQnfields(res) < 4)
+                       {
+                               fprintf(stderr,
+                                               _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                                               progname, PQntuples(res), PQnfields(res), 1, 4);
 
-               if (PQgetisnull(res, 0, 3))
-                       *db_name = NULL;
-               else
-                       *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+                               PQclear(res);
+                               return false;
+                       }
+                       if (!PQgetisnull(res, 0, 3))
+                               *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+               }
        }
 
        PQclear(res);
@@ -309,12 +409,12 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 
 /*
  * Create a replication slot for the given connection. This function
- * returns true in case of success as well as the start position
- * obtained after the slot creation.
+ * returns true in case of success.
  */
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
-                                         XLogRecPtr *startpos, bool is_physical)
+                                         bool is_temporary, bool is_physical, bool reserve_wal,
+                                         bool slot_exists_ok)
 {
        PQExpBuffer query;
        PGresult   *res;
@@ -326,22 +426,45 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
        Assert(slot_name != NULL);
 
        /* Build query */
+       appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+       if (is_temporary)
+               appendPQExpBuffer(query, " TEMPORARY");
        if (is_physical)
-               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
-                                                 slot_name);
+       {
+               appendPQExpBuffer(query, " PHYSICAL");
+               if (reserve_wal)
+                       appendPQExpBuffer(query, " RESERVE_WAL");
+       }
        else
-               appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-                                                 slot_name, plugin);
+       {
+               appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+               if (PQserverVersion(conn) >= 100000)
+                       /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+                       appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+       }
 
        res = PQexec(conn, query->data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-                               progname, query->data, PQerrorMessage(conn));
+               const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
 
-               destroyPQExpBuffer(query);
-               PQclear(res);
-               return false;
+               if (slot_exists_ok &&
+                       sqlstate &&
+                       strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
+               {
+                       destroyPQExpBuffer(query);
+                       PQclear(res);
+                       return true;
+               }
+               else
+               {
+                       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                                       progname, query->data, PQerrorMessage(conn));
+
+                       destroyPQExpBuffer(query);
+                       PQclear(res);
+                       return false;
+               }
        }
 
        if (PQntuples(res) != 1 || PQnfields(res) != 4)
@@ -356,25 +479,6 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
                return false;
        }
 
-       /* Get LSN start position if necessary */
-       if (startpos != NULL)
-       {
-               uint32          hi,
-                                       lo;
-
-               if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
-               {
-                       fprintf(stderr,
-                                 _("%s: could not parse transaction log location \"%s\"\n"),
-                                       progname, PQgetvalue(res, 0, 1));
-
-                       destroyPQExpBuffer(query);
-                       PQclear(res);
-                       return false;
-               }
-               *startpos = ((uint64) hi) << 32 | lo;
-       }
-
        destroyPQExpBuffer(query);
        PQclear(res);
        return true;
@@ -420,6 +524,7 @@ DropReplicationSlot(PGconn *conn, const char *slot_name)
                return false;
        }
 
+       destroyPQExpBuffer(query);
        PQclear(res);
        return true;
 }
@@ -427,20 +532,18 @@ DropReplicationSlot(PGconn *conn, const char *slot_name)
 
 /*
  * Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The replication protocol always uses integer timestamps,
- * regardless of the server setting.
+ * backend code.
  */
-int64
+TimestampTz
 feGetCurrentTimestamp(void)
 {
-       int64           result;
+       TimestampTz result;
        struct timeval tp;
 
        gettimeofday(&tp, NULL);
 
-       result = (int64) tp.tv_sec -
+       result = (TimestampTz) tp.tv_sec -
                ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
        result = (result * USECS_PER_SEC) + tp.tv_usec;
 
        return result;
@@ -451,10 +554,10 @@ feGetCurrentTimestamp(void)
  * backend code.
  */
 void
-feTimestampDifference(int64 start_time, int64 stop_time,
+feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
                                          long *secs, int *microsecs)
 {
-       int64           diff = stop_time - start_time;
+       TimestampTz diff = stop_time - start_time;
 
        if (diff <= 0)
        {
@@ -473,11 +576,11 @@ feTimestampDifference(int64 start_time, int64 stop_time,
  * linked with backend code.
  */
 bool
-feTimestampDifferenceExceeds(int64 start_time,
-                                                        int64 stop_time,
+feTimestampDifferenceExceeds(TimestampTz start_time,
+                                                        TimestampTz stop_time,
                                                         int msec)
 {
-       int64           diff = stop_time - start_time;
+       TimestampTz diff = stop_time - start_time;
 
        return (diff >= msec * INT64CONST(1000));
 }
@@ -488,17 +591,9 @@ feTimestampDifferenceExceeds(int64 start_time,
 void
 fe_sendint64(int64 i, char *buf)
 {
-       uint32          n32;
+       uint64          n64 = pg_hton64(i);
 
-       /* High order half first, since we're doing MSB-first */
-       n32 = (uint32) (i >> 32);
-       n32 = htonl(n32);
-       memcpy(&buf[0], &n32, 4);
-
-       /* Now the low order half */
-       n32 = (uint32) i;
-       n32 = htonl(n32);
-       memcpy(&buf[4], &n32, 4);
+       memcpy(buf, &n64, sizeof(n64));
 }
 
 /*
@@ -507,18 +602,9 @@ fe_sendint64(int64 i, char *buf)
 int64
 fe_recvint64(char *buf)
 {
-       int64           result;
-       uint32          h32;
-       uint32          l32;
-
-       memcpy(&h32, buf, 4);
-       memcpy(&l32, buf + 4, 4);
-       h32 = ntohl(h32);
-       l32 = ntohl(l32);
+       uint64          n64;
 
-       result = h32;
-       result <<= 32;
-       result |= l32;
+       memcpy(&n64, buf, sizeof(n64));
 
-       return result;
+       return pg_ntoh64(n64);
 }