]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
Don't set SO_SNDBUF on recent Windows versions that have a bigger default.
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFiles        - Protect socket files against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_comm_reset   - reset libpq during error recovery
48  *              pq_close                - shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *              pq_getbytes             - get a known number of bytes from connection
52  *              pq_getstring    - get a null terminated string from connection
53  *              pq_getmessage   - get a message with length word from connection
54  *              pq_getbyte              - get next byte from connection
55  *              pq_peekbyte             - peek at next byte from connection
56  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
57  *              pq_flush                - flush pending output
58  *              pq_flush_if_writable - flush pending output if writable without blocking
59  *              pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
63  *              pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *              pq_endcopyout   - end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef WIN32_ONLY_COMPILER              /* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91
92 #include "libpq/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105 /* Where the Unix socket files are (list of palloc'd strings) */
106 static List *sock_paths = NIL;
107
108 /*
109  * Buffers for low-level I/O.
110  *
111  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
112  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
113  */
114
115 #define PQ_SEND_BUFFER_SIZE 8192
116 #define PQ_RECV_BUFFER_SIZE 8192
117
118 static char *PqSendBuffer;
119 static int      PqSendBufferSize;       /* Size send buffer */
120 static int      PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
121 static int      PqSendStart;            /* Next index to send a byte in PqSendBuffer */
122
123 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
124 static int      PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
125 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
126
127 /*
128  * Message status
129  */
130 static bool PqCommBusy;                 /* busy sending data to the client */
131 static bool PqCommReadingMsg;   /* in the middle of reading a message */
132 static bool DoingCopyOut;               /* in old-protocol COPY OUT processing */
133
134
135 /* Internal functions */
136 static void socket_comm_reset(void);
137 static void socket_close(int code, Datum arg);
138 static void socket_set_nonblocking(bool nonblocking);
139 static int      socket_flush(void);
140 static int      socket_flush_if_writable(void);
141 static bool socket_is_send_pending(void);
142 static int      socket_putmessage(char msgtype, const char *s, size_t len);
143 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
144 static void socket_startcopyout(void);
145 static void socket_endcopyout(bool errorAbort);
146 static int      internal_putbytes(const char *s, size_t len);
147 static int      internal_flush(void);
148 static void socket_set_nonblocking(bool nonblocking);
149
150 #ifdef HAVE_UNIX_SOCKETS
151 static int      Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
152 static int      Setup_AF_UNIX(char *sock_path);
153 #endif   /* HAVE_UNIX_SOCKETS */
154
155 static PQcommMethods PqCommSocketMethods = {
156         socket_comm_reset,
157         socket_flush,
158         socket_flush_if_writable,
159         socket_is_send_pending,
160         socket_putmessage,
161         socket_putmessage_noblock,
162         socket_startcopyout,
163         socket_endcopyout
164 };
165
166 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
167
168
169
170 /* --------------------------------
171  *              pq_init - initialize libpq at backend startup
172  * --------------------------------
173  */
174 void
175 pq_init(void)
176 {
177         PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
178         PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
179         PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
180         PqCommBusy = false;
181         PqCommReadingMsg = false;
182         DoingCopyOut = false;
183         on_proc_exit(socket_close, 0);
184
185         /*
186          * In backends (as soon as forked) we operate the underlying socket in
187          * nonblocking mode and use latches to implement blocking semantics if
188          * needed. That allows us to provide safely interruptible reads and
189          * writes.
190          *
191          * Use COMMERROR on failure, because ERROR would try to send the error to
192          * the client, which might require changing the mode again, leading to
193          * infinite recursion.
194          */
195 #ifndef WIN32
196         if (!pg_set_noblock(MyProcPort->sock))
197                 ereport(COMMERROR,
198                                 (errmsg("could not set socket to nonblocking mode: %m")));
199 #endif
200
201 }
202
203 /* --------------------------------
204  *              socket_comm_reset - reset libpq during error recovery
205  *
206  * This is called from error recovery at the outer idle loop.  It's
207  * just to get us out of trouble if we somehow manage to elog() from
208  * inside a pqcomm.c routine (which ideally will never happen, but...)
209  * --------------------------------
210  */
211 static void
212 socket_comm_reset(void)
213 {
214         /* Do not throw away pending data, but do reset the busy flag */
215         PqCommBusy = false;
216         /* We can abort any old-style COPY OUT, too */
217         pq_endcopyout(true);
218 }
219
220 /* --------------------------------
221  *              socket_close - shutdown libpq at backend exit
222  *
223  * This is the one pg_on_exit_callback in place during BackendInitialize().
224  * That function's unusual signal handling constrains that this callback be
225  * safe to run at any instant.
226  * --------------------------------
227  */
228 static void
229 socket_close(int code, Datum arg)
230 {
231         /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
232         if (MyProcPort != NULL)
233         {
234 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
235 #ifdef ENABLE_GSS
236                 OM_uint32       min_s;
237
238                 /*
239                  * Shutdown GSSAPI layer.  This section does nothing when interrupting
240                  * BackendInitialize(), because pg_GSS_recvauth() makes first use of
241                  * "ctx" and "cred".
242                  */
243                 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
244                         gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
245
246                 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
247                         gss_release_cred(&min_s, &MyProcPort->gss->cred);
248 #endif   /* ENABLE_GSS */
249
250                 /*
251                  * GSS and SSPI share the port->gss struct.  Since nowhere else does a
252                  * postmaster child free this, doing so is safe when interrupting
253                  * BackendInitialize().
254                  */
255                 free(MyProcPort->gss);
256 #endif   /* ENABLE_GSS || ENABLE_SSPI */
257
258                 /*
259                  * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
260                  * call this, so this is safe when interrupting BackendInitialize().
261                  */
262                 secure_close(MyProcPort);
263
264                 /*
265                  * Formerly we did an explicit close() here, but it seems better to
266                  * leave the socket open until the process dies.  This allows clients
267                  * to perform a "synchronous close" if they care --- wait till the
268                  * transport layer reports connection closure, and you can be sure the
269                  * backend has exited.
270                  *
271                  * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
272                  * though.
273                  */
274                 MyProcPort->sock = PGINVALID_SOCKET;
275         }
276 }
277
278
279
280 /*
281  * Streams -- wrapper around Unix socket system calls
282  *
283  *
284  *              Stream functions are used for vanilla TCP connection protocol.
285  */
286
287
288 /* StreamDoUnlink()
289  * Shutdown routine for backend connection
290  * If any Unix sockets are used for communication, explicitly close them.
291  */
292 #ifdef HAVE_UNIX_SOCKETS
293 static void
294 StreamDoUnlink(int code, Datum arg)
295 {
296         ListCell   *l;
297
298         /* Loop through all created sockets... */
299         foreach(l, sock_paths)
300         {
301                 char       *sock_path = (char *) lfirst(l);
302
303                 unlink(sock_path);
304         }
305         /* Since we're about to exit, no need to reclaim storage */
306         sock_paths = NIL;
307 }
308 #endif   /* HAVE_UNIX_SOCKETS */
309
310 /*
311  * StreamServerPort -- open a "listening" port to accept connections.
312  *
313  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
314  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
315  * specified.  For TCP ports, hostName is either NULL for all interfaces or
316  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
317  *
318  * Successfully opened sockets are added to the ListenSocket[] array (of
319  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
320  *
321  * RETURNS: STATUS_OK or STATUS_ERROR
322  */
323
324 int
325 StreamServerPort(int family, char *hostName, unsigned short portNumber,
326                                  char *unixSocketDir,
327                                  pgsocket ListenSocket[], int MaxListen)
328 {
329         pgsocket        fd;
330         int                     err;
331         int                     maxconn;
332         int                     ret;
333         char            portNumberStr[32];
334         const char *familyDesc;
335         char            familyDescBuf[64];
336         char       *service;
337         struct addrinfo *addrs = NULL,
338                            *addr;
339         struct addrinfo hint;
340         int                     listen_index = 0;
341         int                     added = 0;
342
343 #ifdef HAVE_UNIX_SOCKETS
344         char            unixSocketPath[MAXPGPATH];
345 #endif
346 #if !defined(WIN32) || defined(IPV6_V6ONLY)
347         int                     one = 1;
348 #endif
349
350         /* Initialize hint structure */
351         MemSet(&hint, 0, sizeof(hint));
352         hint.ai_family = family;
353         hint.ai_flags = AI_PASSIVE;
354         hint.ai_socktype = SOCK_STREAM;
355
356 #ifdef HAVE_UNIX_SOCKETS
357         if (family == AF_UNIX)
358         {
359                 /*
360                  * Create unixSocketPath from portNumber and unixSocketDir and lock
361                  * that file path
362                  */
363                 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
364                 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
365                 {
366                         ereport(LOG,
367                                         (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
368                                                         unixSocketPath,
369                                                         (int) (UNIXSOCK_PATH_BUFLEN - 1))));
370                         return STATUS_ERROR;
371                 }
372                 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
373                         return STATUS_ERROR;
374                 service = unixSocketPath;
375         }
376         else
377 #endif   /* HAVE_UNIX_SOCKETS */
378         {
379                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
380                 service = portNumberStr;
381         }
382
383         ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
384         if (ret || !addrs)
385         {
386                 if (hostName)
387                         ereport(LOG,
388                                         (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
389                                                         hostName, service, gai_strerror(ret))));
390                 else
391                         ereport(LOG,
392                                  (errmsg("could not translate service \"%s\" to address: %s",
393                                                  service, gai_strerror(ret))));
394                 if (addrs)
395                         pg_freeaddrinfo_all(hint.ai_family, addrs);
396                 return STATUS_ERROR;
397         }
398
399         for (addr = addrs; addr; addr = addr->ai_next)
400         {
401                 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
402                 {
403                         /*
404                          * Only set up a unix domain socket when they really asked for it.
405                          * The service/port is different in that case.
406                          */
407                         continue;
408                 }
409
410                 /* See if there is still room to add 1 more socket. */
411                 for (; listen_index < MaxListen; listen_index++)
412                 {
413                         if (ListenSocket[listen_index] == PGINVALID_SOCKET)
414                                 break;
415                 }
416                 if (listen_index >= MaxListen)
417                 {
418                         ereport(LOG,
419                                         (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
420                                                         MaxListen)));
421                         break;
422                 }
423
424                 /* set up family name for possible error messages */
425                 switch (addr->ai_family)
426                 {
427                         case AF_INET:
428                                 familyDesc = _("IPv4");
429                                 break;
430 #ifdef HAVE_IPV6
431                         case AF_INET6:
432                                 familyDesc = _("IPv6");
433                                 break;
434 #endif
435 #ifdef HAVE_UNIX_SOCKETS
436                         case AF_UNIX:
437                                 familyDesc = _("Unix");
438                                 break;
439 #endif
440                         default:
441                                 snprintf(familyDescBuf, sizeof(familyDescBuf),
442                                                  _("unrecognized address family %d"),
443                                                  addr->ai_family);
444                                 familyDesc = familyDescBuf;
445                                 break;
446                 }
447
448                 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
449                 {
450                         ereport(LOG,
451                                         (errcode_for_socket_access(),
452                         /* translator: %s is IPv4, IPv6, or Unix */
453                                          errmsg("could not create %s socket: %m",
454                                                         familyDesc)));
455                         continue;
456                 }
457
458 #ifndef WIN32
459
460                 /*
461                  * Without the SO_REUSEADDR flag, a new postmaster can't be started
462                  * right away after a stop or crash, giving "address already in use"
463                  * error on TCP ports.
464                  *
465                  * On win32, however, this behavior only happens if the
466                  * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
467                  * servers to listen on the same address, resulting in unpredictable
468                  * behavior. With no flags at all, win32 behaves as Unix with
469                  * SO_REUSEADDR.
470                  */
471                 if (!IS_AF_UNIX(addr->ai_family))
472                 {
473                         if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
474                                                         (char *) &one, sizeof(one))) == -1)
475                         {
476                                 ereport(LOG,
477                                                 (errcode_for_socket_access(),
478                                                  errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
479                                 closesocket(fd);
480                                 continue;
481                         }
482                 }
483 #endif
484
485 #ifdef IPV6_V6ONLY
486                 if (addr->ai_family == AF_INET6)
487                 {
488                         if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
489                                                    (char *) &one, sizeof(one)) == -1)
490                         {
491                                 ereport(LOG,
492                                                 (errcode_for_socket_access(),
493                                                  errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
494                                 closesocket(fd);
495                                 continue;
496                         }
497                 }
498 #endif
499
500                 /*
501                  * Note: This might fail on some OS's, like Linux older than
502                  * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
503                  * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
504                  * connections.
505                  */
506                 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
507                 if (err < 0)
508                 {
509                         ereport(LOG,
510                                         (errcode_for_socket_access(),
511                         /* translator: %s is IPv4, IPv6, or Unix */
512                                          errmsg("could not bind %s socket: %m",
513                                                         familyDesc),
514                                          (IS_AF_UNIX(addr->ai_family)) ?
515                                   errhint("Is another postmaster already running on port %d?"
516                                                   " If not, remove socket file \"%s\" and retry.",
517                                                   (int) portNumber, service) :
518                                   errhint("Is another postmaster already running on port %d?"
519                                                   " If not, wait a few seconds and retry.",
520                                                   (int) portNumber)));
521                         closesocket(fd);
522                         continue;
523                 }
524
525 #ifdef HAVE_UNIX_SOCKETS
526                 if (addr->ai_family == AF_UNIX)
527                 {
528                         if (Setup_AF_UNIX(service) != STATUS_OK)
529                         {
530                                 closesocket(fd);
531                                 break;
532                         }
533                 }
534 #endif
535
536                 /*
537                  * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
538                  * intended to provide a clamp on the request on platforms where an
539                  * overly large request provokes a kernel error (are there any?).
540                  */
541                 maxconn = MaxBackends * 2;
542                 if (maxconn > PG_SOMAXCONN)
543                         maxconn = PG_SOMAXCONN;
544
545                 err = listen(fd, maxconn);
546                 if (err < 0)
547                 {
548                         ereport(LOG,
549                                         (errcode_for_socket_access(),
550                         /* translator: %s is IPv4, IPv6, or Unix */
551                                          errmsg("could not listen on %s socket: %m",
552                                                         familyDesc)));
553                         closesocket(fd);
554                         continue;
555                 }
556                 ListenSocket[listen_index] = fd;
557                 added++;
558         }
559
560         pg_freeaddrinfo_all(hint.ai_family, addrs);
561
562         if (!added)
563                 return STATUS_ERROR;
564
565         return STATUS_OK;
566 }
567
568
569 #ifdef HAVE_UNIX_SOCKETS
570
571 /*
572  * Lock_AF_UNIX -- configure unix socket file path
573  */
574 static int
575 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
576 {
577         /*
578          * Grab an interlock file associated with the socket file.
579          *
580          * Note: there are two reasons for using a socket lock file, rather than
581          * trying to interlock directly on the socket itself.  First, it's a lot
582          * more portable, and second, it lets us remove any pre-existing socket
583          * file without race conditions.
584          */
585         CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
586
587         /*
588          * Once we have the interlock, we can safely delete any pre-existing
589          * socket file to avoid failure at bind() time.
590          */
591         unlink(unixSocketPath);
592
593         /*
594          * Arrange to unlink the socket file(s) at proc_exit.  If this is the
595          * first one, set up the on_proc_exit function to do it; then add this
596          * socket file to the list of files to unlink.
597          */
598         if (sock_paths == NIL)
599                 on_proc_exit(StreamDoUnlink, 0);
600
601         sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
602
603         return STATUS_OK;
604 }
605
606
607 /*
608  * Setup_AF_UNIX -- configure unix socket permissions
609  */
610 static int
611 Setup_AF_UNIX(char *sock_path)
612 {
613         /*
614          * Fix socket ownership/permission if requested.  Note we must do this
615          * before we listen() to avoid a window where unwanted connections could
616          * get accepted.
617          */
618         Assert(Unix_socket_group);
619         if (Unix_socket_group[0] != '\0')
620         {
621 #ifdef WIN32
622                 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
623 #else
624                 char       *endptr;
625                 unsigned long val;
626                 gid_t           gid;
627
628                 val = strtoul(Unix_socket_group, &endptr, 10);
629                 if (*endptr == '\0')
630                 {                                               /* numeric group id */
631                         gid = val;
632                 }
633                 else
634                 {                                               /* convert group name to id */
635                         struct group *gr;
636
637                         gr = getgrnam(Unix_socket_group);
638                         if (!gr)
639                         {
640                                 ereport(LOG,
641                                                 (errmsg("group \"%s\" does not exist",
642                                                                 Unix_socket_group)));
643                                 return STATUS_ERROR;
644                         }
645                         gid = gr->gr_gid;
646                 }
647                 if (chown(sock_path, -1, gid) == -1)
648                 {
649                         ereport(LOG,
650                                         (errcode_for_file_access(),
651                                          errmsg("could not set group of file \"%s\": %m",
652                                                         sock_path)));
653                         return STATUS_ERROR;
654                 }
655 #endif
656         }
657
658         if (chmod(sock_path, Unix_socket_permissions) == -1)
659         {
660                 ereport(LOG,
661                                 (errcode_for_file_access(),
662                                  errmsg("could not set permissions of file \"%s\": %m",
663                                                 sock_path)));
664                 return STATUS_ERROR;
665         }
666         return STATUS_OK;
667 }
668 #endif   /* HAVE_UNIX_SOCKETS */
669
670
671 /*
672  * StreamConnection -- create a new connection with client using
673  *              server port.  Set port->sock to the FD of the new connection.
674  *
675  * ASSUME: that this doesn't need to be non-blocking because
676  *              the Postmaster uses select() to tell when the server master
677  *              socket is ready for accept().
678  *
679  * RETURNS: STATUS_OK or STATUS_ERROR
680  */
681 int
682 StreamConnection(pgsocket server_fd, Port *port)
683 {
684         /* accept connection and fill in the client (remote) address */
685         port->raddr.salen = sizeof(port->raddr.addr);
686         if ((port->sock = accept(server_fd,
687                                                          (struct sockaddr *) & port->raddr.addr,
688                                                          &port->raddr.salen)) == PGINVALID_SOCKET)
689         {
690                 ereport(LOG,
691                                 (errcode_for_socket_access(),
692                                  errmsg("could not accept new connection: %m")));
693
694                 /*
695                  * If accept() fails then postmaster.c will still see the server
696                  * socket as read-ready, and will immediately try again.  To avoid
697                  * uselessly sucking lots of CPU, delay a bit before trying again.
698                  * (The most likely reason for failure is being out of kernel file
699                  * table slots; we can do little except hope some will get freed up.)
700                  */
701                 pg_usleep(100000L);             /* wait 0.1 sec */
702                 return STATUS_ERROR;
703         }
704
705 #ifdef SCO_ACCEPT_BUG
706
707         /*
708          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
709          * shouldn't hurt to catch it for all versions of those platforms.
710          */
711         if (port->raddr.addr.ss_family == 0)
712                 port->raddr.addr.ss_family = AF_UNIX;
713 #endif
714
715         /* fill in the server (local) address */
716         port->laddr.salen = sizeof(port->laddr.addr);
717         if (getsockname(port->sock,
718                                         (struct sockaddr *) & port->laddr.addr,
719                                         &port->laddr.salen) < 0)
720         {
721                 elog(LOG, "getsockname() failed: %m");
722                 return STATUS_ERROR;
723         }
724
725         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
726         if (!IS_AF_UNIX(port->laddr.addr.ss_family))
727         {
728                 int                     on;
729 #ifdef WIN32
730                 int                     oldopt;
731                 int                     optlen;
732                 int                     newopt;
733 #endif
734
735 #ifdef  TCP_NODELAY
736                 on = 1;
737                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
738                                            (char *) &on, sizeof(on)) < 0)
739                 {
740                         elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
741                         return STATUS_ERROR;
742                 }
743 #endif
744                 on = 1;
745                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
746                                            (char *) &on, sizeof(on)) < 0)
747                 {
748                         elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
749                         return STATUS_ERROR;
750                 }
751
752 #ifdef WIN32
753
754                 /*
755                  * This is a Win32 socket optimization.  The OS send buffer should be
756                  * large enough to send the whole Postgres send buffer in one go, or
757                  * performance suffers.  The Postgres send buffer can be enlarged if a
758                  * very large message needs to be sent, but we won't attempt to
759                  * enlarge the OS buffer if that happens, so somewhat arbitrarily
760                  * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
761                  * (That's 32kB with the current default).
762                  *
763                  * The default OS buffer size used to be 8kB in earlier Windows
764                  * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
765                  * be necessary to change it in later versions anymore.  Changing it
766                  * unnecessarily can even reduce performance, because setting
767                  * SO_SNDBUF in the application disables the "dynamic send buffering"
768                  * feature that was introduced in Windows 7.  So before fiddling with
769                  * SO_SNDBUF, check if the current buffer size is already large enough
770                  * and only increase it if necessary.
771                  *
772                  * See https://support.microsoft.com/kb/823764/EN-US/ and
773                  * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
774                  */
775                 optlen = sizeof(oldopt);
776                 if (getsockopt(server_fd, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
777                                            &optlen) < 0)
778                 {
779                         elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
780                         return STATUS_ERROR;
781                 }
782                 newopt = PQ_SEND_BUFFER_SIZE * 4;
783                 if (oldopt < newopt)
784                 {
785                         if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
786                                                    sizeof(newopt)) < 0)
787                         {
788                                 elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
789                                 return STATUS_ERROR;
790                         }
791                 }
792 #endif
793
794                 /*
795                  * Also apply the current keepalive parameters.  If we fail to set a
796                  * parameter, don't error out, because these aren't universally
797                  * supported.  (Note: you might think we need to reset the GUC
798                  * variables to 0 in such a case, but it's not necessary because the
799                  * show hooks for these variables report the truth anyway.)
800                  */
801                 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
802                 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
803                 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
804         }
805
806         return STATUS_OK;
807 }
808
809 /*
810  * StreamClose -- close a client/backend connection
811  *
812  * NOTE: this is NOT used to terminate a session; it is just used to release
813  * the file descriptor in a process that should no longer have the socket
814  * open.  (For example, the postmaster calls this after passing ownership
815  * of the connection to a child process.)  It is expected that someone else
816  * still has the socket open.  So, we only want to close the descriptor,
817  * we do NOT want to send anything to the far end.
818  */
819 void
820 StreamClose(pgsocket sock)
821 {
822         closesocket(sock);
823 }
824
825 /*
826  * TouchSocketFiles -- mark socket files as recently accessed
827  *
828  * This routine should be called every so often to ensure that the socket
829  * files have a recent mod date (ordinary operations on sockets usually won't
830  * change the mod date).  That saves them from being removed by
831  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
832  * never have put the socket file in /tmp...)
833  */
834 void
835 TouchSocketFiles(void)
836 {
837         ListCell   *l;
838
839         /* Loop through all created sockets... */
840         foreach(l, sock_paths)
841         {
842                 char       *sock_path = (char *) lfirst(l);
843
844                 /*
845                  * utime() is POSIX standard, utimes() is a common alternative. If we
846                  * have neither, there's no way to affect the mod or access time of
847                  * the socket :-(
848                  *
849                  * In either path, we ignore errors; there's no point in complaining.
850                  */
851 #ifdef HAVE_UTIME
852                 utime(sock_path, NULL);
853 #else                                                   /* !HAVE_UTIME */
854 #ifdef HAVE_UTIMES
855                 utimes(sock_path, NULL);
856 #endif   /* HAVE_UTIMES */
857 #endif   /* HAVE_UTIME */
858         }
859 }
860
861
862 /* --------------------------------
863  * Low-level I/O routines begin here.
864  *
865  * These routines communicate with a frontend client across a connection
866  * already established by the preceding routines.
867  * --------------------------------
868  */
869
870 /* --------------------------------
871  *                        socket_set_nonblocking - set socket blocking/non-blocking
872  *
873  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
874  * blocking otherwise.
875  * --------------------------------
876  */
877 static void
878 socket_set_nonblocking(bool nonblocking)
879 {
880         if (MyProcPort == NULL)
881                 ereport(ERROR,
882                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
883                                  errmsg("there is no client connection")));
884
885         MyProcPort->noblock = nonblocking;
886 }
887
888 /* --------------------------------
889  *              pq_recvbuf - load some bytes into the input buffer
890  *
891  *              returns 0 if OK, EOF if trouble
892  * --------------------------------
893  */
894 static int
895 pq_recvbuf(void)
896 {
897         if (PqRecvPointer > 0)
898         {
899                 if (PqRecvLength > PqRecvPointer)
900                 {
901                         /* still some unread data, left-justify it in the buffer */
902                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
903                                         PqRecvLength - PqRecvPointer);
904                         PqRecvLength -= PqRecvPointer;
905                         PqRecvPointer = 0;
906                 }
907                 else
908                         PqRecvLength = PqRecvPointer = 0;
909         }
910
911         /* Ensure that we're in blocking mode */
912         socket_set_nonblocking(false);
913
914         /* Can fill buffer from PqRecvLength and upwards */
915         for (;;)
916         {
917                 int                     r;
918
919                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
920                                                 PQ_RECV_BUFFER_SIZE - PqRecvLength);
921
922                 if (r < 0)
923                 {
924                         if (errno == EINTR)
925                                 continue;               /* Ok if interrupted */
926
927                         /*
928                          * Careful: an ereport() that tries to write to the client would
929                          * cause recursion to here, leading to stack overflow and core
930                          * dump!  This message must go *only* to the postmaster log.
931                          */
932                         ereport(COMMERROR,
933                                         (errcode_for_socket_access(),
934                                          errmsg("could not receive data from client: %m")));
935                         return EOF;
936                 }
937                 if (r == 0)
938                 {
939                         /*
940                          * EOF detected.  We used to write a log message here, but it's
941                          * better to expect the ultimate caller to do that.
942                          */
943                         return EOF;
944                 }
945                 /* r contains number of bytes read, so just incr length */
946                 PqRecvLength += r;
947                 return 0;
948         }
949 }
950
951 /* --------------------------------
952  *              pq_getbyte      - get a single byte from connection, or return EOF
953  * --------------------------------
954  */
955 int
956 pq_getbyte(void)
957 {
958         Assert(PqCommReadingMsg);
959
960         while (PqRecvPointer >= PqRecvLength)
961         {
962                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
963                         return EOF;                     /* Failed to recv data */
964         }
965         return (unsigned char) PqRecvBuffer[PqRecvPointer++];
966 }
967
968 /* --------------------------------
969  *              pq_peekbyte             - peek at next byte from connection
970  *
971  *       Same as pq_getbyte() except we don't advance the pointer.
972  * --------------------------------
973  */
974 int
975 pq_peekbyte(void)
976 {
977         Assert(PqCommReadingMsg);
978
979         while (PqRecvPointer >= PqRecvLength)
980         {
981                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
982                         return EOF;                     /* Failed to recv data */
983         }
984         return (unsigned char) PqRecvBuffer[PqRecvPointer];
985 }
986
987 /* --------------------------------
988  *              pq_getbyte_if_available - get a single byte from connection,
989  *                      if available
990  *
991  * The received byte is stored in *c. Returns 1 if a byte was read,
992  * 0 if no data was available, or EOF if trouble.
993  * --------------------------------
994  */
995 int
996 pq_getbyte_if_available(unsigned char *c)
997 {
998         int                     r;
999
1000         Assert(PqCommReadingMsg);
1001
1002         if (PqRecvPointer < PqRecvLength)
1003         {
1004                 *c = PqRecvBuffer[PqRecvPointer++];
1005                 return 1;
1006         }
1007
1008         /* Put the socket into non-blocking mode */
1009         socket_set_nonblocking(true);
1010
1011         r = secure_read(MyProcPort, c, 1);
1012         if (r < 0)
1013         {
1014                 /*
1015                  * Ok if no data available without blocking or interrupted (though
1016                  * EINTR really shouldn't happen with a non-blocking socket). Report
1017                  * other errors.
1018                  */
1019                 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1020                         r = 0;
1021                 else
1022                 {
1023                         /*
1024                          * Careful: an ereport() that tries to write to the client would
1025                          * cause recursion to here, leading to stack overflow and core
1026                          * dump!  This message must go *only* to the postmaster log.
1027                          */
1028                         ereport(COMMERROR,
1029                                         (errcode_for_socket_access(),
1030                                          errmsg("could not receive data from client: %m")));
1031                         r = EOF;
1032                 }
1033         }
1034         else if (r == 0)
1035         {
1036                 /* EOF detected */
1037                 r = EOF;
1038         }
1039
1040         return r;
1041 }
1042
1043 /* --------------------------------
1044  *              pq_getbytes             - get a known number of bytes from connection
1045  *
1046  *              returns 0 if OK, EOF if trouble
1047  * --------------------------------
1048  */
1049 int
1050 pq_getbytes(char *s, size_t len)
1051 {
1052         size_t          amount;
1053
1054         Assert(PqCommReadingMsg);
1055
1056         while (len > 0)
1057         {
1058                 while (PqRecvPointer >= PqRecvLength)
1059                 {
1060                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1061                                 return EOF;             /* Failed to recv data */
1062                 }
1063                 amount = PqRecvLength - PqRecvPointer;
1064                 if (amount > len)
1065                         amount = len;
1066                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1067                 PqRecvPointer += amount;
1068                 s += amount;
1069                 len -= amount;
1070         }
1071         return 0;
1072 }
1073
1074 /* --------------------------------
1075  *              pq_discardbytes         - throw away a known number of bytes
1076  *
1077  *              same as pq_getbytes except we do not copy the data to anyplace.
1078  *              this is used for resynchronizing after read errors.
1079  *
1080  *              returns 0 if OK, EOF if trouble
1081  * --------------------------------
1082  */
1083 static int
1084 pq_discardbytes(size_t len)
1085 {
1086         size_t          amount;
1087
1088         Assert(PqCommReadingMsg);
1089
1090         while (len > 0)
1091         {
1092                 while (PqRecvPointer >= PqRecvLength)
1093                 {
1094                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1095                                 return EOF;             /* Failed to recv data */
1096                 }
1097                 amount = PqRecvLength - PqRecvPointer;
1098                 if (amount > len)
1099                         amount = len;
1100                 PqRecvPointer += amount;
1101                 len -= amount;
1102         }
1103         return 0;
1104 }
1105
1106 /* --------------------------------
1107  *              pq_getstring    - get a null terminated string from connection
1108  *
1109  *              The return value is placed in an expansible StringInfo, which has
1110  *              already been initialized by the caller.
1111  *
1112  *              This is used only for dealing with old-protocol clients.  The idea
1113  *              is to produce a StringInfo that looks the same as we would get from
1114  *              pq_getmessage() with a newer client; we will then process it with
1115  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
1116  *              even though this is presumably useful only for text.
1117  *
1118  *              returns 0 if OK, EOF if trouble
1119  * --------------------------------
1120  */
1121 int
1122 pq_getstring(StringInfo s)
1123 {
1124         int                     i;
1125
1126         Assert(PqCommReadingMsg);
1127
1128         resetStringInfo(s);
1129
1130         /* Read until we get the terminating '\0' */
1131         for (;;)
1132         {
1133                 while (PqRecvPointer >= PqRecvLength)
1134                 {
1135                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1136                                 return EOF;             /* Failed to recv data */
1137                 }
1138
1139                 for (i = PqRecvPointer; i < PqRecvLength; i++)
1140                 {
1141                         if (PqRecvBuffer[i] == '\0')
1142                         {
1143                                 /* include the '\0' in the copy */
1144                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1145                                                                            i - PqRecvPointer + 1);
1146                                 PqRecvPointer = i + 1;  /* advance past \0 */
1147                                 return 0;
1148                         }
1149                 }
1150
1151                 /* If we're here we haven't got the \0 in the buffer yet. */
1152                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1153                                                            PqRecvLength - PqRecvPointer);
1154                 PqRecvPointer = PqRecvLength;
1155         }
1156 }
1157
1158
1159 /* --------------------------------
1160  *              pq_startmsgread - begin reading a message from the client.
1161  *
1162  *              This must be called before any of the pq_get* functions.
1163  * --------------------------------
1164  */
1165 void
1166 pq_startmsgread(void)
1167 {
1168         /*
1169          * There shouldn't be a read active already, but let's check just to be
1170          * sure.
1171          */
1172         if (PqCommReadingMsg)
1173                 ereport(FATAL,
1174                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1175                    errmsg("terminating connection because protocol sync was lost")));
1176
1177         PqCommReadingMsg = true;
1178 }
1179
1180
1181 /* --------------------------------
1182  *              pq_endmsgread   - finish reading message.
1183  *
1184  *              This must be called after reading a V2 protocol message with
1185  *              pq_getstring() and friends, to indicate that we have read the whole
1186  *              message. In V3 protocol, pq_getmessage() does this implicitly.
1187  * --------------------------------
1188  */
1189 void
1190 pq_endmsgread(void)
1191 {
1192         Assert(PqCommReadingMsg);
1193
1194         PqCommReadingMsg = false;
1195 }
1196
1197 /* --------------------------------
1198  *              pq_is_reading_msg - are we currently reading a message?
1199  *
1200  * This is used in error recovery at the outer idle loop to detect if we have
1201  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1202  * will check for that too, but it's nicer to detect it earlier.
1203  * --------------------------------
1204  */
1205 bool
1206 pq_is_reading_msg(void)
1207 {
1208         return PqCommReadingMsg;
1209 }
1210
1211 /* --------------------------------
1212  *              pq_getmessage   - get a message with length word from connection
1213  *
1214  *              The return value is placed in an expansible StringInfo, which has
1215  *              already been initialized by the caller.
1216  *              Only the message body is placed in the StringInfo; the length word
1217  *              is removed.  Also, s->cursor is initialized to zero for convenience
1218  *              in scanning the message contents.
1219  *
1220  *              If maxlen is not zero, it is an upper limit on the length of the
1221  *              message we are willing to accept.  We abort the connection (by
1222  *              returning EOF) if client tries to send more than that.
1223  *
1224  *              returns 0 if OK, EOF if trouble
1225  * --------------------------------
1226  */
1227 int
1228 pq_getmessage(StringInfo s, int maxlen)
1229 {
1230         int32           len;
1231
1232         Assert(PqCommReadingMsg);
1233
1234         resetStringInfo(s);
1235
1236         /* Read message length word */
1237         if (pq_getbytes((char *) &len, 4) == EOF)
1238         {
1239                 ereport(COMMERROR,
1240                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1241                                  errmsg("unexpected EOF within message length word")));
1242                 return EOF;
1243         }
1244
1245         len = ntohl(len);
1246
1247         if (len < 4 ||
1248                 (maxlen > 0 && len > maxlen))
1249         {
1250                 ereport(COMMERROR,
1251                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1252                                  errmsg("invalid message length")));
1253                 return EOF;
1254         }
1255
1256         len -= 4;                                       /* discount length itself */
1257
1258         if (len > 0)
1259         {
1260                 /*
1261                  * Allocate space for message.  If we run out of room (ridiculously
1262                  * large message), we will elog(ERROR), but we want to discard the
1263                  * message body so as not to lose communication sync.
1264                  */
1265                 PG_TRY();
1266                 {
1267                         enlargeStringInfo(s, len);
1268                 }
1269                 PG_CATCH();
1270                 {
1271                         if (pq_discardbytes(len) == EOF)
1272                                 ereport(COMMERROR,
1273                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1274                                                  errmsg("incomplete message from client")));
1275
1276                         /* we discarded the rest of the message so we're back in sync. */
1277                         PqCommReadingMsg = false;
1278                         PG_RE_THROW();
1279                 }
1280                 PG_END_TRY();
1281
1282                 /* And grab the message */
1283                 if (pq_getbytes(s->data, len) == EOF)
1284                 {
1285                         ereport(COMMERROR,
1286                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
1287                                          errmsg("incomplete message from client")));
1288                         return EOF;
1289                 }
1290                 s->len = len;
1291                 /* Place a trailing null per StringInfo convention */
1292                 s->data[len] = '\0';
1293         }
1294
1295         /* finished reading the message. */
1296         PqCommReadingMsg = false;
1297
1298         return 0;
1299 }
1300
1301
1302 /* --------------------------------
1303  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
1304  *
1305  *              returns 0 if OK, EOF if trouble
1306  * --------------------------------
1307  */
1308 int
1309 pq_putbytes(const char *s, size_t len)
1310 {
1311         int                     res;
1312
1313         /* Should only be called by old-style COPY OUT */
1314         Assert(DoingCopyOut);
1315         /* No-op if reentrant call */
1316         if (PqCommBusy)
1317                 return 0;
1318         PqCommBusy = true;
1319         res = internal_putbytes(s, len);
1320         PqCommBusy = false;
1321         return res;
1322 }
1323
1324 static int
1325 internal_putbytes(const char *s, size_t len)
1326 {
1327         size_t          amount;
1328
1329         while (len > 0)
1330         {
1331                 /* If buffer is full, then flush it out */
1332                 if (PqSendPointer >= PqSendBufferSize)
1333                 {
1334                         socket_set_nonblocking(false);
1335                         if (internal_flush())
1336                                 return EOF;
1337                 }
1338                 amount = PqSendBufferSize - PqSendPointer;
1339                 if (amount > len)
1340                         amount = len;
1341                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
1342                 PqSendPointer += amount;
1343                 s += amount;
1344                 len -= amount;
1345         }
1346         return 0;
1347 }
1348
1349 /* --------------------------------
1350  *              socket_flush            - flush pending output
1351  *
1352  *              returns 0 if OK, EOF if trouble
1353  * --------------------------------
1354  */
1355 static int
1356 socket_flush(void)
1357 {
1358         int                     res;
1359
1360         /* No-op if reentrant call */
1361         if (PqCommBusy)
1362                 return 0;
1363         PqCommBusy = true;
1364         socket_set_nonblocking(false);
1365         res = internal_flush();
1366         PqCommBusy = false;
1367         return res;
1368 }
1369
1370 /* --------------------------------
1371  *              internal_flush - flush pending output
1372  *
1373  * Returns 0 if OK (meaning everything was sent, or operation would block
1374  * and the socket is in non-blocking mode), or EOF if trouble.
1375  * --------------------------------
1376  */
1377 static int
1378 internal_flush(void)
1379 {
1380         static int      last_reported_send_errno = 0;
1381
1382         char       *bufptr = PqSendBuffer + PqSendStart;
1383         char       *bufend = PqSendBuffer + PqSendPointer;
1384
1385         while (bufptr < bufend)
1386         {
1387                 int                     r;
1388
1389                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1390
1391                 if (r <= 0)
1392                 {
1393                         if (errno == EINTR)
1394                                 continue;               /* Ok if we were interrupted */
1395
1396                         /*
1397                          * Ok if no data writable without blocking, and the socket is in
1398                          * non-blocking mode.
1399                          */
1400                         if (errno == EAGAIN ||
1401                                 errno == EWOULDBLOCK)
1402                         {
1403                                 return 0;
1404                         }
1405
1406                         /*
1407                          * Careful: an ereport() that tries to write to the client would
1408                          * cause recursion to here, leading to stack overflow and core
1409                          * dump!  This message must go *only* to the postmaster log.
1410                          *
1411                          * If a client disconnects while we're in the midst of output, we
1412                          * might write quite a bit of data before we get to a safe query
1413                          * abort point.  So, suppress duplicate log messages.
1414                          */
1415                         if (errno != last_reported_send_errno)
1416                         {
1417                                 last_reported_send_errno = errno;
1418                                 ereport(COMMERROR,
1419                                                 (errcode_for_socket_access(),
1420                                                  errmsg("could not send data to client: %m")));
1421                         }
1422
1423                         /*
1424                          * We drop the buffered data anyway so that processing can
1425                          * continue, even though we'll probably quit soon. We also set a
1426                          * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1427                          * the connection.
1428                          */
1429                         PqSendStart = PqSendPointer = 0;
1430                         ClientConnectionLost = 1;
1431                         InterruptPending = 1;
1432                         return EOF;
1433                 }
1434
1435                 last_reported_send_errno = 0;   /* reset after any successful send */
1436                 bufptr += r;
1437                 PqSendStart += r;
1438         }
1439
1440         PqSendStart = PqSendPointer = 0;
1441         return 0;
1442 }
1443
1444 /* --------------------------------
1445  *              pq_flush_if_writable - flush pending output if writable without blocking
1446  *
1447  * Returns 0 if OK, or EOF if trouble.
1448  * --------------------------------
1449  */
1450 static int
1451 socket_flush_if_writable(void)
1452 {
1453         int                     res;
1454
1455         /* Quick exit if nothing to do */
1456         if (PqSendPointer == PqSendStart)
1457                 return 0;
1458
1459         /* No-op if reentrant call */
1460         if (PqCommBusy)
1461                 return 0;
1462
1463         /* Temporarily put the socket into non-blocking mode */
1464         socket_set_nonblocking(true);
1465
1466         PqCommBusy = true;
1467         res = internal_flush();
1468         PqCommBusy = false;
1469         return res;
1470 }
1471
1472 /* --------------------------------
1473  *      socket_is_send_pending  - is there any pending data in the output buffer?
1474  * --------------------------------
1475  */
1476 static bool
1477 socket_is_send_pending(void)
1478 {
1479         return (PqSendStart < PqSendPointer);
1480 }
1481
1482 /* --------------------------------
1483  * Message-level I/O routines begin here.
1484  *
1485  * These routines understand about the old-style COPY OUT protocol.
1486  * --------------------------------
1487  */
1488
1489
1490 /* --------------------------------
1491  *              socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1492  *
1493  *              If msgtype is not '\0', it is a message type code to place before
1494  *              the message body.  If msgtype is '\0', then the message has no type
1495  *              code (this is only valid in pre-3.0 protocols).
1496  *
1497  *              len is the length of the message body data at *s.  In protocol 3.0
1498  *              and later, a message length word (equal to len+4 because it counts
1499  *              itself too) is inserted by this routine.
1500  *
1501  *              All normal messages are suppressed while old-style COPY OUT is in
1502  *              progress.  (In practice only a few notice messages might get emitted
1503  *              then; dropping them is annoying, but at least they will still appear
1504  *              in the postmaster log.)
1505  *
1506  *              We also suppress messages generated while pqcomm.c is busy.  This
1507  *              avoids any possibility of messages being inserted within other
1508  *              messages.  The only known trouble case arises if SIGQUIT occurs
1509  *              during a pqcomm.c routine --- quickdie() will try to send a warning
1510  *              message, and the most reasonable approach seems to be to drop it.
1511  *
1512  *              returns 0 if OK, EOF if trouble
1513  * --------------------------------
1514  */
1515 static int
1516 socket_putmessage(char msgtype, const char *s, size_t len)
1517 {
1518         if (DoingCopyOut || PqCommBusy)
1519                 return 0;
1520         PqCommBusy = true;
1521         if (msgtype)
1522                 if (internal_putbytes(&msgtype, 1))
1523                         goto fail;
1524         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1525         {
1526                 uint32          n32;
1527
1528                 n32 = htonl((uint32) (len + 4));
1529                 if (internal_putbytes((char *) &n32, 4))
1530                         goto fail;
1531         }
1532         if (internal_putbytes(s, len))
1533                 goto fail;
1534         PqCommBusy = false;
1535         return 0;
1536
1537 fail:
1538         PqCommBusy = false;
1539         return EOF;
1540 }
1541
1542 /* --------------------------------
1543  *              pq_putmessage_noblock   - like pq_putmessage, but never blocks
1544  *
1545  *              If the output buffer is too small to hold the message, the buffer
1546  *              is enlarged.
1547  */
1548 static void
1549 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1550 {
1551         int res         PG_USED_FOR_ASSERTS_ONLY;
1552         int                     required;
1553
1554         /*
1555          * Ensure we have enough space in the output buffer for the message header
1556          * as well as the message itself.
1557          */
1558         required = PqSendPointer + 1 + 4 + len;
1559         if (required > PqSendBufferSize)
1560         {
1561                 PqSendBuffer = repalloc(PqSendBuffer, required);
1562                 PqSendBufferSize = required;
1563         }
1564         res = pq_putmessage(msgtype, s, len);
1565         Assert(res == 0);                       /* should not fail when the message fits in
1566                                                                  * buffer */
1567 }
1568
1569
1570 /* --------------------------------
1571  *              socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1572  *                      is beginning
1573  * --------------------------------
1574  */
1575 static void
1576 socket_startcopyout(void)
1577 {
1578         DoingCopyOut = true;
1579 }
1580
1581 /* --------------------------------
1582  *              socket_endcopyout       - end an old-style COPY OUT transfer
1583  *
1584  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1585  *              and must send a terminator line.  Since a partial data line might have
1586  *              been emitted, send a couple of newlines first (the first one could
1587  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
1588  *              not allow binary transfers, so a textual terminator is always correct.
1589  * --------------------------------
1590  */
1591 static void
1592 socket_endcopyout(bool errorAbort)
1593 {
1594         if (!DoingCopyOut)
1595                 return;
1596         if (errorAbort)
1597                 pq_putbytes("\n\n\\.\n", 5);
1598         /* in non-error case, copy.c will have emitted the terminator line */
1599         DoingCopyOut = false;
1600 }
1601
1602 /*
1603  * Support for TCP Keepalive parameters
1604  */
1605
1606 /*
1607  * On Windows, we need to set both idle and interval at the same time.
1608  * We also cannot reset them to the default (setting to zero will
1609  * actually set them to zero, not default), therefore we fallback to
1610  * the out-of-the-box default instead.
1611  */
1612 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1613 static int
1614 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1615 {
1616         struct tcp_keepalive ka;
1617         DWORD           retsize;
1618
1619         if (idle <= 0)
1620                 idle = 2 * 60 * 60;             /* default = 2 hours */
1621         if (interval <= 0)
1622                 interval = 1;                   /* default = 1 second */
1623
1624         ka.onoff = 1;
1625         ka.keepalivetime = idle * 1000;
1626         ka.keepaliveinterval = interval * 1000;
1627
1628         if (WSAIoctl(port->sock,
1629                                  SIO_KEEPALIVE_VALS,
1630                                  (LPVOID) &ka,
1631                                  sizeof(ka),
1632                                  NULL,
1633                                  0,
1634                                  &retsize,
1635                                  NULL,
1636                                  NULL)
1637                 != 0)
1638         {
1639                 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1640                          WSAGetLastError());
1641                 return STATUS_ERROR;
1642         }
1643         if (port->keepalives_idle != idle)
1644                 port->keepalives_idle = idle;
1645         if (port->keepalives_interval != interval)
1646                 port->keepalives_interval = interval;
1647         return STATUS_OK;
1648 }
1649 #endif
1650
1651 int
1652 pq_getkeepalivesidle(Port *port)
1653 {
1654 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
1655         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1656                 return 0;
1657
1658         if (port->keepalives_idle != 0)
1659                 return port->keepalives_idle;
1660
1661         if (port->default_keepalives_idle == 0)
1662         {
1663 #ifndef WIN32
1664                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1665
1666 #ifdef TCP_KEEPIDLE
1667                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1668                                            (char *) &port->default_keepalives_idle,
1669                                            &size) < 0)
1670                 {
1671                         elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
1672                         port->default_keepalives_idle = -1; /* don't know */
1673                 }
1674 #else
1675                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1676                                            (char *) &port->default_keepalives_idle,
1677                                            &size) < 0)
1678                 {
1679                         elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
1680                         port->default_keepalives_idle = -1; /* don't know */
1681                 }
1682 #endif   /* TCP_KEEPIDLE */
1683 #else                                                   /* WIN32 */
1684                 /* We can't get the defaults on Windows, so return "don't know" */
1685                 port->default_keepalives_idle = -1;
1686 #endif   /* WIN32 */
1687         }
1688
1689         return port->default_keepalives_idle;
1690 #else
1691         return 0;
1692 #endif
1693 }
1694
1695 int
1696 pq_setkeepalivesidle(int idle, Port *port)
1697 {
1698         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1699                 return STATUS_OK;
1700
1701 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
1702         if (idle == port->keepalives_idle)
1703                 return STATUS_OK;
1704
1705 #ifndef WIN32
1706         if (port->default_keepalives_idle <= 0)
1707         {
1708                 if (pq_getkeepalivesidle(port) < 0)
1709                 {
1710                         if (idle == 0)
1711                                 return STATUS_OK;               /* default is set but unknown */
1712                         else
1713                                 return STATUS_ERROR;
1714                 }
1715         }
1716
1717         if (idle == 0)
1718                 idle = port->default_keepalives_idle;
1719
1720 #ifdef TCP_KEEPIDLE
1721         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1722                                    (char *) &idle, sizeof(idle)) < 0)
1723         {
1724                 elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
1725                 return STATUS_ERROR;
1726         }
1727 #else
1728         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1729                                    (char *) &idle, sizeof(idle)) < 0)
1730         {
1731                 elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
1732                 return STATUS_ERROR;
1733         }
1734 #endif
1735
1736         port->keepalives_idle = idle;
1737 #else                                                   /* WIN32 */
1738         return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1739 #endif
1740 #else                                                   /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
1741         if (idle != 0)
1742         {
1743                 elog(LOG, "setting the keepalive idle time is not supported");
1744                 return STATUS_ERROR;
1745         }
1746 #endif
1747         return STATUS_OK;
1748 }
1749
1750 int
1751 pq_getkeepalivesinterval(Port *port)
1752 {
1753 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1754         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1755                 return 0;
1756
1757         if (port->keepalives_interval != 0)
1758                 return port->keepalives_interval;
1759
1760         if (port->default_keepalives_interval == 0)
1761         {
1762 #ifndef WIN32
1763                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1764
1765                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1766                                            (char *) &port->default_keepalives_interval,
1767                                            &size) < 0)
1768                 {
1769                         elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
1770                         port->default_keepalives_interval = -1;         /* don't know */
1771                 }
1772 #else
1773                 /* We can't get the defaults on Windows, so return "don't know" */
1774                 port->default_keepalives_interval = -1;
1775 #endif   /* WIN32 */
1776         }
1777
1778         return port->default_keepalives_interval;
1779 #else
1780         return 0;
1781 #endif
1782 }
1783
1784 int
1785 pq_setkeepalivesinterval(int interval, Port *port)
1786 {
1787         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1788                 return STATUS_OK;
1789
1790 #if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
1791         if (interval == port->keepalives_interval)
1792                 return STATUS_OK;
1793
1794 #ifndef WIN32
1795         if (port->default_keepalives_interval <= 0)
1796         {
1797                 if (pq_getkeepalivesinterval(port) < 0)
1798                 {
1799                         if (interval == 0)
1800                                 return STATUS_OK;               /* default is set but unknown */
1801                         else
1802                                 return STATUS_ERROR;
1803                 }
1804         }
1805
1806         if (interval == 0)
1807                 interval = port->default_keepalives_interval;
1808
1809         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1810                                    (char *) &interval, sizeof(interval)) < 0)
1811         {
1812                 elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
1813                 return STATUS_ERROR;
1814         }
1815
1816         port->keepalives_interval = interval;
1817 #else                                                   /* WIN32 */
1818         return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1819 #endif
1820 #else
1821         if (interval != 0)
1822         {
1823                 elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
1824                 return STATUS_ERROR;
1825         }
1826 #endif
1827
1828         return STATUS_OK;
1829 }
1830
1831 int
1832 pq_getkeepalivescount(Port *port)
1833 {
1834 #ifdef TCP_KEEPCNT
1835         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1836                 return 0;
1837
1838         if (port->keepalives_count != 0)
1839                 return port->keepalives_count;
1840
1841         if (port->default_keepalives_count == 0)
1842         {
1843                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1844
1845                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1846                                            (char *) &port->default_keepalives_count,
1847                                            &size) < 0)
1848                 {
1849                         elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
1850                         port->default_keepalives_count = -1;            /* don't know */
1851                 }
1852         }
1853
1854         return port->default_keepalives_count;
1855 #else
1856         return 0;
1857 #endif
1858 }
1859
1860 int
1861 pq_setkeepalivescount(int count, Port *port)
1862 {
1863         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1864                 return STATUS_OK;
1865
1866 #ifdef TCP_KEEPCNT
1867         if (count == port->keepalives_count)
1868                 return STATUS_OK;
1869
1870         if (port->default_keepalives_count <= 0)
1871         {
1872                 if (pq_getkeepalivescount(port) < 0)
1873                 {
1874                         if (count == 0)
1875                                 return STATUS_OK;               /* default is set but unknown */
1876                         else
1877                                 return STATUS_ERROR;
1878                 }
1879         }
1880
1881         if (count == 0)
1882                 count = port->default_keepalives_count;
1883
1884         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1885                                    (char *) &count, sizeof(count)) < 0)
1886         {
1887                 elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
1888                 return STATUS_ERROR;
1889         }
1890
1891         port->keepalives_count = count;
1892 #else
1893         if (count != 0)
1894         {
1895                 elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
1896                 return STATUS_ERROR;
1897         }
1898 #endif
1899
1900         return STATUS_OK;
1901 }