]> granicus.if.org Git - postgresql/commitdiff
Use asynchronous connect API in libpqwalreceiver
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 3 Mar 2017 14:07:22 +0000 (09:07 -0500)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 3 Mar 2017 14:13:58 +0000 (09:13 -0500)
This makes the connection attempt from CREATE SUBSCRIPTION and from
WalReceiver interruptable by the user in case the libpq connection is
hanging.  The previous coding required immediate shutdown (SIGQUIT) of
PostgreSQL in that situation.

From: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Tested-by: Thom Brown <thom@linux.com>
src/backend/postmaster/pgstat.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/include/pgstat.h

index ada374c0c4402da9e0cccaaf4baf2c7ccf8dd063..2fb9a8bf580639d50ca1ea224cae434d9678bfd1 100644 (file)
@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
-               case WAIT_EVENT_LIBPQWALRECEIVER_READ:
-                       event_name = "LibPQWalReceiverRead";
+               case WAIT_EVENT_LIBPQWALRECEIVER:
+                       event_name = "LibPQWalReceiver";
                        break;
                case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
                        event_name = "WalSenderWaitForWAL";
index daae3f70e73baa6ddad65c8fcd9698b87d4292e5..048d2aaa76b8cca5bfd02d2487f5827c2879301a 100644 (file)
@@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
                                 char **err)
 {
        WalReceiverConn *conn;
+       PostgresPollingStatusType status;
        const char *keys[5];
        const char *vals[5];
        int                     i = 0;
@@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
        Assert(i < sizeof(keys));
 
        conn = palloc0(sizeof(WalReceiverConn));
-       conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+       conn->streamConn = PQconnectStartParams(keys, vals,
+                                                                                       /* expand_dbname = */ true);
+       if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+       {
+               *err = pchomp(PQerrorMessage(conn->streamConn));
+               return NULL;
+       }
+
+       /* Poll connection. */
+       do
+       {
+               /* Determine current state of the connection. */
+               status = PQconnectPoll(conn->streamConn);
+
+               /* Sleep a bit if waiting for socket. */
+               if (status == PGRES_POLLING_READING ||
+                       status == PGRES_POLLING_WRITING)
+               {
+                       int             extra_flag;
+                       int             rc;
+
+                       extra_flag = (status == PGRES_POLLING_READING
+                                                 ? WL_SOCKET_READABLE
+                                                 : WL_SOCKET_WRITEABLE);
+
+                       ResetLatch(&MyProc->procLatch);
+                       rc = WaitLatchOrSocket(&MyProc->procLatch,
+                                                                  WL_POSTMASTER_DEATH |
+                                                                  WL_LATCH_SET | extra_flag,
+                                                                  PQsocket(conn->streamConn),
+                                                                  0,
+                                                                  WAIT_EVENT_LIBPQWALRECEIVER);
+
+                       /* Emergency bailout. */
+                       if (rc & WL_POSTMASTER_DEATH)
+                               exit(1);
+
+                       /* Interrupted. */
+                       if (rc & WL_LATCH_SET)
+                               CHECK_FOR_INTERRUPTS();
+               }
+
+               /* Otherwise loop until we have OK or FAILED status. */
+       } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+
        if (PQstatus(conn->streamConn) != CONNECTION_OK)
        {
                *err = pchomp(PQerrorMessage(conn->streamConn));
@@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
                                                                   WL_LATCH_SET,
                                                                   PQsocket(streamConn),
                                                                   0,
-                                                                  WAIT_EVENT_LIBPQWALRECEIVER_READ);
+                                                                  WAIT_EVENT_LIBPQWALRECEIVER);
                        if (rc & WL_POSTMASTER_DEATH)
                                exit(1);
 
index 8b710ecb24e2011e2b9861fc7d73c7e2466467b3..0062fb8af24d0af80e4acd089f83b32013b81168 100644 (file)
@@ -764,7 +764,7 @@ typedef enum
        WAIT_EVENT_CLIENT_WRITE,
        WAIT_EVENT_SSL_OPEN_SERVER,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
-       WAIT_EVENT_LIBPQWALRECEIVER_READ,
+       WAIT_EVENT_LIBPQWALRECEIVER,
        WAIT_EVENT_WAL_SENDER_WAIT_WAL,
        WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;