1 /*-------------------------------------------------------------------------
3 * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/streamutil.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
25 #include "receivelog.h"
26 #include "streamutil.h"
28 #include "access/xlog_internal.h"
29 #include "pqexpbuffer.h"
30 #include "common/fe_memutils.h"
31 #include "datatype/timestamp.h"
33 #define ERRCODE_DUPLICATE_OBJECT "42710"
37 /* SHOW command for replication connection was introduced in version 10 */
38 #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
41 char *connection_string = NULL;
46 int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
47 static bool have_password = false;
48 static char password[100];
52 * Connect to the server. Returns a valid PGconn pointer if connected,
53 * or NULL on non-permanent error. On permanent error, the function will
54 * call exit(1) directly.
60 int argcount = 7; /* dbname, replication, fallback_app_name,
61 * host, user, port, password */
63 const char **keywords;
67 PQconninfoOption *conn_opts = NULL;
68 PQconninfoOption *conn_opt;
71 /* pg_recvlogical uses dbname only; others use connection_string only. */
72 Assert(dbname == NULL || connection_string == NULL);
75 * Merge the connection info inputs given in form of connection string,
76 * options and default values (dbname=replication, replication=true, etc.)
77 * Explicitly discard any dbname value in the connection string;
78 * otherwise, PQconnectdbParams() would interpret that value as being
79 * itself a connection string.
82 if (connection_string)
84 conn_opts = PQconninfoParse(connection_string, &err_msg);
85 if (conn_opts == NULL)
87 fprintf(stderr, "%s: %s", progname, err_msg);
91 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
93 if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
94 strcmp(conn_opt->keyword, "dbname") != 0)
98 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
99 values = pg_malloc0((argcount + 1) * sizeof(*values));
101 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
103 if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
104 strcmp(conn_opt->keyword, "dbname") != 0)
106 keywords[i] = conn_opt->keyword;
107 values[i] = conn_opt->val;
114 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
115 values = pg_malloc0((argcount + 1) * sizeof(*values));
118 keywords[i] = "dbname";
119 values[i] = dbname == NULL ? "replication" : dbname;
121 keywords[i] = "replication";
122 values[i] = dbname == NULL ? "true" : "database";
124 keywords[i] = "fallback_application_name";
125 values[i] = progname;
130 keywords[i] = "host";
136 keywords[i] = "user";
142 keywords[i] = "port";
147 /* If -W was given, force prompt for password, but only the first time */
148 need_password = (dbgetpassword == 1 && !have_password);
152 /* Get a new password if appropriate */
155 simple_prompt("Password: ", password, sizeof(password), false);
156 have_password = true;
157 need_password = false;
160 /* Use (or reuse, on a subsequent connection) password if we have it */
163 keywords[i] = "password";
164 values[i] = password;
172 tmpconn = PQconnectdbParams(keywords, values, true);
175 * If there is too little memory even to allocate the PGconn object
176 * and PQconnectdbParams returns NULL, we call exit(1) directly.
180 fprintf(stderr, _("%s: could not connect to server\n"),
185 /* If we need a password and -w wasn't given, loop back and get one */
186 if (PQstatus(tmpconn) == CONNECTION_BAD &&
187 PQconnectionNeedsPassword(tmpconn) &&
191 need_password = true;
194 while (need_password);
196 if (PQstatus(tmpconn) != CONNECTION_OK)
198 fprintf(stderr, _("%s: could not connect to server: %s"),
199 progname, PQerrorMessage(tmpconn));
204 PQconninfoFree(conn_opts);
212 PQconninfoFree(conn_opts);
215 * Ensure we have the same value of integer_datetimes (now always "on") as
216 * the server we are connecting to.
218 tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
222 _("%s: could not determine server setting for integer_datetimes\n"),
228 if (strcmp(tmpparam, "on") != 0)
231 _("%s: integer_datetimes compile flag does not match server\n"),
241 * From version 10, explicitly set wal segment size using SHOW wal_segment_size
242 * since ControlFile is not accessible here.
245 RetrieveWalSegSize(PGconn *conn)
252 /* check connection existence */
253 Assert(conn != NULL);
255 /* for previous versions set the default xlog seg size */
256 if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
258 WalSegSz = DEFAULT_XLOG_SEG_SIZE;
262 res = PQexec(conn, "SHOW wal_segment_size");
263 if (PQresultStatus(res) != PGRES_TUPLES_OK)
265 fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
266 progname, "SHOW wal_segment_size", PQerrorMessage(conn));
271 if (PQntuples(res) != 1 || PQnfields(res) < 1)
274 _("%s: could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
275 progname, PQntuples(res), PQnfields(res), 1, 1);
281 /* fetch xlog value and unit from the result */
282 if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &xlog_val, xlog_unit) != 2)
284 fprintf(stderr, _("%s: WAL segment size could not be parsed\n"),
289 /* set the multiplier based on unit to convert xlog_val to bytes */
290 if (strcmp(xlog_unit, "MB") == 0)
291 multiplier = 1024 * 1024;
292 else if (strcmp(xlog_unit, "GB") == 0)
293 multiplier = 1024 * 1024 * 1024;
295 /* convert and set WalSegSz */
296 WalSegSz = xlog_val * multiplier;
298 if (!IsValidWalSegSize(WalSegSz))
301 _("%s: WAL segment size must be a power of two between 1MB and 1GB, but the remote server reported a value of %d bytes\n"),
311 * Run IDENTIFY_SYSTEM through a given connection and give back to caller
312 * some result information if requested:
313 * - System identifier
314 * - Current timeline ID
315 * - Start LSN position
316 * - Database name (NULL in servers prior to 9.4)
319 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
320 XLogRecPtr *startpos, char **db_name)
326 /* Check connection existence */
327 Assert(conn != NULL);
329 res = PQexec(conn, "IDENTIFY_SYSTEM");
330 if (PQresultStatus(res) != PGRES_TUPLES_OK)
332 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
333 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
338 if (PQntuples(res) != 1 || PQnfields(res) < 3)
341 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
342 progname, PQntuples(res), PQnfields(res), 1, 3);
348 /* Get system identifier */
350 *sysid = pg_strdup(PQgetvalue(res, 0, 0));
352 /* Get timeline ID to start streaming from */
353 if (starttli != NULL)
354 *starttli = atoi(PQgetvalue(res, 0, 1));
356 /* Get LSN start position if necessary */
357 if (startpos != NULL)
359 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
362 _("%s: could not parse write-ahead log location \"%s\"\n"),
363 progname, PQgetvalue(res, 0, 2));
368 *startpos = ((uint64) hi) << 32 | lo;
371 /* Get database name, only available in 9.4 and newer versions */
375 if (PQserverVersion(conn) >= 90400)
377 if (PQnfields(res) < 4)
380 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
381 progname, PQntuples(res), PQnfields(res), 1, 4);
386 if (!PQgetisnull(res, 0, 3))
387 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
396 * Create a replication slot for the given connection. This function
397 * returns true in case of success.
400 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
401 bool is_physical, bool slot_exists_ok)
406 query = createPQExpBuffer();
408 Assert((is_physical && plugin == NULL) ||
409 (!is_physical && plugin != NULL));
410 Assert(slot_name != NULL);
414 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
418 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
420 if (PQserverVersion(conn) >= 100000)
421 /* pg_recvlogical doesn't use an exported snapshot, so suppress */
422 appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
425 res = PQexec(conn, query->data);
426 if (PQresultStatus(res) != PGRES_TUPLES_OK)
428 const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
430 if (slot_exists_ok &&
432 strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
434 destroyPQExpBuffer(query);
440 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
441 progname, query->data, PQerrorMessage(conn));
443 destroyPQExpBuffer(query);
449 if (PQntuples(res) != 1 || PQnfields(res) != 4)
452 _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
454 PQntuples(res), PQnfields(res), 1, 4);
456 destroyPQExpBuffer(query);
461 destroyPQExpBuffer(query);
467 * Drop a replication slot for the given connection. This function
468 * returns true in case of success.
471 DropReplicationSlot(PGconn *conn, const char *slot_name)
476 Assert(slot_name != NULL);
478 query = createPQExpBuffer();
481 appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
483 res = PQexec(conn, query->data);
484 if (PQresultStatus(res) != PGRES_COMMAND_OK)
486 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
487 progname, query->data, PQerrorMessage(conn));
489 destroyPQExpBuffer(query);
494 if (PQntuples(res) != 0 || PQnfields(res) != 0)
497 _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
499 PQntuples(res), PQnfields(res), 0, 0);
501 destroyPQExpBuffer(query);
506 destroyPQExpBuffer(query);
513 * Frontend version of GetCurrentTimestamp(), since we are not linked with
517 feGetCurrentTimestamp(void)
522 gettimeofday(&tp, NULL);
524 result = (TimestampTz) tp.tv_sec -
525 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
526 result = (result * USECS_PER_SEC) + tp.tv_usec;
532 * Frontend version of TimestampDifference(), since we are not linked with
536 feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
537 long *secs, int *microsecs)
539 TimestampTz diff = stop_time - start_time;
548 *secs = (long) (diff / USECS_PER_SEC);
549 *microsecs = (int) (diff % USECS_PER_SEC);
554 * Frontend version of TimestampDifferenceExceeds(), since we are not
555 * linked with backend code.
558 feTimestampDifferenceExceeds(TimestampTz start_time,
559 TimestampTz stop_time,
562 TimestampTz diff = stop_time - start_time;
564 return (diff >= msec * INT64CONST(1000));
568 * Converts an int64 to network byte order.
571 fe_sendint64(int64 i, char *buf)
575 /* High order half first, since we're doing MSB-first */
576 n32 = (uint32) (i >> 32);
578 memcpy(&buf[0], &n32, 4);
580 /* Now the low order half */
583 memcpy(&buf[4], &n32, 4);
587 * Converts an int64 from network byte order to native format.
590 fe_recvint64(char *buf)
596 memcpy(&h32, buf, 4);
597 memcpy(&l32, buf + 4, 4);