*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
*
*-------------------------------------------------------------------------
*/
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
+static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
- res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+ res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
- res = PQexec(streamConn, cmd);
+ res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
+ }
PQclear(res);
justconnected = true;
return true;
}
+/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+static PGresult *
+libpqrcv_PQexec(const char *query)
+{
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+
+ /*
+ * PQexec() silently discards any prior query results on the
+ * connection. This is not required for walreceiver since it's
+ * expected that walsender won't generate any such junk results.
+ */
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But its risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(streamConn, query))
+ return NULL;
+
+ for (;;)
+ {
+ /*
+ * Receive data until PQgetResult is ready to get the result
+ * without blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * and check for interrupts after each nap, since we can just
+ * elog(FATAL) within SIGTERM signal handler if the signal
+ * arrives in the middle of establishment of replication connection.
+ */
+ if (!libpq_select(-1))
+ continue; /* interrupted */
+ if (PQconsumeInput(streamConn) == 0)
+ return NULL; /* trouble */
+ }
+
+ /*
+ * Emulate the PQexec()'s behavior of returning the last result
+ * when there are many.
+ * Since walsender will never generate multiple results, we skip
+ * the concatenation of error messages.
+ */
+ result = PQgetResult(streamConn);
+ if (result == NULL)
+ break; /* query is complete */
+
+ PQclear(lastResult);
+ lastResult = result;
+
+ if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+ PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ PQstatus(streamConn) == CONNECTION_BAD)
+ break;
+ }
+
+ return lastResult;
+}
+
/*
* Disconnect connection to primary, if any.
*/
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
*
*-------------------------------------------------------------------------
*/
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking libpq
- * operations like PQexec(), which can take a long time to finish.
+ * check it in the main loop, because we perform some blocking operations
+ * like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just