1 /*-------------------------------------------------------------------------
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
9 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
13 * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
15 *-------------------------------------------------------------------------
23 #include "pqexpbuffer.h"
24 #include "access/xlog.h"
25 #include "miscadmin.h"
27 #include "replication/walreceiver.h"
28 #include "storage/proc.h"
29 #include "utils/builtins.h"
35 struct WalReceiverConn
37 /* Current connection to the primary, if any */
39 /* Used to remember if the connection is logical or physical */
41 /* Buffer for currently read records */
45 /* Prototypes for interface functions */
46 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
47 bool logical, const char *appname);
48 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
49 static char *libpqrcv_identify_system(WalReceiverConn *conn,
50 TimeLineID *primary_tli);
51 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
52 TimeLineID tli, char **filename,
53 char **content, int *len);
54 static bool libpqrcv_startstreaming(WalReceiverConn *conn,
55 TimeLineID tli, XLogRecPtr startpoint,
56 const char *slotname);
57 static void libpqrcv_endstreaming(WalReceiverConn *conn,
58 TimeLineID *next_tli);
59 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
61 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
63 static void libpqrcv_disconnect(WalReceiverConn *conn);
65 static WalReceiverFunctionsType PQWalReceiverFunctions = {
67 libpqrcv_get_conninfo,
68 libpqrcv_identify_system,
69 libpqrcv_readtimelinehistoryfile,
70 libpqrcv_startstreaming,
71 libpqrcv_endstreaming,
77 /* Prototypes for private functions */
78 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
81 * Module initialization function
86 if (WalReceiverFunctions != NULL)
87 elog(ERROR, "libpqwalreceiver already loaded");
88 WalReceiverFunctions = &PQWalReceiverFunctions;
92 * Establish the connection to the primary server for XLOG streaming
94 static WalReceiverConn *
95 libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
97 WalReceiverConn *conn;
103 * We use the expand_dbname parameter to process the connection string (or
104 * URI), and pass some extra options. The deliberately undocumented
105 * parameter "replication=true" makes it a replication connection. The
106 * database name is ignored by the server in replication mode, but specify
107 * "replication" for .pgpass lookup.
111 keys[++i] = "replication";
112 vals[i] = logical ? "database" : "true";
115 keys[++i] = "dbname";
116 vals[i] = "replication";
118 keys[++i] = "fallback_application_name";
123 conn = palloc0(sizeof(WalReceiverConn));
124 conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
125 if (PQstatus(conn->streamConn) != CONNECTION_OK)
127 (errmsg("could not connect to the primary server: %s",
128 PQerrorMessage(conn->streamConn))));
129 conn->logical = logical;
135 * Return a user-displayable conninfo string. Any security-sensitive fields
139 libpqrcv_get_conninfo(WalReceiverConn *conn)
141 PQconninfoOption *conn_opts;
142 PQconninfoOption *conn_opt;
146 Assert(conn->streamConn != NULL);
148 initPQExpBuffer(&buf);
149 conn_opts = PQconninfo(conn->streamConn);
151 if (conn_opts == NULL)
153 (errmsg("could not parse connection string: %s",
154 _("out of memory"))));
156 /* build a clean connection string from pieces */
157 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
161 /* Skip debug and empty options */
162 if (strchr(conn_opt->dispchar, 'D') ||
163 conn_opt->val == NULL ||
164 conn_opt->val[0] == '\0')
167 /* Obfuscate security-sensitive options */
168 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
170 appendPQExpBuffer(&buf, "%s%s=%s",
171 buf.len == 0 ? "" : " ",
173 obfuscate ? "********" : conn_opt->val);
176 PQconninfoFree(conn_opts);
178 retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
179 termPQExpBuffer(&buf);
184 * Check that primary's system identifier matches ours, and fetch the current
185 * timeline ID of the primary.
188 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
194 * Get the system identifier and timeline ID as a DataRow message from the
197 res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
198 if (PQresultStatus(res) != PGRES_TUPLES_OK)
202 (errmsg("could not receive database system identifier and timeline ID from "
203 "the primary server: %s",
204 PQerrorMessage(conn->streamConn))));
206 if (PQnfields(res) < 3 || PQntuples(res) != 1)
208 int ntuples = PQntuples(res);
209 int nfields = PQnfields(res);
213 (errmsg("invalid response from primary server"),
214 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
215 ntuples, nfields, 3, 1)));
217 primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
218 *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
221 return primary_sysid;
225 * Start streaming WAL data from given startpoint and timeline.
227 * Returns true if we switched successfully to copy-both mode. False
228 * means the server received the command and executed it successfully, but
229 * didn't switch to copy-mode. That means that there was no WAL on the
230 * requested timeline and starting point, because the server switched to
231 * another timeline at or before the requested starting point. On failure,
235 libpqrcv_startstreaming(WalReceiverConn *conn,
236 TimeLineID tli, XLogRecPtr startpoint,
237 const char *slotname)
242 Assert(!conn->logical);
244 initStringInfo(&cmd);
246 /* Start streaming from the point requested by startup process */
247 if (slotname != NULL)
248 appendStringInfo(&cmd,
249 "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
251 (uint32) (startpoint >> 32), (uint32) startpoint,
254 appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
255 (uint32) (startpoint >> 32), (uint32) startpoint,
257 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
260 if (PQresultStatus(res) == PGRES_COMMAND_OK)
265 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
269 (errmsg("could not start WAL streaming: %s",
270 PQerrorMessage(conn->streamConn))));
277 * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
278 * reported by the server, or 0 if it did not report it.
281 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
285 if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
286 PQflush(conn->streamConn))
288 (errmsg("could not send end-of-streaming message to primary: %s",
289 PQerrorMessage(conn->streamConn))));
294 * After COPY is finished, we should receive a result set indicating the
295 * next timeline's ID, or just CommandComplete if the server was shut
298 * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
299 * would also be possible. However, at the moment this function is only
300 * called after receiving CopyDone from the backend - the walreceiver
301 * never terminates replication on its own initiative.
303 res = PQgetResult(conn->streamConn);
304 if (PQresultStatus(res) == PGRES_TUPLES_OK)
307 * Read the next timeline's ID. The server also sends the timeline's
308 * starting point, but it is ignored.
310 if (PQnfields(res) < 2 || PQntuples(res) != 1)
312 (errmsg("unexpected result set after end-of-streaming")));
313 *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
316 /* the result set should be followed by CommandComplete */
317 res = PQgetResult(conn->streamConn);
319 else if (PQresultStatus(res) == PGRES_COPY_OUT)
324 PQendcopy(conn->streamConn);
326 /* CommandComplete should follow */
327 res = PQgetResult(conn->streamConn);
330 if (PQresultStatus(res) != PGRES_COMMAND_OK)
332 (errmsg("error reading result of streaming command: %s",
333 PQerrorMessage(conn->streamConn))));
336 /* Verify that there are no more results */
337 res = PQgetResult(conn->streamConn);
340 (errmsg("unexpected result after CommandComplete: %s",
341 PQerrorMessage(conn->streamConn))));
345 * Fetch the timeline history file for 'tli' from primary.
348 libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
349 TimeLineID tli, char **filename,
350 char **content, int *len)
355 Assert(!conn->logical);
358 * Request the primary to send over the history file for given timeline.
360 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
361 res = libpqrcv_PQexec(conn->streamConn, cmd);
362 if (PQresultStatus(res) != PGRES_TUPLES_OK)
366 (errmsg("could not receive timeline history file from "
367 "the primary server: %s",
368 PQerrorMessage(conn->streamConn))));
370 if (PQnfields(res) != 2 || PQntuples(res) != 1)
372 int ntuples = PQntuples(res);
373 int nfields = PQnfields(res);
377 (errmsg("invalid response from primary server"),
378 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
381 *filename = pstrdup(PQgetvalue(res, 0, 0));
383 *len = PQgetlength(res, 0, 1);
384 *content = palloc(*len);
385 memcpy(*content, PQgetvalue(res, 0, 1), *len);
390 * Send a query and wait for the results by using the asynchronous libpq
391 * functions and socket readiness events.
393 * We must not use the regular blocking libpq functions like PQexec()
394 * since they are uninterruptible by signals on some platforms, such as
397 * The function is modeled on PQexec() in libpq, but only implements
398 * those parts that are in use in the walreceiver.
400 * Queries are always executed on the connection in streamConn.
403 libpqrcv_PQexec(PGconn *streamConn, const char *query)
405 PGresult *result = NULL;
406 PGresult *lastResult = NULL;
409 * PQexec() silently discards any prior query results on the connection.
410 * This is not required for walreceiver since it's expected that walsender
411 * won't generate any such junk results.
415 * Submit a query. Since we don't use non-blocking mode, this also can
416 * block. But its risk is relatively small, so we ignore that for now.
418 if (!PQsendQuery(streamConn, query))
424 * Receive data until PQgetResult is ready to get the result without
427 while (PQisBusy(streamConn))
432 * We don't need to break down the sleep into smaller increments,
433 * since we'll get interrupted by signals and can either handle
434 * interrupts here or elog(FATAL) within SIGTERM signal handler if
435 * the signal arrives in the middle of establishment of
436 * replication connection.
438 ResetLatch(&MyProc->procLatch);
439 rc = WaitLatchOrSocket(&MyProc->procLatch,
440 WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
442 PQsocket(streamConn),
444 WAIT_EVENT_LIBPQWALRECEIVER_READ);
445 if (rc & WL_POSTMASTER_DEATH)
449 if (rc & WL_LATCH_SET)
451 CHECK_FOR_INTERRUPTS();
454 if (PQconsumeInput(streamConn) == 0)
455 return NULL; /* trouble */
459 * Emulate the PQexec()'s behavior of returning the last result when
460 * there are many. Since walsender will never generate multiple
461 * results, we skip the concatenation of error messages.
463 result = PQgetResult(streamConn);
465 break; /* query is complete */
470 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
471 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
472 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
473 PQstatus(streamConn) == CONNECTION_BAD)
481 * Disconnect connection to primary, if any.
484 libpqrcv_disconnect(WalReceiverConn *conn)
486 PQfinish(conn->streamConn);
487 if (conn->recvBuf != NULL)
488 PQfreemem(conn->recvBuf);
493 * Receive a message available from XLOG stream.
497 * If data was received, returns the length of the data. *buffer is set to
498 * point to a buffer holding the received message. The buffer is only valid
499 * until the next libpqrcv_* call.
501 * If no data was available immediately, returns 0, and *wait_fd is set to a
502 * socket descriptor which can be waited on before trying again.
504 * -1 if the server ended the COPY.
509 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
514 if (conn->recvBuf != NULL)
515 PQfreemem(conn->recvBuf);
516 conn->recvBuf = NULL;
518 /* Try to receive a CopyData message */
519 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
522 /* Try consuming some data. */
523 if (PQconsumeInput(conn->streamConn) == 0)
525 (errmsg("could not receive data from WAL stream: %s",
526 PQerrorMessage(conn->streamConn))));
528 /* Now that we've consumed some input, try again */
529 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
532 /* Tell caller to try again when our socket is ready. */
533 *wait_fd = PQsocket(conn->streamConn);
537 if (rawlen == -1) /* end-of-streaming or error */
541 res = PQgetResult(conn->streamConn);
542 if (PQresultStatus(res) == PGRES_COMMAND_OK ||
543 PQresultStatus(res) == PGRES_COPY_IN)
552 (errmsg("could not receive data from WAL stream: %s",
553 PQerrorMessage(conn->streamConn))));
558 (errmsg("could not receive data from WAL stream: %s",
559 PQerrorMessage(conn->streamConn))));
561 /* Return received messages to caller */
562 *buffer = conn->recvBuf;
567 * Send a message to XLOG stream.
572 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
574 if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
575 PQflush(conn->streamConn))
577 (errmsg("could not send data to WAL stream: %s",
578 PQerrorMessage(conn->streamConn))));