]> granicus.if.org Git - postgresql/blobdiff - src/backend/libpq/pqcomm.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / libpq / pqcomm.c
index 8800fa6f213c4cfe1937c6380e447b51b446f24e..d1cc38beb2b25d6e38417a30e0651db7673c2c34 100644 (file)
  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
  * All that remains is similarities of names to trap the unwary...
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- *     $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.200 2010/01/02 16:57:45 momjian Exp $
+ *     src/backend/libpq/pqcomm.c
  *
  *-------------------------------------------------------------------------
  */
@@ -42,7 +42,7 @@
  *             StreamServerPort        - Open postmaster's server port
  *             StreamConnection        - Create new connection with client
  *             StreamClose                     - Close a client/backend connection
- *             TouchSocketFile         - Protect socket file against /tmp cleaners
+ *             TouchSocketFiles        - Protect socket files against /tmp cleaners
  *             pq_init                 - initialize libpq at backend startup
  *             pq_comm_reset   - reset libpq during error recovery
  *             pq_close                - shutdown libpq at backend exit
  *             pq_peekbyte             - peek at next byte from connection
  *             pq_putbytes             - send bytes to connection (not flushed until pq_flush)
  *             pq_flush                - flush pending output
+ *             pq_flush_if_writable - flush pending output if writable without blocking
+ *             pq_getbyte_if_available - get a byte if available without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
  *             pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *             pq_endcopyout   - end a COPY OUT transfer
  *
 #ifdef HAVE_UTIME_H
 #include <utime.h>
 #endif
+#ifdef _MSC_VER                                        /* mstcpip.h is missing on mingw */
+#include <mstcpip.h>
+#endif
 
-#include "libpq/ip.h"
+#include "common/ip.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
 int                    Unix_socket_permissions;
 char      *Unix_socket_group;
 
-
-/* Where the Unix socket file is */
-static char sock_path[MAXPGPATH];
-
+/* Where the Unix socket files are (list of palloc'd strings) */
+static List *sock_paths = NIL;
 
 /*
- * Buffers for low-level I/O
+ * Buffers for low-level I/O.
+ *
+ * The receive buffer is fixed size. Send buffer is usually 8k, but can be
+ * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int     PqSendBufferSize;       /* Size send buffer */
 static int     PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
+static int     PqSendStart;            /* Next index to send a byte in PqSendBuffer */
 
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int     PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
 static int     PqRecvLength;           /* End of data available in PqRecvBuffer */
 
 /*
  * Message status
  */
-static bool PqCommBusy;
-static bool DoingCopyOut;
+static bool PqCommBusy;                        /* busy sending data to the client */
+static bool PqCommReadingMsg;  /* in the middle of reading a message */
+static bool DoingCopyOut;              /* in old-protocol COPY OUT processing */
 
 
 /* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int     socket_flush(void);
+static int     socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int     socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
 static int     internal_putbytes(const char *s, size_t len);
 static int     internal_flush(void);
 
 #ifdef HAVE_UNIX_SOCKETS
-static int     Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
-static int     Setup_AF_UNIX(void);
+static int     Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
+static int     Setup_AF_UNIX(char *sock_path);
 #endif   /* HAVE_UNIX_SOCKETS */
 
+static PQcommMethods PqCommSocketMethods = {
+       socket_comm_reset,
+       socket_flush,
+       socket_flush_if_writable,
+       socket_is_send_pending,
+       socket_putmessage,
+       socket_putmessage_noblock,
+       socket_startcopyout,
+       socket_endcopyout
+};
+
+PQcommMethods *PqCommMethods = &PqCommSocketMethods;
+
+WaitEventSet *FeBeWaitSet;
+
 
 /* --------------------------------
  *             pq_init - initialize libpq at backend startup
@@ -138,22 +174,50 @@ static int        Setup_AF_UNIX(void);
 void
 pq_init(void)
 {
-       PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+       /* initialize state variables */
+       PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+       PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+       PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
        PqCommBusy = false;
+       PqCommReadingMsg = false;
        DoingCopyOut = false;
-       on_proc_exit(pq_close, 0);
+
+       /* set up process-exit hook to close the socket */
+       on_proc_exit(socket_close, 0);
+
+       /*
+        * In backends (as soon as forked) we operate the underlying socket in
+        * nonblocking mode and use latches to implement blocking semantics if
+        * needed. That allows us to provide safely interruptible reads and
+        * writes.
+        *
+        * Use COMMERROR on failure, because ERROR would try to send the error to
+        * the client, which might require changing the mode again, leading to
+        * infinite recursion.
+        */
+#ifndef WIN32
+       if (!pg_set_noblock(MyProcPort->sock))
+               ereport(COMMERROR,
+                               (errmsg("could not set socket to nonblocking mode: %m")));
+#endif
+
+       FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+       AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
+                                         NULL, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
 }
 
 /* --------------------------------
- *             pq_comm_reset - reset libpq during error recovery
+ *             socket_comm_reset - reset libpq during error recovery
  *
  * This is called from error recovery at the outer idle loop.  It's
  * just to get us out of trouble if we somehow manage to elog() from
  * inside a pqcomm.c routine (which ideally will never happen, but...)
  * --------------------------------
  */
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
 {
        /* Do not throw away pending data, but do reset the busy flag */
        PqCommBusy = false;
@@ -162,34 +226,47 @@ pq_comm_reset(void)
 }
 
 /* --------------------------------
- *             pq_close - shutdown libpq at backend exit
+ *             socket_close - shutdown libpq at backend exit
  *
- * Note: in a standalone backend MyProcPort will be null,
- * don't crash during exit...
+ * This is the one pg_on_exit_callback in place during BackendInitialize().
+ * That function's unusual signal handling constrains that this callback be
+ * safe to run at any instant.
  * --------------------------------
  */
 static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
 {
+       /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
        if (MyProcPort != NULL)
        {
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 #ifdef ENABLE_GSS
                OM_uint32       min_s;
 
-               /* Shutdown GSSAPI layer */
+               /*
+                * Shutdown GSSAPI layer.  This section does nothing when interrupting
+                * BackendInitialize(), because pg_GSS_recvauth() makes first use of
+                * "ctx" and "cred".
+                */
                if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
                        gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
 
                if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
                        gss_release_cred(&min_s, &MyProcPort->gss->cred);
 #endif   /* ENABLE_GSS */
-               /* GSS and SSPI share the port->gss struct */
 
+               /*
+                * GSS and SSPI share the port->gss struct.  Since nowhere else does a
+                * postmaster child free this, doing so is safe when interrupting
+                * BackendInitialize().
+                */
                free(MyProcPort->gss);
 #endif   /* ENABLE_GSS || ENABLE_SSPI */
 
-               /* Cleanly shut down SSL layer */
+               /*
+                * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
+                * call this, so this is safe when interrupting BackendInitialize().
+                */
                secure_close(MyProcPort);
 
                /*
@@ -199,9 +276,10 @@ pq_close(int code, Datum arg)
                 * transport layer reports connection closure, and you can be sure the
                 * backend has exited.
                 *
-                * We do set sock to -1 to prevent any further I/O, though.
+                * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
+                * though.
                 */
-               MyProcPort->sock = -1;
+               MyProcPort->sock = PGINVALID_SOCKET;
        }
 }
 
@@ -215,40 +293,34 @@ pq_close(int code, Datum arg)
  */
 
 
-/* StreamDoUnlink()
- * Shutdown routine for backend connection
- * If a Unix socket is used for communication, explicitly close it.
- */
-#ifdef HAVE_UNIX_SOCKETS
-static void
-StreamDoUnlink(int code, Datum arg)
-{
-       Assert(sock_path[0]);
-       unlink(sock_path);
-}
-#endif   /* HAVE_UNIX_SOCKETS */
-
 /*
  * StreamServerPort -- open a "listening" port to accept connections.
  *
- * Successfully opened sockets are added to the ListenSocket[] array,
- * at the first position that isn't -1.
+ * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
+ * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
+ * specified.  For TCP ports, hostName is either NULL for all interfaces or
+ * the interface to listen on, and unixSocketDir is ignored (can be NULL).
+ *
+ * Successfully opened sockets are added to the ListenSocket[] array (of
+ * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
  *
  * RETURNS: STATUS_OK or STATUS_ERROR
  */
 
 int
 StreamServerPort(int family, char *hostName, unsigned short portNumber,
-                                char *unixSocketName,
-                                int ListenSocket[], int MaxListen)
+                                char *unixSocketDir,
+                                pgsocket ListenSocket[], int MaxListen)
 {
-       int                     fd,
-                               err;
+       pgsocket        fd;
+       int                     err;
        int                     maxconn;
        int                     ret;
        char            portNumberStr[32];
        const char *familyDesc;
        char            familyDescBuf[64];
+       const char *addrDesc;
+       char            addrBuf[NI_MAXHOST];
        char       *service;
        struct addrinfo *addrs = NULL,
                           *addr;
@@ -256,6 +328,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
        int                     listen_index = 0;
        int                     added = 0;
 
+#ifdef HAVE_UNIX_SOCKETS
+       char            unixSocketPath[MAXPGPATH];
+#endif
 #if !defined(WIN32) || defined(IPV6_V6ONLY)
        int                     one = 1;
 #endif
@@ -269,10 +344,22 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 #ifdef HAVE_UNIX_SOCKETS
        if (family == AF_UNIX)
        {
-               /* Lock_AF_UNIX will also fill in sock_path. */
-               if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
+               /*
+                * Create unixSocketPath from portNumber and unixSocketDir and lock
+                * that file path
+                */
+               UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
+               if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
+               {
+                       ereport(LOG,
+                                       (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
+                                                       unixSocketPath,
+                                                       (int) (UNIXSOCK_PATH_BUFLEN - 1))));
                        return STATUS_ERROR;
-               service = sock_path;
+               }
+               if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
+                       return STATUS_ERROR;
+               service = unixSocketPath;
        }
        else
 #endif   /* HAVE_UNIX_SOCKETS */
@@ -311,7 +398,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                /* See if there is still room to add 1 more socket. */
                for (; listen_index < MaxListen; listen_index++)
                {
-                       if (ListenSocket[listen_index] == -1)
+                       if (ListenSocket[listen_index] == PGINVALID_SOCKET)
                                break;
                }
                if (listen_index >= MaxListen)
@@ -322,7 +409,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        break;
                }
 
-               /* set up family name for possible error messages */
+               /* set up address family name for log messages */
                switch (addr->ai_family)
                {
                        case AF_INET:
@@ -346,13 +433,28 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                                break;
                }
 
-               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
+               /* set up text form of address for log messages */
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       addrDesc = unixSocketPath;
+               else
+#endif
+               {
+                       pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
+                                                          addr->ai_addrlen,
+                                                          addrBuf, sizeof(addrBuf),
+                                                          NULL, 0,
+                                                          NI_NUMERICHOST);
+                       addrDesc = addrBuf;
+               }
+
+               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not create %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                 errmsg("could not create %s socket for address \"%s\": %m",
+                                                familyDesc, addrDesc)));
                        continue;
                }
 
@@ -376,7 +478,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
@@ -391,7 +495,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
@@ -401,7 +507,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                /*
                 * Note: This might fail on some OS's, like Linux older than
                 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
-                * ipv4 addresses to ipv6.      It will show ::ffff:ipv4 for all ipv4
+                * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
                 * connections.
                 */
                err = bind(fd, addr->ai_addr, addr->ai_addrlen);
@@ -409,13 +515,13 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not bind %s socket: %m",
-                                                       familyDesc),
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not bind %s address \"%s\": %m",
+                                                       familyDesc, addrDesc),
                                         (IS_AF_UNIX(addr->ai_family)) ?
                                  errhint("Is another postmaster already running on port %d?"
                                                  " If not, remove socket file \"%s\" and retry.",
-                                                 (int) portNumber, sock_path) :
+                                                 (int) portNumber, service) :
                                  errhint("Is another postmaster already running on port %d?"
                                                  " If not, wait a few seconds and retry.",
                                                  (int) portNumber)));
@@ -426,7 +532,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 #ifdef HAVE_UNIX_SOCKETS
                if (addr->ai_family == AF_UNIX)
                {
-                       if (Setup_AF_UNIX() != STATUS_OK)
+                       if (Setup_AF_UNIX(service) != STATUS_OK)
                        {
                                closesocket(fd);
                                break;
@@ -448,12 +554,25 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                       /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not listen on %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not listen on %s address \"%s\": %m",
+                                                       familyDesc, addrDesc)));
                        closesocket(fd);
                        continue;
                }
+
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       ereport(LOG,
+                                       (errmsg("listening on Unix socket \"%s\"",
+                                                       addrDesc)));
+               else
+#endif
+                       ereport(LOG,
+                       /* translator: first %s is IPv4 or IPv6 */
+                                       (errmsg("listening on %s address \"%s\", port %d",
+                                                       familyDesc, addrDesc, (int) portNumber)));
+
                ListenSocket[listen_index] = fd;
                added++;
        }
@@ -473,20 +592,28 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
  * Lock_AF_UNIX -- configure unix socket file path
  */
 static int
-Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
+Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
 {
-       UNIXSOCK_PATH(sock_path, portNumber, unixSocketName);
-
        /*
         * Grab an interlock file associated with the socket file.
+        *
+        * Note: there are two reasons for using a socket lock file, rather than
+        * trying to interlock directly on the socket itself.  First, it's a lot
+        * more portable, and second, it lets us remove any pre-existing socket
+        * file without race conditions.
         */
-       CreateSocketLockFile(sock_path, true);
+       CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
 
        /*
         * Once we have the interlock, we can safely delete any pre-existing
         * socket file to avoid failure at bind() time.
         */
-       unlink(sock_path);
+       (void) unlink(unixSocketPath);
+
+       /*
+        * Remember socket file pathnames for later maintenance.
+        */
+       sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
 
        return STATUS_OK;
 }
@@ -496,11 +623,8 @@ Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
  * Setup_AF_UNIX -- configure unix socket permissions
  */
 static int
-Setup_AF_UNIX(void)
+Setup_AF_UNIX(char *sock_path)
 {
-       /* Arrange to unlink the socket file at exit */
-       on_proc_exit(StreamDoUnlink, 0);
-
        /*
         * Fix socket ownership/permission if requested.  Note we must do this
         * before we listen() to avoid a window where unwanted connections could
@@ -570,13 +694,13 @@ Setup_AF_UNIX(void)
  * RETURNS: STATUS_OK or STATUS_ERROR
  */
 int
-StreamConnection(int server_fd, Port *port)
+StreamConnection(pgsocket server_fd, Port *port)
 {
        /* accept connection and fill in the client (remote) address */
        port->raddr.salen = sizeof(port->raddr.addr);
        if ((port->sock = accept(server_fd,
                                                         (struct sockaddr *) & port->raddr.addr,
-                                                        &port->raddr.salen)) < 0)
+                                                        &port->raddr.salen)) == PGINVALID_SOCKET)
        {
                ereport(LOG,
                                (errcode_for_socket_access(),
@@ -593,16 +717,6 @@ StreamConnection(int server_fd, Port *port)
                return STATUS_ERROR;
        }
 
-#ifdef SCO_ACCEPT_BUG
-
-       /*
-        * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
-        * shouldn't hurt to catch it for all versions of those platforms.
-        */
-       if (port->raddr.addr.ss_family == 0)
-               port->raddr.addr.ss_family = AF_UNIX;
-#endif
-
        /* fill in the server (local) address */
        port->laddr.salen = sizeof(port->laddr.addr);
        if (getsockname(port->sock,
@@ -617,6 +731,11 @@ StreamConnection(int server_fd, Port *port)
        if (!IS_AF_UNIX(port->laddr.addr.ss_family))
        {
                int                     on;
+#ifdef WIN32
+               int                     oldopt;
+               int                     optlen;
+               int                     newopt;
+#endif
 
 #ifdef TCP_NODELAY
                on = 1;
@@ -638,16 +757,43 @@ StreamConnection(int server_fd, Port *port)
 #ifdef WIN32
 
                /*
-                * This is a Win32 socket optimization.  The ideal size is 32k.
-                * http://support.microsoft.com/kb/823764/EN-US/
+                * This is a Win32 socket optimization.  The OS send buffer should be
+                * large enough to send the whole Postgres send buffer in one go, or
+                * performance suffers.  The Postgres send buffer can be enlarged if a
+                * very large message needs to be sent, but we won't attempt to
+                * enlarge the OS buffer if that happens, so somewhat arbitrarily
+                * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
+                * (That's 32kB with the current default).
+                *
+                * The default OS buffer size used to be 8kB in earlier Windows
+                * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
+                * be necessary to change it in later versions anymore.  Changing it
+                * unnecessarily can even reduce performance, because setting
+                * SO_SNDBUF in the application disables the "dynamic send buffering"
+                * feature that was introduced in Windows 7.  So before fiddling with
+                * SO_SNDBUF, check if the current buffer size is already large enough
+                * and only increase it if necessary.
+                *
+                * See https://support.microsoft.com/kb/823764/EN-US/ and
+                * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
                 */
-               on = PQ_BUFFER_SIZE * 4;
-               if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &on,
-                                          sizeof(on)) < 0)
+               optlen = sizeof(oldopt);
+               if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
+                                          &optlen) < 0)
                {
-                       elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
+                       elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
                        return STATUS_ERROR;
                }
+               newopt = PQ_SEND_BUFFER_SIZE * 4;
+               if (oldopt < newopt)
+               {
+                       if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
+                                                  sizeof(newopt)) < 0)
+                       {
+                               elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
+                               return STATUS_ERROR;
+                       }
+               }
 #endif
 
                /*
@@ -676,26 +822,30 @@ StreamConnection(int server_fd, Port *port)
  * we do NOT want to send anything to the far end.
  */
 void
-StreamClose(int sock)
+StreamClose(pgsocket sock)
 {
        closesocket(sock);
 }
 
 /*
- * TouchSocketFile -- mark socket file as recently accessed
+ * TouchSocketFiles -- mark socket files as recently accessed
  *
  * This routine should be called every so often to ensure that the socket
- * file has a recent mod date (ordinary operations on sockets usually won't
- * change the mod date).  That saves it from being removed by
+ * files have a recent mod date (ordinary operations on sockets usually won't
+ * change the mod date).  That saves them from being removed by
  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
  * never have put the socket file in /tmp...)
  */
 void
-TouchSocketFile(void)
+TouchSocketFiles(void)
 {
-       /* Do nothing if we did not create a socket... */
-       if (sock_path[0] != '\0')
+       ListCell   *l;
+
+       /* Loop through all created sockets... */
+       foreach(l, sock_paths)
        {
+               char       *sock_path = (char *) lfirst(l);
+
                /*
                 * utime() is POSIX standard, utimes() is a common alternative. If we
                 * have neither, there's no way to affect the mod or access time of
@@ -713,6 +863,26 @@ TouchSocketFile(void)
        }
 }
 
+/*
+ * RemoveSocketFiles -- unlink socket files at postmaster shutdown
+ */
+void
+RemoveSocketFiles(void)
+{
+       ListCell   *l;
+
+       /* Loop through all created sockets... */
+       foreach(l, sock_paths)
+       {
+               char       *sock_path = (char *) lfirst(l);
+
+               /* Ignore any error. */
+               (void) unlink(sock_path);
+       }
+       /* Since we're about to exit, no need to reclaim storage */
+       sock_paths = NIL;
+}
+
 
 /* --------------------------------
  * Low-level I/O routines begin here.
@@ -722,6 +892,23 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *                       socket_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+socket_set_nonblocking(bool nonblocking)
+{
+       if (MyProcPort == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+                                errmsg("there is no client connection")));
+
+       MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *             pq_recvbuf - load some bytes into the input buffer
@@ -746,13 +933,16 @@ pq_recvbuf(void)
                        PqRecvLength = PqRecvPointer = 0;
        }
 
+       /* Ensure that we're in blocking mode */
+       socket_set_nonblocking(false);
+
        /* Can fill buffer from PqRecvLength and upwards */
        for (;;)
        {
                int                     r;
 
                r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-                                               PQ_BUFFER_SIZE - PqRecvLength);
+                                               PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
                if (r < 0)
                {
@@ -790,6 +980,8 @@ pq_recvbuf(void)
 int
 pq_getbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
@@ -807,6 +999,8 @@ pq_getbyte(void)
 int
 pq_peekbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
@@ -815,6 +1009,62 @@ pq_peekbyte(void)
        return (unsigned char) PqRecvBuffer[PqRecvPointer];
 }
 
+/* --------------------------------
+ *             pq_getbyte_if_available - get a single byte from connection,
+ *                     if available
+ *
+ * The received byte is stored in *c. Returns 1 if a byte was read,
+ * 0 if no data was available, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_getbyte_if_available(unsigned char *c)
+{
+       int                     r;
+
+       Assert(PqCommReadingMsg);
+
+       if (PqRecvPointer < PqRecvLength)
+       {
+               *c = PqRecvBuffer[PqRecvPointer++];
+               return 1;
+       }
+
+       /* Put the socket into non-blocking mode */
+       socket_set_nonblocking(true);
+
+       r = secure_read(MyProcPort, c, 1);
+       if (r < 0)
+       {
+               /*
+                * Ok if no data available without blocking or interrupted (though
+                * EINTR really shouldn't happen with a non-blocking socket). Report
+                * other errors.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+                       r = 0;
+               else
+               {
+                       /*
+                        * Careful: an ereport() that tries to write to the client would
+                        * cause recursion to here, leading to stack overflow and core
+                        * dump!  This message must go *only* to the postmaster log.
+                        */
+                       ereport(COMMERROR,
+                                       (errcode_for_socket_access(),
+                                        errmsg("could not receive data from client: %m")));
+                       r = EOF;
+               }
+       }
+       else if (r == 0)
+       {
+               /* EOF detected */
+               r = EOF;
+       }
+
+       return r;
+}
+
 /* --------------------------------
  *             pq_getbytes             - get a known number of bytes from connection
  *
@@ -826,6 +1076,8 @@ pq_getbytes(char *s, size_t len)
 {
        size_t          amount;
 
+       Assert(PqCommReadingMsg);
+
        while (len > 0)
        {
                while (PqRecvPointer >= PqRecvLength)
@@ -858,6 +1110,8 @@ pq_discardbytes(size_t len)
 {
        size_t          amount;
 
+       Assert(PqCommReadingMsg);
+
        while (len > 0)
        {
                while (PqRecvPointer >= PqRecvLength)
@@ -894,6 +1148,8 @@ pq_getstring(StringInfo s)
 {
        int                     i;
 
+       Assert(PqCommReadingMsg);
+
        resetStringInfo(s);
 
        /* Read until we get the terminating '\0' */
@@ -925,6 +1181,58 @@ pq_getstring(StringInfo s)
 }
 
 
+/* --------------------------------
+ *             pq_startmsgread - begin reading a message from the client.
+ *
+ *             This must be called before any of the pq_get* functions.
+ * --------------------------------
+ */
+void
+pq_startmsgread(void)
+{
+       /*
+        * There shouldn't be a read active already, but let's check just to be
+        * sure.
+        */
+       if (PqCommReadingMsg)
+               ereport(FATAL,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg("terminating connection because protocol synchronization was lost")));
+
+       PqCommReadingMsg = true;
+}
+
+
+/* --------------------------------
+ *             pq_endmsgread   - finish reading message.
+ *
+ *             This must be called after reading a V2 protocol message with
+ *             pq_getstring() and friends, to indicate that we have read the whole
+ *             message. In V3 protocol, pq_getmessage() does this implicitly.
+ * --------------------------------
+ */
+void
+pq_endmsgread(void)
+{
+       Assert(PqCommReadingMsg);
+
+       PqCommReadingMsg = false;
+}
+
+/* --------------------------------
+ *             pq_is_reading_msg - are we currently reading a message?
+ *
+ * This is used in error recovery at the outer idle loop to detect if we have
+ * lost protocol sync, and need to terminate the connection. pq_startmsgread()
+ * will check for that too, but it's nicer to detect it earlier.
+ * --------------------------------
+ */
+bool
+pq_is_reading_msg(void)
+{
+       return PqCommReadingMsg;
+}
+
 /* --------------------------------
  *             pq_getmessage   - get a message with length word from connection
  *
@@ -946,6 +1254,8 @@ pq_getmessage(StringInfo s, int maxlen)
 {
        int32           len;
 
+       Assert(PqCommReadingMsg);
+
        resetStringInfo(s);
 
        /* Read message length word */
@@ -973,7 +1283,7 @@ pq_getmessage(StringInfo s, int maxlen)
        if (len > 0)
        {
                /*
-                * Allocate space for message.  If we run out of room (ridiculously
+                * Allocate space for message.  If we run out of room (ridiculously
                 * large message), we will elog(ERROR), but we want to discard the
                 * message body so as not to lose communication sync.
                 */
@@ -987,6 +1297,9 @@ pq_getmessage(StringInfo s, int maxlen)
                                ereport(COMMERROR,
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                 errmsg("incomplete message from client")));
+
+                       /* we discarded the rest of the message so we're back in sync. */
+                       PqCommReadingMsg = false;
                        PG_RE_THROW();
                }
                PG_END_TRY();
@@ -1004,6 +1317,9 @@ pq_getmessage(StringInfo s, int maxlen)
                s->data[len] = '\0';
        }
 
+       /* finished reading the message. */
+       PqCommReadingMsg = false;
+
        return 0;
 }
 
@@ -1038,10 +1354,13 @@ internal_putbytes(const char *s, size_t len)
        while (len > 0)
        {
                /* If buffer is full, then flush it out */
-               if (PqSendPointer >= PQ_BUFFER_SIZE)
+               if (PqSendPointer >= PqSendBufferSize)
+               {
+                       socket_set_nonblocking(false);
                        if (internal_flush())
                                return EOF;
-               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               }
+               amount = PqSendBufferSize - PqSendPointer;
                if (amount > len)
                        amount = len;
                memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1053,13 +1372,13 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *             pq_flush                - flush pending output
+ *             socket_flush            - flush pending output
  *
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
 {
        int                     res;
 
@@ -1067,17 +1386,25 @@ pq_flush(void)
        if (PqCommBusy)
                return 0;
        PqCommBusy = true;
+       socket_set_nonblocking(false);
        res = internal_flush();
        PqCommBusy = false;
        return res;
 }
 
+/* --------------------------------
+ *             internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
 static int
 internal_flush(void)
 {
        static int      last_reported_send_errno = 0;
 
-       char       *bufptr = PqSendBuffer;
+       char       *bufptr = PqSendBuffer + PqSendStart;
        char       *bufend = PqSendBuffer + PqSendPointer;
 
        while (bufptr < bufend)
@@ -1091,6 +1418,16 @@ internal_flush(void)
                        if (errno == EINTR)
                                continue;               /* Ok if we were interrupted */
 
+                       /*
+                        * Ok if no data writable without blocking, and the socket is in
+                        * non-blocking mode.
+                        */
+                       if (errno == EAGAIN ||
+                               errno == EWOULDBLOCK)
+                       {
+                               return 0;
+                       }
+
                        /*
                         * Careful: an ereport() that tries to write to the client would
                         * cause recursion to here, leading to stack overflow and core
@@ -1110,20 +1447,62 @@ internal_flush(void)
 
                        /*
                         * We drop the buffered data anyway so that processing can
-                        * continue, even though we'll probably quit soon.
+                        * continue, even though we'll probably quit soon. We also set a
+                        * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
+                        * the connection.
                         */
-                       PqSendPointer = 0;
+                       PqSendStart = PqSendPointer = 0;
+                       ClientConnectionLost = 1;
+                       InterruptPending = 1;
                        return EOF;
                }
 
                last_reported_send_errno = 0;   /* reset after any successful send */
                bufptr += r;
+               PqSendStart += r;
        }
 
-       PqSendPointer = 0;
+       PqSendStart = PqSendPointer = 0;
        return 0;
 }
 
+/* --------------------------------
+ *             pq_flush_if_writable - flush pending output if writable without blocking
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+static int
+socket_flush_if_writable(void)
+{
+       int                     res;
+
+       /* Quick exit if nothing to do */
+       if (PqSendPointer == PqSendStart)
+               return 0;
+
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+
+       /* Temporarily put the socket into non-blocking mode */
+       socket_set_nonblocking(true);
+
+       PqCommBusy = true;
+       res = internal_flush();
+       PqCommBusy = false;
+       return res;
+}
+
+/* --------------------------------
+ *     socket_is_send_pending  - is there any pending data in the output buffer?
+ * --------------------------------
+ */
+static bool
+socket_is_send_pending(void)
+{
+       return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -1134,7 +1513,7 @@ internal_flush(void)
 
 
 /* --------------------------------
- *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *             If msgtype is not '\0', it is a message type code to place before
  *             the message body.  If msgtype is '\0', then the message has no type
@@ -1158,8 +1537,8 @@ internal_flush(void)
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
        if (DoingCopyOut || PqCommBusy)
                return 0;
@@ -1186,18 +1565,46 @@ fail:
 }
 
 /* --------------------------------
- *             pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *             pq_putmessage_noblock   - like pq_putmessage, but never blocks
+ *
+ *             If the output buffer is too small to hold the message, the buffer
+ *             is enlarged.
+ */
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+       int res         PG_USED_FOR_ASSERTS_ONLY;
+       int                     required;
+
+       /*
+        * Ensure we have enough space in the output buffer for the message header
+        * as well as the message itself.
+        */
+       required = PqSendPointer + 1 + 4 + len;
+       if (required > PqSendBufferSize)
+       {
+               PqSendBuffer = repalloc(PqSendBuffer, required);
+               PqSendBufferSize = required;
+       }
+       res = pq_putmessage(msgtype, s, len);
+       Assert(res == 0);                       /* should not fail when the message fits in
+                                                                * buffer */
+}
+
+
+/* --------------------------------
+ *             socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *                     is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
        DoingCopyOut = true;
 }
 
 /* --------------------------------
- *             pq_endcopyout   - end an old-style COPY OUT transfer
+ *             socket_endcopyout       - end an old-style COPY OUT transfer
  *
  *             If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *             and must send a terminator line.  Since a partial data line might have
@@ -1206,8 +1613,8 @@ pq_startcopyout(void)
  *             not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
        if (!DoingCopyOut)
                return;
@@ -1217,15 +1624,59 @@ pq_endcopyout(bool errorAbort)
        DoingCopyOut = false;
 }
 
-
 /*
  * Support for TCP Keepalive parameters
  */
 
+/*
+ * On Windows, we need to set both idle and interval at the same time.
+ * We also cannot reset them to the default (setting to zero will
+ * actually set them to zero, not default), therefore we fallback to
+ * the out-of-the-box default instead.
+ */
+#if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
+static int
+pq_setkeepaliveswin32(Port *port, int idle, int interval)
+{
+       struct tcp_keepalive ka;
+       DWORD           retsize;
+
+       if (idle <= 0)
+               idle = 2 * 60 * 60;             /* default = 2 hours */
+       if (interval <= 0)
+               interval = 1;                   /* default = 1 second */
+
+       ka.onoff = 1;
+       ka.keepalivetime = idle * 1000;
+       ka.keepaliveinterval = interval * 1000;
+
+       if (WSAIoctl(port->sock,
+                                SIO_KEEPALIVE_VALS,
+                                (LPVOID) &ka,
+                                sizeof(ka),
+                                NULL,
+                                0,
+                                &retsize,
+                                NULL,
+                                NULL)
+               != 0)
+       {
+               elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
+                        WSAGetLastError());
+               return STATUS_ERROR;
+       }
+       if (port->keepalives_idle != idle)
+               port->keepalives_idle = idle;
+       if (port->keepalives_interval != interval)
+               port->keepalives_interval = interval;
+       return STATUS_OK;
+}
+#endif
+
 int
 pq_getkeepalivesidle(Port *port)
 {
-#ifdef TCP_KEEPIDLE
+#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
        if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
                return 0;
 
@@ -1234,8 +1685,10 @@ pq_getkeepalivesidle(Port *port)
 
        if (port->default_keepalives_idle == 0)
        {
+#ifndef WIN32
                ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
 
+#ifdef TCP_KEEPIDLE
                if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
                                           (char *) &port->default_keepalives_idle,
                                           &size) < 0)
@@ -1243,6 +1696,19 @@ pq_getkeepalivesidle(Port *port)
                        elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
                        port->default_keepalives_idle = -1; /* don't know */
                }
+#else
+               if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
+                                          (char *) &port->default_keepalives_idle,
+                                          &size) < 0)
+               {
+                       elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
+                       port->default_keepalives_idle = -1; /* don't know */
+               }
+#endif   /* TCP_KEEPIDLE */
+#else                                                  /* WIN32 */
+               /* We can't get the defaults on Windows, so return "don't know" */
+               port->default_keepalives_idle = -1;
+#endif   /* WIN32 */
        }
 
        return port->default_keepalives_idle;
@@ -1257,10 +1723,11 @@ pq_setkeepalivesidle(int idle, Port *port)
        if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
                return STATUS_OK;
 
-#ifdef TCP_KEEPIDLE
+#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
        if (idle == port->keepalives_idle)
                return STATUS_OK;
 
+#ifndef WIN32
        if (port->default_keepalives_idle <= 0)
        {
                if (pq_getkeepalivesidle(port) < 0)
@@ -1275,29 +1742,40 @@ pq_setkeepalivesidle(int idle, Port *port)
        if (idle == 0)
                idle = port->default_keepalives_idle;
 
+#ifdef TCP_KEEPIDLE
        if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
                                   (char *) &idle, sizeof(idle)) < 0)
        {
                elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
                return STATUS_ERROR;
        }
+#else
+       if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
+                                  (char *) &idle, sizeof(idle)) < 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
+               return STATUS_ERROR;
+       }
+#endif
 
        port->keepalives_idle = idle;
-#else
+#else                                                  /* WIN32 */
+       return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
+#endif
+#else                                                  /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
        if (idle != 0)
        {
-               elog(LOG, "setsockopt(TCP_KEEPIDLE) not supported");
+               elog(LOG, "setting the keepalive idle time is not supported");
                return STATUS_ERROR;
        }
 #endif
-
        return STATUS_OK;
 }
 
 int
 pq_getkeepalivesinterval(Port *port)
 {
-#ifdef TCP_KEEPINTVL
+#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
        if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
                return 0;
 
@@ -1306,6 +1784,7 @@ pq_getkeepalivesinterval(Port *port)
 
        if (port->default_keepalives_interval == 0)
        {
+#ifndef WIN32
                ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
 
                if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
@@ -1315,6 +1794,10 @@ pq_getkeepalivesinterval(Port *port)
                        elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
                        port->default_keepalives_interval = -1;         /* don't know */
                }
+#else
+               /* We can't get the defaults on Windows, so return "don't know" */
+               port->default_keepalives_interval = -1;
+#endif   /* WIN32 */
        }
 
        return port->default_keepalives_interval;
@@ -1329,10 +1812,11 @@ pq_setkeepalivesinterval(int interval, Port *port)
        if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
                return STATUS_OK;
 
-#ifdef TCP_KEEPINTVL
+#if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
        if (interval == port->keepalives_interval)
                return STATUS_OK;
 
+#ifndef WIN32
        if (port->default_keepalives_interval <= 0)
        {
                if (pq_getkeepalivesinterval(port) < 0)
@@ -1355,6 +1839,9 @@ pq_setkeepalivesinterval(int interval, Port *port)
        }
 
        port->keepalives_interval = interval;
+#else                                                  /* WIN32 */
+       return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
+#endif
 #else
        if (interval != 0)
        {