]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
Properly declare FeBeWaitSet.
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFiles        - Protect socket files against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_comm_reset   - reset libpq during error recovery
48  *              pq_close                - shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *              pq_getbytes             - get a known number of bytes from connection
52  *              pq_getstring    - get a null terminated string from connection
53  *              pq_getmessage   - get a message with length word from connection
54  *              pq_getbyte              - get next byte from connection
55  *              pq_peekbyte             - peek at next byte from connection
56  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
57  *              pq_flush                - flush pending output
58  *              pq_flush_if_writable - flush pending output if writable without blocking
59  *              pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
63  *              pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *              pq_endcopyout   - end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef WIN32_ONLY_COMPILER              /* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91
92 #include "libpq/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105 /* Where the Unix socket files are (list of palloc'd strings) */
106 static List *sock_paths = NIL;
107
108 /*
109  * Buffers for low-level I/O.
110  *
111  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
112  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
113  */
114
115 #define PQ_SEND_BUFFER_SIZE 8192
116 #define PQ_RECV_BUFFER_SIZE 8192
117
118 static char *PqSendBuffer;
119 static int      PqSendBufferSize;       /* Size send buffer */
120 static int      PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
121 static int      PqSendStart;            /* Next index to send a byte in PqSendBuffer */
122
123 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
124 static int      PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
125 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
126
127 /*
128  * Message status
129  */
130 static bool PqCommBusy;                 /* busy sending data to the client */
131 static bool PqCommReadingMsg;   /* in the middle of reading a message */
132 static bool DoingCopyOut;               /* in old-protocol COPY OUT processing */
133
134
135 /* Internal functions */
136 static void socket_comm_reset(void);
137 static void socket_close(int code, Datum arg);
138 static void socket_set_nonblocking(bool nonblocking);
139 static int      socket_flush(void);
140 static int      socket_flush_if_writable(void);
141 static bool socket_is_send_pending(void);
142 static int      socket_putmessage(char msgtype, const char *s, size_t len);
143 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
144 static void socket_startcopyout(void);
145 static void socket_endcopyout(bool errorAbort);
146 static int      internal_putbytes(const char *s, size_t len);
147 static int      internal_flush(void);
148 static void socket_set_nonblocking(bool nonblocking);
149
150 #ifdef HAVE_UNIX_SOCKETS
151 static int      Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
152 static int      Setup_AF_UNIX(char *sock_path);
153 #endif   /* HAVE_UNIX_SOCKETS */
154
155 static PQcommMethods PqCommSocketMethods = {
156         socket_comm_reset,
157         socket_flush,
158         socket_flush_if_writable,
159         socket_is_send_pending,
160         socket_putmessage,
161         socket_putmessage_noblock,
162         socket_startcopyout,
163         socket_endcopyout
164 };
165
166 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
167
168 WaitEventSet *FeBeWaitSet;
169
170
171 /* --------------------------------
172  *              pq_init - initialize libpq at backend startup
173  * --------------------------------
174  */
175 void
176 pq_init(void)
177 {
178         /* initialize state variables */
179         PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
180         PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
181         PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
182         PqCommBusy = false;
183         PqCommReadingMsg = false;
184         DoingCopyOut = false;
185
186         /* set up process-exit hook to close the socket */
187         on_proc_exit(socket_close, 0);
188
189         /*
190          * In backends (as soon as forked) we operate the underlying socket in
191          * nonblocking mode and use latches to implement blocking semantics if
192          * needed. That allows us to provide safely interruptible reads and
193          * writes.
194          *
195          * Use COMMERROR on failure, because ERROR would try to send the error to
196          * the client, which might require changing the mode again, leading to
197          * infinite recursion.
198          */
199 #ifndef WIN32
200         if (!pg_set_noblock(MyProcPort->sock))
201                 ereport(COMMERROR,
202                                 (errmsg("could not set socket to nonblocking mode: %m")));
203 #endif
204
205         FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
206         AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
207                                           NULL, NULL);
208         AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
209         AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
210 }
211
212 /* --------------------------------
213  *              socket_comm_reset - reset libpq during error recovery
214  *
215  * This is called from error recovery at the outer idle loop.  It's
216  * just to get us out of trouble if we somehow manage to elog() from
217  * inside a pqcomm.c routine (which ideally will never happen, but...)
218  * --------------------------------
219  */
220 static void
221 socket_comm_reset(void)
222 {
223         /* Do not throw away pending data, but do reset the busy flag */
224         PqCommBusy = false;
225         /* We can abort any old-style COPY OUT, too */
226         pq_endcopyout(true);
227 }
228
229 /* --------------------------------
230  *              socket_close - shutdown libpq at backend exit
231  *
232  * This is the one pg_on_exit_callback in place during BackendInitialize().
233  * That function's unusual signal handling constrains that this callback be
234  * safe to run at any instant.
235  * --------------------------------
236  */
237 static void
238 socket_close(int code, Datum arg)
239 {
240         /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
241         if (MyProcPort != NULL)
242         {
243 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
244 #ifdef ENABLE_GSS
245                 OM_uint32       min_s;
246
247                 /*
248                  * Shutdown GSSAPI layer.  This section does nothing when interrupting
249                  * BackendInitialize(), because pg_GSS_recvauth() makes first use of
250                  * "ctx" and "cred".
251                  */
252                 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
253                         gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
254
255                 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
256                         gss_release_cred(&min_s, &MyProcPort->gss->cred);
257 #endif   /* ENABLE_GSS */
258
259                 /*
260                  * GSS and SSPI share the port->gss struct.  Since nowhere else does a
261                  * postmaster child free this, doing so is safe when interrupting
262                  * BackendInitialize().
263                  */
264                 free(MyProcPort->gss);
265 #endif   /* ENABLE_GSS || ENABLE_SSPI */
266
267                 /*
268                  * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
269                  * call this, so this is safe when interrupting BackendInitialize().
270                  */
271                 secure_close(MyProcPort);
272
273                 /*
274                  * Formerly we did an explicit close() here, but it seems better to
275                  * leave the socket open until the process dies.  This allows clients
276                  * to perform a "synchronous close" if they care --- wait till the
277                  * transport layer reports connection closure, and you can be sure the
278                  * backend has exited.
279                  *
280                  * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
281                  * though.
282                  */
283                 MyProcPort->sock = PGINVALID_SOCKET;
284         }
285 }
286
287
288
289 /*
290  * Streams -- wrapper around Unix socket system calls
291  *
292  *
293  *              Stream functions are used for vanilla TCP connection protocol.
294  */
295
296
297 /*
298  * StreamServerPort -- open a "listening" port to accept connections.
299  *
300  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
301  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
302  * specified.  For TCP ports, hostName is either NULL for all interfaces or
303  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
304  *
305  * Successfully opened sockets are added to the ListenSocket[] array (of
306  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
307  *
308  * RETURNS: STATUS_OK or STATUS_ERROR
309  */
310
311 int
312 StreamServerPort(int family, char *hostName, unsigned short portNumber,
313                                  char *unixSocketDir,
314                                  pgsocket ListenSocket[], int MaxListen)
315 {
316         pgsocket        fd;
317         int                     err;
318         int                     maxconn;
319         int                     ret;
320         char            portNumberStr[32];
321         const char *familyDesc;
322         char            familyDescBuf[64];
323         char       *service;
324         struct addrinfo *addrs = NULL,
325                            *addr;
326         struct addrinfo hint;
327         int                     listen_index = 0;
328         int                     added = 0;
329
330 #ifdef HAVE_UNIX_SOCKETS
331         char            unixSocketPath[MAXPGPATH];
332 #endif
333 #if !defined(WIN32) || defined(IPV6_V6ONLY)
334         int                     one = 1;
335 #endif
336
337         /* Initialize hint structure */
338         MemSet(&hint, 0, sizeof(hint));
339         hint.ai_family = family;
340         hint.ai_flags = AI_PASSIVE;
341         hint.ai_socktype = SOCK_STREAM;
342
343 #ifdef HAVE_UNIX_SOCKETS
344         if (family == AF_UNIX)
345         {
346                 /*
347                  * Create unixSocketPath from portNumber and unixSocketDir and lock
348                  * that file path
349                  */
350                 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
351                 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
352                 {
353                         ereport(LOG,
354                                         (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
355                                                         unixSocketPath,
356                                                         (int) (UNIXSOCK_PATH_BUFLEN - 1))));
357                         return STATUS_ERROR;
358                 }
359                 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
360                         return STATUS_ERROR;
361                 service = unixSocketPath;
362         }
363         else
364 #endif   /* HAVE_UNIX_SOCKETS */
365         {
366                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
367                 service = portNumberStr;
368         }
369
370         ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
371         if (ret || !addrs)
372         {
373                 if (hostName)
374                         ereport(LOG,
375                                         (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
376                                                         hostName, service, gai_strerror(ret))));
377                 else
378                         ereport(LOG,
379                                  (errmsg("could not translate service \"%s\" to address: %s",
380                                                  service, gai_strerror(ret))));
381                 if (addrs)
382                         pg_freeaddrinfo_all(hint.ai_family, addrs);
383                 return STATUS_ERROR;
384         }
385
386         for (addr = addrs; addr; addr = addr->ai_next)
387         {
388                 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
389                 {
390                         /*
391                          * Only set up a unix domain socket when they really asked for it.
392                          * The service/port is different in that case.
393                          */
394                         continue;
395                 }
396
397                 /* See if there is still room to add 1 more socket. */
398                 for (; listen_index < MaxListen; listen_index++)
399                 {
400                         if (ListenSocket[listen_index] == PGINVALID_SOCKET)
401                                 break;
402                 }
403                 if (listen_index >= MaxListen)
404                 {
405                         ereport(LOG,
406                                         (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
407                                                         MaxListen)));
408                         break;
409                 }
410
411                 /* set up family name for possible error messages */
412                 switch (addr->ai_family)
413                 {
414                         case AF_INET:
415                                 familyDesc = _("IPv4");
416                                 break;
417 #ifdef HAVE_IPV6
418                         case AF_INET6:
419                                 familyDesc = _("IPv6");
420                                 break;
421 #endif
422 #ifdef HAVE_UNIX_SOCKETS
423                         case AF_UNIX:
424                                 familyDesc = _("Unix");
425                                 break;
426 #endif
427                         default:
428                                 snprintf(familyDescBuf, sizeof(familyDescBuf),
429                                                  _("unrecognized address family %d"),
430                                                  addr->ai_family);
431                                 familyDesc = familyDescBuf;
432                                 break;
433                 }
434
435                 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
436                 {
437                         ereport(LOG,
438                                         (errcode_for_socket_access(),
439                         /* translator: %s is IPv4, IPv6, or Unix */
440                                          errmsg("could not create %s socket: %m",
441                                                         familyDesc)));
442                         continue;
443                 }
444
445 #ifndef WIN32
446
447                 /*
448                  * Without the SO_REUSEADDR flag, a new postmaster can't be started
449                  * right away after a stop or crash, giving "address already in use"
450                  * error on TCP ports.
451                  *
452                  * On win32, however, this behavior only happens if the
453                  * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
454                  * servers to listen on the same address, resulting in unpredictable
455                  * behavior. With no flags at all, win32 behaves as Unix with
456                  * SO_REUSEADDR.
457                  */
458                 if (!IS_AF_UNIX(addr->ai_family))
459                 {
460                         if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
461                                                         (char *) &one, sizeof(one))) == -1)
462                         {
463                                 ereport(LOG,
464                                                 (errcode_for_socket_access(),
465                                                  errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
466                                 closesocket(fd);
467                                 continue;
468                         }
469                 }
470 #endif
471
472 #ifdef IPV6_V6ONLY
473                 if (addr->ai_family == AF_INET6)
474                 {
475                         if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
476                                                    (char *) &one, sizeof(one)) == -1)
477                         {
478                                 ereport(LOG,
479                                                 (errcode_for_socket_access(),
480                                                  errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
481                                 closesocket(fd);
482                                 continue;
483                         }
484                 }
485 #endif
486
487                 /*
488                  * Note: This might fail on some OS's, like Linux older than
489                  * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
490                  * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
491                  * connections.
492                  */
493                 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
494                 if (err < 0)
495                 {
496                         ereport(LOG,
497                                         (errcode_for_socket_access(),
498                         /* translator: %s is IPv4, IPv6, or Unix */
499                                          errmsg("could not bind %s socket: %m",
500                                                         familyDesc),
501                                          (IS_AF_UNIX(addr->ai_family)) ?
502                                   errhint("Is another postmaster already running on port %d?"
503                                                   " If not, remove socket file \"%s\" and retry.",
504                                                   (int) portNumber, service) :
505                                   errhint("Is another postmaster already running on port %d?"
506                                                   " If not, wait a few seconds and retry.",
507                                                   (int) portNumber)));
508                         closesocket(fd);
509                         continue;
510                 }
511
512 #ifdef HAVE_UNIX_SOCKETS
513                 if (addr->ai_family == AF_UNIX)
514                 {
515                         if (Setup_AF_UNIX(service) != STATUS_OK)
516                         {
517                                 closesocket(fd);
518                                 break;
519                         }
520                 }
521 #endif
522
523                 /*
524                  * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
525                  * intended to provide a clamp on the request on platforms where an
526                  * overly large request provokes a kernel error (are there any?).
527                  */
528                 maxconn = MaxBackends * 2;
529                 if (maxconn > PG_SOMAXCONN)
530                         maxconn = PG_SOMAXCONN;
531
532                 err = listen(fd, maxconn);
533                 if (err < 0)
534                 {
535                         ereport(LOG,
536                                         (errcode_for_socket_access(),
537                         /* translator: %s is IPv4, IPv6, or Unix */
538                                          errmsg("could not listen on %s socket: %m",
539                                                         familyDesc)));
540                         closesocket(fd);
541                         continue;
542                 }
543                 ListenSocket[listen_index] = fd;
544                 added++;
545         }
546
547         pg_freeaddrinfo_all(hint.ai_family, addrs);
548
549         if (!added)
550                 return STATUS_ERROR;
551
552         return STATUS_OK;
553 }
554
555
556 #ifdef HAVE_UNIX_SOCKETS
557
558 /*
559  * Lock_AF_UNIX -- configure unix socket file path
560  */
561 static int
562 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
563 {
564         /*
565          * Grab an interlock file associated with the socket file.
566          *
567          * Note: there are two reasons for using a socket lock file, rather than
568          * trying to interlock directly on the socket itself.  First, it's a lot
569          * more portable, and second, it lets us remove any pre-existing socket
570          * file without race conditions.
571          */
572         CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
573
574         /*
575          * Once we have the interlock, we can safely delete any pre-existing
576          * socket file to avoid failure at bind() time.
577          */
578         (void) unlink(unixSocketPath);
579
580         /*
581          * Remember socket file pathnames for later maintenance.
582          */
583         sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
584
585         return STATUS_OK;
586 }
587
588
589 /*
590  * Setup_AF_UNIX -- configure unix socket permissions
591  */
592 static int
593 Setup_AF_UNIX(char *sock_path)
594 {
595         /*
596          * Fix socket ownership/permission if requested.  Note we must do this
597          * before we listen() to avoid a window where unwanted connections could
598          * get accepted.
599          */
600         Assert(Unix_socket_group);
601         if (Unix_socket_group[0] != '\0')
602         {
603 #ifdef WIN32
604                 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
605 #else
606                 char       *endptr;
607                 unsigned long val;
608                 gid_t           gid;
609
610                 val = strtoul(Unix_socket_group, &endptr, 10);
611                 if (*endptr == '\0')
612                 {                                               /* numeric group id */
613                         gid = val;
614                 }
615                 else
616                 {                                               /* convert group name to id */
617                         struct group *gr;
618
619                         gr = getgrnam(Unix_socket_group);
620                         if (!gr)
621                         {
622                                 ereport(LOG,
623                                                 (errmsg("group \"%s\" does not exist",
624                                                                 Unix_socket_group)));
625                                 return STATUS_ERROR;
626                         }
627                         gid = gr->gr_gid;
628                 }
629                 if (chown(sock_path, -1, gid) == -1)
630                 {
631                         ereport(LOG,
632                                         (errcode_for_file_access(),
633                                          errmsg("could not set group of file \"%s\": %m",
634                                                         sock_path)));
635                         return STATUS_ERROR;
636                 }
637 #endif
638         }
639
640         if (chmod(sock_path, Unix_socket_permissions) == -1)
641         {
642                 ereport(LOG,
643                                 (errcode_for_file_access(),
644                                  errmsg("could not set permissions of file \"%s\": %m",
645                                                 sock_path)));
646                 return STATUS_ERROR;
647         }
648         return STATUS_OK;
649 }
650 #endif   /* HAVE_UNIX_SOCKETS */
651
652
653 /*
654  * StreamConnection -- create a new connection with client using
655  *              server port.  Set port->sock to the FD of the new connection.
656  *
657  * ASSUME: that this doesn't need to be non-blocking because
658  *              the Postmaster uses select() to tell when the server master
659  *              socket is ready for accept().
660  *
661  * RETURNS: STATUS_OK or STATUS_ERROR
662  */
663 int
664 StreamConnection(pgsocket server_fd, Port *port)
665 {
666         /* accept connection and fill in the client (remote) address */
667         port->raddr.salen = sizeof(port->raddr.addr);
668         if ((port->sock = accept(server_fd,
669                                                          (struct sockaddr *) & port->raddr.addr,
670                                                          &port->raddr.salen)) == PGINVALID_SOCKET)
671         {
672                 ereport(LOG,
673                                 (errcode_for_socket_access(),
674                                  errmsg("could not accept new connection: %m")));
675
676                 /*
677                  * If accept() fails then postmaster.c will still see the server
678                  * socket as read-ready, and will immediately try again.  To avoid
679                  * uselessly sucking lots of CPU, delay a bit before trying again.
680                  * (The most likely reason for failure is being out of kernel file
681                  * table slots; we can do little except hope some will get freed up.)
682                  */
683                 pg_usleep(100000L);             /* wait 0.1 sec */
684                 return STATUS_ERROR;
685         }
686
687 #ifdef SCO_ACCEPT_BUG
688
689         /*
690          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
691          * shouldn't hurt to catch it for all versions of those platforms.
692          */
693         if (port->raddr.addr.ss_family == 0)
694                 port->raddr.addr.ss_family = AF_UNIX;
695 #endif
696
697         /* fill in the server (local) address */
698         port->laddr.salen = sizeof(port->laddr.addr);
699         if (getsockname(port->sock,
700                                         (struct sockaddr *) & port->laddr.addr,
701                                         &port->laddr.salen) < 0)
702         {
703                 elog(LOG, "getsockname() failed: %m");
704                 return STATUS_ERROR;
705         }
706
707         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
708         if (!IS_AF_UNIX(port->laddr.addr.ss_family))
709         {
710                 int                     on;
711 #ifdef WIN32
712                 int                     oldopt;
713                 int                     optlen;
714                 int                     newopt;
715 #endif
716
717 #ifdef  TCP_NODELAY
718                 on = 1;
719                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
720                                            (char *) &on, sizeof(on)) < 0)
721                 {
722                         elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
723                         return STATUS_ERROR;
724                 }
725 #endif
726                 on = 1;
727                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
728                                            (char *) &on, sizeof(on)) < 0)
729                 {
730                         elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
731                         return STATUS_ERROR;
732                 }
733
734 #ifdef WIN32
735
736                 /*
737                  * This is a Win32 socket optimization.  The OS send buffer should be
738                  * large enough to send the whole Postgres send buffer in one go, or
739                  * performance suffers.  The Postgres send buffer can be enlarged if a
740                  * very large message needs to be sent, but we won't attempt to
741                  * enlarge the OS buffer if that happens, so somewhat arbitrarily
742                  * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
743                  * (That's 32kB with the current default).
744                  *
745                  * The default OS buffer size used to be 8kB in earlier Windows
746                  * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
747                  * be necessary to change it in later versions anymore.  Changing it
748                  * unnecessarily can even reduce performance, because setting
749                  * SO_SNDBUF in the application disables the "dynamic send buffering"
750                  * feature that was introduced in Windows 7.  So before fiddling with
751                  * SO_SNDBUF, check if the current buffer size is already large enough
752                  * and only increase it if necessary.
753                  *
754                  * See https://support.microsoft.com/kb/823764/EN-US/ and
755                  * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
756                  */
757                 optlen = sizeof(oldopt);
758                 if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
759                                            &optlen) < 0)
760                 {
761                         elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
762                         return STATUS_ERROR;
763                 }
764                 newopt = PQ_SEND_BUFFER_SIZE * 4;
765                 if (oldopt < newopt)
766                 {
767                         if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
768                                                    sizeof(newopt)) < 0)
769                         {
770                                 elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
771                                 return STATUS_ERROR;
772                         }
773                 }
774 #endif
775
776                 /*
777                  * Also apply the current keepalive parameters.  If we fail to set a
778                  * parameter, don't error out, because these aren't universally
779                  * supported.  (Note: you might think we need to reset the GUC
780                  * variables to 0 in such a case, but it's not necessary because the
781                  * show hooks for these variables report the truth anyway.)
782                  */
783                 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
784                 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
785                 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
786         }
787
788         return STATUS_OK;
789 }
790
791 /*
792  * StreamClose -- close a client/backend connection
793  *
794  * NOTE: this is NOT used to terminate a session; it is just used to release
795  * the file descriptor in a process that should no longer have the socket
796  * open.  (For example, the postmaster calls this after passing ownership
797  * of the connection to a child process.)  It is expected that someone else
798  * still has the socket open.  So, we only want to close the descriptor,
799  * we do NOT want to send anything to the far end.
800  */
801 void
802 StreamClose(pgsocket sock)
803 {
804         closesocket(sock);
805 }
806
807 /*
808  * TouchSocketFiles -- mark socket files as recently accessed
809  *
810  * This routine should be called every so often to ensure that the socket
811  * files have a recent mod date (ordinary operations on sockets usually won't
812  * change the mod date).  That saves them from being removed by
813  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
814  * never have put the socket file in /tmp...)
815  */
816 void
817 TouchSocketFiles(void)
818 {
819         ListCell   *l;
820
821         /* Loop through all created sockets... */
822         foreach(l, sock_paths)
823         {
824                 char       *sock_path = (char *) lfirst(l);
825
826                 /*
827                  * utime() is POSIX standard, utimes() is a common alternative. If we
828                  * have neither, there's no way to affect the mod or access time of
829                  * the socket :-(
830                  *
831                  * In either path, we ignore errors; there's no point in complaining.
832                  */
833 #ifdef HAVE_UTIME
834                 utime(sock_path, NULL);
835 #else                                                   /* !HAVE_UTIME */
836 #ifdef HAVE_UTIMES
837                 utimes(sock_path, NULL);
838 #endif   /* HAVE_UTIMES */
839 #endif   /* HAVE_UTIME */
840         }
841 }
842
843 /*
844  * RemoveSocketFiles -- unlink socket files at postmaster shutdown
845  */
846 void
847 RemoveSocketFiles(void)
848 {
849         ListCell   *l;
850
851         /* Loop through all created sockets... */
852         foreach(l, sock_paths)
853         {
854                 char       *sock_path = (char *) lfirst(l);
855
856                 /* Ignore any error. */
857                 (void) unlink(sock_path);
858         }
859         /* Since we're about to exit, no need to reclaim storage */
860         sock_paths = NIL;
861 }
862
863
864 /* --------------------------------
865  * Low-level I/O routines begin here.
866  *
867  * These routines communicate with a frontend client across a connection
868  * already established by the preceding routines.
869  * --------------------------------
870  */
871
872 /* --------------------------------
873  *                        socket_set_nonblocking - set socket blocking/non-blocking
874  *
875  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
876  * blocking otherwise.
877  * --------------------------------
878  */
879 static void
880 socket_set_nonblocking(bool nonblocking)
881 {
882         if (MyProcPort == NULL)
883                 ereport(ERROR,
884                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
885                                  errmsg("there is no client connection")));
886
887         MyProcPort->noblock = nonblocking;
888 }
889
890 /* --------------------------------
891  *              pq_recvbuf - load some bytes into the input buffer
892  *
893  *              returns 0 if OK, EOF if trouble
894  * --------------------------------
895  */
896 static int
897 pq_recvbuf(void)
898 {
899         if (PqRecvPointer > 0)
900         {
901                 if (PqRecvLength > PqRecvPointer)
902                 {
903                         /* still some unread data, left-justify it in the buffer */
904                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
905                                         PqRecvLength - PqRecvPointer);
906                         PqRecvLength -= PqRecvPointer;
907                         PqRecvPointer = 0;
908                 }
909                 else
910                         PqRecvLength = PqRecvPointer = 0;
911         }
912
913         /* Ensure that we're in blocking mode */
914         socket_set_nonblocking(false);
915
916         /* Can fill buffer from PqRecvLength and upwards */
917         for (;;)
918         {
919                 int                     r;
920
921                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
922                                                 PQ_RECV_BUFFER_SIZE - PqRecvLength);
923
924                 if (r < 0)
925                 {
926                         if (errno == EINTR)
927                                 continue;               /* Ok if interrupted */
928
929                         /*
930                          * Careful: an ereport() that tries to write to the client would
931                          * cause recursion to here, leading to stack overflow and core
932                          * dump!  This message must go *only* to the postmaster log.
933                          */
934                         ereport(COMMERROR,
935                                         (errcode_for_socket_access(),
936                                          errmsg("could not receive data from client: %m")));
937                         return EOF;
938                 }
939                 if (r == 0)
940                 {
941                         /*
942                          * EOF detected.  We used to write a log message here, but it's
943                          * better to expect the ultimate caller to do that.
944                          */
945                         return EOF;
946                 }
947                 /* r contains number of bytes read, so just incr length */
948                 PqRecvLength += r;
949                 return 0;
950         }
951 }
952
953 /* --------------------------------
954  *              pq_getbyte      - get a single byte from connection, or return EOF
955  * --------------------------------
956  */
957 int
958 pq_getbyte(void)
959 {
960         Assert(PqCommReadingMsg);
961
962         while (PqRecvPointer >= PqRecvLength)
963         {
964                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
965                         return EOF;                     /* Failed to recv data */
966         }
967         return (unsigned char) PqRecvBuffer[PqRecvPointer++];
968 }
969
970 /* --------------------------------
971  *              pq_peekbyte             - peek at next byte from connection
972  *
973  *       Same as pq_getbyte() except we don't advance the pointer.
974  * --------------------------------
975  */
976 int
977 pq_peekbyte(void)
978 {
979         Assert(PqCommReadingMsg);
980
981         while (PqRecvPointer >= PqRecvLength)
982         {
983                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
984                         return EOF;                     /* Failed to recv data */
985         }
986         return (unsigned char) PqRecvBuffer[PqRecvPointer];
987 }
988
989 /* --------------------------------
990  *              pq_getbyte_if_available - get a single byte from connection,
991  *                      if available
992  *
993  * The received byte is stored in *c. Returns 1 if a byte was read,
994  * 0 if no data was available, or EOF if trouble.
995  * --------------------------------
996  */
997 int
998 pq_getbyte_if_available(unsigned char *c)
999 {
1000         int                     r;
1001
1002         Assert(PqCommReadingMsg);
1003
1004         if (PqRecvPointer < PqRecvLength)
1005         {
1006                 *c = PqRecvBuffer[PqRecvPointer++];
1007                 return 1;
1008         }
1009
1010         /* Put the socket into non-blocking mode */
1011         socket_set_nonblocking(true);
1012
1013         r = secure_read(MyProcPort, c, 1);
1014         if (r < 0)
1015         {
1016                 /*
1017                  * Ok if no data available without blocking or interrupted (though
1018                  * EINTR really shouldn't happen with a non-blocking socket). Report
1019                  * other errors.
1020                  */
1021                 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1022                         r = 0;
1023                 else
1024                 {
1025                         /*
1026                          * Careful: an ereport() that tries to write to the client would
1027                          * cause recursion to here, leading to stack overflow and core
1028                          * dump!  This message must go *only* to the postmaster log.
1029                          */
1030                         ereport(COMMERROR,
1031                                         (errcode_for_socket_access(),
1032                                          errmsg("could not receive data from client: %m")));
1033                         r = EOF;
1034                 }
1035         }
1036         else if (r == 0)
1037         {
1038                 /* EOF detected */
1039                 r = EOF;
1040         }
1041
1042         return r;
1043 }
1044
1045 /* --------------------------------
1046  *              pq_getbytes             - get a known number of bytes from connection
1047  *
1048  *              returns 0 if OK, EOF if trouble
1049  * --------------------------------
1050  */
1051 int
1052 pq_getbytes(char *s, size_t len)
1053 {
1054         size_t          amount;
1055
1056         Assert(PqCommReadingMsg);
1057
1058         while (len > 0)
1059         {
1060                 while (PqRecvPointer >= PqRecvLength)
1061                 {
1062                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1063                                 return EOF;             /* Failed to recv data */
1064                 }
1065                 amount = PqRecvLength - PqRecvPointer;
1066                 if (amount > len)
1067                         amount = len;
1068                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1069                 PqRecvPointer += amount;
1070                 s += amount;
1071                 len -= amount;
1072         }
1073         return 0;
1074 }
1075
1076 /* --------------------------------
1077  *              pq_discardbytes         - throw away a known number of bytes
1078  *
1079  *              same as pq_getbytes except we do not copy the data to anyplace.
1080  *              this is used for resynchronizing after read errors.
1081  *
1082  *              returns 0 if OK, EOF if trouble
1083  * --------------------------------
1084  */
1085 static int
1086 pq_discardbytes(size_t len)
1087 {
1088         size_t          amount;
1089
1090         Assert(PqCommReadingMsg);
1091
1092         while (len > 0)
1093         {
1094                 while (PqRecvPointer >= PqRecvLength)
1095                 {
1096                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1097                                 return EOF;             /* Failed to recv data */
1098                 }
1099                 amount = PqRecvLength - PqRecvPointer;
1100                 if (amount > len)
1101                         amount = len;
1102                 PqRecvPointer += amount;
1103                 len -= amount;
1104         }
1105         return 0;
1106 }
1107
1108 /* --------------------------------
1109  *              pq_getstring    - get a null terminated string from connection
1110  *
1111  *              The return value is placed in an expansible StringInfo, which has
1112  *              already been initialized by the caller.
1113  *
1114  *              This is used only for dealing with old-protocol clients.  The idea
1115  *              is to produce a StringInfo that looks the same as we would get from
1116  *              pq_getmessage() with a newer client; we will then process it with
1117  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
1118  *              even though this is presumably useful only for text.
1119  *
1120  *              returns 0 if OK, EOF if trouble
1121  * --------------------------------
1122  */
1123 int
1124 pq_getstring(StringInfo s)
1125 {
1126         int                     i;
1127
1128         Assert(PqCommReadingMsg);
1129
1130         resetStringInfo(s);
1131
1132         /* Read until we get the terminating '\0' */
1133         for (;;)
1134         {
1135                 while (PqRecvPointer >= PqRecvLength)
1136                 {
1137                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1138                                 return EOF;             /* Failed to recv data */
1139                 }
1140
1141                 for (i = PqRecvPointer; i < PqRecvLength; i++)
1142                 {
1143                         if (PqRecvBuffer[i] == '\0')
1144                         {
1145                                 /* include the '\0' in the copy */
1146                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1147                                                                            i - PqRecvPointer + 1);
1148                                 PqRecvPointer = i + 1;  /* advance past \0 */
1149                                 return 0;
1150                         }
1151                 }
1152
1153                 /* If we're here we haven't got the \0 in the buffer yet. */
1154                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1155                                                            PqRecvLength - PqRecvPointer);
1156                 PqRecvPointer = PqRecvLength;
1157         }
1158 }
1159
1160
1161 /* --------------------------------
1162  *              pq_startmsgread - begin reading a message from the client.
1163  *
1164  *              This must be called before any of the pq_get* functions.
1165  * --------------------------------
1166  */
1167 void
1168 pq_startmsgread(void)
1169 {
1170         /*
1171          * There shouldn't be a read active already, but let's check just to be
1172          * sure.
1173          */
1174         if (PqCommReadingMsg)
1175                 ereport(FATAL,
1176                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1177                    errmsg("terminating connection because protocol synchronization was lost")));
1178
1179         PqCommReadingMsg = true;
1180 }
1181
1182
1183 /* --------------------------------
1184  *              pq_endmsgread   - finish reading message.
1185  *
1186  *              This must be called after reading a V2 protocol message with
1187  *              pq_getstring() and friends, to indicate that we have read the whole
1188  *              message. In V3 protocol, pq_getmessage() does this implicitly.
1189  * --------------------------------
1190  */
1191 void
1192 pq_endmsgread(void)
1193 {
1194         Assert(PqCommReadingMsg);
1195
1196         PqCommReadingMsg = false;
1197 }
1198
1199 /* --------------------------------
1200  *              pq_is_reading_msg - are we currently reading a message?
1201  *
1202  * This is used in error recovery at the outer idle loop to detect if we have
1203  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1204  * will check for that too, but it's nicer to detect it earlier.
1205  * --------------------------------
1206  */
1207 bool
1208 pq_is_reading_msg(void)
1209 {
1210         return PqCommReadingMsg;
1211 }
1212
1213 /* --------------------------------
1214  *              pq_getmessage   - get a message with length word from connection
1215  *
1216  *              The return value is placed in an expansible StringInfo, which has
1217  *              already been initialized by the caller.
1218  *              Only the message body is placed in the StringInfo; the length word
1219  *              is removed.  Also, s->cursor is initialized to zero for convenience
1220  *              in scanning the message contents.
1221  *
1222  *              If maxlen is not zero, it is an upper limit on the length of the
1223  *              message we are willing to accept.  We abort the connection (by
1224  *              returning EOF) if client tries to send more than that.
1225  *
1226  *              returns 0 if OK, EOF if trouble
1227  * --------------------------------
1228  */
1229 int
1230 pq_getmessage(StringInfo s, int maxlen)
1231 {
1232         int32           len;
1233
1234         Assert(PqCommReadingMsg);
1235
1236         resetStringInfo(s);
1237
1238         /* Read message length word */
1239         if (pq_getbytes((char *) &len, 4) == EOF)
1240         {
1241                 ereport(COMMERROR,
1242                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1243                                  errmsg("unexpected EOF within message length word")));
1244                 return EOF;
1245         }
1246
1247         len = ntohl(len);
1248
1249         if (len < 4 ||
1250                 (maxlen > 0 && len > maxlen))
1251         {
1252                 ereport(COMMERROR,
1253                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1254                                  errmsg("invalid message length")));
1255                 return EOF;
1256         }
1257
1258         len -= 4;                                       /* discount length itself */
1259
1260         if (len > 0)
1261         {
1262                 /*
1263                  * Allocate space for message.  If we run out of room (ridiculously
1264                  * large message), we will elog(ERROR), but we want to discard the
1265                  * message body so as not to lose communication sync.
1266                  */
1267                 PG_TRY();
1268                 {
1269                         enlargeStringInfo(s, len);
1270                 }
1271                 PG_CATCH();
1272                 {
1273                         if (pq_discardbytes(len) == EOF)
1274                                 ereport(COMMERROR,
1275                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1276                                                  errmsg("incomplete message from client")));
1277
1278                         /* we discarded the rest of the message so we're back in sync. */
1279                         PqCommReadingMsg = false;
1280                         PG_RE_THROW();
1281                 }
1282                 PG_END_TRY();
1283
1284                 /* And grab the message */
1285                 if (pq_getbytes(s->data, len) == EOF)
1286                 {
1287                         ereport(COMMERROR,
1288                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
1289                                          errmsg("incomplete message from client")));
1290                         return EOF;
1291                 }
1292                 s->len = len;
1293                 /* Place a trailing null per StringInfo convention */
1294                 s->data[len] = '\0';
1295         }
1296
1297         /* finished reading the message. */
1298         PqCommReadingMsg = false;
1299
1300         return 0;
1301 }
1302
1303
1304 /* --------------------------------
1305  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
1306  *
1307  *              returns 0 if OK, EOF if trouble
1308  * --------------------------------
1309  */
1310 int
1311 pq_putbytes(const char *s, size_t len)
1312 {
1313         int                     res;
1314
1315         /* Should only be called by old-style COPY OUT */
1316         Assert(DoingCopyOut);
1317         /* No-op if reentrant call */
1318         if (PqCommBusy)
1319                 return 0;
1320         PqCommBusy = true;
1321         res = internal_putbytes(s, len);
1322         PqCommBusy = false;
1323         return res;
1324 }
1325
1326 static int
1327 internal_putbytes(const char *s, size_t len)
1328 {
1329         size_t          amount;
1330
1331         while (len > 0)
1332         {
1333                 /* If buffer is full, then flush it out */
1334                 if (PqSendPointer >= PqSendBufferSize)
1335                 {
1336                         socket_set_nonblocking(false);
1337                         if (internal_flush())
1338                                 return EOF;
1339                 }
1340                 amount = PqSendBufferSize - PqSendPointer;
1341                 if (amount > len)
1342                         amount = len;
1343                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
1344                 PqSendPointer += amount;
1345                 s += amount;
1346                 len -= amount;
1347         }
1348         return 0;
1349 }
1350
1351 /* --------------------------------
1352  *              socket_flush            - flush pending output
1353  *
1354  *              returns 0 if OK, EOF if trouble
1355  * --------------------------------
1356  */
1357 static int
1358 socket_flush(void)
1359 {
1360         int                     res;
1361
1362         /* No-op if reentrant call */
1363         if (PqCommBusy)
1364                 return 0;
1365         PqCommBusy = true;
1366         socket_set_nonblocking(false);
1367         res = internal_flush();
1368         PqCommBusy = false;
1369         return res;
1370 }
1371
1372 /* --------------------------------
1373  *              internal_flush - flush pending output
1374  *
1375  * Returns 0 if OK (meaning everything was sent, or operation would block
1376  * and the socket is in non-blocking mode), or EOF if trouble.
1377  * --------------------------------
1378  */
1379 static int
1380 internal_flush(void)
1381 {
1382         static int      last_reported_send_errno = 0;
1383
1384         char       *bufptr = PqSendBuffer + PqSendStart;
1385         char       *bufend = PqSendBuffer + PqSendPointer;
1386
1387         while (bufptr < bufend)
1388         {
1389                 int                     r;
1390
1391                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1392
1393                 if (r <= 0)
1394                 {
1395                         if (errno == EINTR)
1396                                 continue;               /* Ok if we were interrupted */
1397
1398                         /*
1399                          * Ok if no data writable without blocking, and the socket is in
1400                          * non-blocking mode.
1401                          */
1402                         if (errno == EAGAIN ||
1403                                 errno == EWOULDBLOCK)
1404                         {
1405                                 return 0;
1406                         }
1407
1408                         /*
1409                          * Careful: an ereport() that tries to write to the client would
1410                          * cause recursion to here, leading to stack overflow and core
1411                          * dump!  This message must go *only* to the postmaster log.
1412                          *
1413                          * If a client disconnects while we're in the midst of output, we
1414                          * might write quite a bit of data before we get to a safe query
1415                          * abort point.  So, suppress duplicate log messages.
1416                          */
1417                         if (errno != last_reported_send_errno)
1418                         {
1419                                 last_reported_send_errno = errno;
1420                                 ereport(COMMERROR,
1421                                                 (errcode_for_socket_access(),
1422                                                  errmsg("could not send data to client: %m")));
1423                         }
1424
1425                         /*
1426                          * We drop the buffered data anyway so that processing can
1427                          * continue, even though we'll probably quit soon. We also set a
1428                          * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1429                          * the connection.
1430                          */
1431                         PqSendStart = PqSendPointer = 0;
1432                         ClientConnectionLost = 1;
1433                         InterruptPending = 1;
1434                         return EOF;
1435                 }
1436
1437                 last_reported_send_errno = 0;   /* reset after any successful send */
1438                 bufptr += r;
1439                 PqSendStart += r;
1440         }
1441
1442         PqSendStart = PqSendPointer = 0;
1443         return 0;
1444 }
1445
1446 /* --------------------------------
1447  *              pq_flush_if_writable - flush pending output if writable without blocking
1448  *
1449  * Returns 0 if OK, or EOF if trouble.
1450  * --------------------------------
1451  */
1452 static int
1453 socket_flush_if_writable(void)
1454 {
1455         int                     res;
1456
1457         /* Quick exit if nothing to do */
1458         if (PqSendPointer == PqSendStart)
1459                 return 0;
1460
1461         /* No-op if reentrant call */
1462         if (PqCommBusy)
1463                 return 0;
1464
1465         /* Temporarily put the socket into non-blocking mode */
1466         socket_set_nonblocking(true);
1467
1468         PqCommBusy = true;
1469         res = internal_flush();
1470         PqCommBusy = false;
1471         return res;
1472 }
1473
1474 /* --------------------------------
1475  *      socket_is_send_pending  - is there any pending data in the output buffer?
1476  * --------------------------------
1477  */
1478 static bool
1479 socket_is_send_pending(void)
1480 {
1481         return (PqSendStart < PqSendPointer);
1482 }
1483
1484 /* --------------------------------
1485  * Message-level I/O routines begin here.
1486  *
1487  * These routines understand about the old-style COPY OUT protocol.
1488  * --------------------------------
1489  */
1490
1491
1492 /* --------------------------------
1493  *              socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1494  *
1495  *              If msgtype is not '\0', it is a message type code to place before
1496  *              the message body.  If msgtype is '\0', then the message has no type
1497  *              code (this is only valid in pre-3.0 protocols).
1498  *
1499  *              len is the length of the message body data at *s.  In protocol 3.0
1500  *              and later, a message length word (equal to len+4 because it counts
1501  *              itself too) is inserted by this routine.
1502  *
1503  *              All normal messages are suppressed while old-style COPY OUT is in
1504  *              progress.  (In practice only a few notice messages might get emitted
1505  *              then; dropping them is annoying, but at least they will still appear
1506  *              in the postmaster log.)
1507  *
1508  *              We also suppress messages generated while pqcomm.c is busy.  This
1509  *              avoids any possibility of messages being inserted within other
1510  *              messages.  The only known trouble case arises if SIGQUIT occurs
1511  *              during a pqcomm.c routine --- quickdie() will try to send a warning
1512  *              message, and the most reasonable approach seems to be to drop it.
1513  *
1514  *              returns 0 if OK, EOF if trouble
1515  * --------------------------------
1516  */
1517 static int
1518 socket_putmessage(char msgtype, const char *s, size_t len)
1519 {
1520         if (DoingCopyOut || PqCommBusy)
1521                 return 0;
1522         PqCommBusy = true;
1523         if (msgtype)
1524                 if (internal_putbytes(&msgtype, 1))
1525                         goto fail;
1526         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1527         {
1528                 uint32          n32;
1529
1530                 n32 = htonl((uint32) (len + 4));
1531                 if (internal_putbytes((char *) &n32, 4))
1532                         goto fail;
1533         }
1534         if (internal_putbytes(s, len))
1535                 goto fail;
1536         PqCommBusy = false;
1537         return 0;
1538
1539 fail:
1540         PqCommBusy = false;
1541         return EOF;
1542 }
1543
1544 /* --------------------------------
1545  *              pq_putmessage_noblock   - like pq_putmessage, but never blocks
1546  *
1547  *              If the output buffer is too small to hold the message, the buffer
1548  *              is enlarged.
1549  */
1550 static void
1551 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1552 {
1553         int res         PG_USED_FOR_ASSERTS_ONLY;
1554         int                     required;
1555
1556         /*
1557          * Ensure we have enough space in the output buffer for the message header
1558          * as well as the message itself.
1559          */
1560         required = PqSendPointer + 1 + 4 + len;
1561         if (required > PqSendBufferSize)
1562         {
1563                 PqSendBuffer = repalloc(PqSendBuffer, required);
1564                 PqSendBufferSize = required;
1565         }
1566         res = pq_putmessage(msgtype, s, len);
1567         Assert(res == 0);                       /* should not fail when the message fits in
1568                                                                  * buffer */
1569 }
1570
1571
1572 /* --------------------------------
1573  *              socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1574  *                      is beginning
1575  * --------------------------------
1576  */
1577 static void
1578 socket_startcopyout(void)
1579 {
1580         DoingCopyOut = true;
1581 }
1582
1583 /* --------------------------------
1584  *              socket_endcopyout       - end an old-style COPY OUT transfer
1585  *
1586  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1587  *              and must send a terminator line.  Since a partial data line might have
1588  *              been emitted, send a couple of newlines first (the first one could
1589  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
1590  *              not allow binary transfers, so a textual terminator is always correct.
1591  * --------------------------------
1592  */
1593 static void
1594 socket_endcopyout(bool errorAbort)
1595 {
1596         if (!DoingCopyOut)
1597                 return;
1598         if (errorAbort)
1599                 pq_putbytes("\n\n\\.\n", 5);
1600         /* in non-error case, copy.c will have emitted the terminator line */
1601         DoingCopyOut = false;
1602 }
1603
1604 /*
1605  * Support for TCP Keepalive parameters
1606  */
1607
1608 /*
1609  * On Windows, we need to set both idle and interval at the same time.
1610  * We also cannot reset them to the default (setting to zero will
1611  * actually set them to zero, not default), therefore we fallback to
1612  * the out-of-the-box default instead.
1613  */
1614 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1615 static int
1616 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1617 {
1618         struct tcp_keepalive ka;
1619         DWORD           retsize;
1620
1621         if (idle <= 0)
1622                 idle = 2 * 60 * 60;             /* default = 2 hours */
1623         if (interval <= 0)
1624                 interval = 1;                   /* default = 1 second */
1625
1626         ka.onoff = 1;
1627         ka.keepalivetime = idle * 1000;
1628         ka.keepaliveinterval = interval * 1000;
1629
1630         if (WSAIoctl(port->sock,
1631                                  SIO_KEEPALIVE_VALS,
1632                                  (LPVOID) &ka,
1633                                  sizeof(ka),
1634                                  NULL,
1635                                  0,
1636                                  &retsize,
1637                                  NULL,
1638                                  NULL)
1639                 != 0)
1640         {
1641                 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1642                          WSAGetLastError());
1643                 return STATUS_ERROR;
1644         }
1645         if (port->keepalives_idle != idle)
1646                 port->keepalives_idle = idle;
1647         if (port->keepalives_interval != interval)
1648                 port->keepalives_interval = interval;
1649         return STATUS_OK;
1650 }
1651 #endif
1652
1653 int
1654 pq_getkeepalivesidle(Port *port)
1655 {
1656 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
1657         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1658                 return 0;
1659
1660         if (port->keepalives_idle != 0)
1661                 return port->keepalives_idle;
1662
1663         if (port->default_keepalives_idle == 0)
1664         {
1665 #ifndef WIN32
1666                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1667
1668 #ifdef TCP_KEEPIDLE
1669                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1670                                            (char *) &port->default_keepalives_idle,
1671                                            &size) < 0)
1672                 {
1673                         elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
1674                         port->default_keepalives_idle = -1; /* don't know */
1675                 }
1676 #else
1677                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1678                                            (char *) &port->default_keepalives_idle,
1679                                            &size) < 0)
1680                 {
1681                         elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
1682                         port->default_keepalives_idle = -1; /* don't know */
1683                 }
1684 #endif   /* TCP_KEEPIDLE */
1685 #else                                                   /* WIN32 */
1686                 /* We can't get the defaults on Windows, so return "don't know" */
1687                 port->default_keepalives_idle = -1;
1688 #endif   /* WIN32 */
1689         }
1690
1691         return port->default_keepalives_idle;
1692 #else
1693         return 0;
1694 #endif
1695 }
1696
1697 int
1698 pq_setkeepalivesidle(int idle, Port *port)
1699 {
1700         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1701                 return STATUS_OK;
1702
1703 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
1704         if (idle == port->keepalives_idle)
1705                 return STATUS_OK;
1706
1707 #ifndef WIN32
1708         if (port->default_keepalives_idle <= 0)
1709         {
1710                 if (pq_getkeepalivesidle(port) < 0)
1711                 {
1712                         if (idle == 0)
1713                                 return STATUS_OK;               /* default is set but unknown */
1714                         else
1715                                 return STATUS_ERROR;
1716                 }
1717         }
1718
1719         if (idle == 0)
1720                 idle = port->default_keepalives_idle;
1721
1722 #ifdef TCP_KEEPIDLE
1723         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1724                                    (char *) &idle, sizeof(idle)) < 0)
1725         {
1726                 elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
1727                 return STATUS_ERROR;
1728         }
1729 #else
1730         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1731                                    (char *) &idle, sizeof(idle)) < 0)
1732         {
1733                 elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
1734                 return STATUS_ERROR;
1735         }
1736 #endif
1737
1738         port->keepalives_idle = idle;
1739 #else                                                   /* WIN32 */
1740         return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1741 #endif
1742 #else                                                   /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
1743         if (idle != 0)
1744         {
1745                 elog(LOG, "setting the keepalive idle time is not supported");
1746                 return STATUS_ERROR;
1747         }
1748 #endif
1749         return STATUS_OK;
1750 }
1751
1752 int
1753 pq_getkeepalivesinterval(Port *port)
1754 {
1755 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1756         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1757                 return 0;
1758
1759         if (port->keepalives_interval != 0)
1760                 return port->keepalives_interval;
1761
1762         if (port->default_keepalives_interval == 0)
1763         {
1764 #ifndef WIN32
1765                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1766
1767                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1768                                            (char *) &port->default_keepalives_interval,
1769                                            &size) < 0)
1770                 {
1771                         elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
1772                         port->default_keepalives_interval = -1;         /* don't know */
1773                 }
1774 #else
1775                 /* We can't get the defaults on Windows, so return "don't know" */
1776                 port->default_keepalives_interval = -1;
1777 #endif   /* WIN32 */
1778         }
1779
1780         return port->default_keepalives_interval;
1781 #else
1782         return 0;
1783 #endif
1784 }
1785
1786 int
1787 pq_setkeepalivesinterval(int interval, Port *port)
1788 {
1789         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1790                 return STATUS_OK;
1791
1792 #if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
1793         if (interval == port->keepalives_interval)
1794                 return STATUS_OK;
1795
1796 #ifndef WIN32
1797         if (port->default_keepalives_interval <= 0)
1798         {
1799                 if (pq_getkeepalivesinterval(port) < 0)
1800                 {
1801                         if (interval == 0)
1802                                 return STATUS_OK;               /* default is set but unknown */
1803                         else
1804                                 return STATUS_ERROR;
1805                 }
1806         }
1807
1808         if (interval == 0)
1809                 interval = port->default_keepalives_interval;
1810
1811         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1812                                    (char *) &interval, sizeof(interval)) < 0)
1813         {
1814                 elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
1815                 return STATUS_ERROR;
1816         }
1817
1818         port->keepalives_interval = interval;
1819 #else                                                   /* WIN32 */
1820         return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1821 #endif
1822 #else
1823         if (interval != 0)
1824         {
1825                 elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
1826                 return STATUS_ERROR;
1827         }
1828 #endif
1829
1830         return STATUS_OK;
1831 }
1832
1833 int
1834 pq_getkeepalivescount(Port *port)
1835 {
1836 #ifdef TCP_KEEPCNT
1837         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1838                 return 0;
1839
1840         if (port->keepalives_count != 0)
1841                 return port->keepalives_count;
1842
1843         if (port->default_keepalives_count == 0)
1844         {
1845                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1846
1847                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1848                                            (char *) &port->default_keepalives_count,
1849                                            &size) < 0)
1850                 {
1851                         elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
1852                         port->default_keepalives_count = -1;            /* don't know */
1853                 }
1854         }
1855
1856         return port->default_keepalives_count;
1857 #else
1858         return 0;
1859 #endif
1860 }
1861
1862 int
1863 pq_setkeepalivescount(int count, Port *port)
1864 {
1865         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1866                 return STATUS_OK;
1867
1868 #ifdef TCP_KEEPCNT
1869         if (count == port->keepalives_count)
1870                 return STATUS_OK;
1871
1872         if (port->default_keepalives_count <= 0)
1873         {
1874                 if (pq_getkeepalivescount(port) < 0)
1875                 {
1876                         if (count == 0)
1877                                 return STATUS_OK;               /* default is set but unknown */
1878                         else
1879                                 return STATUS_ERROR;
1880                 }
1881         }
1882
1883         if (count == 0)
1884                 count = port->default_keepalives_count;
1885
1886         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1887                                    (char *) &count, sizeof(count)) < 0)
1888         {
1889                 elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
1890                 return STATUS_ERROR;
1891         }
1892
1893         port->keepalives_count = count;
1894 #else
1895         if (count != 0)
1896         {
1897                 elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
1898                 return STATUS_ERROR;
1899         }
1900 #endif
1901
1902         return STATUS_OK;
1903 }