1 /*-------------------------------------------------------------------------
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
9 * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
13 * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
15 *-------------------------------------------------------------------------
23 #include "access/xlog.h"
24 #include "miscadmin.h"
25 #include "replication/walreceiver.h"
26 #include "utils/builtins.h"
31 #ifdef HAVE_SYS_POLL_H
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
42 /* Current connection to the primary, if any */
43 static PGconn *streamConn = NULL;
45 /* Buffer for currently read records */
46 static char *recvBuf = NULL;
48 /* Prototypes for interface functions */
49 static void libpqrcv_connect(char *conninfo);
50 static void libpqrcv_identify_system(TimeLineID *primary_tli);
51 static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
52 static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
54 static void libpqrcv_endstreaming(TimeLineID *next_tli);
55 static int libpqrcv_receive(int timeout, char **buffer);
56 static void libpqrcv_send(const char *buffer, int nbytes);
57 static void libpqrcv_disconnect(void);
59 /* Prototypes for private functions */
60 static bool libpq_select(int timeout_ms);
61 static PGresult *libpqrcv_PQexec(const char *query);
64 * Module load callback
69 /* Tell walreceiver how to reach us */
70 if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
71 walrcv_readtimelinehistoryfile != NULL ||
72 walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
73 walrcv_receive != NULL || walrcv_send != NULL ||
74 walrcv_disconnect != NULL)
75 elog(ERROR, "libpqwalreceiver already loaded");
76 walrcv_connect = libpqrcv_connect;
77 walrcv_identify_system = libpqrcv_identify_system;
78 walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
79 walrcv_startstreaming = libpqrcv_startstreaming;
80 walrcv_endstreaming = libpqrcv_endstreaming;
81 walrcv_receive = libpqrcv_receive;
82 walrcv_send = libpqrcv_send;
83 walrcv_disconnect = libpqrcv_disconnect;
87 * Establish the connection to the primary server for XLOG streaming
90 libpqrcv_connect(char *conninfo)
96 * We use the expand_dbname parameter to process the connection string
97 * (or URI), and pass some extra options. The deliberately undocumented
98 * parameter "replication=true" makes it a replication connection.
99 * The database name is ignored by the server in replication mode, but
100 * specify "replication" for .pgpass lookup.
104 keys[1] = "replication";
107 vals[2] = "replication";
108 keys[3] = "fallback_application_name";
109 vals[3] = "walreceiver";
113 streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
114 if (PQstatus(streamConn) != CONNECTION_OK)
116 (errmsg("could not connect to the primary server: %s",
117 PQerrorMessage(streamConn))));
121 * Check that primary's system identifier matches ours, and fetch the current
122 * timeline ID of the primary.
125 libpqrcv_identify_system(TimeLineID *primary_tli)
129 char standby_sysid[32];
132 * Get the system identifier and timeline ID as a DataRow message from the
135 res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
136 if (PQresultStatus(res) != PGRES_TUPLES_OK)
140 (errmsg("could not receive database system identifier and timeline ID from "
141 "the primary server: %s",
142 PQerrorMessage(streamConn))));
144 if (PQnfields(res) < 3 || PQntuples(res) != 1)
146 int ntuples = PQntuples(res);
147 int nfields = PQnfields(res);
151 (errmsg("invalid response from primary server"),
152 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
153 ntuples, nfields, 3, 1)));
155 primary_sysid = PQgetvalue(res, 0, 0);
156 *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
159 * Confirm that the system identifier of the primary is the same as ours.
161 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
162 GetSystemIdentifier());
163 if (strcmp(primary_sysid, standby_sysid) != 0)
165 primary_sysid = pstrdup(primary_sysid);
168 (errmsg("database system identifier differs between the primary and standby"),
169 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
170 primary_sysid, standby_sysid)));
176 * Start streaming WAL data from given startpoint and timeline.
178 * Returns true if we switched successfully to copy-both mode. False
179 * means the server received the command and executed it successfully, but
180 * didn't switch to copy-mode. That means that there was no WAL on the
181 * requested timeline and starting point, because the server switched to
182 * another timeline at or before the requested starting point. On failure,
186 libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
191 /* Start streaming from the point requested by startup process */
192 if (slotname != NULL)
193 snprintf(cmd, sizeof(cmd),
194 "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
195 (uint32) (startpoint >> 32), (uint32) startpoint, tli);
197 snprintf(cmd, sizeof(cmd),
198 "START_REPLICATION %X/%X TIMELINE %u",
199 (uint32) (startpoint >> 32), (uint32) startpoint, tli);
200 res = libpqrcv_PQexec(cmd);
202 if (PQresultStatus(res) == PGRES_COMMAND_OK)
207 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
211 (errmsg("could not start WAL streaming: %s",
212 PQerrorMessage(streamConn))));
219 * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
220 * reported by the server, or 0 if it did not report it.
223 libpqrcv_endstreaming(TimeLineID *next_tli)
227 if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
229 (errmsg("could not send end-of-streaming message to primary: %s",
230 PQerrorMessage(streamConn))));
233 * After COPY is finished, we should receive a result set indicating the
234 * next timeline's ID, or just CommandComplete if the server was shut
237 * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
238 * would also be possible. However, at the moment this function is only
239 * called after receiving CopyDone from the backend - the walreceiver
240 * never terminates replication on its own initiative.
242 res = PQgetResult(streamConn);
243 if (PQresultStatus(res) == PGRES_TUPLES_OK)
246 * Read the next timeline's ID. The server also sends the timeline's
247 * starting point, but it is ignored.
249 if (PQnfields(res) < 2 || PQntuples(res) != 1)
251 (errmsg("unexpected result set after end-of-streaming")));
252 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
255 /* the result set should be followed by CommandComplete */
256 res = PQgetResult(streamConn);
261 if (PQresultStatus(res) != PGRES_COMMAND_OK)
263 (errmsg("error reading result of streaming command: %s",
264 PQerrorMessage(streamConn))));
266 /* Verify that there are no more results */
267 res = PQgetResult(streamConn);
270 (errmsg("unexpected result after CommandComplete: %s",
271 PQerrorMessage(streamConn))));
275 * Fetch the timeline history file for 'tli' from primary.
278 libpqrcv_readtimelinehistoryfile(TimeLineID tli,
279 char **filename, char **content, int *len)
285 * Request the primary to send over the history file for given timeline.
287 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
288 res = libpqrcv_PQexec(cmd);
289 if (PQresultStatus(res) != PGRES_TUPLES_OK)
293 (errmsg("could not receive timeline history file from "
294 "the primary server: %s",
295 PQerrorMessage(streamConn))));
297 if (PQnfields(res) != 2 || PQntuples(res) != 1)
299 int ntuples = PQntuples(res);
300 int nfields = PQnfields(res);
304 (errmsg("invalid response from primary server"),
305 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
308 *filename = pstrdup(PQgetvalue(res, 0, 0));
310 *len = PQgetlength(res, 0, 1);
311 *content = palloc(*len);
312 memcpy(*content, PQgetvalue(res, 0, 1), *len);
317 * Wait until we can read WAL stream, or timeout.
319 * Returns true if data has become available for reading, false if timed out
320 * or interrupted by signal.
322 * This is based on pqSocketCheck.
325 libpq_select(int timeout_ms)
329 Assert(streamConn != NULL);
330 if (PQsocket(streamConn) < 0)
332 (errcode_for_socket_access(),
333 errmsg("socket not open")));
335 /* We use poll(2) if available, otherwise select(2) */
338 struct pollfd input_fd;
340 input_fd.fd = PQsocket(streamConn);
341 input_fd.events = POLLIN | POLLERR;
342 input_fd.revents = 0;
344 ret = poll(&input_fd, 1, timeout_ms);
345 #else /* !HAVE_POLL */
348 struct timeval timeout;
349 struct timeval *ptr_timeout;
351 FD_ZERO(&input_mask);
352 FD_SET(PQsocket(streamConn), &input_mask);
358 timeout.tv_sec = timeout_ms / 1000;
359 timeout.tv_usec = (timeout_ms % 1000) * 1000;
360 ptr_timeout = &timeout;
363 ret = select(PQsocket(streamConn) + 1, &input_mask,
364 NULL, NULL, ptr_timeout);
365 #endif /* HAVE_POLL */
368 if (ret == 0 || (ret < 0 && errno == EINTR))
372 (errcode_for_socket_access(),
373 errmsg("select() failed: %m")));
378 * Send a query and wait for the results by using the asynchronous libpq
379 * functions and the backend version of select().
381 * We must not use the regular blocking libpq functions like PQexec()
382 * since they are uninterruptible by signals on some platforms, such as
385 * We must also not use vanilla select() here since it cannot handle the
386 * signal emulation layer on Windows.
388 * The function is modeled on PQexec() in libpq, but only implements
389 * those parts that are in use in the walreceiver.
391 * Queries are always executed on the connection in streamConn.
394 libpqrcv_PQexec(const char *query)
396 PGresult *result = NULL;
397 PGresult *lastResult = NULL;
400 * PQexec() silently discards any prior query results on the connection.
401 * This is not required for walreceiver since it's expected that walsender
402 * won't generate any such junk results.
406 * Submit a query. Since we don't use non-blocking mode, this also can
407 * block. But its risk is relatively small, so we ignore that for now.
409 if (!PQsendQuery(streamConn, query))
415 * Receive data until PQgetResult is ready to get the result without
418 while (PQisBusy(streamConn))
421 * We don't need to break down the sleep into smaller increments,
422 * and check for interrupts after each nap, since we can just
423 * elog(FATAL) within SIGTERM signal handler if the signal arrives
424 * in the middle of establishment of replication connection.
426 if (!libpq_select(-1))
427 continue; /* interrupted */
428 if (PQconsumeInput(streamConn) == 0)
429 return NULL; /* trouble */
433 * Emulate the PQexec()'s behavior of returning the last result when
434 * there are many. Since walsender will never generate multiple
435 * results, we skip the concatenation of error messages.
437 result = PQgetResult(streamConn);
439 break; /* query is complete */
444 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
445 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
446 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
447 PQstatus(streamConn) == CONNECTION_BAD)
455 * Disconnect connection to primary, if any.
458 libpqrcv_disconnect(void)
460 PQfinish(streamConn);
465 * Receive a message available from XLOG stream, blocking for
466 * maximum of 'timeout' ms.
470 * If data was received, returns the length of the data. *buffer is set to
471 * point to a buffer holding the received message. The buffer is only valid
472 * until the next libpqrcv_* call.
474 * 0 if no data was available within timeout, or wait was interrupted
477 * -1 if the server ended the COPY.
482 libpqrcv_receive(int timeout, char **buffer)
490 /* Try to receive a CopyData message */
491 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
495 * No data available yet. If the caller requested to block, wait for
496 * more data to arrive.
500 if (!libpq_select(timeout))
504 if (PQconsumeInput(streamConn) == 0)
506 (errmsg("could not receive data from WAL stream: %s",
507 PQerrorMessage(streamConn))));
509 /* Now that we've consumed some input, try again */
510 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
514 if (rawlen == -1) /* end-of-streaming or error */
518 res = PQgetResult(streamConn);
519 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
520 PQresultStatus(res) == PGRES_COPY_IN)
529 (errmsg("could not receive data from WAL stream: %s",
530 PQerrorMessage(streamConn))));
535 (errmsg("could not receive data from WAL stream: %s",
536 PQerrorMessage(streamConn))));
538 /* Return received messages to caller */
544 * Send a message to XLOG stream.
549 libpqrcv_send(const char *buffer, int nbytes)
551 if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
554 (errmsg("could not send data to WAL stream: %s",
555 PQerrorMessage(streamConn))));