]> granicus.if.org Git - postgresql/blob - src/bin/pg_basebackup/streamutil.c
df17f60596a4b74d98810fdd876beec9050cb234
[postgresql] / src / bin / pg_basebackup / streamutil.c
1 /*-------------------------------------------------------------------------
2  *
3  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
4  *                                      pg_recvlogical
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *                src/bin/pg_basebackup/streamutil.c
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres_fe.h"
16
17 #include <sys/time.h>
18 #include <unistd.h>
19
20 /* for ntohl/htonl */
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23
24 /* local includes */
25 #include "receivelog.h"
26 #include "streamutil.h"
27
28 #include "access/xlog_internal.h"
29 #include "pqexpbuffer.h"
30 #include "common/fe_memutils.h"
31 #include "datatype/timestamp.h"
32
33 #define ERRCODE_DUPLICATE_OBJECT  "42710"
34
35 uint32          WalSegSz;
36
37 /* SHOW command for replication connection was introduced in version 10 */
38 #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
39
40 const char *progname;
41 char       *connection_string = NULL;
42 char       *dbhost = NULL;
43 char       *dbuser = NULL;
44 char       *dbport = NULL;
45 char       *dbname = NULL;
46 int                     dbgetpassword = 0;      /* 0=auto, -1=never, 1=always */
47 static bool have_password = false;
48 static char password[100];
49 PGconn     *conn = NULL;
50
51 /*
52  * Connect to the server. Returns a valid PGconn pointer if connected,
53  * or NULL on non-permanent error. On permanent error, the function will
54  * call exit(1) directly.
55  */
56 PGconn *
57 GetConnection(void)
58 {
59         PGconn     *tmpconn;
60         int                     argcount = 7;   /* dbname, replication, fallback_app_name,
61                                                                  * host, user, port, password */
62         int                     i;
63         const char **keywords;
64         const char **values;
65         const char *tmpparam;
66         bool            need_password;
67         PQconninfoOption *conn_opts = NULL;
68         PQconninfoOption *conn_opt;
69         char       *err_msg = NULL;
70
71         /* pg_recvlogical uses dbname only; others use connection_string only. */
72         Assert(dbname == NULL || connection_string == NULL);
73
74         /*
75          * Merge the connection info inputs given in form of connection string,
76          * options and default values (dbname=replication, replication=true, etc.)
77          * Explicitly discard any dbname value in the connection string;
78          * otherwise, PQconnectdbParams() would interpret that value as being
79          * itself a connection string.
80          */
81         i = 0;
82         if (connection_string)
83         {
84                 conn_opts = PQconninfoParse(connection_string, &err_msg);
85                 if (conn_opts == NULL)
86                 {
87                         fprintf(stderr, "%s: %s", progname, err_msg);
88                         exit(1);
89                 }
90
91                 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
92                 {
93                         if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
94                                 strcmp(conn_opt->keyword, "dbname") != 0)
95                                 argcount++;
96                 }
97
98                 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
99                 values = pg_malloc0((argcount + 1) * sizeof(*values));
100
101                 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
102                 {
103                         if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
104                                 strcmp(conn_opt->keyword, "dbname") != 0)
105                         {
106                                 keywords[i] = conn_opt->keyword;
107                                 values[i] = conn_opt->val;
108                                 i++;
109                         }
110                 }
111         }
112         else
113         {
114                 keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
115                 values = pg_malloc0((argcount + 1) * sizeof(*values));
116         }
117
118         keywords[i] = "dbname";
119         values[i] = dbname == NULL ? "replication" : dbname;
120         i++;
121         keywords[i] = "replication";
122         values[i] = dbname == NULL ? "true" : "database";
123         i++;
124         keywords[i] = "fallback_application_name";
125         values[i] = progname;
126         i++;
127
128         if (dbhost)
129         {
130                 keywords[i] = "host";
131                 values[i] = dbhost;
132                 i++;
133         }
134         if (dbuser)
135         {
136                 keywords[i] = "user";
137                 values[i] = dbuser;
138                 i++;
139         }
140         if (dbport)
141         {
142                 keywords[i] = "port";
143                 values[i] = dbport;
144                 i++;
145         }
146
147         /* If -W was given, force prompt for password, but only the first time */
148         need_password = (dbgetpassword == 1 && !have_password);
149
150         do
151         {
152                 /* Get a new password if appropriate */
153                 if (need_password)
154                 {
155                         simple_prompt("Password: ", password, sizeof(password), false);
156                         have_password = true;
157                         need_password = false;
158                 }
159
160                 /* Use (or reuse, on a subsequent connection) password if we have it */
161                 if (have_password)
162                 {
163                         keywords[i] = "password";
164                         values[i] = password;
165                 }
166                 else
167                 {
168                         keywords[i] = NULL;
169                         values[i] = NULL;
170                 }
171
172                 tmpconn = PQconnectdbParams(keywords, values, true);
173
174                 /*
175                  * If there is too little memory even to allocate the PGconn object
176                  * and PQconnectdbParams returns NULL, we call exit(1) directly.
177                  */
178                 if (!tmpconn)
179                 {
180                         fprintf(stderr, _("%s: could not connect to server\n"),
181                                         progname);
182                         exit(1);
183                 }
184
185                 /* If we need a password and -w wasn't given, loop back and get one */
186                 if (PQstatus(tmpconn) == CONNECTION_BAD &&
187                         PQconnectionNeedsPassword(tmpconn) &&
188                         dbgetpassword != -1)
189                 {
190                         PQfinish(tmpconn);
191                         need_password = true;
192                 }
193         }
194         while (need_password);
195
196         if (PQstatus(tmpconn) != CONNECTION_OK)
197         {
198                 fprintf(stderr, _("%s: could not connect to server: %s"),
199                                 progname, PQerrorMessage(tmpconn));
200                 PQfinish(tmpconn);
201                 free(values);
202                 free(keywords);
203                 if (conn_opts)
204                         PQconninfoFree(conn_opts);
205                 return NULL;
206         }
207
208         /* Connection ok! */
209         free(values);
210         free(keywords);
211         if (conn_opts)
212                 PQconninfoFree(conn_opts);
213
214         /*
215          * Ensure we have the same value of integer_datetimes (now always "on") as
216          * the server we are connecting to.
217          */
218         tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
219         if (!tmpparam)
220         {
221                 fprintf(stderr,
222                                 _("%s: could not determine server setting for integer_datetimes\n"),
223                                 progname);
224                 PQfinish(tmpconn);
225                 exit(1);
226         }
227
228         if (strcmp(tmpparam, "on") != 0)
229         {
230                 fprintf(stderr,
231                                 _("%s: integer_datetimes compile flag does not match server\n"),
232                                 progname);
233                 PQfinish(tmpconn);
234                 exit(1);
235         }
236
237         return tmpconn;
238 }
239
240 /*
241  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
242  * since ControlFile is not accessible here.
243  */
244 bool
245 RetrieveWalSegSize(PGconn *conn)
246 {
247         PGresult   *res;
248         char            xlog_unit[3];
249         int                     xlog_val,
250                                 multiplier = 1;
251
252         /* check connection existence */
253         Assert(conn != NULL);
254
255         /* for previous versions set the default xlog seg size */
256         if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
257         {
258                 WalSegSz = DEFAULT_XLOG_SEG_SIZE;
259                 return true;
260         }
261
262         res = PQexec(conn, "SHOW wal_segment_size");
263         if (PQresultStatus(res) != PGRES_TUPLES_OK)
264         {
265                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
266                                 progname, "SHOW wal_segment_size", PQerrorMessage(conn));
267
268                 PQclear(res);
269                 return false;
270         }
271         if (PQntuples(res) != 1 || PQnfields(res) < 1)
272         {
273                 fprintf(stderr,
274                                 _("%s: could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
275                                 progname, PQntuples(res), PQnfields(res), 1, 1);
276
277                 PQclear(res);
278                 return false;
279         }
280
281         /* fetch xlog value and unit from the result */
282         if (sscanf(PQgetvalue(res, 0, 0), "%d%s", &xlog_val, xlog_unit) != 2)
283         {
284                 fprintf(stderr, _("%s: WAL segment size could not be parsed\n"),
285                                 progname);
286                 return false;
287         }
288
289         /* set the multiplier based on unit to convert xlog_val to bytes */
290         if (strcmp(xlog_unit, "MB") == 0)
291                 multiplier = 1024 * 1024;
292         else if (strcmp(xlog_unit, "GB") == 0)
293                 multiplier = 1024 * 1024 * 1024;
294
295         /* convert and set WalSegSz */
296         WalSegSz = xlog_val * multiplier;
297
298         if (!IsValidWalSegSize(WalSegSz))
299         {
300                 fprintf(stderr,
301                                 _("%s: WAL segment size must be a power of two between 1MB and 1GB, but the remote server reported a value of %d bytes\n"),
302                                 progname, WalSegSz);
303                 return false;
304         }
305
306         PQclear(res);
307         return true;
308 }
309
310 /*
311  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
312  * some result information if requested:
313  * - System identifier
314  * - Current timeline ID
315  * - Start LSN position
316  * - Database name (NULL in servers prior to 9.4)
317  */
318 bool
319 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
320                                   XLogRecPtr *startpos, char **db_name)
321 {
322         PGresult   *res;
323         uint32          hi,
324                                 lo;
325
326         /* Check connection existence */
327         Assert(conn != NULL);
328
329         res = PQexec(conn, "IDENTIFY_SYSTEM");
330         if (PQresultStatus(res) != PGRES_TUPLES_OK)
331         {
332                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
333                                 progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
334
335                 PQclear(res);
336                 return false;
337         }
338         if (PQntuples(res) != 1 || PQnfields(res) < 3)
339         {
340                 fprintf(stderr,
341                                 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
342                                 progname, PQntuples(res), PQnfields(res), 1, 3);
343
344                 PQclear(res);
345                 return false;
346         }
347
348         /* Get system identifier */
349         if (sysid != NULL)
350                 *sysid = pg_strdup(PQgetvalue(res, 0, 0));
351
352         /* Get timeline ID to start streaming from */
353         if (starttli != NULL)
354                 *starttli = atoi(PQgetvalue(res, 0, 1));
355
356         /* Get LSN start position if necessary */
357         if (startpos != NULL)
358         {
359                 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
360                 {
361                         fprintf(stderr,
362                                         _("%s: could not parse write-ahead log location \"%s\"\n"),
363                                         progname, PQgetvalue(res, 0, 2));
364
365                         PQclear(res);
366                         return false;
367                 }
368                 *startpos = ((uint64) hi) << 32 | lo;
369         }
370
371         /* Get database name, only available in 9.4 and newer versions */
372         if (db_name != NULL)
373         {
374                 *db_name = NULL;
375                 if (PQserverVersion(conn) >= 90400)
376                 {
377                         if (PQnfields(res) < 4)
378                         {
379                                 fprintf(stderr,
380                                                 _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
381                                                 progname, PQntuples(res), PQnfields(res), 1, 4);
382
383                                 PQclear(res);
384                                 return false;
385                         }
386                         if (!PQgetisnull(res, 0, 3))
387                                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
388                 }
389         }
390
391         PQclear(res);
392         return true;
393 }
394
395 /*
396  * Create a replication slot for the given connection. This function
397  * returns true in case of success.
398  */
399 bool
400 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
401                                           bool is_physical, bool slot_exists_ok)
402 {
403         PQExpBuffer query;
404         PGresult   *res;
405
406         query = createPQExpBuffer();
407
408         Assert((is_physical && plugin == NULL) ||
409                    (!is_physical && plugin != NULL));
410         Assert(slot_name != NULL);
411
412         /* Build query */
413         if (is_physical)
414                 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
415                                                   slot_name);
416         else
417         {
418                 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
419                                                   slot_name, plugin);
420                 if (PQserverVersion(conn) >= 100000)
421                         /* pg_recvlogical doesn't use an exported snapshot, so suppress */
422                         appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
423         }
424
425         res = PQexec(conn, query->data);
426         if (PQresultStatus(res) != PGRES_TUPLES_OK)
427         {
428                 const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
429
430                 if (slot_exists_ok &&
431                         sqlstate &&
432                         strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
433                 {
434                         destroyPQExpBuffer(query);
435                         PQclear(res);
436                         return true;
437                 }
438                 else
439                 {
440                         fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
441                                         progname, query->data, PQerrorMessage(conn));
442
443                         destroyPQExpBuffer(query);
444                         PQclear(res);
445                         return false;
446                 }
447         }
448
449         if (PQntuples(res) != 1 || PQnfields(res) != 4)
450         {
451                 fprintf(stderr,
452                                 _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
453                                 progname, slot_name,
454                                 PQntuples(res), PQnfields(res), 1, 4);
455
456                 destroyPQExpBuffer(query);
457                 PQclear(res);
458                 return false;
459         }
460
461         destroyPQExpBuffer(query);
462         PQclear(res);
463         return true;
464 }
465
466 /*
467  * Drop a replication slot for the given connection. This function
468  * returns true in case of success.
469  */
470 bool
471 DropReplicationSlot(PGconn *conn, const char *slot_name)
472 {
473         PQExpBuffer query;
474         PGresult   *res;
475
476         Assert(slot_name != NULL);
477
478         query = createPQExpBuffer();
479
480         /* Build query */
481         appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
482                                           slot_name);
483         res = PQexec(conn, query->data);
484         if (PQresultStatus(res) != PGRES_COMMAND_OK)
485         {
486                 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
487                                 progname, query->data, PQerrorMessage(conn));
488
489                 destroyPQExpBuffer(query);
490                 PQclear(res);
491                 return false;
492         }
493
494         if (PQntuples(res) != 0 || PQnfields(res) != 0)
495         {
496                 fprintf(stderr,
497                                 _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
498                                 progname, slot_name,
499                                 PQntuples(res), PQnfields(res), 0, 0);
500
501                 destroyPQExpBuffer(query);
502                 PQclear(res);
503                 return false;
504         }
505
506         destroyPQExpBuffer(query);
507         PQclear(res);
508         return true;
509 }
510
511
512 /*
513  * Frontend version of GetCurrentTimestamp(), since we are not linked with
514  * backend code.
515  */
516 TimestampTz
517 feGetCurrentTimestamp(void)
518 {
519         TimestampTz result;
520         struct timeval tp;
521
522         gettimeofday(&tp, NULL);
523
524         result = (TimestampTz) tp.tv_sec -
525                 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
526         result = (result * USECS_PER_SEC) + tp.tv_usec;
527
528         return result;
529 }
530
531 /*
532  * Frontend version of TimestampDifference(), since we are not linked with
533  * backend code.
534  */
535 void
536 feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
537                                           long *secs, int *microsecs)
538 {
539         TimestampTz diff = stop_time - start_time;
540
541         if (diff <= 0)
542         {
543                 *secs = 0;
544                 *microsecs = 0;
545         }
546         else
547         {
548                 *secs = (long) (diff / USECS_PER_SEC);
549                 *microsecs = (int) (diff % USECS_PER_SEC);
550         }
551 }
552
553 /*
554  * Frontend version of TimestampDifferenceExceeds(), since we are not
555  * linked with backend code.
556  */
557 bool
558 feTimestampDifferenceExceeds(TimestampTz start_time,
559                                                          TimestampTz stop_time,
560                                                          int msec)
561 {
562         TimestampTz diff = stop_time - start_time;
563
564         return (diff >= msec * INT64CONST(1000));
565 }
566
567 /*
568  * Converts an int64 to network byte order.
569  */
570 void
571 fe_sendint64(int64 i, char *buf)
572 {
573         uint32          n32;
574
575         /* High order half first, since we're doing MSB-first */
576         n32 = (uint32) (i >> 32);
577         n32 = htonl(n32);
578         memcpy(&buf[0], &n32, 4);
579
580         /* Now the low order half */
581         n32 = (uint32) i;
582         n32 = htonl(n32);
583         memcpy(&buf[4], &n32, 4);
584 }
585
586 /*
587  * Converts an int64 from network byte order to native format.
588  */
589 int64
590 fe_recvint64(char *buf)
591 {
592         int64           result;
593         uint32          h32;
594         uint32          l32;
595
596         memcpy(&h32, buf, 4);
597         memcpy(&l32, buf + 4, 4);
598         h32 = ntohl(h32);
599         l32 = ntohl(l32);
600
601         result = h32;
602         result <<= 32;
603         result |= l32;
604
605         return result;
606 }