]> granicus.if.org Git - postgresql/blobdiff - src/backend/libpq/pqcomm.c
Post-PG 10 beta1 pgindent run
[postgresql] / src / backend / libpq / pqcomm.c
index 49fc229f90bdd5b25dc9e0defb761b842ffed68e..d1cc38beb2b25d6e38417a30e0651db7673c2c34 100644 (file)
  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
  * All that remains is similarities of names to trap the unwary...
  *
- * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- *     $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.167 2003/11/29 19:51:49 pgsql Exp $
+ *     src/backend/libpq/pqcomm.c
  *
  *-------------------------------------------------------------------------
  */
@@ -42,8 +42,9 @@
  *             StreamServerPort        - Open postmaster's server port
  *             StreamConnection        - Create new connection with client
  *             StreamClose                     - Close a client/backend connection
- *             TouchSocketFile         - Protect socket file against /tmp cleaners
+ *             TouchSocketFiles        - Protect socket files against /tmp cleaners
  *             pq_init                 - initialize libpq at backend startup
+ *             pq_comm_reset   - reset libpq during error recovery
  *             pq_close                - shutdown libpq at backend exit
  *
  * low-level I/O:
  *             pq_peekbyte             - peek at next byte from connection
  *             pq_putbytes             - send bytes to connection (not flushed until pq_flush)
  *             pq_flush                - flush pending output
+ *             pq_flush_if_writable - flush pending output if writable without blocking
+ *             pq_getbyte_if_available - get a byte if available without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
  *             pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *             pq_endcopyout   - end a COPY OUT transfer
  *
@@ -65,7 +69,6 @@
 #include "postgres.h"
 
 #include <signal.h>
-#include <errno.h>
 #include <fcntl.h>
 #include <grp.h>
 #include <unistd.h>
 #ifdef HAVE_UTIME_H
 #include <utime.h>
 #endif
+#ifdef _MSC_VER                                        /* mstcpip.h is missing on mingw */
+#include <mstcpip.h>
+#endif
 
+#include "common/ip.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
-
-
-static void pq_close(void);
-
-#ifdef HAVE_UNIX_SOCKETS
-static int     Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
-static int     Setup_AF_UNIX(void);
-#endif   /* HAVE_UNIX_SOCKETS */
-
+#include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
@@ -102,26 +102,69 @@ static int        Setup_AF_UNIX(void);
 int                    Unix_socket_permissions;
 char      *Unix_socket_group;
 
+/* Where the Unix socket files are (list of palloc'd strings) */
+static List *sock_paths = NIL;
 
 /*
- * Buffers for low-level I/O
+ * Buffers for low-level I/O.
+ *
+ * The receive buffer is fixed size. Send buffer is usually 8k, but can be
+ * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
-static int     PqSendPointer;          /* Next index to store a byte in
-                                                                * PqSendBuffer */
+static char *PqSendBuffer;
+static int     PqSendBufferSize;       /* Size send buffer */
+static int     PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
+static int     PqSendStart;            /* Next index to send a byte in PqSendBuffer */
 
-static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
-static int     PqRecvPointer;          /* Next index to read a byte from
-                                                                * PqRecvBuffer */
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
+static int     PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
 static int     PqRecvLength;           /* End of data available in PqRecvBuffer */
 
 /*
  * Message status
  */
-static bool DoingCopyOut;
+static bool PqCommBusy;                        /* busy sending data to the client */
+static bool PqCommReadingMsg;  /* in the middle of reading a message */
+static bool DoingCopyOut;              /* in old-protocol COPY OUT processing */
+
+
+/* Internal functions */
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int     socket_flush(void);
+static int     socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int     socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
+static int     internal_putbytes(const char *s, size_t len);
+static int     internal_flush(void);
+
+#ifdef HAVE_UNIX_SOCKETS
+static int     Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
+static int     Setup_AF_UNIX(char *sock_path);
+#endif   /* HAVE_UNIX_SOCKETS */
+
+static PQcommMethods PqCommSocketMethods = {
+       socket_comm_reset,
+       socket_flush,
+       socket_flush_if_writable,
+       socket_is_send_pending,
+       socket_putmessage,
+       socket_putmessage_noblock,
+       socket_startcopyout,
+       socket_endcopyout
+};
+
+PQcommMethods *PqCommMethods = &PqCommSocketMethods;
+
+WaitEventSet *FeBeWaitSet;
 
 
 /* --------------------------------
@@ -131,37 +174,112 @@ static bool DoingCopyOut;
 void
 pq_init(void)
 {
-       PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+       /* initialize state variables */
+       PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+       PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+       PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
+       PqCommBusy = false;
+       PqCommReadingMsg = false;
        DoingCopyOut = false;
-       on_proc_exit(pq_close, 0);
+
+       /* set up process-exit hook to close the socket */
+       on_proc_exit(socket_close, 0);
+
+       /*
+        * In backends (as soon as forked) we operate the underlying socket in
+        * nonblocking mode and use latches to implement blocking semantics if
+        * needed. That allows us to provide safely interruptible reads and
+        * writes.
+        *
+        * Use COMMERROR on failure, because ERROR would try to send the error to
+        * the client, which might require changing the mode again, leading to
+        * infinite recursion.
+        */
+#ifndef WIN32
+       if (!pg_set_noblock(MyProcPort->sock))
+               ereport(COMMERROR,
+                               (errmsg("could not set socket to nonblocking mode: %m")));
+#endif
+
+       FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+       AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
+                                         NULL, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
+       AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
 }
 
+/* --------------------------------
+ *             socket_comm_reset - reset libpq during error recovery
+ *
+ * This is called from error recovery at the outer idle loop.  It's
+ * just to get us out of trouble if we somehow manage to elog() from
+ * inside a pqcomm.c routine (which ideally will never happen, but...)
+ * --------------------------------
+ */
+static void
+socket_comm_reset(void)
+{
+       /* Do not throw away pending data, but do reset the busy flag */
+       PqCommBusy = false;
+       /* We can abort any old-style COPY OUT, too */
+       pq_endcopyout(true);
+}
 
 /* --------------------------------
- *             pq_close - shutdown libpq at backend exit
+ *             socket_close - shutdown libpq at backend exit
  *
- * Note: in a standalone backend MyProcPort will be null,
- * don't crash during exit...
+ * This is the one pg_on_exit_callback in place during BackendInitialize().
+ * That function's unusual signal handling constrains that this callback be
+ * safe to run at any instant.
  * --------------------------------
  */
 static void
-pq_close(void)
+socket_close(int code, Datum arg)
 {
+       /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
        if (MyProcPort != NULL)
        {
-               /* Cleanly shut down SSL layer */
+#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+#ifdef ENABLE_GSS
+               OM_uint32       min_s;
+
+               /*
+                * Shutdown GSSAPI layer.  This section does nothing when interrupting
+                * BackendInitialize(), because pg_GSS_recvauth() makes first use of
+                * "ctx" and "cred".
+                */
+               if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
+                       gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
+
+               if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
+                       gss_release_cred(&min_s, &MyProcPort->gss->cred);
+#endif   /* ENABLE_GSS */
+
+               /*
+                * GSS and SSPI share the port->gss struct.  Since nowhere else does a
+                * postmaster child free this, doing so is safe when interrupting
+                * BackendInitialize().
+                */
+               free(MyProcPort->gss);
+#endif   /* ENABLE_GSS || ENABLE_SSPI */
+
+               /*
+                * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
+                * call this, so this is safe when interrupting BackendInitialize().
+                */
                secure_close(MyProcPort);
 
                /*
-                * Formerly we did an explicit close() here, but it seems better
-                * to leave the socket open until the process dies.  This allows
-                * clients to perform a "synchronous close" if they care --- wait
-                * till the transport layer reports connection closure, and you
-                * can be sure the backend has exited.
+                * Formerly we did an explicit close() here, but it seems better to
+                * leave the socket open until the process dies.  This allows clients
+                * to perform a "synchronous close" if they care --- wait till the
+                * transport layer reports connection closure, and you can be sure the
+                * backend has exited.
                 *
-                * We do set sock to -1 to prevent any further I/O, though.
+                * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
+                * though.
                 */
-               MyProcPort->sock = -1;
+               MyProcPort->sock = PGINVALID_SOCKET;
        }
 }
 
@@ -174,44 +292,35 @@ pq_close(void)
  *             Stream functions are used for vanilla TCP connection protocol.
  */
 
-static char sock_path[MAXPGPATH];
-
-
-/* StreamDoUnlink()
- * Shutdown routine for backend connection
- * If a Unix socket is used for communication, explicitly close it.
- */
-#ifdef HAVE_UNIX_SOCKETS
-static void
-StreamDoUnlink(void)
-{
-       Assert(sock_path[0]);
-       unlink(sock_path);
-}
-#endif   /* HAVE_UNIX_SOCKETS */
 
 /*
  * StreamServerPort -- open a "listening" port to accept connections.
  *
- * Successfully opened sockets are added to the ListenSocket[] array,
- * at the first position that isn't -1.
+ * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
+ * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
+ * specified.  For TCP ports, hostName is either NULL for all interfaces or
+ * the interface to listen on, and unixSocketDir is ignored (can be NULL).
+ *
+ * Successfully opened sockets are added to the ListenSocket[] array (of
+ * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
  *
  * RETURNS: STATUS_OK or STATUS_ERROR
  */
 
 int
 StreamServerPort(int family, char *hostName, unsigned short portNumber,
-                                char *unixSocketName,
-                                int ListenSocket[], int MaxListen)
+                                char *unixSocketDir,
+                                pgsocket ListenSocket[], int MaxListen)
 {
-       int                     fd,
-                               err;
+       pgsocket        fd;
+       int                     err;
        int                     maxconn;
-       int                     one = 1;
        int                     ret;
        char            portNumberStr[32];
        const char *familyDesc;
        char            familyDescBuf[64];
+       const char *addrDesc;
+       char            addrBuf[NI_MAXHOST];
        char       *service;
        struct addrinfo *addrs = NULL,
                           *addr;
@@ -219,6 +328,13 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
        int                     listen_index = 0;
        int                     added = 0;
 
+#ifdef HAVE_UNIX_SOCKETS
+       char            unixSocketPath[MAXPGPATH];
+#endif
+#if !defined(WIN32) || defined(IPV6_V6ONLY)
+       int                     one = 1;
+#endif
+
        /* Initialize hint structure */
        MemSet(&hint, 0, sizeof(hint));
        hint.ai_family = family;
@@ -228,10 +344,22 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 #ifdef HAVE_UNIX_SOCKETS
        if (family == AF_UNIX)
        {
-               /* Lock_AF_UNIX will also fill in sock_path. */
-               if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
+               /*
+                * Create unixSocketPath from portNumber and unixSocketDir and lock
+                * that file path
+                */
+               UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
+               if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
+               {
+                       ereport(LOG,
+                                       (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
+                                                       unixSocketPath,
+                                                       (int) (UNIXSOCK_PATH_BUFLEN - 1))));
                        return STATUS_ERROR;
-               service = sock_path;
+               }
+               if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
+                       return STATUS_ERROR;
+               service = unixSocketPath;
        }
        else
 #endif   /* HAVE_UNIX_SOCKETS */
@@ -240,7 +368,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                service = portNumberStr;
        }
 
-       ret = getaddrinfo_all(hostName, service, &hint, &addrs);
+       ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
        if (ret || !addrs)
        {
                if (hostName)
@@ -249,9 +377,10 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                                                        hostName, service, gai_strerror(ret))));
                else
                        ereport(LOG,
-                        (errmsg("could not translate service \"%s\" to address: %s",
-                                        service, gai_strerror(ret))));
-               freeaddrinfo_all(hint.ai_family, addrs);
+                                (errmsg("could not translate service \"%s\" to address: %s",
+                                                service, gai_strerror(ret))));
+               if (addrs)
+                       pg_freeaddrinfo_all(hint.ai_family, addrs);
                return STATUS_ERROR;
        }
 
@@ -260,8 +389,8 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
                {
                        /*
-                        * Only set up a unix domain socket when they really asked for
-                        * it.  The service/port is different in that case.
+                        * Only set up a unix domain socket when they really asked for it.
+                        * The service/port is different in that case.
                         */
                        continue;
                }
@@ -269,49 +398,79 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                /* See if there is still room to add 1 more socket. */
                for (; listen_index < MaxListen; listen_index++)
                {
-                       if (ListenSocket[listen_index] == -1)
+                       if (ListenSocket[listen_index] == PGINVALID_SOCKET)
                                break;
                }
                if (listen_index >= MaxListen)
                {
-                       /* Nothing found. */
+                       ereport(LOG,
+                                       (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
+                                                       MaxListen)));
                        break;
                }
 
-               /* set up family name for possible error messages */
+               /* set up address family name for log messages */
                switch (addr->ai_family)
                {
                        case AF_INET:
-                               familyDesc = gettext("IPv4");
+                               familyDesc = _("IPv4");
                                break;
 #ifdef HAVE_IPV6
                        case AF_INET6:
-                               familyDesc = gettext("IPv6");
+                               familyDesc = _("IPv6");
                                break;
 #endif
 #ifdef HAVE_UNIX_SOCKETS
                        case AF_UNIX:
-                               familyDesc = gettext("Unix");
+                               familyDesc = _("Unix");
                                break;
 #endif
                        default:
                                snprintf(familyDescBuf, sizeof(familyDescBuf),
-                                                gettext("unrecognized address family %d"),
+                                                _("unrecognized address family %d"),
                                                 addr->ai_family);
                                familyDesc = familyDescBuf;
                                break;
                }
 
-               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
+               /* set up text form of address for log messages */
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       addrDesc = unixSocketPath;
+               else
+#endif
+               {
+                       pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
+                                                          addr->ai_addrlen,
+                                                          addrBuf, sizeof(addrBuf),
+                                                          NULL, 0,
+                                                          NI_NUMERICHOST);
+                       addrDesc = addrBuf;
+               }
+
+               if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                                        /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not create %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                 errmsg("could not create %s socket for address \"%s\": %m",
+                                                familyDesc, addrDesc)));
                        continue;
                }
 
+#ifndef WIN32
+
+               /*
+                * Without the SO_REUSEADDR flag, a new postmaster can't be started
+                * right away after a stop or crash, giving "address already in use"
+                * error on TCP ports.
+                *
+                * On win32, however, this behavior only happens if the
+                * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
+                * servers to listen on the same address, resulting in unpredictable
+                * behavior. With no flags at all, win32 behaves as Unix with
+                * SO_REUSEADDR.
+                */
                if (!IS_AF_UNIX(addr->ai_family))
                {
                        if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
@@ -319,11 +478,14 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
                }
+#endif
 
 #ifdef IPV6_V6ONLY
                if (addr->ai_family == AF_INET6)
@@ -333,7 +495,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
-                                                errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
+                               /* translator: first %s is IPv4, IPv6, or Unix */
+                                                errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
+                                                               familyDesc, addrDesc)));
                                closesocket(fd);
                                continue;
                        }
@@ -342,25 +506,25 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 
                /*
                 * Note: This might fail on some OS's, like Linux older than
-                * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and
-                * map ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all
-                * ipv4 connections.
+                * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
+                * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
+                * connections.
                 */
                err = bind(fd, addr->ai_addr, addr->ai_addrlen);
                if (err < 0)
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                                        /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not bind %s socket: %m",
-                                                       familyDesc),
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not bind %s address \"%s\": %m",
+                                                       familyDesc, addrDesc),
                                         (IS_AF_UNIX(addr->ai_family)) ?
-                         errhint("Is another postmaster already running on port %d?"
-                                         " If not, remove socket file \"%s\" and retry.",
-                                         (int) portNumber, sock_path) :
-                         errhint("Is another postmaster already running on port %d?"
-                                         " If not, wait a few seconds and retry.",
-                                         (int) portNumber)));
+                                 errhint("Is another postmaster already running on port %d?"
+                                                 " If not, remove socket file \"%s\" and retry.",
+                                                 (int) portNumber, service) :
+                                 errhint("Is another postmaster already running on port %d?"
+                                                 " If not, wait a few seconds and retry.",
+                                                 (int) portNumber)));
                        closesocket(fd);
                        continue;
                }
@@ -368,7 +532,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 #ifdef HAVE_UNIX_SOCKETS
                if (addr->ai_family == AF_UNIX)
                {
-                       if (Setup_AF_UNIX() != STATUS_OK)
+                       if (Setup_AF_UNIX(service) != STATUS_OK)
                        {
                                closesocket(fd);
                                break;
@@ -377,10 +541,9 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 #endif
 
                /*
-                * Select appropriate accept-queue length limit.  PG_SOMAXCONN is
-                * only intended to provide a clamp on the request on platforms
-                * where an overly large request provokes a kernel error (are
-                * there any?).
+                * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
+                * intended to provide a clamp on the request on platforms where an
+                * overly large request provokes a kernel error (are there any?).
                 */
                maxconn = MaxBackends * 2;
                if (maxconn > PG_SOMAXCONN)
@@ -391,17 +554,30 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
                {
                        ereport(LOG,
                                        (errcode_for_socket_access(),
-                                        /* translator: %s is IPv4, IPv6, or Unix */
-                                        errmsg("could not listen on %s socket: %m",
-                                                       familyDesc)));
+                       /* translator: first %s is IPv4, IPv6, or Unix */
+                                        errmsg("could not listen on %s address \"%s\": %m",
+                                                       familyDesc, addrDesc)));
                        closesocket(fd);
                        continue;
                }
+
+#ifdef HAVE_UNIX_SOCKETS
+               if (addr->ai_family == AF_UNIX)
+                       ereport(LOG,
+                                       (errmsg("listening on Unix socket \"%s\"",
+                                                       addrDesc)));
+               else
+#endif
+                       ereport(LOG,
+                       /* translator: first %s is IPv4 or IPv6 */
+                                       (errmsg("listening on %s address \"%s\", port %d",
+                                                       familyDesc, addrDesc, (int) portNumber)));
+
                ListenSocket[listen_index] = fd;
                added++;
        }
 
-       freeaddrinfo_all(hint.ai_family, addrs);
+       pg_freeaddrinfo_all(hint.ai_family, addrs);
 
        if (!added)
                return STATUS_ERROR;
@@ -416,20 +592,28 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
  * Lock_AF_UNIX -- configure unix socket file path
  */
 static int
-Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
+Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
 {
-       UNIXSOCK_PATH(sock_path, portNumber, unixSocketName);
-
        /*
         * Grab an interlock file associated with the socket file.
+        *
+        * Note: there are two reasons for using a socket lock file, rather than
+        * trying to interlock directly on the socket itself.  First, it's a lot
+        * more portable, and second, it lets us remove any pre-existing socket
+        * file without race conditions.
         */
-       CreateSocketLockFile(sock_path, true);
+       CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
 
        /*
         * Once we have the interlock, we can safely delete any pre-existing
         * socket file to avoid failure at bind() time.
         */
-       unlink(sock_path);
+       (void) unlink(unixSocketPath);
+
+       /*
+        * Remember socket file pathnames for later maintenance.
+        */
+       sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
 
        return STATUS_OK;
 }
@@ -439,15 +623,12 @@ Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
  * Setup_AF_UNIX -- configure unix socket permissions
  */
 static int
-Setup_AF_UNIX(void)
+Setup_AF_UNIX(char *sock_path)
 {
-       /* Arrange to unlink the socket file at exit */
-       on_proc_exit(StreamDoUnlink, 0);
-
        /*
         * Fix socket ownership/permission if requested.  Note we must do this
-        * before we listen() to avoid a window where unwanted connections
-        * could get accepted.
+        * before we listen() to avoid a window where unwanted connections could
+        * get accepted.
         */
        Assert(Unix_socket_group);
        if (Unix_socket_group[0] != '\0')
@@ -456,7 +637,7 @@ Setup_AF_UNIX(void)
                elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
 #else
                char       *endptr;
-               unsigned long int val;
+               unsigned long val;
                gid_t           gid;
 
                val = strtoul(Unix_socket_group, &endptr, 10);
@@ -504,7 +685,7 @@ Setup_AF_UNIX(void)
 
 /*
  * StreamConnection -- create a new connection with client using
- *             server port.
+ *             server port.  Set port->sock to the FD of the new connection.
  *
  * ASSUME: that this doesn't need to be non-blocking because
  *             the Postmaster uses select() to tell when the server master
@@ -513,30 +694,29 @@ Setup_AF_UNIX(void)
  * RETURNS: STATUS_OK or STATUS_ERROR
  */
 int
-StreamConnection(int server_fd, Port *port)
+StreamConnection(pgsocket server_fd, Port *port)
 {
        /* accept connection and fill in the client (remote) address */
        port->raddr.salen = sizeof(port->raddr.addr);
        if ((port->sock = accept(server_fd,
                                                         (struct sockaddr *) & port->raddr.addr,
-                                                        &port->raddr.salen)) < 0)
+                                                        &port->raddr.salen)) == PGINVALID_SOCKET)
        {
                ereport(LOG,
                                (errcode_for_socket_access(),
                                 errmsg("could not accept new connection: %m")));
+
+               /*
+                * If accept() fails then postmaster.c will still see the server
+                * socket as read-ready, and will immediately try again.  To avoid
+                * uselessly sucking lots of CPU, delay a bit before trying again.
+                * (The most likely reason for failure is being out of kernel file
+                * table slots; we can do little except hope some will get freed up.)
+                */
+               pg_usleep(100000L);             /* wait 0.1 sec */
                return STATUS_ERROR;
        }
 
-#ifdef SCO_ACCEPT_BUG
-
-       /*
-        * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
-        * shouldn't hurt to catch it for all versions of those platforms.
-        */
-       if (port->raddr.addr.ss_family == 0)
-               port->raddr.addr.ss_family = AF_UNIX;
-#endif
-
        /* fill in the server (local) address */
        port->laddr.salen = sizeof(port->laddr.addr);
        if (getsockname(port->sock,
@@ -551,6 +731,11 @@ StreamConnection(int server_fd, Port *port)
        if (!IS_AF_UNIX(port->laddr.addr.ss_family))
        {
                int                     on;
+#ifdef WIN32
+               int                     oldopt;
+               int                     optlen;
+               int                     newopt;
+#endif
 
 #ifdef TCP_NODELAY
                on = 1;
@@ -568,6 +753,59 @@ StreamConnection(int server_fd, Port *port)
                        elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
                        return STATUS_ERROR;
                }
+
+#ifdef WIN32
+
+               /*
+                * This is a Win32 socket optimization.  The OS send buffer should be
+                * large enough to send the whole Postgres send buffer in one go, or
+                * performance suffers.  The Postgres send buffer can be enlarged if a
+                * very large message needs to be sent, but we won't attempt to
+                * enlarge the OS buffer if that happens, so somewhat arbitrarily
+                * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
+                * (That's 32kB with the current default).
+                *
+                * The default OS buffer size used to be 8kB in earlier Windows
+                * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
+                * be necessary to change it in later versions anymore.  Changing it
+                * unnecessarily can even reduce performance, because setting
+                * SO_SNDBUF in the application disables the "dynamic send buffering"
+                * feature that was introduced in Windows 7.  So before fiddling with
+                * SO_SNDBUF, check if the current buffer size is already large enough
+                * and only increase it if necessary.
+                *
+                * See https://support.microsoft.com/kb/823764/EN-US/ and
+                * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
+                */
+               optlen = sizeof(oldopt);
+               if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
+                                          &optlen) < 0)
+               {
+                       elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
+                       return STATUS_ERROR;
+               }
+               newopt = PQ_SEND_BUFFER_SIZE * 4;
+               if (oldopt < newopt)
+               {
+                       if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
+                                                  sizeof(newopt)) < 0)
+                       {
+                               elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
+                               return STATUS_ERROR;
+                       }
+               }
+#endif
+
+               /*
+                * Also apply the current keepalive parameters.  If we fail to set a
+                * parameter, don't error out, because these aren't universally
+                * supported.  (Note: you might think we need to reset the GUC
+                * variables to 0 in such a case, but it's not necessary because the
+                * show hooks for these variables report the truth anyway.)
+                */
+               (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
+               (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
+               (void) pq_setkeepalivescount(tcp_keepalives_count, port);
        }
 
        return STATUS_OK;
@@ -584,30 +822,34 @@ StreamConnection(int server_fd, Port *port)
  * we do NOT want to send anything to the far end.
  */
 void
-StreamClose(int sock)
+StreamClose(pgsocket sock)
 {
        closesocket(sock);
 }
 
 /*
- * TouchSocketFile -- mark socket file as recently accessed
+ * TouchSocketFiles -- mark socket files as recently accessed
  *
  * This routine should be called every so often to ensure that the socket
- * file has a recent mod date (ordinary operations on sockets usually won't
- * change the mod date).  That saves it from being removed by
+ * files have a recent mod date (ordinary operations on sockets usually won't
+ * change the mod date).  That saves them from being removed by
  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
  * never have put the socket file in /tmp...)
  */
 void
-TouchSocketFile(void)
+TouchSocketFiles(void)
 {
-       /* Do nothing if we did not create a socket... */
-       if (sock_path[0] != '\0')
+       ListCell   *l;
+
+       /* Loop through all created sockets... */
+       foreach(l, sock_paths)
        {
+               char       *sock_path = (char *) lfirst(l);
+
                /*
-                * utime() is POSIX standard, utimes() is a common alternative. If
-                * we have neither, there's no way to affect the mod or access
-                * time of the socket :-(
+                * utime() is POSIX standard, utimes() is a common alternative. If we
+                * have neither, there's no way to affect the mod or access time of
+                * the socket :-(
                 *
                 * In either path, we ignore errors; there's no point in complaining.
                 */
@@ -621,6 +863,26 @@ TouchSocketFile(void)
        }
 }
 
+/*
+ * RemoveSocketFiles -- unlink socket files at postmaster shutdown
+ */
+void
+RemoveSocketFiles(void)
+{
+       ListCell   *l;
+
+       /* Loop through all created sockets... */
+       foreach(l, sock_paths)
+       {
+               char       *sock_path = (char *) lfirst(l);
+
+               /* Ignore any error. */
+               (void) unlink(sock_path);
+       }
+       /* Since we're about to exit, no need to reclaim storage */
+       sock_paths = NIL;
+}
+
 
 /* --------------------------------
  * Low-level I/O routines begin here.
@@ -630,6 +892,23 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *                       socket_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+socket_set_nonblocking(bool nonblocking)
+{
+       if (MyProcPort == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+                                errmsg("there is no client connection")));
+
+       MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *             pq_recvbuf - load some bytes into the input buffer
@@ -654,13 +933,16 @@ pq_recvbuf(void)
                        PqRecvLength = PqRecvPointer = 0;
        }
 
+       /* Ensure that we're in blocking mode */
+       socket_set_nonblocking(false);
+
        /* Can fill buffer from PqRecvLength and upwards */
        for (;;)
        {
                int                     r;
 
                r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-                                               PQ_BUFFER_SIZE - PqRecvLength);
+                                               PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
                if (r < 0)
                {
@@ -668,10 +950,9 @@ pq_recvbuf(void)
                                continue;               /* Ok if interrupted */
 
                        /*
-                        * Careful: an ereport() that tries to write to the client
-                        * would cause recursion to here, leading to stack overflow
-                        * and core dump!  This message must go *only* to the
-                        * postmaster log.
+                        * Careful: an ereport() that tries to write to the client would
+                        * cause recursion to here, leading to stack overflow and core
+                        * dump!  This message must go *only* to the postmaster log.
                         */
                        ereport(COMMERROR,
                                        (errcode_for_socket_access(),
@@ -681,8 +962,8 @@ pq_recvbuf(void)
                if (r == 0)
                {
                        /*
-                        * EOF detected.  We used to write a log message here, but
-                        * it's better to expect the ultimate caller to do that.
+                        * EOF detected.  We used to write a log message here, but it's
+                        * better to expect the ultimate caller to do that.
                         */
                        return EOF;
                }
@@ -699,12 +980,14 @@ pq_recvbuf(void)
 int
 pq_getbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
                        return EOF;                     /* Failed to recv data */
        }
-       return PqRecvBuffer[PqRecvPointer++];
+       return (unsigned char) PqRecvBuffer[PqRecvPointer++];
 }
 
 /* --------------------------------
@@ -716,12 +999,70 @@ pq_getbyte(void)
 int
 pq_peekbyte(void)
 {
+       Assert(PqCommReadingMsg);
+
        while (PqRecvPointer >= PqRecvLength)
        {
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
                        return EOF;                     /* Failed to recv data */
        }
-       return PqRecvBuffer[PqRecvPointer];
+       return (unsigned char) PqRecvBuffer[PqRecvPointer];
+}
+
+/* --------------------------------
+ *             pq_getbyte_if_available - get a single byte from connection,
+ *                     if available
+ *
+ * The received byte is stored in *c. Returns 1 if a byte was read,
+ * 0 if no data was available, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_getbyte_if_available(unsigned char *c)
+{
+       int                     r;
+
+       Assert(PqCommReadingMsg);
+
+       if (PqRecvPointer < PqRecvLength)
+       {
+               *c = PqRecvBuffer[PqRecvPointer++];
+               return 1;
+       }
+
+       /* Put the socket into non-blocking mode */
+       socket_set_nonblocking(true);
+
+       r = secure_read(MyProcPort, c, 1);
+       if (r < 0)
+       {
+               /*
+                * Ok if no data available without blocking or interrupted (though
+                * EINTR really shouldn't happen with a non-blocking socket). Report
+                * other errors.
+                */
+               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+                       r = 0;
+               else
+               {
+                       /*
+                        * Careful: an ereport() that tries to write to the client would
+                        * cause recursion to here, leading to stack overflow and core
+                        * dump!  This message must go *only* to the postmaster log.
+                        */
+                       ereport(COMMERROR,
+                                       (errcode_for_socket_access(),
+                                        errmsg("could not receive data from client: %m")));
+                       r = EOF;
+               }
+       }
+       else if (r == 0)
+       {
+               /* EOF detected */
+               r = EOF;
+       }
+
+       return r;
 }
 
 /* --------------------------------
@@ -735,6 +1076,8 @@ pq_getbytes(char *s, size_t len)
 {
        size_t          amount;
 
+       Assert(PqCommReadingMsg);
+
        while (len > 0)
        {
                while (PqRecvPointer >= PqRecvLength)
@@ -753,6 +1096,38 @@ pq_getbytes(char *s, size_t len)
        return 0;
 }
 
+/* --------------------------------
+ *             pq_discardbytes         - throw away a known number of bytes
+ *
+ *             same as pq_getbytes except we do not copy the data to anyplace.
+ *             this is used for resynchronizing after read errors.
+ *
+ *             returns 0 if OK, EOF if trouble
+ * --------------------------------
+ */
+static int
+pq_discardbytes(size_t len)
+{
+       size_t          amount;
+
+       Assert(PqCommReadingMsg);
+
+       while (len > 0)
+       {
+               while (PqRecvPointer >= PqRecvLength)
+               {
+                       if (pq_recvbuf())       /* If nothing in buffer, then recv some */
+                               return EOF;             /* Failed to recv data */
+               }
+               amount = PqRecvLength - PqRecvPointer;
+               if (amount > len)
+                       amount = len;
+               PqRecvPointer += amount;
+               len -= amount;
+       }
+       return 0;
+}
+
 /* --------------------------------
  *             pq_getstring    - get a null terminated string from connection
  *
@@ -773,10 +1148,9 @@ pq_getstring(StringInfo s)
 {
        int                     i;
 
-       /* Reset string to empty */
-       s->len = 0;
-       s->data[0] = '\0';
-       s->cursor = 0;
+       Assert(PqCommReadingMsg);
+
+       resetStringInfo(s);
 
        /* Read until we get the terminating '\0' */
        for (;;)
@@ -807,6 +1181,58 @@ pq_getstring(StringInfo s)
 }
 
 
+/* --------------------------------
+ *             pq_startmsgread - begin reading a message from the client.
+ *
+ *             This must be called before any of the pq_get* functions.
+ * --------------------------------
+ */
+void
+pq_startmsgread(void)
+{
+       /*
+        * There shouldn't be a read active already, but let's check just to be
+        * sure.
+        */
+       if (PqCommReadingMsg)
+               ereport(FATAL,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg("terminating connection because protocol synchronization was lost")));
+
+       PqCommReadingMsg = true;
+}
+
+
+/* --------------------------------
+ *             pq_endmsgread   - finish reading message.
+ *
+ *             This must be called after reading a V2 protocol message with
+ *             pq_getstring() and friends, to indicate that we have read the whole
+ *             message. In V3 protocol, pq_getmessage() does this implicitly.
+ * --------------------------------
+ */
+void
+pq_endmsgread(void)
+{
+       Assert(PqCommReadingMsg);
+
+       PqCommReadingMsg = false;
+}
+
+/* --------------------------------
+ *             pq_is_reading_msg - are we currently reading a message?
+ *
+ * This is used in error recovery at the outer idle loop to detect if we have
+ * lost protocol sync, and need to terminate the connection. pq_startmsgread()
+ * will check for that too, but it's nicer to detect it earlier.
+ * --------------------------------
+ */
+bool
+pq_is_reading_msg(void)
+{
+       return PqCommReadingMsg;
+}
+
 /* --------------------------------
  *             pq_getmessage   - get a message with length word from connection
  *
@@ -828,10 +1254,9 @@ pq_getmessage(StringInfo s, int maxlen)
 {
        int32           len;
 
-       /* Reset message buffer to empty */
-       s->len = 0;
-       s->data[0] = '\0';
-       s->cursor = 0;
+       Assert(PqCommReadingMsg);
+
+       resetStringInfo(s);
 
        /* Read message length word */
        if (pq_getbytes((char *) &len, 4) == EOF)
@@ -843,9 +1268,8 @@ pq_getmessage(StringInfo s, int maxlen)
        }
 
        len = ntohl(len);
-       len -= 4;                                       /* discount length itself */
 
-       if (len < 0 ||
+       if (len < 4 ||
                (maxlen > 0 && len > maxlen))
        {
                ereport(COMMERROR,
@@ -854,10 +1278,31 @@ pq_getmessage(StringInfo s, int maxlen)
                return EOF;
        }
 
+       len -= 4;                                       /* discount length itself */
+
        if (len > 0)
        {
-               /* Allocate space for message */
-               enlargeStringInfo(s, len);
+               /*
+                * Allocate space for message.  If we run out of room (ridiculously
+                * large message), we will elog(ERROR), but we want to discard the
+                * message body so as not to lose communication sync.
+                */
+               PG_TRY();
+               {
+                       enlargeStringInfo(s, len);
+               }
+               PG_CATCH();
+               {
+                       if (pq_discardbytes(len) == EOF)
+                               ereport(COMMERROR,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg("incomplete message from client")));
+
+                       /* we discarded the rest of the message so we're back in sync. */
+                       PqCommReadingMsg = false;
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
 
                /* And grab the message */
                if (pq_getbytes(s->data, len) == EOF)
@@ -872,6 +1317,9 @@ pq_getmessage(StringInfo s, int maxlen)
                s->data[len] = '\0';
        }
 
+       /* finished reading the message. */
+       PqCommReadingMsg = false;
+
        return 0;
 }
 
@@ -884,15 +1332,35 @@ pq_getmessage(StringInfo s, int maxlen)
  */
 int
 pq_putbytes(const char *s, size_t len)
+{
+       int                     res;
+
+       /* Should only be called by old-style COPY OUT */
+       Assert(DoingCopyOut);
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+       PqCommBusy = true;
+       res = internal_putbytes(s, len);
+       PqCommBusy = false;
+       return res;
+}
+
+static int
+internal_putbytes(const char *s, size_t len)
 {
        size_t          amount;
 
        while (len > 0)
        {
-               if (PqSendPointer >= PQ_BUFFER_SIZE)
-                       if (pq_flush())         /* If buffer is full, then flush it out */
+               /* If buffer is full, then flush it out */
+               if (PqSendPointer >= PqSendBufferSize)
+               {
+                       socket_set_nonblocking(false);
+                       if (internal_flush())
                                return EOF;
-               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               }
+               amount = PqSendBufferSize - PqSendPointer;
                if (amount > len)
                        amount = len;
                memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -904,18 +1372,40 @@ pq_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *             pq_flush                - flush pending output
+ *             socket_flush            - flush pending output
  *
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
+{
+       int                     res;
+
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+       PqCommBusy = true;
+       socket_set_nonblocking(false);
+       res = internal_flush();
+       PqCommBusy = false;
+       return res;
+}
+
+/* --------------------------------
+ *             internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
+static int
+internal_flush(void)
 {
        static int      last_reported_send_errno = 0;
 
-       unsigned char *bufptr = PqSendBuffer;
-       unsigned char *bufend = PqSendBuffer + PqSendPointer;
+       char       *bufptr = PqSendBuffer + PqSendStart;
+       char       *bufend = PqSendBuffer + PqSendPointer;
 
        while (bufptr < bufend)
        {
@@ -929,14 +1419,23 @@ pq_flush(void)
                                continue;               /* Ok if we were interrupted */
 
                        /*
-                        * Careful: an ereport() that tries to write to the client
-                        * would cause recursion to here, leading to stack overflow
-                        * and core dump!  This message must go *only* to the
-                        * postmaster log.
+                        * Ok if no data writable without blocking, and the socket is in
+                        * non-blocking mode.
+                        */
+                       if (errno == EAGAIN ||
+                               errno == EWOULDBLOCK)
+                       {
+                               return 0;
+                       }
+
+                       /*
+                        * Careful: an ereport() that tries to write to the client would
+                        * cause recursion to here, leading to stack overflow and core
+                        * dump!  This message must go *only* to the postmaster log.
                         *
                         * If a client disconnects while we're in the midst of output, we
-                        * might write quite a bit of data before we get to a safe
-                        * query abort point.  So, suppress duplicate log messages.
+                        * might write quite a bit of data before we get to a safe query
+                        * abort point.  So, suppress duplicate log messages.
                         */
                        if (errno != last_reported_send_errno)
                        {
@@ -948,20 +1447,62 @@ pq_flush(void)
 
                        /*
                         * We drop the buffered data anyway so that processing can
-                        * continue, even though we'll probably quit soon.
+                        * continue, even though we'll probably quit soon. We also set a
+                        * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
+                        * the connection.
                         */
-                       PqSendPointer = 0;
+                       PqSendStart = PqSendPointer = 0;
+                       ClientConnectionLost = 1;
+                       InterruptPending = 1;
                        return EOF;
                }
 
                last_reported_send_errno = 0;   /* reset after any successful send */
                bufptr += r;
+               PqSendStart += r;
        }
 
-       PqSendPointer = 0;
+       PqSendStart = PqSendPointer = 0;
        return 0;
 }
 
+/* --------------------------------
+ *             pq_flush_if_writable - flush pending output if writable without blocking
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+static int
+socket_flush_if_writable(void)
+{
+       int                     res;
+
+       /* Quick exit if nothing to do */
+       if (PqSendPointer == PqSendStart)
+               return 0;
+
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+
+       /* Temporarily put the socket into non-blocking mode */
+       socket_set_nonblocking(true);
+
+       PqCommBusy = true;
+       res = internal_flush();
+       PqCommBusy = false;
+       return res;
+}
+
+/* --------------------------------
+ *     socket_is_send_pending  - is there any pending data in the output buffer?
+ * --------------------------------
+ */
+static bool
+socket_is_send_pending(void)
+{
+       return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -972,7 +1513,7 @@ pq_flush(void)
 
 
 /* --------------------------------
- *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
+ *             socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *             If msgtype is not '\0', it is a message type code to place before
  *             the message body.  If msgtype is '\0', then the message has no type
@@ -987,41 +1528,83 @@ pq_flush(void)
  *             then; dropping them is annoying, but at least they will still appear
  *             in the postmaster log.)
  *
+ *             We also suppress messages generated while pqcomm.c is busy.  This
+ *             avoids any possibility of messages being inserted within other
+ *             messages.  The only known trouble case arises if SIGQUIT occurs
+ *             during a pqcomm.c routine --- quickdie() will try to send a warning
+ *             message, and the most reasonable approach seems to be to drop it.
+ *
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
-       if (DoingCopyOut)
+       if (DoingCopyOut || PqCommBusy)
                return 0;
+       PqCommBusy = true;
        if (msgtype)
-               if (pq_putbytes(&msgtype, 1))
-                       return EOF;
+               if (internal_putbytes(&msgtype, 1))
+                       goto fail;
        if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
        {
                uint32          n32;
 
                n32 = htonl((uint32) (len + 4));
-               if (pq_putbytes((char *) &n32, 4))
-                       return EOF;
+               if (internal_putbytes((char *) &n32, 4))
+                       goto fail;
+       }
+       if (internal_putbytes(s, len))
+               goto fail;
+       PqCommBusy = false;
+       return 0;
+
+fail:
+       PqCommBusy = false;
+       return EOF;
+}
+
+/* --------------------------------
+ *             pq_putmessage_noblock   - like pq_putmessage, but never blocks
+ *
+ *             If the output buffer is too small to hold the message, the buffer
+ *             is enlarged.
+ */
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+       int res         PG_USED_FOR_ASSERTS_ONLY;
+       int                     required;
+
+       /*
+        * Ensure we have enough space in the output buffer for the message header
+        * as well as the message itself.
+        */
+       required = PqSendPointer + 1 + 4 + len;
+       if (required > PqSendBufferSize)
+       {
+               PqSendBuffer = repalloc(PqSendBuffer, required);
+               PqSendBufferSize = required;
        }
-       return pq_putbytes(s, len);
+       res = pq_putmessage(msgtype, s, len);
+       Assert(res == 0);                       /* should not fail when the message fits in
+                                                                * buffer */
 }
 
+
 /* --------------------------------
- *             pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *             socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *                     is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
        DoingCopyOut = true;
 }
 
 /* --------------------------------
- *             pq_endcopyout   - end an old-style COPY OUT transfer
+ *             socket_endcopyout       - end an old-style COPY OUT transfer
  *
  *             If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *             and must send a terminator line.  Since a partial data line might have
@@ -1030,13 +1613,314 @@ pq_startcopyout(void)
  *             not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
        if (!DoingCopyOut)
                return;
-       DoingCopyOut = false;
        if (errorAbort)
                pq_putbytes("\n\n\\.\n", 5);
        /* in non-error case, copy.c will have emitted the terminator line */
+       DoingCopyOut = false;
+}
+
+/*
+ * Support for TCP Keepalive parameters
+ */
+
+/*
+ * On Windows, we need to set both idle and interval at the same time.
+ * We also cannot reset them to the default (setting to zero will
+ * actually set them to zero, not default), therefore we fallback to
+ * the out-of-the-box default instead.
+ */
+#if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
+static int
+pq_setkeepaliveswin32(Port *port, int idle, int interval)
+{
+       struct tcp_keepalive ka;
+       DWORD           retsize;
+
+       if (idle <= 0)
+               idle = 2 * 60 * 60;             /* default = 2 hours */
+       if (interval <= 0)
+               interval = 1;                   /* default = 1 second */
+
+       ka.onoff = 1;
+       ka.keepalivetime = idle * 1000;
+       ka.keepaliveinterval = interval * 1000;
+
+       if (WSAIoctl(port->sock,
+                                SIO_KEEPALIVE_VALS,
+                                (LPVOID) &ka,
+                                sizeof(ka),
+                                NULL,
+                                0,
+                                &retsize,
+                                NULL,
+                                NULL)
+               != 0)
+       {
+               elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
+                        WSAGetLastError());
+               return STATUS_ERROR;
+       }
+       if (port->keepalives_idle != idle)
+               port->keepalives_idle = idle;
+       if (port->keepalives_interval != interval)
+               port->keepalives_interval = interval;
+       return STATUS_OK;
+}
+#endif
+
+int
+pq_getkeepalivesidle(Port *port)
+{
+#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return 0;
+
+       if (port->keepalives_idle != 0)
+               return port->keepalives_idle;
+
+       if (port->default_keepalives_idle == 0)
+       {
+#ifndef WIN32
+               ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
+
+#ifdef TCP_KEEPIDLE
+               if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
+                                          (char *) &port->default_keepalives_idle,
+                                          &size) < 0)
+               {
+                       elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
+                       port->default_keepalives_idle = -1; /* don't know */
+               }
+#else
+               if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
+                                          (char *) &port->default_keepalives_idle,
+                                          &size) < 0)
+               {
+                       elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
+                       port->default_keepalives_idle = -1; /* don't know */
+               }
+#endif   /* TCP_KEEPIDLE */
+#else                                                  /* WIN32 */
+               /* We can't get the defaults on Windows, so return "don't know" */
+               port->default_keepalives_idle = -1;
+#endif   /* WIN32 */
+       }
+
+       return port->default_keepalives_idle;
+#else
+       return 0;
+#endif
+}
+
+int
+pq_setkeepalivesidle(int idle, Port *port)
+{
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return STATUS_OK;
+
+#if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
+       if (idle == port->keepalives_idle)
+               return STATUS_OK;
+
+#ifndef WIN32
+       if (port->default_keepalives_idle <= 0)
+       {
+               if (pq_getkeepalivesidle(port) < 0)
+               {
+                       if (idle == 0)
+                               return STATUS_OK;               /* default is set but unknown */
+                       else
+                               return STATUS_ERROR;
+               }
+       }
+
+       if (idle == 0)
+               idle = port->default_keepalives_idle;
+
+#ifdef TCP_KEEPIDLE
+       if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
+                                  (char *) &idle, sizeof(idle)) < 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
+               return STATUS_ERROR;
+       }
+#else
+       if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
+                                  (char *) &idle, sizeof(idle)) < 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
+               return STATUS_ERROR;
+       }
+#endif
+
+       port->keepalives_idle = idle;
+#else                                                  /* WIN32 */
+       return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
+#endif
+#else                                                  /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
+       if (idle != 0)
+       {
+               elog(LOG, "setting the keepalive idle time is not supported");
+               return STATUS_ERROR;
+       }
+#endif
+       return STATUS_OK;
+}
+
+int
+pq_getkeepalivesinterval(Port *port)
+{
+#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return 0;
+
+       if (port->keepalives_interval != 0)
+               return port->keepalives_interval;
+
+       if (port->default_keepalives_interval == 0)
+       {
+#ifndef WIN32
+               ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
+
+               if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
+                                          (char *) &port->default_keepalives_interval,
+                                          &size) < 0)
+               {
+                       elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
+                       port->default_keepalives_interval = -1;         /* don't know */
+               }
+#else
+               /* We can't get the defaults on Windows, so return "don't know" */
+               port->default_keepalives_interval = -1;
+#endif   /* WIN32 */
+       }
+
+       return port->default_keepalives_interval;
+#else
+       return 0;
+#endif
+}
+
+int
+pq_setkeepalivesinterval(int interval, Port *port)
+{
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return STATUS_OK;
+
+#if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
+       if (interval == port->keepalives_interval)
+               return STATUS_OK;
+
+#ifndef WIN32
+       if (port->default_keepalives_interval <= 0)
+       {
+               if (pq_getkeepalivesinterval(port) < 0)
+               {
+                       if (interval == 0)
+                               return STATUS_OK;               /* default is set but unknown */
+                       else
+                               return STATUS_ERROR;
+               }
+       }
+
+       if (interval == 0)
+               interval = port->default_keepalives_interval;
+
+       if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
+                                  (char *) &interval, sizeof(interval)) < 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
+               return STATUS_ERROR;
+       }
+
+       port->keepalives_interval = interval;
+#else                                                  /* WIN32 */
+       return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
+#endif
+#else
+       if (interval != 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
+               return STATUS_ERROR;
+       }
+#endif
+
+       return STATUS_OK;
+}
+
+int
+pq_getkeepalivescount(Port *port)
+{
+#ifdef TCP_KEEPCNT
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return 0;
+
+       if (port->keepalives_count != 0)
+               return port->keepalives_count;
+
+       if (port->default_keepalives_count == 0)
+       {
+               ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
+
+               if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
+                                          (char *) &port->default_keepalives_count,
+                                          &size) < 0)
+               {
+                       elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
+                       port->default_keepalives_count = -1;            /* don't know */
+               }
+       }
+
+       return port->default_keepalives_count;
+#else
+       return 0;
+#endif
+}
+
+int
+pq_setkeepalivescount(int count, Port *port)
+{
+       if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
+               return STATUS_OK;
+
+#ifdef TCP_KEEPCNT
+       if (count == port->keepalives_count)
+               return STATUS_OK;
+
+       if (port->default_keepalives_count <= 0)
+       {
+               if (pq_getkeepalivescount(port) < 0)
+               {
+                       if (count == 0)
+                               return STATUS_OK;               /* default is set but unknown */
+                       else
+                               return STATUS_ERROR;
+               }
+       }
+
+       if (count == 0)
+               count = port->default_keepalives_count;
+
+       if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
+                                  (char *) &count, sizeof(count)) < 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
+               return STATUS_ERROR;
+       }
+
+       port->keepalives_count = count;
+#else
+       if (count != 0)
+       {
+               elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
+               return STATUS_ERROR;
+       }
+#endif
+
+       return STATUS_OK;
 }