]> granicus.if.org Git - postgresql/blob - src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Update copyright via script for 2017
[postgresql] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *        src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18
19 #include <unistd.h>
20 #include <sys/time.h>
21
22 #include "libpq-fe.h"
23 #include "pqexpbuffer.h"
24 #include "access/xlog.h"
25 #include "miscadmin.h"
26 #include "pgstat.h"
27 #include "replication/walreceiver.h"
28 #include "storage/proc.h"
29 #include "utils/builtins.h"
30
31 PG_MODULE_MAGIC;
32
33 void            _PG_init(void);
34
35 struct WalReceiverConn
36 {
37         /* Current connection to the primary, if any */
38         PGconn *streamConn;
39         /* Used to remember if the connection is logical or physical */
40         bool    logical;
41         /* Buffer for currently read records */
42         char   *recvBuf;
43 };
44
45 /* Prototypes for interface functions */
46 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
47                                                                                  bool logical, const char *appname);
48 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
49 static char *libpqrcv_identify_system(WalReceiverConn *conn,
50                                                                           TimeLineID *primary_tli);
51 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
52                                                                  TimeLineID tli, char **filename,
53                                                                  char **content, int *len);
54 static bool libpqrcv_startstreaming(WalReceiverConn *conn,
55                                                 TimeLineID tli, XLogRecPtr startpoint,
56                                                 const char *slotname);
57 static void libpqrcv_endstreaming(WalReceiverConn *conn,
58                                                                   TimeLineID *next_tli);
59 static int      libpqrcv_receive(WalReceiverConn *conn, char **buffer,
60                                                          pgsocket *wait_fd);
61 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
62                                                   int nbytes);
63 static void libpqrcv_disconnect(WalReceiverConn *conn);
64
65 static WalReceiverFunctionsType PQWalReceiverFunctions = {
66         libpqrcv_connect,
67         libpqrcv_get_conninfo,
68         libpqrcv_identify_system,
69         libpqrcv_readtimelinehistoryfile,
70         libpqrcv_startstreaming,
71         libpqrcv_endstreaming,
72         libpqrcv_receive,
73         libpqrcv_send,
74         libpqrcv_disconnect
75 };
76
77 /* Prototypes for private functions */
78 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
79
80 /*
81  * Module initialization function
82  */
83 void
84 _PG_init(void)
85 {
86         if (WalReceiverFunctions != NULL)
87                 elog(ERROR, "libpqwalreceiver already loaded");
88         WalReceiverFunctions = &PQWalReceiverFunctions;
89 }
90
91 /*
92  * Establish the connection to the primary server for XLOG streaming
93  */
94 static WalReceiverConn *
95 libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
96 {
97         WalReceiverConn *conn;
98         const char *keys[5];
99         const char *vals[5];
100         int                     i = 0;
101
102         /*
103          * We use the expand_dbname parameter to process the connection string (or
104          * URI), and pass some extra options. The deliberately undocumented
105          * parameter "replication=true" makes it a replication connection. The
106          * database name is ignored by the server in replication mode, but specify
107          * "replication" for .pgpass lookup.
108          */
109         keys[i] = "dbname";
110         vals[i] = conninfo;
111         keys[++i] = "replication";
112         vals[i] = logical ? "database" : "true";
113         if (!logical)
114         {
115                 keys[++i] = "dbname";
116                 vals[i] = "replication";
117         }
118         keys[++i] = "fallback_application_name";
119         vals[i] = appname;
120         keys[++i] = NULL;
121         vals[i] = NULL;
122
123         conn = palloc0(sizeof(WalReceiverConn));
124         conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
125         if (PQstatus(conn->streamConn) != CONNECTION_OK)
126                 ereport(ERROR,
127                                 (errmsg("could not connect to the primary server: %s",
128                                                 PQerrorMessage(conn->streamConn))));
129         conn->logical = logical;
130
131         return conn;
132 }
133
134 /*
135  * Return a user-displayable conninfo string.  Any security-sensitive fields
136  * are obfuscated.
137  */
138 static char *
139 libpqrcv_get_conninfo(WalReceiverConn *conn)
140 {
141         PQconninfoOption *conn_opts;
142         PQconninfoOption *conn_opt;
143         PQExpBufferData buf;
144         char       *retval;
145
146         Assert(conn->streamConn != NULL);
147
148         initPQExpBuffer(&buf);
149         conn_opts = PQconninfo(conn->streamConn);
150
151         if (conn_opts == NULL)
152                 ereport(ERROR,
153                                 (errmsg("could not parse connection string: %s",
154                                                 _("out of memory"))));
155
156         /* build a clean connection string from pieces */
157         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
158         {
159                 bool            obfuscate;
160
161                 /* Skip debug and empty options */
162                 if (strchr(conn_opt->dispchar, 'D') ||
163                         conn_opt->val == NULL ||
164                         conn_opt->val[0] == '\0')
165                         continue;
166
167                 /* Obfuscate security-sensitive options */
168                 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
169
170                 appendPQExpBuffer(&buf, "%s%s=%s",
171                                                   buf.len == 0 ? "" : " ",
172                                                   conn_opt->keyword,
173                                                   obfuscate ? "********" : conn_opt->val);
174         }
175
176         PQconninfoFree(conn_opts);
177
178         retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
179         termPQExpBuffer(&buf);
180         return retval;
181 }
182
183 /*
184  * Check that primary's system identifier matches ours, and fetch the current
185  * timeline ID of the primary.
186  */
187 static char *
188 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
189 {
190         PGresult   *res;
191         char       *primary_sysid;
192
193         /*
194          * Get the system identifier and timeline ID as a DataRow message from the
195          * primary server.
196          */
197         res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
198         if (PQresultStatus(res) != PGRES_TUPLES_OK)
199         {
200                 PQclear(res);
201                 ereport(ERROR,
202                                 (errmsg("could not receive database system identifier and timeline ID from "
203                                                 "the primary server: %s",
204                                                 PQerrorMessage(conn->streamConn))));
205         }
206         if (PQnfields(res) < 3 || PQntuples(res) != 1)
207         {
208                 int                     ntuples = PQntuples(res);
209                 int                     nfields = PQnfields(res);
210
211                 PQclear(res);
212                 ereport(ERROR,
213                                 (errmsg("invalid response from primary server"),
214                                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
215                                                    ntuples, nfields, 3, 1)));
216         }
217         primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
218         *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
219         PQclear(res);
220
221         return primary_sysid;
222 }
223
224 /*
225  * Start streaming WAL data from given startpoint and timeline.
226  *
227  * Returns true if we switched successfully to copy-both mode. False
228  * means the server received the command and executed it successfully, but
229  * didn't switch to copy-mode.  That means that there was no WAL on the
230  * requested timeline and starting point, because the server switched to
231  * another timeline at or before the requested starting point. On failure,
232  * throws an ERROR.
233  */
234 static bool
235 libpqrcv_startstreaming(WalReceiverConn *conn,
236                                                 TimeLineID tli, XLogRecPtr startpoint,
237                                                 const char *slotname)
238 {
239         StringInfoData cmd;
240         PGresult   *res;
241
242         Assert(!conn->logical);
243
244         initStringInfo(&cmd);
245
246         /* Start streaming from the point requested by startup process */
247         if (slotname != NULL)
248                 appendStringInfo(&cmd,
249                                                  "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
250                                                  slotname,
251                                                  (uint32) (startpoint >> 32), (uint32) startpoint,
252                                                  tli);
253         else
254                 appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
255                                                  (uint32) (startpoint >> 32), (uint32) startpoint,
256                                                  tli);
257         res = libpqrcv_PQexec(conn->streamConn, cmd.data);
258         pfree(cmd.data);
259
260         if (PQresultStatus(res) == PGRES_COMMAND_OK)
261         {
262                 PQclear(res);
263                 return false;
264         }
265         else if (PQresultStatus(res) != PGRES_COPY_BOTH)
266         {
267                 PQclear(res);
268                 ereport(ERROR,
269                                 (errmsg("could not start WAL streaming: %s",
270                                                 PQerrorMessage(conn->streamConn))));
271         }
272         PQclear(res);
273         return true;
274 }
275
276 /*
277  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
278  * reported by the server, or 0 if it did not report it.
279  */
280 static void
281 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
282 {
283         PGresult   *res;
284
285         if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
286                 PQflush(conn->streamConn))
287                 ereport(ERROR,
288                         (errmsg("could not send end-of-streaming message to primary: %s",
289                                         PQerrorMessage(conn->streamConn))));
290
291         *next_tli = 0;
292
293         /*
294          * After COPY is finished, we should receive a result set indicating the
295          * next timeline's ID, or just CommandComplete if the server was shut
296          * down.
297          *
298          * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
299          * would also be possible. However, at the moment this function is only
300          * called after receiving CopyDone from the backend - the walreceiver
301          * never terminates replication on its own initiative.
302          */
303         res = PQgetResult(conn->streamConn);
304         if (PQresultStatus(res) == PGRES_TUPLES_OK)
305         {
306                 /*
307                  * Read the next timeline's ID. The server also sends the timeline's
308                  * starting point, but it is ignored.
309                  */
310                 if (PQnfields(res) < 2 || PQntuples(res) != 1)
311                         ereport(ERROR,
312                                         (errmsg("unexpected result set after end-of-streaming")));
313                 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
314                 PQclear(res);
315
316                 /* the result set should be followed by CommandComplete */
317                 res = PQgetResult(conn->streamConn);
318         }
319         else if (PQresultStatus(res) == PGRES_COPY_OUT)
320         {
321                 PQclear(res);
322
323                 /* End the copy */
324                 PQendcopy(conn->streamConn);
325
326                 /* CommandComplete should follow */
327                 res = PQgetResult(conn->streamConn);
328         }
329
330         if (PQresultStatus(res) != PGRES_COMMAND_OK)
331                 ereport(ERROR,
332                                 (errmsg("error reading result of streaming command: %s",
333                                                 PQerrorMessage(conn->streamConn))));
334         PQclear(res);
335
336         /* Verify that there are no more results */
337         res = PQgetResult(conn->streamConn);
338         if (res != NULL)
339                 ereport(ERROR,
340                                 (errmsg("unexpected result after CommandComplete: %s",
341                                                 PQerrorMessage(conn->streamConn))));
342 }
343
344 /*
345  * Fetch the timeline history file for 'tli' from primary.
346  */
347 static void
348 libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
349                                                                  TimeLineID tli, char **filename,
350                                                                  char **content, int *len)
351 {
352         PGresult   *res;
353         char            cmd[64];
354
355         Assert(!conn->logical);
356
357         /*
358          * Request the primary to send over the history file for given timeline.
359          */
360         snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
361         res = libpqrcv_PQexec(conn->streamConn, cmd);
362         if (PQresultStatus(res) != PGRES_TUPLES_OK)
363         {
364                 PQclear(res);
365                 ereport(ERROR,
366                                 (errmsg("could not receive timeline history file from "
367                                                 "the primary server: %s",
368                                                 PQerrorMessage(conn->streamConn))));
369         }
370         if (PQnfields(res) != 2 || PQntuples(res) != 1)
371         {
372                 int                     ntuples = PQntuples(res);
373                 int                     nfields = PQnfields(res);
374
375                 PQclear(res);
376                 ereport(ERROR,
377                                 (errmsg("invalid response from primary server"),
378                                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
379                                                    ntuples, nfields)));
380         }
381         *filename = pstrdup(PQgetvalue(res, 0, 0));
382
383         *len = PQgetlength(res, 0, 1);
384         *content = palloc(*len);
385         memcpy(*content, PQgetvalue(res, 0, 1), *len);
386         PQclear(res);
387 }
388
389 /*
390  * Send a query and wait for the results by using the asynchronous libpq
391  * functions and socket readiness events.
392  *
393  * We must not use the regular blocking libpq functions like PQexec()
394  * since they are uninterruptible by signals on some platforms, such as
395  * Windows.
396  *
397  * The function is modeled on PQexec() in libpq, but only implements
398  * those parts that are in use in the walreceiver.
399  *
400  * Queries are always executed on the connection in streamConn.
401  */
402 static PGresult *
403 libpqrcv_PQexec(PGconn *streamConn, const char *query)
404 {
405         PGresult   *result = NULL;
406         PGresult   *lastResult = NULL;
407
408         /*
409          * PQexec() silently discards any prior query results on the connection.
410          * This is not required for walreceiver since it's expected that walsender
411          * won't generate any such junk results.
412          */
413
414         /*
415          * Submit a query. Since we don't use non-blocking mode, this also can
416          * block. But its risk is relatively small, so we ignore that for now.
417          */
418         if (!PQsendQuery(streamConn, query))
419                 return NULL;
420
421         for (;;)
422         {
423                 /*
424                  * Receive data until PQgetResult is ready to get the result without
425                  * blocking.
426                  */
427                 while (PQisBusy(streamConn))
428                 {
429                         int                     rc;
430
431                         /*
432                          * We don't need to break down the sleep into smaller increments,
433                          * since we'll get interrupted by signals and can either handle
434                          * interrupts here or elog(FATAL) within SIGTERM signal handler if
435                          * the signal arrives in the middle of establishment of
436                          * replication connection.
437                          */
438                         ResetLatch(&MyProc->procLatch);
439                         rc = WaitLatchOrSocket(&MyProc->procLatch,
440                                                                    WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
441                                                                    WL_LATCH_SET,
442                                                                    PQsocket(streamConn),
443                                                                    0,
444                                                                    WAIT_EVENT_LIBPQWALRECEIVER_READ);
445                         if (rc & WL_POSTMASTER_DEATH)
446                                 exit(1);
447
448                         /* interrupted */
449                         if (rc & WL_LATCH_SET)
450                         {
451                                 CHECK_FOR_INTERRUPTS();
452                                 continue;
453                         }
454                         if (PQconsumeInput(streamConn) == 0)
455                                 return NULL;    /* trouble */
456                 }
457
458                 /*
459                  * Emulate the PQexec()'s behavior of returning the last result when
460                  * there are many. Since walsender will never generate multiple
461                  * results, we skip the concatenation of error messages.
462                  */
463                 result = PQgetResult(streamConn);
464                 if (result == NULL)
465                         break;                          /* query is complete */
466
467                 PQclear(lastResult);
468                 lastResult = result;
469
470                 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
471                         PQresultStatus(lastResult) == PGRES_COPY_OUT ||
472                         PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
473                         PQstatus(streamConn) == CONNECTION_BAD)
474                         break;
475         }
476
477         return lastResult;
478 }
479
480 /*
481  * Disconnect connection to primary, if any.
482  */
483 static void
484 libpqrcv_disconnect(WalReceiverConn *conn)
485 {
486         PQfinish(conn->streamConn);
487         if (conn->recvBuf != NULL)
488                 PQfreemem(conn->recvBuf);
489         pfree(conn);
490 }
491
492 /*
493  * Receive a message available from XLOG stream.
494  *
495  * Returns:
496  *
497  *       If data was received, returns the length of the data. *buffer is set to
498  *       point to a buffer holding the received message. The buffer is only valid
499  *       until the next libpqrcv_* call.
500  *
501  *       If no data was available immediately, returns 0, and *wait_fd is set to a
502  *       socket descriptor which can be waited on before trying again.
503  *
504  *       -1 if the server ended the COPY.
505  *
506  * ereports on error.
507  */
508 static int
509 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
510                                  pgsocket *wait_fd)
511 {
512         int                     rawlen;
513
514         if (conn->recvBuf != NULL)
515                 PQfreemem(conn->recvBuf);
516         conn->recvBuf = NULL;
517
518         /* Try to receive a CopyData message */
519         rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
520         if (rawlen == 0)
521         {
522                 /* Try consuming some data. */
523                 if (PQconsumeInput(conn->streamConn) == 0)
524                         ereport(ERROR,
525                                         (errmsg("could not receive data from WAL stream: %s",
526                                                         PQerrorMessage(conn->streamConn))));
527
528                 /* Now that we've consumed some input, try again */
529                 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
530                 if (rawlen == 0)
531                 {
532                         /* Tell caller to try again when our socket is ready. */
533                         *wait_fd = PQsocket(conn->streamConn);
534                         return 0;
535                 }
536         }
537         if (rawlen == -1)                       /* end-of-streaming or error */
538         {
539                 PGresult   *res;
540
541                 res = PQgetResult(conn->streamConn);
542                 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
543                         PQresultStatus(res) == PGRES_COPY_IN)
544                 {
545                         PQclear(res);
546                         return -1;
547                 }
548                 else
549                 {
550                         PQclear(res);
551                         ereport(ERROR,
552                                         (errmsg("could not receive data from WAL stream: %s",
553                                                         PQerrorMessage(conn->streamConn))));
554                 }
555         }
556         if (rawlen < -1)
557                 ereport(ERROR,
558                                 (errmsg("could not receive data from WAL stream: %s",
559                                                 PQerrorMessage(conn->streamConn))));
560
561         /* Return received messages to caller */
562         *buffer = conn->recvBuf;
563         return rawlen;
564 }
565
566 /*
567  * Send a message to XLOG stream.
568  *
569  * ereports on error.
570  */
571 static void
572 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
573 {
574         if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
575                 PQflush(conn->streamConn))
576                 ereport(ERROR,
577                                 (errmsg("could not send data to WAL stream: %s",
578                                                 PQerrorMessage(conn->streamConn))));
579 }