]> granicus.if.org Git - postgresql/blobdiff - src/backend/libpq/pqcomm.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / libpq / pqcomm.c
index d9dda21d93c3c1128d744d6486d914cf5ef2af21..d1cc38beb2b25d6e38417a30e0651db7673c2c34 100644 (file)
@@ -27,7 +27,7 @@
  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
  * All that remains is similarities of names to trap the unwary...
  *
- * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *     src/backend/libpq/pqcomm.c
 #ifdef HAVE_UTIME_H
 #include <utime.h>
 #endif
-#ifdef WIN32_ONLY_COMPILER             /* mstcpip.h is missing on mingw */
+#ifdef _MSC_VER                                        /* mstcpip.h is missing on mingw */
 #include <mstcpip.h>
 #endif
 
-#include "libpq/ip.h"
+#include "common/ip.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
 int                    Unix_socket_permissions;
 char      *Unix_socket_group;
 
-
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
 
-
 /*
  * Buffers for low-level I/O.
  *
@@ -129,21 +127,45 @@ static int        PqRecvLength;           /* End of data available in PqRecvBuffer */
 /*
  * Message status
  */
-static bool PqCommBusy;
-static bool DoingCopyOut;
+static bool PqCommBusy;                        /* busy sending data to the client */
+static bool PqCommReadingMsg;  /* in the middle of reading a message */
+static bool DoingCopyOut;              /* in old-protocol COPY OUT processing */
 
 
 /* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int     socket_flush(void);
+static int     socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int     socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
 static int     internal_putbytes(const char *s, size_t len);
 static int     internal_flush(void);
-static void pq_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int     Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
 static int     Setup_AF_UNIX(char *sock_path);
 #endif   /* HAVE_UNIX_SOCKETS */
 
+static PQcommMethods PqCommSocketMethods = {
+       socket_comm_reset,
+       socket_flush,
+       socket_flush_if_writable,
+       socket_is_send_pending,
+       socket_putmessage,
+       socket_putmessage_noblock,
+       socket_startcopyout,
+       socket_endcopyout
+};
+
+PQcommMethods *PqCommMethods = &PqCommSocketMethods;
+
+WaitEventSet *FeBeWaitSet;
+
 
 /* --------------------------------
  *             pq_init - initialize libpq at backend startup
@@ -152,24 +174,50 @@ static int        Setup_AF_UNIX(char *sock_path);
 void
 pq_init(void)
 {
+       /* initialize state variables */
        PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
        PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
        PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
        PqCommBusy = false;
+       PqCommReadingMsg = false;
        DoingCopyOut = false;
-       on_proc_exit(pq_close, 0);
+
+       /* set up process-exit hook to close the socket */
+       on_proc_exit(socket_close, 0);
+
+       /*
+        * In backends (as soon as forked) we operate the underlying socket in
+        * nonblocking mode and use latches to implement blocking semantics if
+        * needed. That allows us to provide safely interruptible reads and
+        * writes.
+        *
+        * Use COMMERROR on failure, because ERROR would try to send the error to
+        * the client, which might require changing the mode again, leading to
+        * infinite recursion.
+        */
+#ifndef WIN32
+       if (!pg_set_noblock(MyProcPort->sock))
+               ereport(COMMERROR,
+                               (errmsg("could not set socket to nonblocking mode: %m")));
+#endif
+
+       FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+       AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
+                                         NULL, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
 }
 
 /* --------------------------------
- *             pq_comm_reset - reset libpq during error recovery
+ *             socket_comm_reset - reset libpq during error recovery
  *
  * This is called from error recovery at the outer idle loop.  It's
  * just to get us out of trouble if we somehow manage to elog() from
  * inside a pqcomm.c routine (which ideally will never happen, but...)
  * --------------------------------
  */
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
 {
        /* Do not throw away pending data, but do reset the busy flag */
        PqCommBusy = false;
@@ -178,34 +226,47 @@ pq_comm_reset(void)
 }
 
 /* --------------------------------
- *             pq_close - shutdown libpq at backend exit
+ *             socket_close - shutdown libpq at backend exit
  *
- * Note: in a standalone backend MyProcPort will be null,
- * don't crash during exit...
+ * This is the one pg_on_exit_callback in place during BackendInitialize().
+ * That function's unusual signal handling constrains that this callback be
+ * safe to run at any instant.
  * --------------------------------
  */
 static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
 {
+       /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
        if (MyProcPort != NULL)
        {
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 #ifdef ENABLE_GSS
                OM_uint32       min_s;
 
-               /* Shutdown GSSAPI layer */
+               /*
+                * Shutdown GSSAPI layer.  This section does nothing when interrupting
+                * BackendInitialize(), because pg_GSS_recvauth() makes first use of
+                * "ctx" and "cred".
+                */
                if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
                        gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
 
                if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
                        gss_release_cred(&min_s, &MyProcPort->gss->cred);
 #endif   /* ENABLE_GSS */
-               /* GSS and SSPI share the port->gss struct */
 
+               /*
+                * GSS and SSPI share the port->gss struct.  Since nowhere else does a
+                * postmaster child free this, doing so is safe when interrupting
+                * BackendInitialize().
+                */
                free(MyProcPort->gss);
 #endif   /* ENABLE_GSS || ENABLE_SSPI */
 
-               /* Cleanly shut down SSL layer */
+               /*
+                * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
+                * call this, so this is safe when interrupting BackendInitialize().
+                */
                secure_close(MyProcPort);
 
                /*
@@ -232,28 +293,6 @@ pq_close(int code, Datum arg)
  */
 
 
-/* StreamDoUnlink()
- * Shutdown routine for backend connection
- * If any Unix sockets are used for communication, explicitly close them.
- */
-#ifdef HAVE_UNIX_SOCKETS
-static void
-StreamDoUnlink(int code, Datum arg)
-{
-       ListCell   *l;
-
-       /* Loop through all created sockets... */
-       foreach(l, sock_paths)
-       {
-               char       *sock_path = (char *) lfirst(l);
-
-               unlink(sock_path);
-       }
-       /* Since we're about to exit, no need to reclaim storage */
-       sock_paths = NIL;
-}
-#endif   /* HAVE_UNIX_SOCKETS */
-
 /*
  * StreamServerPort -- open a "listening" port to accept connections.
  *
@@ -280,6 +319,8 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
        char            portNumberStr[32];
        const char *familyDesc;
        char            familyDescBuf[64];
+       const char *addrDesc;
+       char            addrBuf[NI_MAXHOST];
        char       *service;
        struct addrinfo *addrs = NULL,
                           *addr;
@@ -368,7 +409,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        break;
                }
 
-               /* set up family name for possible error messages */
+               /* set up address family name for log messages */
                switch (addr->ai_family)
                {
                        case AF_INET:
@@ -392,13 +433,28 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                                break;
                }
 
-               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
+               /* set up text form of address for log messages */
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       addrDesc = unixSocketPath;
+               else
+#endif
+               {
+                       pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
+                                                          addr->ai_addrlen,
+                                                          addrBuf, sizeof(addrBuf),
+                                                          NULL, 0,
+                                                          NI_NUMERICHOST);
+                       addrDesc = addrBuf;
+               }
+
+               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not create %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                 errmsg("could not create %s socket for address \"%s\": %m",
+                                                familyDesc, addrDesc)));
                        continue;
                }
 
@@ -422,7 +478,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
@@ -437,7 +495,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
@@ -447,7 +507,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                /*
                 * Note: This might fail on some OS's, like Linux older than
                 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
-                * ipv4 addresses to ipv6.      It will show ::ffff:ipv4 for all ipv4
+                * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
                 * connections.
                 */
                err = bind(fd, addr->ai_addr, addr->ai_addrlen);
@@ -455,9 +515,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not bind %s socket: %m",
-                                                       familyDesc),
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not bind %s address \"%s\": %m",
+                                                       familyDesc, addrDesc),
                                         (IS_AF_UNIX(addr->ai_family)) ?
                                  errhint("Is another postmaster already running on port %d?"
                                                  " If not, remove socket file \"%s\" and retry.",
@@ -494,12 +554,25 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not listen on %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not listen on %s address \"%s\": %m",
+                                                       familyDesc, addrDesc)));
                        closesocket(fd);
                        continue;
                }
+
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       ereport(LOG,
+                                       (errmsg("listening on Unix socket \"%s\"",
+                                                       addrDesc)));
+               else
+#endif
+                       ereport(LOG,
+                       /* translator: first %s is IPv4 or IPv6 */
+                                       (errmsg("listening on %s address \"%s\", port %d",
+                                                       familyDesc, addrDesc, (int) portNumber)));
+
                ListenSocket[listen_index] = fd;
                added++;
        }
@@ -535,16 +608,11 @@ Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
         * Once we have the interlock, we can safely delete any pre-existing
         * socket file to avoid failure at bind() time.
         */
-       unlink(unixSocketPath);
+       (void) unlink(unixSocketPath);
 
        /*
-        * Arrange to unlink the socket file(s) at proc_exit.  If this is the
-        * first one, set up the on_proc_exit function to do it; then add this
-        * socket file to the list of files to unlink.
+        * Remember socket file pathnames for later maintenance.
         */
-       if (sock_paths == NIL)
-               on_proc_exit(StreamDoUnlink, 0);
-
        sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
 
        return STATUS_OK;
@@ -632,7 +700,7 @@ StreamConnection(pgsocket server_fd, Port *port)
        port->raddr.salen = sizeof(port->raddr.addr);
        if ((port->sock = accept(server_fd,
                                                         (struct sockaddr *) & port->raddr.addr,
-                                                        &port->raddr.salen)) < 0)
+                                                        &port->raddr.salen)) == PGINVALID_SOCKET)
        {
                ereport(LOG,
                                (errcode_for_socket_access(),
@@ -649,16 +717,6 @@ StreamConnection(pgsocket server_fd, Port *port)
                return STATUS_ERROR;
        }
 
-#ifdef SCO_ACCEPT_BUG
-
-       /*
-        * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
-        * shouldn't hurt to catch it for all versions of those platforms.
-        */
-       if (port->raddr.addr.ss_family == 0)
-               port->raddr.addr.ss_family = AF_UNIX;
-#endif
-
        /* fill in the server (local) address */
        port->laddr.salen = sizeof(port->laddr.addr);
        if (getsockname(port->sock,
@@ -673,6 +731,11 @@ StreamConnection(pgsocket server_fd, Port *port)
        if (!IS_AF_UNIX(port->laddr.addr.ss_family))
        {
                int                     on;
+#ifdef WIN32
+               int                     oldopt;
+               int                     optlen;
+               int                     newopt;
+#endif
 
 #ifdef TCP_NODELAY
                on = 1;
@@ -694,16 +757,43 @@ StreamConnection(pgsocket server_fd, Port *port)
 #ifdef WIN32
 
                /*
-                * This is a Win32 socket optimization.  The ideal size is 32k.
-                * http://support.microsoft.com/kb/823764/EN-US/
+                * This is a Win32 socket optimization.  The OS send buffer should be
+                * large enough to send the whole Postgres send buffer in one go, or
+                * performance suffers.  The Postgres send buffer can be enlarged if a
+                * very large message needs to be sent, but we won't attempt to
+                * enlarge the OS buffer if that happens, so somewhat arbitrarily
+                * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
+                * (That's 32kB with the current default).
+                *
+                * The default OS buffer size used to be 8kB in earlier Windows
+                * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
+                * be necessary to change it in later versions anymore.  Changing it
+                * unnecessarily can even reduce performance, because setting
+                * SO_SNDBUF in the application disables the "dynamic send buffering"
+                * feature that was introduced in Windows 7.  So before fiddling with
+                * SO_SNDBUF, check if the current buffer size is already large enough
+                * and only increase it if necessary.
+                *
+                * See https://support.microsoft.com/kb/823764/EN-US/ and
+                * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
                 */
-               on = PQ_SEND_BUFFER_SIZE * 4;
-               if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &on,
-                                          sizeof(on)) < 0)
+               optlen = sizeof(oldopt);
+               if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
+                                          &optlen) < 0)
                {
-                       elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
+                       elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
                        return STATUS_ERROR;
                }
+               newopt = PQ_SEND_BUFFER_SIZE * 4;
+               if (oldopt < newopt)
+               {
+                       if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
+                                                  sizeof(newopt)) < 0)
+                       {
+                               elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
+                               return STATUS_ERROR;
+                       }
+               }
 #endif
 
                /*
@@ -773,6 +863,26 @@ TouchSocketFiles(void)
        }
 }
 
+/*
+ * RemoveSocketFiles -- unlink socket files at postmaster shutdown
+ */
+void
+RemoveSocketFiles(void)
+{
+       ListCell   *l;
+
+       /* Loop through all created sockets... */
+       foreach(l, sock_paths)
+       {
+               char       *sock_path = (char *) lfirst(l);
+
+               /* Ignore any error. */
+               (void) unlink(sock_path);
+       }
+       /* Since we're about to exit, no need to reclaim storage */
+       sock_paths = NIL;
+}
+
 
 /* --------------------------------
  * Low-level I/O routines begin here.
@@ -783,40 +893,20 @@ TouchSocketFiles(void)
  */
 
 /* --------------------------------
- *                       pq_set_nonblocking - set socket blocking/non-blocking
+ *                       socket_set_nonblocking - set socket blocking/non-blocking
  *
  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
  * blocking otherwise.
  * --------------------------------
  */
 static void
-pq_set_nonblocking(bool nonblocking)
+socket_set_nonblocking(bool nonblocking)
 {
-       if (MyProcPort->noblock == nonblocking)
-               return;
-
-#ifdef WIN32
-       pgwin32_noblock = nonblocking ? 1 : 0;
-#else
+       if (MyProcPort == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+                                errmsg("there is no client connection")));
 
-       /*
-        * Use COMMERROR on failure, because ERROR would try to send the error to
-        * the client, which might require changing the mode again, leading to
-        * infinite recursion.
-        */
-       if (nonblocking)
-       {
-               if (!pg_set_noblock(MyProcPort->sock))
-                       ereport(COMMERROR,
-                                 (errmsg("could not set socket to non-blocking mode: %m")));
-       }
-       else
-       {
-               if (!pg_set_block(MyProcPort->sock))
-                       ereport(COMMERROR,
-                                       (errmsg("could not set socket to blocking mode: %m")));
-       }
-#endif
        MyProcPort->noblock = nonblocking;
 }
 
@@ -844,7 +934,7 @@ pq_recvbuf(void)
        }
 
        /* Ensure that we're in blocking mode */
-       pq_set_nonblocking(false);
+       socket_set_nonblocking(false);
 
        /* Can fill buffer from PqRecvLength and upwards */
        for (;;)
@@ -890,6 +980,8 @@ pq_recvbuf(void)
 int
 pq_getbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
@@ -907,6 +999,8 @@ pq_getbyte(void)
 int
 pq_peekbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
@@ -928,6 +1022,8 @@ pq_getbyte_if_available(unsigned char *c)
 {
        int                     r;
 
+       Assert(PqCommReadingMsg);
+
        if (PqRecvPointer < PqRecvLength)
        {
                *c = PqRecvBuffer[PqRecvPointer++];
@@ -935,7 +1031,7 @@ pq_getbyte_if_available(unsigned char *c)
        }
 
        /* Put the socket into non-blocking mode */
-       pq_set_nonblocking(true);
+       socket_set_nonblocking(true);
 
        r = secure_read(MyProcPort, c, 1);
        if (r < 0)
@@ -980,6 +1076,8 @@ pq_getbytes(char *s, size_t len)
 {
        size_t          amount;
 
+       Assert(PqCommReadingMsg);
+
        while (len > 0)
        {
                while (PqRecvPointer >= PqRecvLength)
@@ -1012,6 +1110,8 @@ pq_discardbytes(size_t len)
 {
        size_t          amount;
 
+       Assert(PqCommReadingMsg);
+
        while (len > 0)
        {
                while (PqRecvPointer >= PqRecvLength)
@@ -1048,6 +1148,8 @@ pq_getstring(StringInfo s)
 {
        int                     i;
 
+       Assert(PqCommReadingMsg);
+
        resetStringInfo(s);
 
        /* Read until we get the terminating '\0' */
@@ -1079,6 +1181,58 @@ pq_getstring(StringInfo s)
 }
 
 
+/* --------------------------------
+ *             pq_startmsgread - begin reading a message from the client.
+ *
+ *             This must be called before any of the pq_get* functions.
+ * --------------------------------
+ */
+void
+pq_startmsgread(void)
+{
+       /*
+        * There shouldn't be a read active already, but let's check just to be
+        * sure.
+        */
+       if (PqCommReadingMsg)
+               ereport(FATAL,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg("terminating connection because protocol synchronization was lost")));
+
+       PqCommReadingMsg = true;
+}
+
+
+/* --------------------------------
+ *             pq_endmsgread   - finish reading message.
+ *
+ *             This must be called after reading a V2 protocol message with
+ *             pq_getstring() and friends, to indicate that we have read the whole
+ *             message. In V3 protocol, pq_getmessage() does this implicitly.
+ * --------------------------------
+ */
+void
+pq_endmsgread(void)
+{
+       Assert(PqCommReadingMsg);
+
+       PqCommReadingMsg = false;
+}
+
+/* --------------------------------
+ *             pq_is_reading_msg - are we currently reading a message?
+ *
+ * This is used in error recovery at the outer idle loop to detect if we have
+ * lost protocol sync, and need to terminate the connection. pq_startmsgread()
+ * will check for that too, but it's nicer to detect it earlier.
+ * --------------------------------
+ */
+bool
+pq_is_reading_msg(void)
+{
+       return PqCommReadingMsg;
+}
+
 /* --------------------------------
  *             pq_getmessage   - get a message with length word from connection
  *
@@ -1100,6 +1254,8 @@ pq_getmessage(StringInfo s, int maxlen)
 {
        int32           len;
 
+       Assert(PqCommReadingMsg);
+
        resetStringInfo(s);
 
        /* Read message length word */
@@ -1127,7 +1283,7 @@ pq_getmessage(StringInfo s, int maxlen)
        if (len > 0)
        {
                /*
-                * Allocate space for message.  If we run out of room (ridiculously
+                * Allocate space for message.  If we run out of room (ridiculously
                 * large message), we will elog(ERROR), but we want to discard the
                 * message body so as not to lose communication sync.
                 */
@@ -1141,6 +1297,9 @@ pq_getmessage(StringInfo s, int maxlen)
                                ereport(COMMERROR,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                 errmsg("incomplete message from client")));
+
+                       /* we discarded the rest of the message so we're back in sync. */
+                       PqCommReadingMsg = false;
                        PG_RE_THROW();
                }
                PG_END_TRY();
@@ -1158,6 +1317,9 @@ pq_getmessage(StringInfo s, int maxlen)
                s->data[len] = '\0';
        }
 
+       /* finished reading the message. */
+       PqCommReadingMsg = false;
+
        return 0;
 }
 
@@ -1194,7 +1356,7 @@ internal_putbytes(const char *s, size_t len)
                /* If buffer is full, then flush it out */
                if (PqSendPointer >= PqSendBufferSize)
                {
-                       pq_set_nonblocking(false);
+                       socket_set_nonblocking(false);
                        if (internal_flush())
                                return EOF;
                }
@@ -1210,13 +1372,13 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *             pq_flush                - flush pending output
+ *             socket_flush            - flush pending output
  *
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
 {
        int                     res;
 
@@ -1224,7 +1386,7 @@ pq_flush(void)
        if (PqCommBusy)
                return 0;
        PqCommBusy = true;
-       pq_set_nonblocking(false);
+       socket_set_nonblocking(false);
        res = internal_flush();
        PqCommBusy = false;
        return res;
@@ -1310,8 +1472,8 @@ internal_flush(void)
  * Returns 0 if OK, or EOF if trouble.
  * --------------------------------
  */
-int
-pq_flush_if_writable(void)
+static int
+socket_flush_if_writable(void)
 {
        int                     res;
 
@@ -1324,7 +1486,7 @@ pq_flush_if_writable(void)
                return 0;
 
        /* Temporarily put the socket into non-blocking mode */
-       pq_set_nonblocking(true);
+       socket_set_nonblocking(true);
 
        PqCommBusy = true;
        res = internal_flush();
@@ -1333,11 +1495,11 @@ pq_flush_if_writable(void)
 }
 
 /* --------------------------------
- *             pq_is_send_pending      - is there any pending data in the output buffer?
+ *     socket_is_send_pending  - is there any pending data in the output buffer?
  * --------------------------------
  */
-bool
-pq_is_send_pending(void)
+static bool
+socket_is_send_pending(void)
 {
        return (PqSendStart < PqSendPointer);
 }
@@ -1351,7 +1513,7 @@ pq_is_send_pending(void)
 
 
 /* --------------------------------
- *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *             If msgtype is not '\0', it is a message type code to place before
  *             the message body.  If msgtype is '\0', then the message has no type
@@ -1375,8 +1537,8 @@ pq_is_send_pending(void)
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
        if (DoingCopyOut || PqCommBusy)
                return 0;
@@ -1408,8 +1570,8 @@ fail:
  *             If the output buffer is too small to hold the message, the buffer
  *             is enlarged.
  */
-void
-pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
 {
        int res         PG_USED_FOR_ASSERTS_ONLY;
        int                     required;
@@ -1431,18 +1593,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len)
 
 
 /* --------------------------------
- *             pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *             socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *                     is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
        DoingCopyOut = true;
 }
 
 /* --------------------------------
- *             pq_endcopyout   - end an old-style COPY OUT transfer
+ *             socket_endcopyout       - end an old-style COPY OUT transfer
  *
  *             If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *             and must send a terminator line.  Since a partial data line might have
@@ -1451,8 +1613,8 @@ pq_startcopyout(void)
  *             not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
        if (!DoingCopyOut)
                return;
@@ -1462,7 +1624,6 @@ pq_endcopyout(bool errorAbort)
        DoingCopyOut = false;
 }
 
-
 /*
  * Support for TCP Keepalive parameters
  */
@@ -1470,7 +1631,7 @@ pq_endcopyout(bool errorAbort)
 /*
  * On Windows, we need to set both idle and interval at the same time.
  * We also cannot reset them to the default (setting to zero will
- * actually set them to zero, not default), therefor we fallback to
+ * actually set them to zero, not default), therefore we fallback to
  * the out-of-the-box default instead.
  */
 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)