]> granicus.if.org Git - postgresql/commitdiff
Add wrapper function libpqrcv_PQexec() in the walreceiver that uses async
authorMagnus Hagander <magnus@hagander.net>
Mon, 19 Apr 2010 14:10:45 +0000 (14:10 +0000)
committerMagnus Hagander <magnus@hagander.net>
Mon, 19 Apr 2010 14:10:45 +0000 (14:10 +0000)
libpq to send queries, making the waiting for responses interruptible on
platforms where PQexec() can't normally be interrupted by signals, such
as win32.

Fujii Masao and Magnus Hagander

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c

index 9e318e669d099bbb375c52580b910fb23c63cb39..d41858e49a201de8975d04514a75cd40189f98ff 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
 static bool libpq_select(int timeout_ms);
+static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
  * Module load callback
@@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
         * Get the system identifier and timeline ID as a DataRow message from the
         * primary server.
         */
-       res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+       res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
                PQclear(res);
@@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
        /* Start streaming from the point requested by startup process */
        snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
                         startpoint.xlogid, startpoint.xrecoff);
-       res = PQexec(streamConn, cmd);
+       res = libpqrcv_PQexec(cmd);
        if (PQresultStatus(res) != PGRES_COPY_OUT)
+       {
+               PQclear(res);
                ereport(ERROR,
                                (errmsg("could not start WAL streaming: %s",
                                                PQerrorMessage(streamConn))));
+       }
        PQclear(res);
 
        justconnected = true;
@@ -224,6 +228,84 @@ libpq_select(int timeout_ms)
        return true;
 }
 
+/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+static PGresult *
+libpqrcv_PQexec(const char *query)
+{
+       PGresult   *result              = NULL;
+       PGresult   *lastResult  = NULL;
+
+       /*
+        * PQexec() silently discards any prior query results on the
+        * connection. This is not required for walreceiver since it's
+        * expected that walsender won't generate any such junk results.
+        */
+
+       /*
+        * Submit a query. Since we don't use non-blocking mode, this also
+        * can block. But its risk is relatively small, so we ignore that
+        * for now.
+        */
+       if (!PQsendQuery(streamConn, query))
+               return NULL;
+
+       for (;;)
+       {
+               /*
+                * Receive data until PQgetResult is ready to get the result
+                * without blocking.
+                */
+               while (PQisBusy(streamConn))
+               {
+                       /*
+                        * We don't need to break down the sleep into smaller increments,
+                        * and check for interrupts after each nap, since we can just
+                        * elog(FATAL) within SIGTERM signal handler if the signal
+                        * arrives in the middle of establishment of replication connection.
+                        */
+                       if (!libpq_select(-1))
+                               continue;               /* interrupted */
+                       if (PQconsumeInput(streamConn) == 0)
+                               return NULL;    /* trouble */
+               }
+
+               /*
+                * Emulate the PQexec()'s behavior of returning the last result
+                * when there are many.
+                * Since walsender will never generate multiple results, we skip
+                * the concatenation of error messages.
+                */
+               result = PQgetResult(streamConn);
+               if (result == NULL)
+                       break;                          /* query is complete */
+
+               PQclear(lastResult);
+               lastResult = result;
+
+               if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+                       PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                       PQstatus(streamConn) == CONNECTION_BAD)
+                       break;
+       }
+
+       return lastResult;
+}
+
 /*
  * Disconnect connection to primary, if any.
  */
index 090111bb1126d6ad48f582d91e7b263885163192..f2694db8733b39af6da53744100b40844da3ca80 100644 (file)
@@ -29,7 +29,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $
+ *       $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void);
  * We can't just exit(1) within SIGTERM signal handler, because the signal
  * might arrive in the middle of some critical operation, like while we're
  * holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking libpq
- * operations like PQexec(), which can take a long time to finish.
+ * check it in the main loop, because we perform some blocking operations
+ * like libpqrcv_PQexec(), which can take a long time to finish.
  *
  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just