/*-------------------------------------------------------------------------
*
- * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
+ * pg_recvlogical
*
* Author: Magnus Hagander <magnus@hagander.net>
*
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/streamutil.c
#include "postgres_fe.h"
-#include <stdio.h>
-#include <string.h>
#include <sys/time.h>
-#include <sys/types.h>
#include <unistd.h>
-/* for ntohl/htonl */
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
/* local includes */
#include "receivelog.h"
#include "streamutil.h"
-#include "pqexpbuffer.h"
+#include "access/xlog_internal.h"
#include "common/fe_memutils.h"
#include "datatype/timestamp.h"
+#include "fe_utils/connect.h"
+#include "port/pg_bswap.h"
+#include "pqexpbuffer.h"
+
+#define ERRCODE_DUPLICATE_OBJECT "42710"
+
+uint32 WalSegSz;
+
+/* SHOW command for replication connection was introduced in version 10 */
+#define MINIMUM_VERSION_FOR_SHOW_CMD 100000
const char *progname;
char *connection_string = NULL;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
-char *replication_slot = NULL;
char *dbname = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
-static char *dbpassword = NULL;
+static bool have_password = false;
+static char password[100];
PGconn *conn = NULL;
/*
PQconninfoOption *conn_opt;
char *err_msg = NULL;
+ /* pg_recvlogical uses dbname only; others use connection_string only. */
+ Assert(dbname == NULL || connection_string == NULL);
+
/*
* Merge the connection info inputs given in form of connection string,
* options and default values (dbname=replication, replication=true, etc.)
+ * Explicitly discard any dbname value in the connection string;
+ * otherwise, PQconnectdbParams() would interpret that value as being
+ * itself a connection string.
*/
i = 0;
if (connection_string)
for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
{
- if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+ if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+ strcmp(conn_opt->keyword, "dbname") != 0)
argcount++;
}
for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
{
- if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+ if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+ strcmp(conn_opt->keyword, "dbname") != 0)
{
keywords[i] = conn_opt->keyword;
values[i] = conn_opt->val;
}
/* If -W was given, force prompt for password, but only the first time */
- need_password = (dbgetpassword == 1 && dbpassword == NULL);
+ need_password = (dbgetpassword == 1 && !have_password);
do
{
/* Get a new password if appropriate */
if (need_password)
{
- if (dbpassword)
- free(dbpassword);
- dbpassword = simple_prompt(_("Password: "), 100, false);
+ simple_prompt("Password: ", password, sizeof(password), false);
+ have_password = true;
need_password = false;
}
/* Use (or reuse, on a subsequent connection) password if we have it */
- if (dbpassword)
+ if (have_password)
{
keywords[i] = "password";
- values[i] = dbpassword;
+ values[i] = password;
}
else
{
if (PQstatus(tmpconn) != CONNECTION_OK)
{
- fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ fprintf(stderr, _("%s: could not connect to server: %s"),
progname, PQerrorMessage(tmpconn));
PQfinish(tmpconn);
free(values);
if (conn_opts)
PQconninfoFree(conn_opts);
+ /* Set always-secure search path, so malicious users can't get control. */
+ if (dbname != NULL)
+ {
+ PGresult *res;
+
+ res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not clear search_path: %s\n"),
+ progname, PQerrorMessage(tmpconn));
+ PQclear(res);
+ PQfinish(tmpconn);
+ exit(1);
+ }
+ PQclear(res);
+ }
+
/*
- * Ensure we have the same value of integer timestamps as the server we
- * are connecting to.
+ * Ensure we have the same value of integer_datetimes (now always "on") as
+ * the server we are connecting to.
*/
tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
if (!tmpparam)
{
fprintf(stderr,
- _("%s: could not determine server setting for integer_datetimes\n"),
+ _("%s: could not determine server setting for integer_datetimes\n"),
progname);
PQfinish(tmpconn);
exit(1);
}
-#ifdef HAVE_INT64_TIMESTAMP
if (strcmp(tmpparam, "on") != 0)
-#else
- if (strcmp(tmpparam, "off") != 0)
-#endif
{
fprintf(stderr,
- _("%s: integer_datetimes compile flag does not match server\n"),
+ _("%s: integer_datetimes compile flag does not match server\n"),
progname);
PQfinish(tmpconn);
exit(1);
return tmpconn;
}
+/*
+ * From version 10, explicitly set wal segment size using SHOW wal_segment_size
+ * since ControlFile is not accessible here.
+ */
+bool
+RetrieveWalSegSize(PGconn *conn)
+{
+ PGresult *res;
+ char xlog_unit[3];
+ int xlog_val,
+ multiplier = 1;
+
+ /* check connection existence */
+ Assert(conn != NULL);
+
+ /* for previous versions set the default xlog seg size */
+ if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
+ {
+ WalSegSz = DEFAULT_XLOG_SEG_SIZE;
+ return true;
+ }
+
+ res = PQexec(conn, "SHOW wal_segment_size");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
+ progname, "SHOW wal_segment_size", PQerrorMessage(conn));
+
+ PQclear(res);
+ return false;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 1)
+ {
+ fprintf(stderr,
+ _("%s: could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 1);
+
+ PQclear(res);
+ return false;
+ }
+
+ /* fetch xlog value and unit from the result */
+ if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &xlog_val, xlog_unit) != 2)
+ {
+ fprintf(stderr, _("%s: WAL segment size could not be parsed\n"),
+ progname);
+ return false;
+ }
+
+ /* set the multiplier based on unit to convert xlog_val to bytes */
+ if (strcmp(xlog_unit, "MB") == 0)
+ multiplier = 1024 * 1024;
+ else if (strcmp(xlog_unit, "GB") == 0)
+ multiplier = 1024 * 1024 * 1024;
+
+ /* convert and set WalSegSz */
+ WalSegSz = xlog_val * multiplier;
+
+ if (!IsValidWalSegSize(WalSegSz))
+ {
+ fprintf(stderr,
+ _("%s: WAL segment size must be a power of two between 1MB and 1GB, but the remote server reported a value of %d bytes\n"),
+ progname, WalSegSz);
+ return false;
+ }
+
+ PQclear(res);
+ return true;
+}
+
/*
* Run IDENTIFY_SYSTEM through a given connection and give back to caller
* some result information if requested:
- * - Start LSN position
- * - Current timeline ID
* - System identifier
- * - Plugin name
+ * - Current timeline ID
+ * - Start LSN position
+ * - Database name (NULL in servers prior to 9.4)
*/
bool
RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
+ _("%s: could not parse write-ahead log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 2));
PQclear(res);
/* Get database name, only available in 9.4 and newer versions */
if (db_name != NULL)
{
- if (PQnfields(res) < 4)
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 4);
+ *db_name = NULL;
+ if (PQserverVersion(conn) >= 90400)
+ {
+ if (PQnfields(res) < 4)
+ {
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 4);
- if (PQgetisnull(res, 0, 3))
- *db_name = NULL;
- else
- *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+ PQclear(res);
+ return false;
+ }
+ if (!PQgetisnull(res, 0, 3))
+ *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+ }
}
PQclear(res);
/*
* Create a replication slot for the given connection. This function
- * returns true in case of success as well as the start position
- * obtained after the slot creation.
+ * returns true in case of success.
*/
bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
- XLogRecPtr *startpos, bool is_physical)
+ bool is_temporary, bool is_physical, bool reserve_wal,
+ bool slot_exists_ok)
{
PQExpBuffer query;
PGresult *res;
Assert(slot_name != NULL);
/* Build query */
+ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+ if (is_temporary)
+ appendPQExpBuffer(query, " TEMPORARY");
if (is_physical)
- appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
- slot_name);
+ {
+ appendPQExpBuffer(query, " PHYSICAL");
+ if (reserve_wal)
+ appendPQExpBuffer(query, " RESERVE_WAL");
+ }
else
- appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
- slot_name, plugin);
+ {
+ appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+ if (PQserverVersion(conn) >= 100000)
+ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+ appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+ }
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query->data, PQerrorMessage(conn));
+ const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- destroyPQExpBuffer(query);
- PQclear(res);
- return false;
+ if (slot_exists_ok &&
+ sqlstate &&
+ strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
+ {
+ destroyPQExpBuffer(query);
+ PQclear(res);
+ return true;
+ }
+ else
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query->data, PQerrorMessage(conn));
+
+ destroyPQExpBuffer(query);
+ PQclear(res);
+ return false;
+ }
}
if (PQntuples(res) != 1 || PQnfields(res) != 4)
return false;
}
- /* Get LSN start position if necessary */
- if (startpos != NULL)
- {
- uint32 hi,
- lo;
-
- if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
- {
- fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
- progname, PQgetvalue(res, 0, 1));
-
- destroyPQExpBuffer(query);
- PQclear(res);
- return false;
- }
- *startpos = ((uint64) hi) << 32 | lo;
- }
-
destroyPQExpBuffer(query);
PQclear(res);
return true;
return false;
}
+ destroyPQExpBuffer(query);
PQclear(res);
return true;
}
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The replication protocol always uses integer timestamps,
- * regardless of the server setting.
+ * backend code.
*/
-int64
+TimestampTz
feGetCurrentTimestamp(void)
{
- int64 result;
+ TimestampTz result;
struct timeval tp;
gettimeofday(&tp, NULL);
- result = (int64) tp.tv_sec -
+ result = (TimestampTz) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
result = (result * USECS_PER_SEC) + tp.tv_usec;
return result;
* backend code.
*/
void
-feTimestampDifference(int64 start_time, int64 stop_time,
+feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs)
{
- int64 diff = stop_time - start_time;
+ TimestampTz diff = stop_time - start_time;
if (diff <= 0)
{
* linked with backend code.
*/
bool
-feTimestampDifferenceExceeds(int64 start_time,
- int64 stop_time,
+feTimestampDifferenceExceeds(TimestampTz start_time,
+ TimestampTz stop_time,
int msec)
{
- int64 diff = stop_time - start_time;
+ TimestampTz diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000));
}
void
fe_sendint64(int64 i, char *buf)
{
- uint32 n32;
+ uint64 n64 = pg_hton64(i);
- /* High order half first, since we're doing MSB-first */
- n32 = (uint32) (i >> 32);
- n32 = htonl(n32);
- memcpy(&buf[0], &n32, 4);
-
- /* Now the low order half */
- n32 = (uint32) i;
- n32 = htonl(n32);
- memcpy(&buf[4], &n32, 4);
+ memcpy(buf, &n64, sizeof(n64));
}
/*
int64
fe_recvint64(char *buf)
{
- int64 result;
- uint32 h32;
- uint32 l32;
-
- memcpy(&h32, buf, 4);
- memcpy(&l32, buf + 4, 4);
- h32 = ntohl(h32);
- l32 = ntohl(l32);
+ uint64 n64;
- result = h32;
- result <<= 32;
- result |= l32;
+ memcpy(&n64, buf, sizeof(n64));
- return result;
+ return pg_ntoh64(n64);
}