]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
90b6946b386b1234613d61f8275bdefd6ed49094
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFiles        - Protect socket files against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_comm_reset   - reset libpq during error recovery
48  *              pq_close                - shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *              pq_getbytes             - get a known number of bytes from connection
52  *              pq_getstring    - get a null terminated string from connection
53  *              pq_getmessage   - get a message with length word from connection
54  *              pq_getbyte              - get next byte from connection
55  *              pq_peekbyte             - peek at next byte from connection
56  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
57  *              pq_flush                - flush pending output
58  *              pq_flush_if_writable - flush pending output if writable without blocking
59  *              pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
63  *              pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *              pq_endcopyout   - end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef WIN32_ONLY_COMPILER              /* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91
92 #include "libpq/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105 /* Where the Unix socket files are (list of palloc'd strings) */
106 static List *sock_paths = NIL;
107
108 /*
109  * Buffers for low-level I/O.
110  *
111  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
112  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
113  */
114
115 #define PQ_SEND_BUFFER_SIZE 8192
116 #define PQ_RECV_BUFFER_SIZE 8192
117
118 static char *PqSendBuffer;
119 static int      PqSendBufferSize;       /* Size send buffer */
120 static int      PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
121 static int      PqSendStart;            /* Next index to send a byte in PqSendBuffer */
122
123 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
124 static int      PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
125 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
126
127 /*
128  * Message status
129  */
130 static bool PqCommBusy;                 /* busy sending data to the client */
131 static bool PqCommReadingMsg;   /* in the middle of reading a message */
132 static bool DoingCopyOut;               /* in old-protocol COPY OUT processing */
133
134
135 /* Internal functions */
136 static void socket_comm_reset(void);
137 static void socket_close(int code, Datum arg);
138 static void socket_set_nonblocking(bool nonblocking);
139 static int      socket_flush(void);
140 static int      socket_flush_if_writable(void);
141 static bool socket_is_send_pending(void);
142 static int      socket_putmessage(char msgtype, const char *s, size_t len);
143 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
144 static void socket_startcopyout(void);
145 static void socket_endcopyout(bool errorAbort);
146 static int      internal_putbytes(const char *s, size_t len);
147 static int      internal_flush(void);
148
149 #ifdef HAVE_UNIX_SOCKETS
150 static int      Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
151 static int      Setup_AF_UNIX(char *sock_path);
152 #endif   /* HAVE_UNIX_SOCKETS */
153
154 static PQcommMethods PqCommSocketMethods = {
155         socket_comm_reset,
156         socket_flush,
157         socket_flush_if_writable,
158         socket_is_send_pending,
159         socket_putmessage,
160         socket_putmessage_noblock,
161         socket_startcopyout,
162         socket_endcopyout
163 };
164
165 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
166
167 WaitEventSet *FeBeWaitSet;
168
169
170 /* --------------------------------
171  *              pq_init - initialize libpq at backend startup
172  * --------------------------------
173  */
174 void
175 pq_init(void)
176 {
177         /* initialize state variables */
178         PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
179         PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
180         PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
181         PqCommBusy = false;
182         PqCommReadingMsg = false;
183         DoingCopyOut = false;
184
185         /* set up process-exit hook to close the socket */
186         on_proc_exit(socket_close, 0);
187
188         /*
189          * In backends (as soon as forked) we operate the underlying socket in
190          * nonblocking mode and use latches to implement blocking semantics if
191          * needed. That allows us to provide safely interruptible reads and
192          * writes.
193          *
194          * Use COMMERROR on failure, because ERROR would try to send the error to
195          * the client, which might require changing the mode again, leading to
196          * infinite recursion.
197          */
198 #ifndef WIN32
199         if (!pg_set_noblock(MyProcPort->sock))
200                 ereport(COMMERROR,
201                                 (errmsg("could not set socket to nonblocking mode: %m")));
202 #endif
203
204         FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
205         AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
206                                           NULL, NULL);
207         AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
208         AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
209 }
210
211 /* --------------------------------
212  *              socket_comm_reset - reset libpq during error recovery
213  *
214  * This is called from error recovery at the outer idle loop.  It's
215  * just to get us out of trouble if we somehow manage to elog() from
216  * inside a pqcomm.c routine (which ideally will never happen, but...)
217  * --------------------------------
218  */
219 static void
220 socket_comm_reset(void)
221 {
222         /* Do not throw away pending data, but do reset the busy flag */
223         PqCommBusy = false;
224         /* We can abort any old-style COPY OUT, too */
225         pq_endcopyout(true);
226 }
227
228 /* --------------------------------
229  *              socket_close - shutdown libpq at backend exit
230  *
231  * This is the one pg_on_exit_callback in place during BackendInitialize().
232  * That function's unusual signal handling constrains that this callback be
233  * safe to run at any instant.
234  * --------------------------------
235  */
236 static void
237 socket_close(int code, Datum arg)
238 {
239         /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
240         if (MyProcPort != NULL)
241         {
242 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
243 #ifdef ENABLE_GSS
244                 OM_uint32       min_s;
245
246                 /*
247                  * Shutdown GSSAPI layer.  This section does nothing when interrupting
248                  * BackendInitialize(), because pg_GSS_recvauth() makes first use of
249                  * "ctx" and "cred".
250                  */
251                 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
252                         gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
253
254                 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
255                         gss_release_cred(&min_s, &MyProcPort->gss->cred);
256 #endif   /* ENABLE_GSS */
257
258                 /*
259                  * GSS and SSPI share the port->gss struct.  Since nowhere else does a
260                  * postmaster child free this, doing so is safe when interrupting
261                  * BackendInitialize().
262                  */
263                 free(MyProcPort->gss);
264 #endif   /* ENABLE_GSS || ENABLE_SSPI */
265
266                 /*
267                  * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
268                  * call this, so this is safe when interrupting BackendInitialize().
269                  */
270                 secure_close(MyProcPort);
271
272                 /*
273                  * Formerly we did an explicit close() here, but it seems better to
274                  * leave the socket open until the process dies.  This allows clients
275                  * to perform a "synchronous close" if they care --- wait till the
276                  * transport layer reports connection closure, and you can be sure the
277                  * backend has exited.
278                  *
279                  * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
280                  * though.
281                  */
282                 MyProcPort->sock = PGINVALID_SOCKET;
283         }
284 }
285
286
287
288 /*
289  * Streams -- wrapper around Unix socket system calls
290  *
291  *
292  *              Stream functions are used for vanilla TCP connection protocol.
293  */
294
295
296 /*
297  * StreamServerPort -- open a "listening" port to accept connections.
298  *
299  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
300  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
301  * specified.  For TCP ports, hostName is either NULL for all interfaces or
302  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
303  *
304  * Successfully opened sockets are added to the ListenSocket[] array (of
305  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
306  *
307  * RETURNS: STATUS_OK or STATUS_ERROR
308  */
309
310 int
311 StreamServerPort(int family, char *hostName, unsigned short portNumber,
312                                  char *unixSocketDir,
313                                  pgsocket ListenSocket[], int MaxListen)
314 {
315         pgsocket        fd;
316         int                     err;
317         int                     maxconn;
318         int                     ret;
319         char            portNumberStr[32];
320         const char *familyDesc;
321         char            familyDescBuf[64];
322         char       *service;
323         struct addrinfo *addrs = NULL,
324                            *addr;
325         struct addrinfo hint;
326         int                     listen_index = 0;
327         int                     added = 0;
328
329 #ifdef HAVE_UNIX_SOCKETS
330         char            unixSocketPath[MAXPGPATH];
331 #endif
332 #if !defined(WIN32) || defined(IPV6_V6ONLY)
333         int                     one = 1;
334 #endif
335
336         /* Initialize hint structure */
337         MemSet(&hint, 0, sizeof(hint));
338         hint.ai_family = family;
339         hint.ai_flags = AI_PASSIVE;
340         hint.ai_socktype = SOCK_STREAM;
341
342 #ifdef HAVE_UNIX_SOCKETS
343         if (family == AF_UNIX)
344         {
345                 /*
346                  * Create unixSocketPath from portNumber and unixSocketDir and lock
347                  * that file path
348                  */
349                 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
350                 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
351                 {
352                         ereport(LOG,
353                                         (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
354                                                         unixSocketPath,
355                                                         (int) (UNIXSOCK_PATH_BUFLEN - 1))));
356                         return STATUS_ERROR;
357                 }
358                 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
359                         return STATUS_ERROR;
360                 service = unixSocketPath;
361         }
362         else
363 #endif   /* HAVE_UNIX_SOCKETS */
364         {
365                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
366                 service = portNumberStr;
367         }
368
369         ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
370         if (ret || !addrs)
371         {
372                 if (hostName)
373                         ereport(LOG,
374                                         (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
375                                                         hostName, service, gai_strerror(ret))));
376                 else
377                         ereport(LOG,
378                                  (errmsg("could not translate service \"%s\" to address: %s",
379                                                  service, gai_strerror(ret))));
380                 if (addrs)
381                         pg_freeaddrinfo_all(hint.ai_family, addrs);
382                 return STATUS_ERROR;
383         }
384
385         for (addr = addrs; addr; addr = addr->ai_next)
386         {
387                 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
388                 {
389                         /*
390                          * Only set up a unix domain socket when they really asked for it.
391                          * The service/port is different in that case.
392                          */
393                         continue;
394                 }
395
396                 /* See if there is still room to add 1 more socket. */
397                 for (; listen_index < MaxListen; listen_index++)
398                 {
399                         if (ListenSocket[listen_index] == PGINVALID_SOCKET)
400                                 break;
401                 }
402                 if (listen_index >= MaxListen)
403                 {
404                         ereport(LOG,
405                                         (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
406                                                         MaxListen)));
407                         break;
408                 }
409
410                 /* set up family name for possible error messages */
411                 switch (addr->ai_family)
412                 {
413                         case AF_INET:
414                                 familyDesc = _("IPv4");
415                                 break;
416 #ifdef HAVE_IPV6
417                         case AF_INET6:
418                                 familyDesc = _("IPv6");
419                                 break;
420 #endif
421 #ifdef HAVE_UNIX_SOCKETS
422                         case AF_UNIX:
423                                 familyDesc = _("Unix");
424                                 break;
425 #endif
426                         default:
427                                 snprintf(familyDescBuf, sizeof(familyDescBuf),
428                                                  _("unrecognized address family %d"),
429                                                  addr->ai_family);
430                                 familyDesc = familyDescBuf;
431                                 break;
432                 }
433
434                 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
435                 {
436                         ereport(LOG,
437                                         (errcode_for_socket_access(),
438                         /* translator: %s is IPv4, IPv6, or Unix */
439                                          errmsg("could not create %s socket: %m",
440                                                         familyDesc)));
441                         continue;
442                 }
443
444 #ifndef WIN32
445
446                 /*
447                  * Without the SO_REUSEADDR flag, a new postmaster can't be started
448                  * right away after a stop or crash, giving "address already in use"
449                  * error on TCP ports.
450                  *
451                  * On win32, however, this behavior only happens if the
452                  * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
453                  * servers to listen on the same address, resulting in unpredictable
454                  * behavior. With no flags at all, win32 behaves as Unix with
455                  * SO_REUSEADDR.
456                  */
457                 if (!IS_AF_UNIX(addr->ai_family))
458                 {
459                         if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
460                                                         (char *) &one, sizeof(one))) == -1)
461                         {
462                                 ereport(LOG,
463                                                 (errcode_for_socket_access(),
464                                                  errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
465                                 closesocket(fd);
466                                 continue;
467                         }
468                 }
469 #endif
470
471 #ifdef IPV6_V6ONLY
472                 if (addr->ai_family == AF_INET6)
473                 {
474                         if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
475                                                    (char *) &one, sizeof(one)) == -1)
476                         {
477                                 ereport(LOG,
478                                                 (errcode_for_socket_access(),
479                                                  errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
480                                 closesocket(fd);
481                                 continue;
482                         }
483                 }
484 #endif
485
486                 /*
487                  * Note: This might fail on some OS's, like Linux older than
488                  * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
489                  * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
490                  * connections.
491                  */
492                 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
493                 if (err < 0)
494                 {
495                         ereport(LOG,
496                                         (errcode_for_socket_access(),
497                         /* translator: %s is IPv4, IPv6, or Unix */
498                                          errmsg("could not bind %s socket: %m",
499                                                         familyDesc),
500                                          (IS_AF_UNIX(addr->ai_family)) ?
501                                   errhint("Is another postmaster already running on port %d?"
502                                                   " If not, remove socket file \"%s\" and retry.",
503                                                   (int) portNumber, service) :
504                                   errhint("Is another postmaster already running on port %d?"
505                                                   " If not, wait a few seconds and retry.",
506                                                   (int) portNumber)));
507                         closesocket(fd);
508                         continue;
509                 }
510
511 #ifdef HAVE_UNIX_SOCKETS
512                 if (addr->ai_family == AF_UNIX)
513                 {
514                         if (Setup_AF_UNIX(service) != STATUS_OK)
515                         {
516                                 closesocket(fd);
517                                 break;
518                         }
519                 }
520 #endif
521
522                 /*
523                  * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
524                  * intended to provide a clamp on the request on platforms where an
525                  * overly large request provokes a kernel error (are there any?).
526                  */
527                 maxconn = MaxBackends * 2;
528                 if (maxconn > PG_SOMAXCONN)
529                         maxconn = PG_SOMAXCONN;
530
531                 err = listen(fd, maxconn);
532                 if (err < 0)
533                 {
534                         ereport(LOG,
535                                         (errcode_for_socket_access(),
536                         /* translator: %s is IPv4, IPv6, or Unix */
537                                          errmsg("could not listen on %s socket: %m",
538                                                         familyDesc)));
539                         closesocket(fd);
540                         continue;
541                 }
542                 ListenSocket[listen_index] = fd;
543                 added++;
544         }
545
546         pg_freeaddrinfo_all(hint.ai_family, addrs);
547
548         if (!added)
549                 return STATUS_ERROR;
550
551         return STATUS_OK;
552 }
553
554
555 #ifdef HAVE_UNIX_SOCKETS
556
557 /*
558  * Lock_AF_UNIX -- configure unix socket file path
559  */
560 static int
561 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
562 {
563         /*
564          * Grab an interlock file associated with the socket file.
565          *
566          * Note: there are two reasons for using a socket lock file, rather than
567          * trying to interlock directly on the socket itself.  First, it's a lot
568          * more portable, and second, it lets us remove any pre-existing socket
569          * file without race conditions.
570          */
571         CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
572
573         /*
574          * Once we have the interlock, we can safely delete any pre-existing
575          * socket file to avoid failure at bind() time.
576          */
577         (void) unlink(unixSocketPath);
578
579         /*
580          * Remember socket file pathnames for later maintenance.
581          */
582         sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
583
584         return STATUS_OK;
585 }
586
587
588 /*
589  * Setup_AF_UNIX -- configure unix socket permissions
590  */
591 static int
592 Setup_AF_UNIX(char *sock_path)
593 {
594         /*
595          * Fix socket ownership/permission if requested.  Note we must do this
596          * before we listen() to avoid a window where unwanted connections could
597          * get accepted.
598          */
599         Assert(Unix_socket_group);
600         if (Unix_socket_group[0] != '\0')
601         {
602 #ifdef WIN32
603                 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
604 #else
605                 char       *endptr;
606                 unsigned long val;
607                 gid_t           gid;
608
609                 val = strtoul(Unix_socket_group, &endptr, 10);
610                 if (*endptr == '\0')
611                 {                                               /* numeric group id */
612                         gid = val;
613                 }
614                 else
615                 {                                               /* convert group name to id */
616                         struct group *gr;
617
618                         gr = getgrnam(Unix_socket_group);
619                         if (!gr)
620                         {
621                                 ereport(LOG,
622                                                 (errmsg("group \"%s\" does not exist",
623                                                                 Unix_socket_group)));
624                                 return STATUS_ERROR;
625                         }
626                         gid = gr->gr_gid;
627                 }
628                 if (chown(sock_path, -1, gid) == -1)
629                 {
630                         ereport(LOG,
631                                         (errcode_for_file_access(),
632                                          errmsg("could not set group of file \"%s\": %m",
633                                                         sock_path)));
634                         return STATUS_ERROR;
635                 }
636 #endif
637         }
638
639         if (chmod(sock_path, Unix_socket_permissions) == -1)
640         {
641                 ereport(LOG,
642                                 (errcode_for_file_access(),
643                                  errmsg("could not set permissions of file \"%s\": %m",
644                                                 sock_path)));
645                 return STATUS_ERROR;
646         }
647         return STATUS_OK;
648 }
649 #endif   /* HAVE_UNIX_SOCKETS */
650
651
652 /*
653  * StreamConnection -- create a new connection with client using
654  *              server port.  Set port->sock to the FD of the new connection.
655  *
656  * ASSUME: that this doesn't need to be non-blocking because
657  *              the Postmaster uses select() to tell when the server master
658  *              socket is ready for accept().
659  *
660  * RETURNS: STATUS_OK or STATUS_ERROR
661  */
662 int
663 StreamConnection(pgsocket server_fd, Port *port)
664 {
665         /* accept connection and fill in the client (remote) address */
666         port->raddr.salen = sizeof(port->raddr.addr);
667         if ((port->sock = accept(server_fd,
668                                                          (struct sockaddr *) & port->raddr.addr,
669                                                          &port->raddr.salen)) == PGINVALID_SOCKET)
670         {
671                 ereport(LOG,
672                                 (errcode_for_socket_access(),
673                                  errmsg("could not accept new connection: %m")));
674
675                 /*
676                  * If accept() fails then postmaster.c will still see the server
677                  * socket as read-ready, and will immediately try again.  To avoid
678                  * uselessly sucking lots of CPU, delay a bit before trying again.
679                  * (The most likely reason for failure is being out of kernel file
680                  * table slots; we can do little except hope some will get freed up.)
681                  */
682                 pg_usleep(100000L);             /* wait 0.1 sec */
683                 return STATUS_ERROR;
684         }
685
686 #ifdef SCO_ACCEPT_BUG
687
688         /*
689          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
690          * shouldn't hurt to catch it for all versions of those platforms.
691          */
692         if (port->raddr.addr.ss_family == 0)
693                 port->raddr.addr.ss_family = AF_UNIX;
694 #endif
695
696         /* fill in the server (local) address */
697         port->laddr.salen = sizeof(port->laddr.addr);
698         if (getsockname(port->sock,
699                                         (struct sockaddr *) & port->laddr.addr,
700                                         &port->laddr.salen) < 0)
701         {
702                 elog(LOG, "getsockname() failed: %m");
703                 return STATUS_ERROR;
704         }
705
706         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
707         if (!IS_AF_UNIX(port->laddr.addr.ss_family))
708         {
709                 int                     on;
710 #ifdef WIN32
711                 int                     oldopt;
712                 int                     optlen;
713                 int                     newopt;
714 #endif
715
716 #ifdef  TCP_NODELAY
717                 on = 1;
718                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
719                                            (char *) &on, sizeof(on)) < 0)
720                 {
721                         elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
722                         return STATUS_ERROR;
723                 }
724 #endif
725                 on = 1;
726                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
727                                            (char *) &on, sizeof(on)) < 0)
728                 {
729                         elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
730                         return STATUS_ERROR;
731                 }
732
733 #ifdef WIN32
734
735                 /*
736                  * This is a Win32 socket optimization.  The OS send buffer should be
737                  * large enough to send the whole Postgres send buffer in one go, or
738                  * performance suffers.  The Postgres send buffer can be enlarged if a
739                  * very large message needs to be sent, but we won't attempt to
740                  * enlarge the OS buffer if that happens, so somewhat arbitrarily
741                  * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
742                  * (That's 32kB with the current default).
743                  *
744                  * The default OS buffer size used to be 8kB in earlier Windows
745                  * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
746                  * be necessary to change it in later versions anymore.  Changing it
747                  * unnecessarily can even reduce performance, because setting
748                  * SO_SNDBUF in the application disables the "dynamic send buffering"
749                  * feature that was introduced in Windows 7.  So before fiddling with
750                  * SO_SNDBUF, check if the current buffer size is already large enough
751                  * and only increase it if necessary.
752                  *
753                  * See https://support.microsoft.com/kb/823764/EN-US/ and
754                  * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
755                  */
756                 optlen = sizeof(oldopt);
757                 if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
758                                            &optlen) < 0)
759                 {
760                         elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
761                         return STATUS_ERROR;
762                 }
763                 newopt = PQ_SEND_BUFFER_SIZE * 4;
764                 if (oldopt < newopt)
765                 {
766                         if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
767                                                    sizeof(newopt)) < 0)
768                         {
769                                 elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
770                                 return STATUS_ERROR;
771                         }
772                 }
773 #endif
774
775                 /*
776                  * Also apply the current keepalive parameters.  If we fail to set a
777                  * parameter, don't error out, because these aren't universally
778                  * supported.  (Note: you might think we need to reset the GUC
779                  * variables to 0 in such a case, but it's not necessary because the
780                  * show hooks for these variables report the truth anyway.)
781                  */
782                 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
783                 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
784                 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
785         }
786
787         return STATUS_OK;
788 }
789
790 /*
791  * StreamClose -- close a client/backend connection
792  *
793  * NOTE: this is NOT used to terminate a session; it is just used to release
794  * the file descriptor in a process that should no longer have the socket
795  * open.  (For example, the postmaster calls this after passing ownership
796  * of the connection to a child process.)  It is expected that someone else
797  * still has the socket open.  So, we only want to close the descriptor,
798  * we do NOT want to send anything to the far end.
799  */
800 void
801 StreamClose(pgsocket sock)
802 {
803         closesocket(sock);
804 }
805
806 /*
807  * TouchSocketFiles -- mark socket files as recently accessed
808  *
809  * This routine should be called every so often to ensure that the socket
810  * files have a recent mod date (ordinary operations on sockets usually won't
811  * change the mod date).  That saves them from being removed by
812  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
813  * never have put the socket file in /tmp...)
814  */
815 void
816 TouchSocketFiles(void)
817 {
818         ListCell   *l;
819
820         /* Loop through all created sockets... */
821         foreach(l, sock_paths)
822         {
823                 char       *sock_path = (char *) lfirst(l);
824
825                 /*
826                  * utime() is POSIX standard, utimes() is a common alternative. If we
827                  * have neither, there's no way to affect the mod or access time of
828                  * the socket :-(
829                  *
830                  * In either path, we ignore errors; there's no point in complaining.
831                  */
832 #ifdef HAVE_UTIME
833                 utime(sock_path, NULL);
834 #else                                                   /* !HAVE_UTIME */
835 #ifdef HAVE_UTIMES
836                 utimes(sock_path, NULL);
837 #endif   /* HAVE_UTIMES */
838 #endif   /* HAVE_UTIME */
839         }
840 }
841
842 /*
843  * RemoveSocketFiles -- unlink socket files at postmaster shutdown
844  */
845 void
846 RemoveSocketFiles(void)
847 {
848         ListCell   *l;
849
850         /* Loop through all created sockets... */
851         foreach(l, sock_paths)
852         {
853                 char       *sock_path = (char *) lfirst(l);
854
855                 /* Ignore any error. */
856                 (void) unlink(sock_path);
857         }
858         /* Since we're about to exit, no need to reclaim storage */
859         sock_paths = NIL;
860 }
861
862
863 /* --------------------------------
864  * Low-level I/O routines begin here.
865  *
866  * These routines communicate with a frontend client across a connection
867  * already established by the preceding routines.
868  * --------------------------------
869  */
870
871 /* --------------------------------
872  *                        socket_set_nonblocking - set socket blocking/non-blocking
873  *
874  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
875  * blocking otherwise.
876  * --------------------------------
877  */
878 static void
879 socket_set_nonblocking(bool nonblocking)
880 {
881         if (MyProcPort == NULL)
882                 ereport(ERROR,
883                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
884                                  errmsg("there is no client connection")));
885
886         MyProcPort->noblock = nonblocking;
887 }
888
889 /* --------------------------------
890  *              pq_recvbuf - load some bytes into the input buffer
891  *
892  *              returns 0 if OK, EOF if trouble
893  * --------------------------------
894  */
895 static int
896 pq_recvbuf(void)
897 {
898         if (PqRecvPointer > 0)
899         {
900                 if (PqRecvLength > PqRecvPointer)
901                 {
902                         /* still some unread data, left-justify it in the buffer */
903                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
904                                         PqRecvLength - PqRecvPointer);
905                         PqRecvLength -= PqRecvPointer;
906                         PqRecvPointer = 0;
907                 }
908                 else
909                         PqRecvLength = PqRecvPointer = 0;
910         }
911
912         /* Ensure that we're in blocking mode */
913         socket_set_nonblocking(false);
914
915         /* Can fill buffer from PqRecvLength and upwards */
916         for (;;)
917         {
918                 int                     r;
919
920                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
921                                                 PQ_RECV_BUFFER_SIZE - PqRecvLength);
922
923                 if (r < 0)
924                 {
925                         if (errno == EINTR)
926                                 continue;               /* Ok if interrupted */
927
928                         /*
929                          * Careful: an ereport() that tries to write to the client would
930                          * cause recursion to here, leading to stack overflow and core
931                          * dump!  This message must go *only* to the postmaster log.
932                          */
933                         ereport(COMMERROR,
934                                         (errcode_for_socket_access(),
935                                          errmsg("could not receive data from client: %m")));
936                         return EOF;
937                 }
938                 if (r == 0)
939                 {
940                         /*
941                          * EOF detected.  We used to write a log message here, but it's
942                          * better to expect the ultimate caller to do that.
943                          */
944                         return EOF;
945                 }
946                 /* r contains number of bytes read, so just incr length */
947                 PqRecvLength += r;
948                 return 0;
949         }
950 }
951
952 /* --------------------------------
953  *              pq_getbyte      - get a single byte from connection, or return EOF
954  * --------------------------------
955  */
956 int
957 pq_getbyte(void)
958 {
959         Assert(PqCommReadingMsg);
960
961         while (PqRecvPointer >= PqRecvLength)
962         {
963                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
964                         return EOF;                     /* Failed to recv data */
965         }
966         return (unsigned char) PqRecvBuffer[PqRecvPointer++];
967 }
968
969 /* --------------------------------
970  *              pq_peekbyte             - peek at next byte from connection
971  *
972  *       Same as pq_getbyte() except we don't advance the pointer.
973  * --------------------------------
974  */
975 int
976 pq_peekbyte(void)
977 {
978         Assert(PqCommReadingMsg);
979
980         while (PqRecvPointer >= PqRecvLength)
981         {
982                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
983                         return EOF;                     /* Failed to recv data */
984         }
985         return (unsigned char) PqRecvBuffer[PqRecvPointer];
986 }
987
988 /* --------------------------------
989  *              pq_getbyte_if_available - get a single byte from connection,
990  *                      if available
991  *
992  * The received byte is stored in *c. Returns 1 if a byte was read,
993  * 0 if no data was available, or EOF if trouble.
994  * --------------------------------
995  */
996 int
997 pq_getbyte_if_available(unsigned char *c)
998 {
999         int                     r;
1000
1001         Assert(PqCommReadingMsg);
1002
1003         if (PqRecvPointer < PqRecvLength)
1004         {
1005                 *c = PqRecvBuffer[PqRecvPointer++];
1006                 return 1;
1007         }
1008
1009         /* Put the socket into non-blocking mode */
1010         socket_set_nonblocking(true);
1011
1012         r = secure_read(MyProcPort, c, 1);
1013         if (r < 0)
1014         {
1015                 /*
1016                  * Ok if no data available without blocking or interrupted (though
1017                  * EINTR really shouldn't happen with a non-blocking socket). Report
1018                  * other errors.
1019                  */
1020                 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1021                         r = 0;
1022                 else
1023                 {
1024                         /*
1025                          * Careful: an ereport() that tries to write to the client would
1026                          * cause recursion to here, leading to stack overflow and core
1027                          * dump!  This message must go *only* to the postmaster log.
1028                          */
1029                         ereport(COMMERROR,
1030                                         (errcode_for_socket_access(),
1031                                          errmsg("could not receive data from client: %m")));
1032                         r = EOF;
1033                 }
1034         }
1035         else if (r == 0)
1036         {
1037                 /* EOF detected */
1038                 r = EOF;
1039         }
1040
1041         return r;
1042 }
1043
1044 /* --------------------------------
1045  *              pq_getbytes             - get a known number of bytes from connection
1046  *
1047  *              returns 0 if OK, EOF if trouble
1048  * --------------------------------
1049  */
1050 int
1051 pq_getbytes(char *s, size_t len)
1052 {
1053         size_t          amount;
1054
1055         Assert(PqCommReadingMsg);
1056
1057         while (len > 0)
1058         {
1059                 while (PqRecvPointer >= PqRecvLength)
1060                 {
1061                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1062                                 return EOF;             /* Failed to recv data */
1063                 }
1064                 amount = PqRecvLength - PqRecvPointer;
1065                 if (amount > len)
1066                         amount = len;
1067                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1068                 PqRecvPointer += amount;
1069                 s += amount;
1070                 len -= amount;
1071         }
1072         return 0;
1073 }
1074
1075 /* --------------------------------
1076  *              pq_discardbytes         - throw away a known number of bytes
1077  *
1078  *              same as pq_getbytes except we do not copy the data to anyplace.
1079  *              this is used for resynchronizing after read errors.
1080  *
1081  *              returns 0 if OK, EOF if trouble
1082  * --------------------------------
1083  */
1084 static int
1085 pq_discardbytes(size_t len)
1086 {
1087         size_t          amount;
1088
1089         Assert(PqCommReadingMsg);
1090
1091         while (len > 0)
1092         {
1093                 while (PqRecvPointer >= PqRecvLength)
1094                 {
1095                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1096                                 return EOF;             /* Failed to recv data */
1097                 }
1098                 amount = PqRecvLength - PqRecvPointer;
1099                 if (amount > len)
1100                         amount = len;
1101                 PqRecvPointer += amount;
1102                 len -= amount;
1103         }
1104         return 0;
1105 }
1106
1107 /* --------------------------------
1108  *              pq_getstring    - get a null terminated string from connection
1109  *
1110  *              The return value is placed in an expansible StringInfo, which has
1111  *              already been initialized by the caller.
1112  *
1113  *              This is used only for dealing with old-protocol clients.  The idea
1114  *              is to produce a StringInfo that looks the same as we would get from
1115  *              pq_getmessage() with a newer client; we will then process it with
1116  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
1117  *              even though this is presumably useful only for text.
1118  *
1119  *              returns 0 if OK, EOF if trouble
1120  * --------------------------------
1121  */
1122 int
1123 pq_getstring(StringInfo s)
1124 {
1125         int                     i;
1126
1127         Assert(PqCommReadingMsg);
1128
1129         resetStringInfo(s);
1130
1131         /* Read until we get the terminating '\0' */
1132         for (;;)
1133         {
1134                 while (PqRecvPointer >= PqRecvLength)
1135                 {
1136                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1137                                 return EOF;             /* Failed to recv data */
1138                 }
1139
1140                 for (i = PqRecvPointer; i < PqRecvLength; i++)
1141                 {
1142                         if (PqRecvBuffer[i] == '\0')
1143                         {
1144                                 /* include the '\0' in the copy */
1145                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1146                                                                            i - PqRecvPointer + 1);
1147                                 PqRecvPointer = i + 1;  /* advance past \0 */
1148                                 return 0;
1149                         }
1150                 }
1151
1152                 /* If we're here we haven't got the \0 in the buffer yet. */
1153                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1154                                                            PqRecvLength - PqRecvPointer);
1155                 PqRecvPointer = PqRecvLength;
1156         }
1157 }
1158
1159
1160 /* --------------------------------
1161  *              pq_startmsgread - begin reading a message from the client.
1162  *
1163  *              This must be called before any of the pq_get* functions.
1164  * --------------------------------
1165  */
1166 void
1167 pq_startmsgread(void)
1168 {
1169         /*
1170          * There shouldn't be a read active already, but let's check just to be
1171          * sure.
1172          */
1173         if (PqCommReadingMsg)
1174                 ereport(FATAL,
1175                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1176                                  errmsg("terminating connection because protocol synchronization was lost")));
1177
1178         PqCommReadingMsg = true;
1179 }
1180
1181
1182 /* --------------------------------
1183  *              pq_endmsgread   - finish reading message.
1184  *
1185  *              This must be called after reading a V2 protocol message with
1186  *              pq_getstring() and friends, to indicate that we have read the whole
1187  *              message. In V3 protocol, pq_getmessage() does this implicitly.
1188  * --------------------------------
1189  */
1190 void
1191 pq_endmsgread(void)
1192 {
1193         Assert(PqCommReadingMsg);
1194
1195         PqCommReadingMsg = false;
1196 }
1197
1198 /* --------------------------------
1199  *              pq_is_reading_msg - are we currently reading a message?
1200  *
1201  * This is used in error recovery at the outer idle loop to detect if we have
1202  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1203  * will check for that too, but it's nicer to detect it earlier.
1204  * --------------------------------
1205  */
1206 bool
1207 pq_is_reading_msg(void)
1208 {
1209         return PqCommReadingMsg;
1210 }
1211
1212 /* --------------------------------
1213  *              pq_getmessage   - get a message with length word from connection
1214  *
1215  *              The return value is placed in an expansible StringInfo, which has
1216  *              already been initialized by the caller.
1217  *              Only the message body is placed in the StringInfo; the length word
1218  *              is removed.  Also, s->cursor is initialized to zero for convenience
1219  *              in scanning the message contents.
1220  *
1221  *              If maxlen is not zero, it is an upper limit on the length of the
1222  *              message we are willing to accept.  We abort the connection (by
1223  *              returning EOF) if client tries to send more than that.
1224  *
1225  *              returns 0 if OK, EOF if trouble
1226  * --------------------------------
1227  */
1228 int
1229 pq_getmessage(StringInfo s, int maxlen)
1230 {
1231         int32           len;
1232
1233         Assert(PqCommReadingMsg);
1234
1235         resetStringInfo(s);
1236
1237         /* Read message length word */
1238         if (pq_getbytes((char *) &len, 4) == EOF)
1239         {
1240                 ereport(COMMERROR,
1241                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1242                                  errmsg("unexpected EOF within message length word")));
1243                 return EOF;
1244         }
1245
1246         len = ntohl(len);
1247
1248         if (len < 4 ||
1249                 (maxlen > 0 && len > maxlen))
1250         {
1251                 ereport(COMMERROR,
1252                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1253                                  errmsg("invalid message length")));
1254                 return EOF;
1255         }
1256
1257         len -= 4;                                       /* discount length itself */
1258
1259         if (len > 0)
1260         {
1261                 /*
1262                  * Allocate space for message.  If we run out of room (ridiculously
1263                  * large message), we will elog(ERROR), but we want to discard the
1264                  * message body so as not to lose communication sync.
1265                  */
1266                 PG_TRY();
1267                 {
1268                         enlargeStringInfo(s, len);
1269                 }
1270                 PG_CATCH();
1271                 {
1272                         if (pq_discardbytes(len) == EOF)
1273                                 ereport(COMMERROR,
1274                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1275                                                  errmsg("incomplete message from client")));
1276
1277                         /* we discarded the rest of the message so we're back in sync. */
1278                         PqCommReadingMsg = false;
1279                         PG_RE_THROW();
1280                 }
1281                 PG_END_TRY();
1282
1283                 /* And grab the message */
1284                 if (pq_getbytes(s->data, len) == EOF)
1285                 {
1286                         ereport(COMMERROR,
1287                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
1288                                          errmsg("incomplete message from client")));
1289                         return EOF;
1290                 }
1291                 s->len = len;
1292                 /* Place a trailing null per StringInfo convention */
1293                 s->data[len] = '\0';
1294         }
1295
1296         /* finished reading the message. */
1297         PqCommReadingMsg = false;
1298
1299         return 0;
1300 }
1301
1302
1303 /* --------------------------------
1304  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
1305  *
1306  *              returns 0 if OK, EOF if trouble
1307  * --------------------------------
1308  */
1309 int
1310 pq_putbytes(const char *s, size_t len)
1311 {
1312         int                     res;
1313
1314         /* Should only be called by old-style COPY OUT */
1315         Assert(DoingCopyOut);
1316         /* No-op if reentrant call */
1317         if (PqCommBusy)
1318                 return 0;
1319         PqCommBusy = true;
1320         res = internal_putbytes(s, len);
1321         PqCommBusy = false;
1322         return res;
1323 }
1324
1325 static int
1326 internal_putbytes(const char *s, size_t len)
1327 {
1328         size_t          amount;
1329
1330         while (len > 0)
1331         {
1332                 /* If buffer is full, then flush it out */
1333                 if (PqSendPointer >= PqSendBufferSize)
1334                 {
1335                         socket_set_nonblocking(false);
1336                         if (internal_flush())
1337                                 return EOF;
1338                 }
1339                 amount = PqSendBufferSize - PqSendPointer;
1340                 if (amount > len)
1341                         amount = len;
1342                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
1343                 PqSendPointer += amount;
1344                 s += amount;
1345                 len -= amount;
1346         }
1347         return 0;
1348 }
1349
1350 /* --------------------------------
1351  *              socket_flush            - flush pending output
1352  *
1353  *              returns 0 if OK, EOF if trouble
1354  * --------------------------------
1355  */
1356 static int
1357 socket_flush(void)
1358 {
1359         int                     res;
1360
1361         /* No-op if reentrant call */
1362         if (PqCommBusy)
1363                 return 0;
1364         PqCommBusy = true;
1365         socket_set_nonblocking(false);
1366         res = internal_flush();
1367         PqCommBusy = false;
1368         return res;
1369 }
1370
1371 /* --------------------------------
1372  *              internal_flush - flush pending output
1373  *
1374  * Returns 0 if OK (meaning everything was sent, or operation would block
1375  * and the socket is in non-blocking mode), or EOF if trouble.
1376  * --------------------------------
1377  */
1378 static int
1379 internal_flush(void)
1380 {
1381         static int      last_reported_send_errno = 0;
1382
1383         char       *bufptr = PqSendBuffer + PqSendStart;
1384         char       *bufend = PqSendBuffer + PqSendPointer;
1385
1386         while (bufptr < bufend)
1387         {
1388                 int                     r;
1389
1390                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1391
1392                 if (r <= 0)
1393                 {
1394                         if (errno == EINTR)
1395                                 continue;               /* Ok if we were interrupted */
1396
1397                         /*
1398                          * Ok if no data writable without blocking, and the socket is in
1399                          * non-blocking mode.
1400                          */
1401                         if (errno == EAGAIN ||
1402                                 errno == EWOULDBLOCK)
1403                         {
1404                                 return 0;
1405                         }
1406
1407                         /*
1408                          * Careful: an ereport() that tries to write to the client would
1409                          * cause recursion to here, leading to stack overflow and core
1410                          * dump!  This message must go *only* to the postmaster log.
1411                          *
1412                          * If a client disconnects while we're in the midst of output, we
1413                          * might write quite a bit of data before we get to a safe query
1414                          * abort point.  So, suppress duplicate log messages.
1415                          */
1416                         if (errno != last_reported_send_errno)
1417                         {
1418                                 last_reported_send_errno = errno;
1419                                 ereport(COMMERROR,
1420                                                 (errcode_for_socket_access(),
1421                                                  errmsg("could not send data to client: %m")));
1422                         }
1423
1424                         /*
1425                          * We drop the buffered data anyway so that processing can
1426                          * continue, even though we'll probably quit soon. We also set a
1427                          * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1428                          * the connection.
1429                          */
1430                         PqSendStart = PqSendPointer = 0;
1431                         ClientConnectionLost = 1;
1432                         InterruptPending = 1;
1433                         return EOF;
1434                 }
1435
1436                 last_reported_send_errno = 0;   /* reset after any successful send */
1437                 bufptr += r;
1438                 PqSendStart += r;
1439         }
1440
1441         PqSendStart = PqSendPointer = 0;
1442         return 0;
1443 }
1444
1445 /* --------------------------------
1446  *              pq_flush_if_writable - flush pending output if writable without blocking
1447  *
1448  * Returns 0 if OK, or EOF if trouble.
1449  * --------------------------------
1450  */
1451 static int
1452 socket_flush_if_writable(void)
1453 {
1454         int                     res;
1455
1456         /* Quick exit if nothing to do */
1457         if (PqSendPointer == PqSendStart)
1458                 return 0;
1459
1460         /* No-op if reentrant call */
1461         if (PqCommBusy)
1462                 return 0;
1463
1464         /* Temporarily put the socket into non-blocking mode */
1465         socket_set_nonblocking(true);
1466
1467         PqCommBusy = true;
1468         res = internal_flush();
1469         PqCommBusy = false;
1470         return res;
1471 }
1472
1473 /* --------------------------------
1474  *      socket_is_send_pending  - is there any pending data in the output buffer?
1475  * --------------------------------
1476  */
1477 static bool
1478 socket_is_send_pending(void)
1479 {
1480         return (PqSendStart < PqSendPointer);
1481 }
1482
1483 /* --------------------------------
1484  * Message-level I/O routines begin here.
1485  *
1486  * These routines understand about the old-style COPY OUT protocol.
1487  * --------------------------------
1488  */
1489
1490
1491 /* --------------------------------
1492  *              socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1493  *
1494  *              If msgtype is not '\0', it is a message type code to place before
1495  *              the message body.  If msgtype is '\0', then the message has no type
1496  *              code (this is only valid in pre-3.0 protocols).
1497  *
1498  *              len is the length of the message body data at *s.  In protocol 3.0
1499  *              and later, a message length word (equal to len+4 because it counts
1500  *              itself too) is inserted by this routine.
1501  *
1502  *              All normal messages are suppressed while old-style COPY OUT is in
1503  *              progress.  (In practice only a few notice messages might get emitted
1504  *              then; dropping them is annoying, but at least they will still appear
1505  *              in the postmaster log.)
1506  *
1507  *              We also suppress messages generated while pqcomm.c is busy.  This
1508  *              avoids any possibility of messages being inserted within other
1509  *              messages.  The only known trouble case arises if SIGQUIT occurs
1510  *              during a pqcomm.c routine --- quickdie() will try to send a warning
1511  *              message, and the most reasonable approach seems to be to drop it.
1512  *
1513  *              returns 0 if OK, EOF if trouble
1514  * --------------------------------
1515  */
1516 static int
1517 socket_putmessage(char msgtype, const char *s, size_t len)
1518 {
1519         if (DoingCopyOut || PqCommBusy)
1520                 return 0;
1521         PqCommBusy = true;
1522         if (msgtype)
1523                 if (internal_putbytes(&msgtype, 1))
1524                         goto fail;
1525         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1526         {
1527                 uint32          n32;
1528
1529                 n32 = htonl((uint32) (len + 4));
1530                 if (internal_putbytes((char *) &n32, 4))
1531                         goto fail;
1532         }
1533         if (internal_putbytes(s, len))
1534                 goto fail;
1535         PqCommBusy = false;
1536         return 0;
1537
1538 fail:
1539         PqCommBusy = false;
1540         return EOF;
1541 }
1542
1543 /* --------------------------------
1544  *              pq_putmessage_noblock   - like pq_putmessage, but never blocks
1545  *
1546  *              If the output buffer is too small to hold the message, the buffer
1547  *              is enlarged.
1548  */
1549 static void
1550 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1551 {
1552         int res         PG_USED_FOR_ASSERTS_ONLY;
1553         int                     required;
1554
1555         /*
1556          * Ensure we have enough space in the output buffer for the message header
1557          * as well as the message itself.
1558          */
1559         required = PqSendPointer + 1 + 4 + len;
1560         if (required > PqSendBufferSize)
1561         {
1562                 PqSendBuffer = repalloc(PqSendBuffer, required);
1563                 PqSendBufferSize = required;
1564         }
1565         res = pq_putmessage(msgtype, s, len);
1566         Assert(res == 0);                       /* should not fail when the message fits in
1567                                                                  * buffer */
1568 }
1569
1570
1571 /* --------------------------------
1572  *              socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1573  *                      is beginning
1574  * --------------------------------
1575  */
1576 static void
1577 socket_startcopyout(void)
1578 {
1579         DoingCopyOut = true;
1580 }
1581
1582 /* --------------------------------
1583  *              socket_endcopyout       - end an old-style COPY OUT transfer
1584  *
1585  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1586  *              and must send a terminator line.  Since a partial data line might have
1587  *              been emitted, send a couple of newlines first (the first one could
1588  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
1589  *              not allow binary transfers, so a textual terminator is always correct.
1590  * --------------------------------
1591  */
1592 static void
1593 socket_endcopyout(bool errorAbort)
1594 {
1595         if (!DoingCopyOut)
1596                 return;
1597         if (errorAbort)
1598                 pq_putbytes("\n\n\\.\n", 5);
1599         /* in non-error case, copy.c will have emitted the terminator line */
1600         DoingCopyOut = false;
1601 }
1602
1603 /*
1604  * Support for TCP Keepalive parameters
1605  */
1606
1607 /*
1608  * On Windows, we need to set both idle and interval at the same time.
1609  * We also cannot reset them to the default (setting to zero will
1610  * actually set them to zero, not default), therefore we fallback to
1611  * the out-of-the-box default instead.
1612  */
1613 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1614 static int
1615 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1616 {
1617         struct tcp_keepalive ka;
1618         DWORD           retsize;
1619
1620         if (idle <= 0)
1621                 idle = 2 * 60 * 60;             /* default = 2 hours */
1622         if (interval <= 0)
1623                 interval = 1;                   /* default = 1 second */
1624
1625         ka.onoff = 1;
1626         ka.keepalivetime = idle * 1000;
1627         ka.keepaliveinterval = interval * 1000;
1628
1629         if (WSAIoctl(port->sock,
1630                                  SIO_KEEPALIVE_VALS,
1631                                  (LPVOID) &ka,
1632                                  sizeof(ka),
1633                                  NULL,
1634                                  0,
1635                                  &retsize,
1636                                  NULL,
1637                                  NULL)
1638                 != 0)
1639         {
1640                 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1641                          WSAGetLastError());
1642                 return STATUS_ERROR;
1643         }
1644         if (port->keepalives_idle != idle)
1645                 port->keepalives_idle = idle;
1646         if (port->keepalives_interval != interval)
1647                 port->keepalives_interval = interval;
1648         return STATUS_OK;
1649 }
1650 #endif
1651
1652 int
1653 pq_getkeepalivesidle(Port *port)
1654 {
1655 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
1656         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1657                 return 0;
1658
1659         if (port->keepalives_idle != 0)
1660                 return port->keepalives_idle;
1661
1662         if (port->default_keepalives_idle == 0)
1663         {
1664 #ifndef WIN32
1665                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1666
1667 #ifdef TCP_KEEPIDLE
1668                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1669                                            (char *) &port->default_keepalives_idle,
1670                                            &size) < 0)
1671                 {
1672                         elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
1673                         port->default_keepalives_idle = -1; /* don't know */
1674                 }
1675 #else
1676                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1677                                            (char *) &port->default_keepalives_idle,
1678                                            &size) < 0)
1679                 {
1680                         elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
1681                         port->default_keepalives_idle = -1; /* don't know */
1682                 }
1683 #endif   /* TCP_KEEPIDLE */
1684 #else                                                   /* WIN32 */
1685                 /* We can't get the defaults on Windows, so return "don't know" */
1686                 port->default_keepalives_idle = -1;
1687 #endif   /* WIN32 */
1688         }
1689
1690         return port->default_keepalives_idle;
1691 #else
1692         return 0;
1693 #endif
1694 }
1695
1696 int
1697 pq_setkeepalivesidle(int idle, Port *port)
1698 {
1699         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1700                 return STATUS_OK;
1701
1702 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
1703         if (idle == port->keepalives_idle)
1704                 return STATUS_OK;
1705
1706 #ifndef WIN32
1707         if (port->default_keepalives_idle <= 0)
1708         {
1709                 if (pq_getkeepalivesidle(port) < 0)
1710                 {
1711                         if (idle == 0)
1712                                 return STATUS_OK;               /* default is set but unknown */
1713                         else
1714                                 return STATUS_ERROR;
1715                 }
1716         }
1717
1718         if (idle == 0)
1719                 idle = port->default_keepalives_idle;
1720
1721 #ifdef TCP_KEEPIDLE
1722         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1723                                    (char *) &idle, sizeof(idle)) < 0)
1724         {
1725                 elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
1726                 return STATUS_ERROR;
1727         }
1728 #else
1729         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1730                                    (char *) &idle, sizeof(idle)) < 0)
1731         {
1732                 elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
1733                 return STATUS_ERROR;
1734         }
1735 #endif
1736
1737         port->keepalives_idle = idle;
1738 #else                                                   /* WIN32 */
1739         return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1740 #endif
1741 #else                                                   /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
1742         if (idle != 0)
1743         {
1744                 elog(LOG, "setting the keepalive idle time is not supported");
1745                 return STATUS_ERROR;
1746         }
1747 #endif
1748         return STATUS_OK;
1749 }
1750
1751 int
1752 pq_getkeepalivesinterval(Port *port)
1753 {
1754 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1755         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1756                 return 0;
1757
1758         if (port->keepalives_interval != 0)
1759                 return port->keepalives_interval;
1760
1761         if (port->default_keepalives_interval == 0)
1762         {
1763 #ifndef WIN32
1764                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1765
1766                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1767                                            (char *) &port->default_keepalives_interval,
1768                                            &size) < 0)
1769                 {
1770                         elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
1771                         port->default_keepalives_interval = -1;         /* don't know */
1772                 }
1773 #else
1774                 /* We can't get the defaults on Windows, so return "don't know" */
1775                 port->default_keepalives_interval = -1;
1776 #endif   /* WIN32 */
1777         }
1778
1779         return port->default_keepalives_interval;
1780 #else
1781         return 0;
1782 #endif
1783 }
1784
1785 int
1786 pq_setkeepalivesinterval(int interval, Port *port)
1787 {
1788         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1789                 return STATUS_OK;
1790
1791 #if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
1792         if (interval == port->keepalives_interval)
1793                 return STATUS_OK;
1794
1795 #ifndef WIN32
1796         if (port->default_keepalives_interval <= 0)
1797         {
1798                 if (pq_getkeepalivesinterval(port) < 0)
1799                 {
1800                         if (interval == 0)
1801                                 return STATUS_OK;               /* default is set but unknown */
1802                         else
1803                                 return STATUS_ERROR;
1804                 }
1805         }
1806
1807         if (interval == 0)
1808                 interval = port->default_keepalives_interval;
1809
1810         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1811                                    (char *) &interval, sizeof(interval)) < 0)
1812         {
1813                 elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
1814                 return STATUS_ERROR;
1815         }
1816
1817         port->keepalives_interval = interval;
1818 #else                                                   /* WIN32 */
1819         return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1820 #endif
1821 #else
1822         if (interval != 0)
1823         {
1824                 elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
1825                 return STATUS_ERROR;
1826         }
1827 #endif
1828
1829         return STATUS_OK;
1830 }
1831
1832 int
1833 pq_getkeepalivescount(Port *port)
1834 {
1835 #ifdef TCP_KEEPCNT
1836         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1837                 return 0;
1838
1839         if (port->keepalives_count != 0)
1840                 return port->keepalives_count;
1841
1842         if (port->default_keepalives_count == 0)
1843         {
1844                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1845
1846                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1847                                            (char *) &port->default_keepalives_count,
1848                                            &size) < 0)
1849                 {
1850                         elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
1851                         port->default_keepalives_count = -1;            /* don't know */
1852                 }
1853         }
1854
1855         return port->default_keepalives_count;
1856 #else
1857         return 0;
1858 #endif
1859 }
1860
1861 int
1862 pq_setkeepalivescount(int count, Port *port)
1863 {
1864         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1865                 return STATUS_OK;
1866
1867 #ifdef TCP_KEEPCNT
1868         if (count == port->keepalives_count)
1869                 return STATUS_OK;
1870
1871         if (port->default_keepalives_count <= 0)
1872         {
1873                 if (pq_getkeepalivescount(port) < 0)
1874                 {
1875                         if (count == 0)
1876                                 return STATUS_OK;               /* default is set but unknown */
1877                         else
1878                                 return STATUS_ERROR;
1879                 }
1880         }
1881
1882         if (count == 0)
1883                 count = port->default_keepalives_count;
1884
1885         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1886                                    (char *) &count, sizeof(count)) < 0)
1887         {
1888                 elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
1889                 return STATUS_ERROR;
1890         }
1891
1892         port->keepalives_count = count;
1893 #else
1894         if (count != 0)
1895         {
1896                 elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
1897                 return STATUS_ERROR;
1898         }
1899 #endif
1900
1901         return STATUS_OK;
1902 }