]> granicus.if.org Git - postgresql/blob - src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
Fix bug in checking of IDENTIFY_SYSTEM result.
[postgresql] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *        src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18
19 #include <unistd.h>
20 #include <sys/time.h>
21
22 #include "libpq-fe.h"
23 #include "access/xlog.h"
24 #include "miscadmin.h"
25 #include "replication/walreceiver.h"
26 #include "utils/builtins.h"
27
28 #ifdef HAVE_POLL_H
29 #include <poll.h>
30 #endif
31 #ifdef HAVE_SYS_POLL_H
32 #include <sys/poll.h>
33 #endif
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
36 #endif
37
38 PG_MODULE_MAGIC;
39
40 void            _PG_init(void);
41
42 /* Current connection to the primary, if any */
43 static PGconn *streamConn = NULL;
44
45 /* Buffer for currently read records */
46 static char *recvBuf = NULL;
47
48 /* Prototypes for interface functions */
49 static void libpqrcv_connect(char *conninfo);
50 static void libpqrcv_identify_system(TimeLineID *primary_tli);
51 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
52 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
53                                                 char *slotname);
54 static void libpqrcv_endstreaming(TimeLineID *next_tli);
55 static int      libpqrcv_receive(int timeout, char **buffer);
56 static void libpqrcv_send(const char *buffer, int nbytes);
57 static void libpqrcv_disconnect(void);
58
59 /* Prototypes for private functions */
60 static bool libpq_select(int timeout_ms);
61 static PGresult *libpqrcv_PQexec(const char *query);
62
63 /*
64  * Module load callback
65  */
66 void
67 _PG_init(void)
68 {
69         /* Tell walreceiver how to reach us */
70         if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
71                 walrcv_readtimelinehistoryfile != NULL ||
72                 walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
73                 walrcv_receive != NULL || walrcv_send != NULL ||
74                 walrcv_disconnect != NULL)
75                 elog(ERROR, "libpqwalreceiver already loaded");
76         walrcv_connect = libpqrcv_connect;
77         walrcv_identify_system = libpqrcv_identify_system;
78         walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
79         walrcv_startstreaming = libpqrcv_startstreaming;
80         walrcv_endstreaming = libpqrcv_endstreaming;
81         walrcv_receive = libpqrcv_receive;
82         walrcv_send = libpqrcv_send;
83         walrcv_disconnect = libpqrcv_disconnect;
84 }
85
86 /*
87  * Establish the connection to the primary server for XLOG streaming
88  */
89 static void
90 libpqrcv_connect(char *conninfo)
91 {
92         char            conninfo_repl[MAXCONNINFO + 75];
93
94         /*
95          * Connect using deliberately undocumented parameter: replication. The
96          * database name is ignored by the server in replication mode, but specify
97          * "replication" for .pgpass lookup.
98          */
99         snprintf(conninfo_repl, sizeof(conninfo_repl),
100                          "%s dbname=replication replication=true fallback_application_name=walreceiver",
101                          conninfo);
102
103         streamConn = PQconnectdb(conninfo_repl);
104         if (PQstatus(streamConn) != CONNECTION_OK)
105                 ereport(ERROR,
106                                 (errmsg("could not connect to the primary server: %s",
107                                                 PQerrorMessage(streamConn))));
108 }
109
110 /*
111  * Check that primary's system identifier matches ours, and fetch the current
112  * timeline ID of the primary.
113  */
114 static void
115 libpqrcv_identify_system(TimeLineID *primary_tli)
116 {
117         PGresult   *res;
118         char       *primary_sysid;
119         char            standby_sysid[32];
120
121         /*
122          * Get the system identifier and timeline ID as a DataRow message from the
123          * primary server.
124          */
125         res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
126         if (PQresultStatus(res) != PGRES_TUPLES_OK)
127         {
128                 PQclear(res);
129                 ereport(ERROR,
130                                 (errmsg("could not receive database system identifier and timeline ID from "
131                                                 "the primary server: %s",
132                                                 PQerrorMessage(streamConn))));
133         }
134         if (PQnfields(res) < 4 || PQntuples(res) != 1)
135         {
136                 int                     ntuples = PQntuples(res);
137                 int                     nfields = PQnfields(res);
138
139                 PQclear(res);
140                 ereport(ERROR,
141                                 (errmsg("invalid response from primary server"),
142                                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
143                                                    ntuples, nfields, 4, 1)));
144         }
145         primary_sysid = PQgetvalue(res, 0, 0);
146         *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
147
148         /*
149          * Confirm that the system identifier of the primary is the same as ours.
150          */
151         snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
152                          GetSystemIdentifier());
153         if (strcmp(primary_sysid, standby_sysid) != 0)
154         {
155                 primary_sysid = pstrdup(primary_sysid);
156                 PQclear(res);
157                 ereport(ERROR,
158                                 (errmsg("database system identifier differs between the primary and standby"),
159                                  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
160                                                    primary_sysid, standby_sysid)));
161         }
162         PQclear(res);
163 }
164
165 /*
166  * Start streaming WAL data from given startpoint and timeline.
167  *
168  * Returns true if we switched successfully to copy-both mode. False
169  * means the server received the command and executed it successfully, but
170  * didn't switch to copy-mode.  That means that there was no WAL on the
171  * requested timeline and starting point, because the server switched to
172  * another timeline at or before the requested starting point. On failure,
173  * throws an ERROR.
174  */
175 static bool
176 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
177 {
178         char            cmd[256];
179         PGresult   *res;
180
181         /* Start streaming from the point requested by startup process */
182         if (slotname != NULL)
183                 snprintf(cmd, sizeof(cmd),
184                                  "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
185                                  (uint32) (startpoint >> 32), (uint32) startpoint, tli);
186         else
187                 snprintf(cmd, sizeof(cmd),
188                                  "START_REPLICATION %X/%X TIMELINE %u",
189                                  (uint32) (startpoint >> 32), (uint32) startpoint, tli);
190         res = libpqrcv_PQexec(cmd);
191
192         if (PQresultStatus(res) == PGRES_COMMAND_OK)
193         {
194                 PQclear(res);
195                 return false;
196         }
197         else if (PQresultStatus(res) != PGRES_COPY_BOTH)
198         {
199                 PQclear(res);
200                 ereport(ERROR,
201                                 (errmsg("could not start WAL streaming: %s",
202                                                 PQerrorMessage(streamConn))));
203         }
204         PQclear(res);
205         return true;
206 }
207
208 /*
209  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
210  * reported by the server, or 0 if it did not report it.
211  */
212 static void
213 libpqrcv_endstreaming(TimeLineID *next_tli)
214 {
215         PGresult   *res;
216
217         if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
218                 ereport(ERROR,
219                         (errmsg("could not send end-of-streaming message to primary: %s",
220                                         PQerrorMessage(streamConn))));
221
222         /*
223          * After COPY is finished, we should receive a result set indicating the
224          * next timeline's ID, or just CommandComplete if the server was shut
225          * down.
226          *
227          * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
228          * would also be possible. However, at the moment this function is only
229          * called after receiving CopyDone from the backend - the walreceiver
230          * never terminates replication on its own initiative.
231          */
232         res = PQgetResult(streamConn);
233         if (PQresultStatus(res) == PGRES_TUPLES_OK)
234         {
235                 /*
236                  * Read the next timeline's ID. The server also sends the timeline's
237                  * starting point, but it is ignored.
238                  */
239                 if (PQnfields(res) < 2 || PQntuples(res) != 1)
240                         ereport(ERROR,
241                                         (errmsg("unexpected result set after end-of-streaming")));
242                 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
243                 PQclear(res);
244
245                 /* the result set should be followed by CommandComplete */
246                 res = PQgetResult(streamConn);
247         }
248         else
249                 *next_tli = 0;
250
251         if (PQresultStatus(res) != PGRES_COMMAND_OK)
252                 ereport(ERROR,
253                                 (errmsg("error reading result of streaming command: %s",
254                                                 PQerrorMessage(streamConn))));
255
256         /* Verify that there are no more results */
257         res = PQgetResult(streamConn);
258         if (res != NULL)
259                 ereport(ERROR,
260                                 (errmsg("unexpected result after CommandComplete: %s",
261                                                 PQerrorMessage(streamConn))));
262 }
263
264 /*
265  * Fetch the timeline history file for 'tli' from primary.
266  */
267 static void
268 libpqrcv_readtimelinehistoryfile(TimeLineID tli,
269                                                                  char **filename, char **content, int *len)
270 {
271         PGresult   *res;
272         char            cmd[64];
273
274         /*
275          * Request the primary to send over the history file for given timeline.
276          */
277         snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
278         res = libpqrcv_PQexec(cmd);
279         if (PQresultStatus(res) != PGRES_TUPLES_OK)
280         {
281                 PQclear(res);
282                 ereport(ERROR,
283                                 (errmsg("could not receive timeline history file from "
284                                                 "the primary server: %s",
285                                                 PQerrorMessage(streamConn))));
286         }
287         if (PQnfields(res) != 2 || PQntuples(res) != 1)
288         {
289                 int                     ntuples = PQntuples(res);
290                 int                     nfields = PQnfields(res);
291
292                 PQclear(res);
293                 ereport(ERROR,
294                                 (errmsg("invalid response from primary server"),
295                                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
296                                                    ntuples, nfields)));
297         }
298         *filename = pstrdup(PQgetvalue(res, 0, 0));
299
300         *len = PQgetlength(res, 0, 1);
301         *content = palloc(*len);
302         memcpy(*content, PQgetvalue(res, 0, 1), *len);
303         PQclear(res);
304 }
305
306 /*
307  * Wait until we can read WAL stream, or timeout.
308  *
309  * Returns true if data has become available for reading, false if timed out
310  * or interrupted by signal.
311  *
312  * This is based on pqSocketCheck.
313  */
314 static bool
315 libpq_select(int timeout_ms)
316 {
317         int                     ret;
318
319         Assert(streamConn != NULL);
320         if (PQsocket(streamConn) < 0)
321                 ereport(ERROR,
322                                 (errcode_for_socket_access(),
323                                  errmsg("socket not open")));
324
325         /* We use poll(2) if available, otherwise select(2) */
326         {
327 #ifdef HAVE_POLL
328                 struct pollfd input_fd;
329
330                 input_fd.fd = PQsocket(streamConn);
331                 input_fd.events = POLLIN | POLLERR;
332                 input_fd.revents = 0;
333
334                 ret = poll(&input_fd, 1, timeout_ms);
335 #else                                                   /* !HAVE_POLL */
336
337                 fd_set          input_mask;
338                 struct timeval timeout;
339                 struct timeval *ptr_timeout;
340
341                 FD_ZERO(&input_mask);
342                 FD_SET(PQsocket(streamConn), &input_mask);
343
344                 if (timeout_ms < 0)
345                         ptr_timeout = NULL;
346                 else
347                 {
348                         timeout.tv_sec = timeout_ms / 1000;
349                         timeout.tv_usec = (timeout_ms % 1000) * 1000;
350                         ptr_timeout = &timeout;
351                 }
352
353                 ret = select(PQsocket(streamConn) + 1, &input_mask,
354                                          NULL, NULL, ptr_timeout);
355 #endif   /* HAVE_POLL */
356         }
357
358         if (ret == 0 || (ret < 0 && errno == EINTR))
359                 return false;
360         if (ret < 0)
361                 ereport(ERROR,
362                                 (errcode_for_socket_access(),
363                                  errmsg("select() failed: %m")));
364         return true;
365 }
366
367 /*
368  * Send a query and wait for the results by using the asynchronous libpq
369  * functions and the backend version of select().
370  *
371  * We must not use the regular blocking libpq functions like PQexec()
372  * since they are uninterruptible by signals on some platforms, such as
373  * Windows.
374  *
375  * We must also not use vanilla select() here since it cannot handle the
376  * signal emulation layer on Windows.
377  *
378  * The function is modeled on PQexec() in libpq, but only implements
379  * those parts that are in use in the walreceiver.
380  *
381  * Queries are always executed on the connection in streamConn.
382  */
383 static PGresult *
384 libpqrcv_PQexec(const char *query)
385 {
386         PGresult   *result = NULL;
387         PGresult   *lastResult = NULL;
388
389         /*
390          * PQexec() silently discards any prior query results on the connection.
391          * This is not required for walreceiver since it's expected that walsender
392          * won't generate any such junk results.
393          */
394
395         /*
396          * Submit a query. Since we don't use non-blocking mode, this also can
397          * block. But its risk is relatively small, so we ignore that for now.
398          */
399         if (!PQsendQuery(streamConn, query))
400                 return NULL;
401
402         for (;;)
403         {
404                 /*
405                  * Receive data until PQgetResult is ready to get the result without
406                  * blocking.
407                  */
408                 while (PQisBusy(streamConn))
409                 {
410                         /*
411                          * We don't need to break down the sleep into smaller increments,
412                          * and check for interrupts after each nap, since we can just
413                          * elog(FATAL) within SIGTERM signal handler if the signal arrives
414                          * in the middle of establishment of replication connection.
415                          */
416                         if (!libpq_select(-1))
417                                 continue;               /* interrupted */
418                         if (PQconsumeInput(streamConn) == 0)
419                                 return NULL;    /* trouble */
420                 }
421
422                 /*
423                  * Emulate the PQexec()'s behavior of returning the last result when
424                  * there are many. Since walsender will never generate multiple
425                  * results, we skip the concatenation of error messages.
426                  */
427                 result = PQgetResult(streamConn);
428                 if (result == NULL)
429                         break;                          /* query is complete */
430
431                 PQclear(lastResult);
432                 lastResult = result;
433
434                 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
435                         PQresultStatus(lastResult) == PGRES_COPY_OUT ||
436                         PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
437                         PQstatus(streamConn) == CONNECTION_BAD)
438                         break;
439         }
440
441         return lastResult;
442 }
443
444 /*
445  * Disconnect connection to primary, if any.
446  */
447 static void
448 libpqrcv_disconnect(void)
449 {
450         PQfinish(streamConn);
451         streamConn = NULL;
452 }
453
454 /*
455  * Receive a message available from XLOG stream, blocking for
456  * maximum of 'timeout' ms.
457  *
458  * Returns:
459  *
460  *       If data was received, returns the length of the data. *buffer is set to
461  *       point to a buffer holding the received message. The buffer is only valid
462  *       until the next libpqrcv_* call.
463  *
464  *       0 if no data was available within timeout, or wait was interrupted
465  *       by signal.
466  *
467  *       -1 if the server ended the COPY.
468  *
469  * ereports on error.
470  */
471 static int
472 libpqrcv_receive(int timeout, char **buffer)
473 {
474         int                     rawlen;
475
476         if (recvBuf != NULL)
477                 PQfreemem(recvBuf);
478         recvBuf = NULL;
479
480         /* Try to receive a CopyData message */
481         rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
482         if (rawlen == 0)
483         {
484                 /*
485                  * No data available yet. If the caller requested to block, wait for
486                  * more data to arrive.
487                  */
488                 if (timeout > 0)
489                 {
490                         if (!libpq_select(timeout))
491                                 return 0;
492                 }
493
494                 if (PQconsumeInput(streamConn) == 0)
495                         ereport(ERROR,
496                                         (errmsg("could not receive data from WAL stream: %s",
497                                                         PQerrorMessage(streamConn))));
498
499                 /* Now that we've consumed some input, try again */
500                 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
501                 if (rawlen == 0)
502                         return 0;
503         }
504         if (rawlen == -1)                       /* end-of-streaming or error */
505         {
506                 PGresult   *res;
507
508                 res = PQgetResult(streamConn);
509                 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
510                         PQresultStatus(res) == PGRES_COPY_IN)
511                 {
512                         PQclear(res);
513                         return -1;
514                 }
515                 else
516                 {
517                         PQclear(res);
518                         ereport(ERROR,
519                                         (errmsg("could not receive data from WAL stream: %s",
520                                                         PQerrorMessage(streamConn))));
521                 }
522         }
523         if (rawlen < -1)
524                 ereport(ERROR,
525                                 (errmsg("could not receive data from WAL stream: %s",
526                                                 PQerrorMessage(streamConn))));
527
528         /* Return received messages to caller */
529         *buffer = recvBuf;
530         return rawlen;
531 }
532
533 /*
534  * Send a message to XLOG stream.
535  *
536  * ereports on error.
537  */
538 static void
539 libpqrcv_send(const char *buffer, int nbytes)
540 {
541         if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
542                 PQflush(streamConn))
543                 ereport(ERROR,
544                                 (errmsg("could not send data to WAL stream: %s",
545                                                 PQerrorMessage(streamConn))));
546 }