/* Prototypes for private functions */
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
+static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
/*
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
+ ProcessWalRcvInterrupts();
}
/* If socket is ready, advance the libpq state machine */
{
PGresult *res;
+ /*
+ * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
+ * block, but the risk seems small.
+ */
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
* also possible in case we aborted the copy in mid-stream.
*/
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
PQclear(res);
/* the result set should be followed by CommandComplete */
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
}
else if (PQresultStatus(res) == PGRES_COPY_OUT)
{
pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
}
if (PQresultStatus(res) != PGRES_COMMAND_OK)
PQclear(res);
/* Verify that there are no more results */
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL)
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
* The function is modeled on PQexec() in libpq, but only implements
* those parts that are in use in the walreceiver api.
*
- * Queries are always executed on the connection in streamConn.
+ * May return NULL, rather than an error result, on failure.
*/
static PGresult *
libpqrcv_PQexec(PGconn *streamConn, const char *query)
{
- PGresult *result = NULL;
PGresult *lastResult = NULL;
/*
*/
/*
- * Submit a query. Since we don't use non-blocking mode, this also can
- * block. But its risk is relatively small, so we ignore that for now.
+ * Submit the query. Since we don't use non-blocking mode, this could
+ * theoretically block. In practice, since we don't send very long query
+ * strings, the risk seems negligible.
*/
if (!PQsendQuery(streamConn, query))
return NULL;
for (;;)
{
- /*
- * Receive data until PQgetResult is ready to get the result without
- * blocking.
- */
- while (PQisBusy(streamConn))
- {
- int rc;
-
- /*
- * We don't need to break down the sleep into smaller increments,
- * since we'll get interrupted by signals and can either handle
- * interrupts here or elog(FATAL) within SIGTERM signal handler if
- * the signal arrives in the middle of establishment of
- * replication connection.
- */
- rc = WaitLatchOrSocket(MyLatch,
- WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
- WL_LATCH_SET,
- PQsocket(streamConn),
- 0,
- WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
- /* Emergency bailout? */
- if (rc & WL_POSTMASTER_DEATH)
- exit(1);
-
- /* Interrupted? */
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
+ /* Wait for, and collect, the next PGresult. */
+ PGresult *result;
- /* Consume whatever data is available from the socket */
- if (PQconsumeInput(streamConn) == 0)
- {
- /* trouble; drop whatever we had and return NULL */
- PQclear(lastResult);
- return NULL;
- }
- }
+ result = libpqrcv_PQgetResult(streamConn);
+ if (result == NULL)
+ break; /* query is complete, or failure */
/*
* Emulate PQexec()'s behavior of returning the last result when there
* are many. We are fine with returning just last error message.
*/
- result = PQgetResult(streamConn);
- if (result == NULL)
- break; /* query is complete */
-
PQclear(lastResult);
lastResult = result;
return lastResult;
}
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static PGresult *
+libpqrcv_PQgetResult(PGconn *streamConn)
+{
+ /*
+ * Collect data until PQgetResult is ready to get the result without
+ * blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ int rc;
+
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * since we'll get interrupted by signals and can handle any
+ * interrupts here.
+ */
+ rc = WaitLatchOrSocket(MyLatch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_LATCH_SET,
+ PQsocket(streamConn),
+ 0,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+
+ /* Emergency bailout? */
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+
+ /* Interrupted? */
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessWalRcvInterrupts();
+ }
+
+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(streamConn) == 0)
+ {
+ /* trouble; return NULL */
+ return NULL;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(streamConn);
+}
+
/*
* Disconnect connection to primary, if any.
*/
{
PGresult *res;
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
/* Verify that there are no more results. */
- res = PQgetResult(conn->streamConn);
+ res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL)
{
PQclear(res);
{
char *cstrs[MaxTupleAttributeNumber];
- CHECK_FOR_INTERRUPTS();
+ ProcessWalRcvInterrupts();
/* Do the allocations in temporary context. */
oldcontext = MemoryContextSwitchTo(rowcontext);
static StringInfoData reply_message;
static StringInfoData incoming_message;
-/*
- * About SIGTERM handling:
- *
- * We can't just exit(1) within SIGTERM signal handler, because the signal
- * might arrive in the middle of some critical operation, like while we're
- * holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking operations
- * like libpqrcv_PQexec(), which can take a long time to finish.
- *
- * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
- * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
- * sets got_SIGTERM flag, which is checked in the main loop when convenient.
- *
- * This is very much like what regular backends do with ImmediateInterruptOK,
- * ProcessInterrupts() etc.
- */
-static volatile bool WalRcvImmediateInterruptOK = false;
-
/* Prototypes for private functions */
-static void ProcessWalRcvInterrupts(void);
-static void EnableWalRcvImmediateExit(void);
-static void DisableWalRcvImmediateExit(void);
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
-static void
+/*
+ * Process any interrupts the walreceiver process may have received.
+ * This should be called any time the process's latch has become set.
+ *
+ * Currently, only SIGTERM is of interest. We can't just exit(1) within the
+ * SIGTERM signal handler, because the signal might arrive in the middle of
+ * some critical operation, like while we're holding a spinlock. Instead, the
+ * signal handler sets a flag variable as well as setting the process's latch.
+ * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
+ * latch has become set. Operations that could block for a long time, such as
+ * reading from a remote server, must pay attention to the latch too; see
+ * libpqrcv_PQgetResult for example.
+ */
+void
ProcessWalRcvInterrupts(void)
{
/*
if (got_SIGTERM)
{
- WalRcvImmediateInterruptOK = false;
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating walreceiver process due to administrator command")));
}
}
-static void
-EnableWalRcvImmediateExit(void)
-{
- WalRcvImmediateInterruptOK = true;
- ProcessWalRcvInterrupts();
-}
-
-static void
-DisableWalRcvImmediateExit(void)
-{
- WalRcvImmediateInterruptOK = false;
- ProcessWalRcvInterrupts();
-}
/* Main entry point for walreceiver process */
void
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
- EnableWalRcvImmediateExit();
wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err)));
- DisableWalRcvImmediateExit();
/*
* Save user-visible connection string. This clobbers the original
* Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command.
*/
- EnableWalRcvImmediateExit();
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
&server_version);
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
- DisableWalRcvImmediateExit();
/*
* Confirm that the current timeline of the primary is the same or
if (rc & WL_LATCH_SET)
{
ResetLatch(walrcv->latch);
+ ProcessWalRcvInterrupts();
+
if (walrcv->force_reply)
{
/*
* The backend finished streaming. Exit streaming COPY-mode from
* our side, too.
*/
- EnableWalRcvImmediateExit();
walrcv_endstreaming(wrconn, &primaryTLI);
- DisableWalRcvImmediateExit();
/*
* If the server had switched to a new timeline that we didn't
(errmsg("fetching timeline history file for timeline %u from primary server",
tli)));
- EnableWalRcvImmediateExit();
walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
- DisableWalRcvImmediateExit();
/*
* Check that the filename on the master matches what we
errno = save_errno;
}
-/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
+/* SIGTERM: set flag for ProcessWalRcvInterrupts */
static void
WalRcvShutdownHandler(SIGNAL_ARGS)
{
if (WalRcv->latch)
SetLatch(WalRcv->latch);
- /* Don't joggle the elbow of proc_exit */
- if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
- ProcessWalRcvInterrupts();
-
errno = save_errno;
}