]> granicus.if.org Git - postgresql/blob - src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Fix missing PQclear() in libpqrcv_endstreaming().
[postgresql] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2015, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *        src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18
19 #include <unistd.h>
20 #include <sys/time.h>
21
22 #include "libpq-fe.h"
23 #include "access/xlog.h"
24 #include "miscadmin.h"
25 #include "replication/walreceiver.h"
26 #include "utils/builtins.h"
27
28 #ifdef HAVE_POLL_H
29 #include <poll.h>
30 #endif
31 #ifdef HAVE_SYS_POLL_H
32 #include <sys/poll.h>
33 #endif
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
36 #endif
37
38 PG_MODULE_MAGIC;
39
40 void            _PG_init(void);
41
42 /* Current connection to the primary, if any */
43 static PGconn *streamConn = NULL;
44
45 /* Buffer for currently read records */
46 static char *recvBuf = NULL;
47
48 /* Prototypes for interface functions */
49 static void libpqrcv_connect(char *conninfo);
50 static void libpqrcv_identify_system(TimeLineID *primary_tli);
51 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
52 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
53                                                 char *slotname);
54 static void libpqrcv_endstreaming(TimeLineID *next_tli);
55 static int      libpqrcv_receive(int timeout, char **buffer);
56 static void libpqrcv_send(const char *buffer, int nbytes);
57 static void libpqrcv_disconnect(void);
58
59 /* Prototypes for private functions */
60 static bool libpq_select(int timeout_ms);
61 static PGresult *libpqrcv_PQexec(const char *query);
62
63 /*
64  * Module load callback
65  */
66 void
67 _PG_init(void)
68 {
69         /* Tell walreceiver how to reach us */
70         if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
71                 walrcv_readtimelinehistoryfile != NULL ||
72                 walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
73                 walrcv_receive != NULL || walrcv_send != NULL ||
74                 walrcv_disconnect != NULL)
75                 elog(ERROR, "libpqwalreceiver already loaded");
76         walrcv_connect = libpqrcv_connect;
77         walrcv_identify_system = libpqrcv_identify_system;
78         walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
79         walrcv_startstreaming = libpqrcv_startstreaming;
80         walrcv_endstreaming = libpqrcv_endstreaming;
81         walrcv_receive = libpqrcv_receive;
82         walrcv_send = libpqrcv_send;
83         walrcv_disconnect = libpqrcv_disconnect;
84 }
85
86 /*
87  * Establish the connection to the primary server for XLOG streaming
88  */
89 static void
90 libpqrcv_connect(char *conninfo)
91 {
92         const char      *keys[5];
93         const char      *vals[5];
94
95         /*
96          * We use the expand_dbname parameter to process the connection string
97          * (or URI), and pass some extra options. The deliberately undocumented
98          * parameter "replication=true" makes it a replication connection.
99          * The database name is ignored by the server in replication mode, but
100          * specify "replication" for .pgpass lookup.
101          */
102         keys[0] = "dbname";
103         vals[0] = conninfo;
104         keys[1] = "replication";
105         vals[1] = "true";
106         keys[2] = "dbname";
107         vals[2] = "replication";
108         keys[3] = "fallback_application_name";
109         vals[3] = "walreceiver";
110         keys[4] = NULL;
111         vals[4] = NULL;
112
113         streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
114         if (PQstatus(streamConn) != CONNECTION_OK)
115                 ereport(ERROR,
116                                 (errmsg("could not connect to the primary server: %s",
117                                                 PQerrorMessage(streamConn))));
118 }
119
120 /*
121  * Check that primary's system identifier matches ours, and fetch the current
122  * timeline ID of the primary.
123  */
124 static void
125 libpqrcv_identify_system(TimeLineID *primary_tli)
126 {
127         PGresult   *res;
128         char       *primary_sysid;
129         char            standby_sysid[32];
130
131         /*
132          * Get the system identifier and timeline ID as a DataRow message from the
133          * primary server.
134          */
135         res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
136         if (PQresultStatus(res) != PGRES_TUPLES_OK)
137         {
138                 PQclear(res);
139                 ereport(ERROR,
140                                 (errmsg("could not receive database system identifier and timeline ID from "
141                                                 "the primary server: %s",
142                                                 PQerrorMessage(streamConn))));
143         }
144         if (PQnfields(res) < 3 || PQntuples(res) != 1)
145         {
146                 int                     ntuples = PQntuples(res);
147                 int                     nfields = PQnfields(res);
148
149                 PQclear(res);
150                 ereport(ERROR,
151                                 (errmsg("invalid response from primary server"),
152                                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
153                                                    ntuples, nfields, 3, 1)));
154         }
155         primary_sysid = PQgetvalue(res, 0, 0);
156         *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
157
158         /*
159          * Confirm that the system identifier of the primary is the same as ours.
160          */
161         snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
162                          GetSystemIdentifier());
163         if (strcmp(primary_sysid, standby_sysid) != 0)
164         {
165                 primary_sysid = pstrdup(primary_sysid);
166                 PQclear(res);
167                 ereport(ERROR,
168                                 (errmsg("database system identifier differs between the primary and standby"),
169                                  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
170                                                    primary_sysid, standby_sysid)));
171         }
172         PQclear(res);
173 }
174
175 /*
176  * Start streaming WAL data from given startpoint and timeline.
177  *
178  * Returns true if we switched successfully to copy-both mode. False
179  * means the server received the command and executed it successfully, but
180  * didn't switch to copy-mode.  That means that there was no WAL on the
181  * requested timeline and starting point, because the server switched to
182  * another timeline at or before the requested starting point. On failure,
183  * throws an ERROR.
184  */
185 static bool
186 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
187 {
188         char            cmd[256];
189         PGresult   *res;
190
191         /* Start streaming from the point requested by startup process */
192         if (slotname != NULL)
193                 snprintf(cmd, sizeof(cmd),
194                                  "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
195                                  (uint32) (startpoint >> 32), (uint32) startpoint, tli);
196         else
197                 snprintf(cmd, sizeof(cmd),
198                                  "START_REPLICATION %X/%X TIMELINE %u",
199                                  (uint32) (startpoint >> 32), (uint32) startpoint, tli);
200         res = libpqrcv_PQexec(cmd);
201
202         if (PQresultStatus(res) == PGRES_COMMAND_OK)
203         {
204                 PQclear(res);
205                 return false;
206         }
207         else if (PQresultStatus(res) != PGRES_COPY_BOTH)
208         {
209                 PQclear(res);
210                 ereport(ERROR,
211                                 (errmsg("could not start WAL streaming: %s",
212                                                 PQerrorMessage(streamConn))));
213         }
214         PQclear(res);
215         return true;
216 }
217
218 /*
219  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
220  * reported by the server, or 0 if it did not report it.
221  */
222 static void
223 libpqrcv_endstreaming(TimeLineID *next_tli)
224 {
225         PGresult   *res;
226
227         if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
228                 ereport(ERROR,
229                         (errmsg("could not send end-of-streaming message to primary: %s",
230                                         PQerrorMessage(streamConn))));
231
232         /*
233          * After COPY is finished, we should receive a result set indicating the
234          * next timeline's ID, or just CommandComplete if the server was shut
235          * down.
236          *
237          * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
238          * would also be possible. However, at the moment this function is only
239          * called after receiving CopyDone from the backend - the walreceiver
240          * never terminates replication on its own initiative.
241          */
242         res = PQgetResult(streamConn);
243         if (PQresultStatus(res) == PGRES_TUPLES_OK)
244         {
245                 /*
246                  * Read the next timeline's ID. The server also sends the timeline's
247                  * starting point, but it is ignored.
248                  */
249                 if (PQnfields(res) < 2 || PQntuples(res) != 1)
250                         ereport(ERROR,
251                                         (errmsg("unexpected result set after end-of-streaming")));
252                 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
253                 PQclear(res);
254
255                 /* the result set should be followed by CommandComplete */
256                 res = PQgetResult(streamConn);
257         }
258         else
259                 *next_tli = 0;
260
261         if (PQresultStatus(res) != PGRES_COMMAND_OK)
262                 ereport(ERROR,
263                                 (errmsg("error reading result of streaming command: %s",
264                                                 PQerrorMessage(streamConn))));
265         PQclear(res);
266
267         /* Verify that there are no more results */
268         res = PQgetResult(streamConn);
269         if (res != NULL)
270                 ereport(ERROR,
271                                 (errmsg("unexpected result after CommandComplete: %s",
272                                                 PQerrorMessage(streamConn))));
273 }
274
275 /*
276  * Fetch the timeline history file for 'tli' from primary.
277  */
278 static void
279 libpqrcv_readtimelinehistoryfile(TimeLineID tli,
280                                                                  char **filename, char **content, int *len)
281 {
282         PGresult   *res;
283         char            cmd[64];
284
285         /*
286          * Request the primary to send over the history file for given timeline.
287          */
288         snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
289         res = libpqrcv_PQexec(cmd);
290         if (PQresultStatus(res) != PGRES_TUPLES_OK)
291         {
292                 PQclear(res);
293                 ereport(ERROR,
294                                 (errmsg("could not receive timeline history file from "
295                                                 "the primary server: %s",
296                                                 PQerrorMessage(streamConn))));
297         }
298         if (PQnfields(res) != 2 || PQntuples(res) != 1)
299         {
300                 int                     ntuples = PQntuples(res);
301                 int                     nfields = PQnfields(res);
302
303                 PQclear(res);
304                 ereport(ERROR,
305                                 (errmsg("invalid response from primary server"),
306                                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
307                                                    ntuples, nfields)));
308         }
309         *filename = pstrdup(PQgetvalue(res, 0, 0));
310
311         *len = PQgetlength(res, 0, 1);
312         *content = palloc(*len);
313         memcpy(*content, PQgetvalue(res, 0, 1), *len);
314         PQclear(res);
315 }
316
317 /*
318  * Wait until we can read WAL stream, or timeout.
319  *
320  * Returns true if data has become available for reading, false if timed out
321  * or interrupted by signal.
322  *
323  * This is based on pqSocketCheck.
324  */
325 static bool
326 libpq_select(int timeout_ms)
327 {
328         int                     ret;
329
330         Assert(streamConn != NULL);
331         if (PQsocket(streamConn) < 0)
332                 ereport(ERROR,
333                                 (errcode_for_socket_access(),
334                                  errmsg("socket not open")));
335
336         /* We use poll(2) if available, otherwise select(2) */
337         {
338 #ifdef HAVE_POLL
339                 struct pollfd input_fd;
340
341                 input_fd.fd = PQsocket(streamConn);
342                 input_fd.events = POLLIN | POLLERR;
343                 input_fd.revents = 0;
344
345                 ret = poll(&input_fd, 1, timeout_ms);
346 #else                                                   /* !HAVE_POLL */
347
348                 fd_set          input_mask;
349                 struct timeval timeout;
350                 struct timeval *ptr_timeout;
351
352                 FD_ZERO(&input_mask);
353                 FD_SET(PQsocket(streamConn), &input_mask);
354
355                 if (timeout_ms < 0)
356                         ptr_timeout = NULL;
357                 else
358                 {
359                         timeout.tv_sec = timeout_ms / 1000;
360                         timeout.tv_usec = (timeout_ms % 1000) * 1000;
361                         ptr_timeout = &timeout;
362                 }
363
364                 ret = select(PQsocket(streamConn) + 1, &input_mask,
365                                          NULL, NULL, ptr_timeout);
366 #endif   /* HAVE_POLL */
367         }
368
369         if (ret == 0 || (ret < 0 && errno == EINTR))
370                 return false;
371         if (ret < 0)
372                 ereport(ERROR,
373                                 (errcode_for_socket_access(),
374                                  errmsg("select() failed: %m")));
375         return true;
376 }
377
378 /*
379  * Send a query and wait for the results by using the asynchronous libpq
380  * functions and the backend version of select().
381  *
382  * We must not use the regular blocking libpq functions like PQexec()
383  * since they are uninterruptible by signals on some platforms, such as
384  * Windows.
385  *
386  * We must also not use vanilla select() here since it cannot handle the
387  * signal emulation layer on Windows.
388  *
389  * The function is modeled on PQexec() in libpq, but only implements
390  * those parts that are in use in the walreceiver.
391  *
392  * Queries are always executed on the connection in streamConn.
393  */
394 static PGresult *
395 libpqrcv_PQexec(const char *query)
396 {
397         PGresult   *result = NULL;
398         PGresult   *lastResult = NULL;
399
400         /*
401          * PQexec() silently discards any prior query results on the connection.
402          * This is not required for walreceiver since it's expected that walsender
403          * won't generate any such junk results.
404          */
405
406         /*
407          * Submit a query. Since we don't use non-blocking mode, this also can
408          * block. But its risk is relatively small, so we ignore that for now.
409          */
410         if (!PQsendQuery(streamConn, query))
411                 return NULL;
412
413         for (;;)
414         {
415                 /*
416                  * Receive data until PQgetResult is ready to get the result without
417                  * blocking.
418                  */
419                 while (PQisBusy(streamConn))
420                 {
421                         /*
422                          * We don't need to break down the sleep into smaller increments,
423                          * and check for interrupts after each nap, since we can just
424                          * elog(FATAL) within SIGTERM signal handler if the signal arrives
425                          * in the middle of establishment of replication connection.
426                          */
427                         if (!libpq_select(-1))
428                                 continue;               /* interrupted */
429                         if (PQconsumeInput(streamConn) == 0)
430                                 return NULL;    /* trouble */
431                 }
432
433                 /*
434                  * Emulate the PQexec()'s behavior of returning the last result when
435                  * there are many. Since walsender will never generate multiple
436                  * results, we skip the concatenation of error messages.
437                  */
438                 result = PQgetResult(streamConn);
439                 if (result == NULL)
440                         break;                          /* query is complete */
441
442                 PQclear(lastResult);
443                 lastResult = result;
444
445                 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
446                         PQresultStatus(lastResult) == PGRES_COPY_OUT ||
447                         PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
448                         PQstatus(streamConn) == CONNECTION_BAD)
449                         break;
450         }
451
452         return lastResult;
453 }
454
455 /*
456  * Disconnect connection to primary, if any.
457  */
458 static void
459 libpqrcv_disconnect(void)
460 {
461         PQfinish(streamConn);
462         streamConn = NULL;
463 }
464
465 /*
466  * Receive a message available from XLOG stream, blocking for
467  * maximum of 'timeout' ms.
468  *
469  * Returns:
470  *
471  *       If data was received, returns the length of the data. *buffer is set to
472  *       point to a buffer holding the received message. The buffer is only valid
473  *       until the next libpqrcv_* call.
474  *
475  *       0 if no data was available within timeout, or wait was interrupted
476  *       by signal.
477  *
478  *       -1 if the server ended the COPY.
479  *
480  * ereports on error.
481  */
482 static int
483 libpqrcv_receive(int timeout, char **buffer)
484 {
485         int                     rawlen;
486
487         if (recvBuf != NULL)
488                 PQfreemem(recvBuf);
489         recvBuf = NULL;
490
491         /* Try to receive a CopyData message */
492         rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
493         if (rawlen == 0)
494         {
495                 /*
496                  * No data available yet. If the caller requested to block, wait for
497                  * more data to arrive.
498                  */
499                 if (timeout > 0)
500                 {
501                         if (!libpq_select(timeout))
502                                 return 0;
503                 }
504
505                 if (PQconsumeInput(streamConn) == 0)
506                         ereport(ERROR,
507                                         (errmsg("could not receive data from WAL stream: %s",
508                                                         PQerrorMessage(streamConn))));
509
510                 /* Now that we've consumed some input, try again */
511                 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
512                 if (rawlen == 0)
513                         return 0;
514         }
515         if (rawlen == -1)                       /* end-of-streaming or error */
516         {
517                 PGresult   *res;
518
519                 res = PQgetResult(streamConn);
520                 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
521                         PQresultStatus(res) == PGRES_COPY_IN)
522                 {
523                         PQclear(res);
524                         return -1;
525                 }
526                 else
527                 {
528                         PQclear(res);
529                         ereport(ERROR,
530                                         (errmsg("could not receive data from WAL stream: %s",
531                                                         PQerrorMessage(streamConn))));
532                 }
533         }
534         if (rawlen < -1)
535                 ereport(ERROR,
536                                 (errmsg("could not receive data from WAL stream: %s",
537                                                 PQerrorMessage(streamConn))));
538
539         /* Return received messages to caller */
540         *buffer = recvBuf;
541         return rawlen;
542 }
543
544 /*
545  * Send a message to XLOG stream.
546  *
547  * ereports on error.
548  */
549 static void
550 libpqrcv_send(const char *buffer, int nbytes)
551 {
552         if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
553                 PQflush(streamConn))
554                 ereport(ERROR,
555                                 (errmsg("could not send data to WAL stream: %s",
556                                                 PQerrorMessage(streamConn))));
557 }