]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
d1cc38beb2b25d6e38417a30e0651db7673c2c34
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFiles        - Protect socket files against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_comm_reset   - reset libpq during error recovery
48  *              pq_close                - shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *              pq_getbytes             - get a known number of bytes from connection
52  *              pq_getstring    - get a null terminated string from connection
53  *              pq_getmessage   - get a message with length word from connection
54  *              pq_getbyte              - get next byte from connection
55  *              pq_peekbyte             - peek at next byte from connection
56  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
57  *              pq_flush                - flush pending output
58  *              pq_flush_if_writable - flush pending output if writable without blocking
59  *              pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
63  *              pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *              pq_endcopyout   - end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef _MSC_VER                                 /* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91
92 #include "common/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105 /* Where the Unix socket files are (list of palloc'd strings) */
106 static List *sock_paths = NIL;
107
108 /*
109  * Buffers for low-level I/O.
110  *
111  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
112  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
113  */
114
115 #define PQ_SEND_BUFFER_SIZE 8192
116 #define PQ_RECV_BUFFER_SIZE 8192
117
118 static char *PqSendBuffer;
119 static int      PqSendBufferSize;       /* Size send buffer */
120 static int      PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
121 static int      PqSendStart;            /* Next index to send a byte in PqSendBuffer */
122
123 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
124 static int      PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
125 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
126
127 /*
128  * Message status
129  */
130 static bool PqCommBusy;                 /* busy sending data to the client */
131 static bool PqCommReadingMsg;   /* in the middle of reading a message */
132 static bool DoingCopyOut;               /* in old-protocol COPY OUT processing */
133
134
135 /* Internal functions */
136 static void socket_comm_reset(void);
137 static void socket_close(int code, Datum arg);
138 static void socket_set_nonblocking(bool nonblocking);
139 static int      socket_flush(void);
140 static int      socket_flush_if_writable(void);
141 static bool socket_is_send_pending(void);
142 static int      socket_putmessage(char msgtype, const char *s, size_t len);
143 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
144 static void socket_startcopyout(void);
145 static void socket_endcopyout(bool errorAbort);
146 static int      internal_putbytes(const char *s, size_t len);
147 static int      internal_flush(void);
148
149 #ifdef HAVE_UNIX_SOCKETS
150 static int      Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
151 static int      Setup_AF_UNIX(char *sock_path);
152 #endif   /* HAVE_UNIX_SOCKETS */
153
154 static PQcommMethods PqCommSocketMethods = {
155         socket_comm_reset,
156         socket_flush,
157         socket_flush_if_writable,
158         socket_is_send_pending,
159         socket_putmessage,
160         socket_putmessage_noblock,
161         socket_startcopyout,
162         socket_endcopyout
163 };
164
165 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
166
167 WaitEventSet *FeBeWaitSet;
168
169
170 /* --------------------------------
171  *              pq_init - initialize libpq at backend startup
172  * --------------------------------
173  */
174 void
175 pq_init(void)
176 {
177         /* initialize state variables */
178         PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
179         PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
180         PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
181         PqCommBusy = false;
182         PqCommReadingMsg = false;
183         DoingCopyOut = false;
184
185         /* set up process-exit hook to close the socket */
186         on_proc_exit(socket_close, 0);
187
188         /*
189          * In backends (as soon as forked) we operate the underlying socket in
190          * nonblocking mode and use latches to implement blocking semantics if
191          * needed. That allows us to provide safely interruptible reads and
192          * writes.
193          *
194          * Use COMMERROR on failure, because ERROR would try to send the error to
195          * the client, which might require changing the mode again, leading to
196          * infinite recursion.
197          */
198 #ifndef WIN32
199         if (!pg_set_noblock(MyProcPort->sock))
200                 ereport(COMMERROR,
201                                 (errmsg("could not set socket to nonblocking mode: %m")));
202 #endif
203
204         FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
205         AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
206                                           NULL, NULL);
207         AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
208         AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
209 }
210
211 /* --------------------------------
212  *              socket_comm_reset - reset libpq during error recovery
213  *
214  * This is called from error recovery at the outer idle loop.  It's
215  * just to get us out of trouble if we somehow manage to elog() from
216  * inside a pqcomm.c routine (which ideally will never happen, but...)
217  * --------------------------------
218  */
219 static void
220 socket_comm_reset(void)
221 {
222         /* Do not throw away pending data, but do reset the busy flag */
223         PqCommBusy = false;
224         /* We can abort any old-style COPY OUT, too */
225         pq_endcopyout(true);
226 }
227
228 /* --------------------------------
229  *              socket_close - shutdown libpq at backend exit
230  *
231  * This is the one pg_on_exit_callback in place during BackendInitialize().
232  * That function's unusual signal handling constrains that this callback be
233  * safe to run at any instant.
234  * --------------------------------
235  */
236 static void
237 socket_close(int code, Datum arg)
238 {
239         /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
240         if (MyProcPort != NULL)
241         {
242 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
243 #ifdef ENABLE_GSS
244                 OM_uint32       min_s;
245
246                 /*
247                  * Shutdown GSSAPI layer.  This section does nothing when interrupting
248                  * BackendInitialize(), because pg_GSS_recvauth() makes first use of
249                  * "ctx" and "cred".
250                  */
251                 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
252                         gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
253
254                 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
255                         gss_release_cred(&min_s, &MyProcPort->gss->cred);
256 #endif   /* ENABLE_GSS */
257
258                 /*
259                  * GSS and SSPI share the port->gss struct.  Since nowhere else does a
260                  * postmaster child free this, doing so is safe when interrupting
261                  * BackendInitialize().
262                  */
263                 free(MyProcPort->gss);
264 #endif   /* ENABLE_GSS || ENABLE_SSPI */
265
266                 /*
267                  * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
268                  * call this, so this is safe when interrupting BackendInitialize().
269                  */
270                 secure_close(MyProcPort);
271
272                 /*
273                  * Formerly we did an explicit close() here, but it seems better to
274                  * leave the socket open until the process dies.  This allows clients
275                  * to perform a "synchronous close" if they care --- wait till the
276                  * transport layer reports connection closure, and you can be sure the
277                  * backend has exited.
278                  *
279                  * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
280                  * though.
281                  */
282                 MyProcPort->sock = PGINVALID_SOCKET;
283         }
284 }
285
286
287
288 /*
289  * Streams -- wrapper around Unix socket system calls
290  *
291  *
292  *              Stream functions are used for vanilla TCP connection protocol.
293  */
294
295
296 /*
297  * StreamServerPort -- open a "listening" port to accept connections.
298  *
299  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
300  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
301  * specified.  For TCP ports, hostName is either NULL for all interfaces or
302  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
303  *
304  * Successfully opened sockets are added to the ListenSocket[] array (of
305  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
306  *
307  * RETURNS: STATUS_OK or STATUS_ERROR
308  */
309
310 int
311 StreamServerPort(int family, char *hostName, unsigned short portNumber,
312                                  char *unixSocketDir,
313                                  pgsocket ListenSocket[], int MaxListen)
314 {
315         pgsocket        fd;
316         int                     err;
317         int                     maxconn;
318         int                     ret;
319         char            portNumberStr[32];
320         const char *familyDesc;
321         char            familyDescBuf[64];
322         const char *addrDesc;
323         char            addrBuf[NI_MAXHOST];
324         char       *service;
325         struct addrinfo *addrs = NULL,
326                            *addr;
327         struct addrinfo hint;
328         int                     listen_index = 0;
329         int                     added = 0;
330
331 #ifdef HAVE_UNIX_SOCKETS
332         char            unixSocketPath[MAXPGPATH];
333 #endif
334 #if !defined(WIN32) || defined(IPV6_V6ONLY)
335         int                     one = 1;
336 #endif
337
338         /* Initialize hint structure */
339         MemSet(&hint, 0, sizeof(hint));
340         hint.ai_family = family;
341         hint.ai_flags = AI_PASSIVE;
342         hint.ai_socktype = SOCK_STREAM;
343
344 #ifdef HAVE_UNIX_SOCKETS
345         if (family == AF_UNIX)
346         {
347                 /*
348                  * Create unixSocketPath from portNumber and unixSocketDir and lock
349                  * that file path
350                  */
351                 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
352                 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
353                 {
354                         ereport(LOG,
355                                         (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
356                                                         unixSocketPath,
357                                                         (int) (UNIXSOCK_PATH_BUFLEN - 1))));
358                         return STATUS_ERROR;
359                 }
360                 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
361                         return STATUS_ERROR;
362                 service = unixSocketPath;
363         }
364         else
365 #endif   /* HAVE_UNIX_SOCKETS */
366         {
367                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
368                 service = portNumberStr;
369         }
370
371         ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
372         if (ret || !addrs)
373         {
374                 if (hostName)
375                         ereport(LOG,
376                                         (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
377                                                         hostName, service, gai_strerror(ret))));
378                 else
379                         ereport(LOG,
380                                  (errmsg("could not translate service \"%s\" to address: %s",
381                                                  service, gai_strerror(ret))));
382                 if (addrs)
383                         pg_freeaddrinfo_all(hint.ai_family, addrs);
384                 return STATUS_ERROR;
385         }
386
387         for (addr = addrs; addr; addr = addr->ai_next)
388         {
389                 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
390                 {
391                         /*
392                          * Only set up a unix domain socket when they really asked for it.
393                          * The service/port is different in that case.
394                          */
395                         continue;
396                 }
397
398                 /* See if there is still room to add 1 more socket. */
399                 for (; listen_index < MaxListen; listen_index++)
400                 {
401                         if (ListenSocket[listen_index] == PGINVALID_SOCKET)
402                                 break;
403                 }
404                 if (listen_index >= MaxListen)
405                 {
406                         ereport(LOG,
407                                         (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
408                                                         MaxListen)));
409                         break;
410                 }
411
412                 /* set up address family name for log messages */
413                 switch (addr->ai_family)
414                 {
415                         case AF_INET:
416                                 familyDesc = _("IPv4");
417                                 break;
418 #ifdef HAVE_IPV6
419                         case AF_INET6:
420                                 familyDesc = _("IPv6");
421                                 break;
422 #endif
423 #ifdef HAVE_UNIX_SOCKETS
424                         case AF_UNIX:
425                                 familyDesc = _("Unix");
426                                 break;
427 #endif
428                         default:
429                                 snprintf(familyDescBuf, sizeof(familyDescBuf),
430                                                  _("unrecognized address family %d"),
431                                                  addr->ai_family);
432                                 familyDesc = familyDescBuf;
433                                 break;
434                 }
435
436                 /* set up text form of address for log messages */
437 #ifdef HAVE_UNIX_SOCKETS
438                 if (addr->ai_family == AF_UNIX)
439                         addrDesc = unixSocketPath;
440                 else
441 #endif
442                 {
443                         pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
444                                                            addr->ai_addrlen,
445                                                            addrBuf, sizeof(addrBuf),
446                                                            NULL, 0,
447                                                            NI_NUMERICHOST);
448                         addrDesc = addrBuf;
449                 }
450
451                 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
452                 {
453                         ereport(LOG,
454                                         (errcode_for_socket_access(),
455                         /* translator: first %s is IPv4, IPv6, or Unix */
456                                   errmsg("could not create %s socket for address \"%s\": %m",
457                                                  familyDesc, addrDesc)));
458                         continue;
459                 }
460
461 #ifndef WIN32
462
463                 /*
464                  * Without the SO_REUSEADDR flag, a new postmaster can't be started
465                  * right away after a stop or crash, giving "address already in use"
466                  * error on TCP ports.
467                  *
468                  * On win32, however, this behavior only happens if the
469                  * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
470                  * servers to listen on the same address, resulting in unpredictable
471                  * behavior. With no flags at all, win32 behaves as Unix with
472                  * SO_REUSEADDR.
473                  */
474                 if (!IS_AF_UNIX(addr->ai_family))
475                 {
476                         if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
477                                                         (char *) &one, sizeof(one))) == -1)
478                         {
479                                 ereport(LOG,
480                                                 (errcode_for_socket_access(),
481                                 /* translator: first %s is IPv4, IPv6, or Unix */
482                                                  errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
483                                                                 familyDesc, addrDesc)));
484                                 closesocket(fd);
485                                 continue;
486                         }
487                 }
488 #endif
489
490 #ifdef IPV6_V6ONLY
491                 if (addr->ai_family == AF_INET6)
492                 {
493                         if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
494                                                    (char *) &one, sizeof(one)) == -1)
495                         {
496                                 ereport(LOG,
497                                                 (errcode_for_socket_access(),
498                                 /* translator: first %s is IPv4, IPv6, or Unix */
499                                                  errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
500                                                                 familyDesc, addrDesc)));
501                                 closesocket(fd);
502                                 continue;
503                         }
504                 }
505 #endif
506
507                 /*
508                  * Note: This might fail on some OS's, like Linux older than
509                  * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
510                  * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
511                  * connections.
512                  */
513                 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
514                 if (err < 0)
515                 {
516                         ereport(LOG,
517                                         (errcode_for_socket_access(),
518                         /* translator: first %s is IPv4, IPv6, or Unix */
519                                          errmsg("could not bind %s address \"%s\": %m",
520                                                         familyDesc, addrDesc),
521                                          (IS_AF_UNIX(addr->ai_family)) ?
522                                   errhint("Is another postmaster already running on port %d?"
523                                                   " If not, remove socket file \"%s\" and retry.",
524                                                   (int) portNumber, service) :
525                                   errhint("Is another postmaster already running on port %d?"
526                                                   " If not, wait a few seconds and retry.",
527                                                   (int) portNumber)));
528                         closesocket(fd);
529                         continue;
530                 }
531
532 #ifdef HAVE_UNIX_SOCKETS
533                 if (addr->ai_family == AF_UNIX)
534                 {
535                         if (Setup_AF_UNIX(service) != STATUS_OK)
536                         {
537                                 closesocket(fd);
538                                 break;
539                         }
540                 }
541 #endif
542
543                 /*
544                  * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
545                  * intended to provide a clamp on the request on platforms where an
546                  * overly large request provokes a kernel error (are there any?).
547                  */
548                 maxconn = MaxBackends * 2;
549                 if (maxconn > PG_SOMAXCONN)
550                         maxconn = PG_SOMAXCONN;
551
552                 err = listen(fd, maxconn);
553                 if (err < 0)
554                 {
555                         ereport(LOG,
556                                         (errcode_for_socket_access(),
557                         /* translator: first %s is IPv4, IPv6, or Unix */
558                                          errmsg("could not listen on %s address \"%s\": %m",
559                                                         familyDesc, addrDesc)));
560                         closesocket(fd);
561                         continue;
562                 }
563
564 #ifdef HAVE_UNIX_SOCKETS
565                 if (addr->ai_family == AF_UNIX)
566                         ereport(LOG,
567                                         (errmsg("listening on Unix socket \"%s\"",
568                                                         addrDesc)));
569                 else
570 #endif
571                         ereport(LOG,
572                         /* translator: first %s is IPv4 or IPv6 */
573                                         (errmsg("listening on %s address \"%s\", port %d",
574                                                         familyDesc, addrDesc, (int) portNumber)));
575
576                 ListenSocket[listen_index] = fd;
577                 added++;
578         }
579
580         pg_freeaddrinfo_all(hint.ai_family, addrs);
581
582         if (!added)
583                 return STATUS_ERROR;
584
585         return STATUS_OK;
586 }
587
588
589 #ifdef HAVE_UNIX_SOCKETS
590
591 /*
592  * Lock_AF_UNIX -- configure unix socket file path
593  */
594 static int
595 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
596 {
597         /*
598          * Grab an interlock file associated with the socket file.
599          *
600          * Note: there are two reasons for using a socket lock file, rather than
601          * trying to interlock directly on the socket itself.  First, it's a lot
602          * more portable, and second, it lets us remove any pre-existing socket
603          * file without race conditions.
604          */
605         CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
606
607         /*
608          * Once we have the interlock, we can safely delete any pre-existing
609          * socket file to avoid failure at bind() time.
610          */
611         (void) unlink(unixSocketPath);
612
613         /*
614          * Remember socket file pathnames for later maintenance.
615          */
616         sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
617
618         return STATUS_OK;
619 }
620
621
622 /*
623  * Setup_AF_UNIX -- configure unix socket permissions
624  */
625 static int
626 Setup_AF_UNIX(char *sock_path)
627 {
628         /*
629          * Fix socket ownership/permission if requested.  Note we must do this
630          * before we listen() to avoid a window where unwanted connections could
631          * get accepted.
632          */
633         Assert(Unix_socket_group);
634         if (Unix_socket_group[0] != '\0')
635         {
636 #ifdef WIN32
637                 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
638 #else
639                 char       *endptr;
640                 unsigned long val;
641                 gid_t           gid;
642
643                 val = strtoul(Unix_socket_group, &endptr, 10);
644                 if (*endptr == '\0')
645                 {                                               /* numeric group id */
646                         gid = val;
647                 }
648                 else
649                 {                                               /* convert group name to id */
650                         struct group *gr;
651
652                         gr = getgrnam(Unix_socket_group);
653                         if (!gr)
654                         {
655                                 ereport(LOG,
656                                                 (errmsg("group \"%s\" does not exist",
657                                                                 Unix_socket_group)));
658                                 return STATUS_ERROR;
659                         }
660                         gid = gr->gr_gid;
661                 }
662                 if (chown(sock_path, -1, gid) == -1)
663                 {
664                         ereport(LOG,
665                                         (errcode_for_file_access(),
666                                          errmsg("could not set group of file \"%s\": %m",
667                                                         sock_path)));
668                         return STATUS_ERROR;
669                 }
670 #endif
671         }
672
673         if (chmod(sock_path, Unix_socket_permissions) == -1)
674         {
675                 ereport(LOG,
676                                 (errcode_for_file_access(),
677                                  errmsg("could not set permissions of file \"%s\": %m",
678                                                 sock_path)));
679                 return STATUS_ERROR;
680         }
681         return STATUS_OK;
682 }
683 #endif   /* HAVE_UNIX_SOCKETS */
684
685
686 /*
687  * StreamConnection -- create a new connection with client using
688  *              server port.  Set port->sock to the FD of the new connection.
689  *
690  * ASSUME: that this doesn't need to be non-blocking because
691  *              the Postmaster uses select() to tell when the server master
692  *              socket is ready for accept().
693  *
694  * RETURNS: STATUS_OK or STATUS_ERROR
695  */
696 int
697 StreamConnection(pgsocket server_fd, Port *port)
698 {
699         /* accept connection and fill in the client (remote) address */
700         port->raddr.salen = sizeof(port->raddr.addr);
701         if ((port->sock = accept(server_fd,
702                                                          (struct sockaddr *) & port->raddr.addr,
703                                                          &port->raddr.salen)) == PGINVALID_SOCKET)
704         {
705                 ereport(LOG,
706                                 (errcode_for_socket_access(),
707                                  errmsg("could not accept new connection: %m")));
708
709                 /*
710                  * If accept() fails then postmaster.c will still see the server
711                  * socket as read-ready, and will immediately try again.  To avoid
712                  * uselessly sucking lots of CPU, delay a bit before trying again.
713                  * (The most likely reason for failure is being out of kernel file
714                  * table slots; we can do little except hope some will get freed up.)
715                  */
716                 pg_usleep(100000L);             /* wait 0.1 sec */
717                 return STATUS_ERROR;
718         }
719
720         /* fill in the server (local) address */
721         port->laddr.salen = sizeof(port->laddr.addr);
722         if (getsockname(port->sock,
723                                         (struct sockaddr *) & port->laddr.addr,
724                                         &port->laddr.salen) < 0)
725         {
726                 elog(LOG, "getsockname() failed: %m");
727                 return STATUS_ERROR;
728         }
729
730         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
731         if (!IS_AF_UNIX(port->laddr.addr.ss_family))
732         {
733                 int                     on;
734 #ifdef WIN32
735                 int                     oldopt;
736                 int                     optlen;
737                 int                     newopt;
738 #endif
739
740 #ifdef  TCP_NODELAY
741                 on = 1;
742                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
743                                            (char *) &on, sizeof(on)) < 0)
744                 {
745                         elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
746                         return STATUS_ERROR;
747                 }
748 #endif
749                 on = 1;
750                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
751                                            (char *) &on, sizeof(on)) < 0)
752                 {
753                         elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
754                         return STATUS_ERROR;
755                 }
756
757 #ifdef WIN32
758
759                 /*
760                  * This is a Win32 socket optimization.  The OS send buffer should be
761                  * large enough to send the whole Postgres send buffer in one go, or
762                  * performance suffers.  The Postgres send buffer can be enlarged if a
763                  * very large message needs to be sent, but we won't attempt to
764                  * enlarge the OS buffer if that happens, so somewhat arbitrarily
765                  * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
766                  * (That's 32kB with the current default).
767                  *
768                  * The default OS buffer size used to be 8kB in earlier Windows
769                  * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
770                  * be necessary to change it in later versions anymore.  Changing it
771                  * unnecessarily can even reduce performance, because setting
772                  * SO_SNDBUF in the application disables the "dynamic send buffering"
773                  * feature that was introduced in Windows 7.  So before fiddling with
774                  * SO_SNDBUF, check if the current buffer size is already large enough
775                  * and only increase it if necessary.
776                  *
777                  * See https://support.microsoft.com/kb/823764/EN-US/ and
778                  * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
779                  */
780                 optlen = sizeof(oldopt);
781                 if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
782                                            &optlen) < 0)
783                 {
784                         elog(LOG, "getsockopt(SO_SNDBUF) failed: %m");
785                         return STATUS_ERROR;
786                 }
787                 newopt = PQ_SEND_BUFFER_SIZE * 4;
788                 if (oldopt < newopt)
789                 {
790                         if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
791                                                    sizeof(newopt)) < 0)
792                         {
793                                 elog(LOG, "setsockopt(SO_SNDBUF) failed: %m");
794                                 return STATUS_ERROR;
795                         }
796                 }
797 #endif
798
799                 /*
800                  * Also apply the current keepalive parameters.  If we fail to set a
801                  * parameter, don't error out, because these aren't universally
802                  * supported.  (Note: you might think we need to reset the GUC
803                  * variables to 0 in such a case, but it's not necessary because the
804                  * show hooks for these variables report the truth anyway.)
805                  */
806                 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
807                 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
808                 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
809         }
810
811         return STATUS_OK;
812 }
813
814 /*
815  * StreamClose -- close a client/backend connection
816  *
817  * NOTE: this is NOT used to terminate a session; it is just used to release
818  * the file descriptor in a process that should no longer have the socket
819  * open.  (For example, the postmaster calls this after passing ownership
820  * of the connection to a child process.)  It is expected that someone else
821  * still has the socket open.  So, we only want to close the descriptor,
822  * we do NOT want to send anything to the far end.
823  */
824 void
825 StreamClose(pgsocket sock)
826 {
827         closesocket(sock);
828 }
829
830 /*
831  * TouchSocketFiles -- mark socket files as recently accessed
832  *
833  * This routine should be called every so often to ensure that the socket
834  * files have a recent mod date (ordinary operations on sockets usually won't
835  * change the mod date).  That saves them from being removed by
836  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
837  * never have put the socket file in /tmp...)
838  */
839 void
840 TouchSocketFiles(void)
841 {
842         ListCell   *l;
843
844         /* Loop through all created sockets... */
845         foreach(l, sock_paths)
846         {
847                 char       *sock_path = (char *) lfirst(l);
848
849                 /*
850                  * utime() is POSIX standard, utimes() is a common alternative. If we
851                  * have neither, there's no way to affect the mod or access time of
852                  * the socket :-(
853                  *
854                  * In either path, we ignore errors; there's no point in complaining.
855                  */
856 #ifdef HAVE_UTIME
857                 utime(sock_path, NULL);
858 #else                                                   /* !HAVE_UTIME */
859 #ifdef HAVE_UTIMES
860                 utimes(sock_path, NULL);
861 #endif   /* HAVE_UTIMES */
862 #endif   /* HAVE_UTIME */
863         }
864 }
865
866 /*
867  * RemoveSocketFiles -- unlink socket files at postmaster shutdown
868  */
869 void
870 RemoveSocketFiles(void)
871 {
872         ListCell   *l;
873
874         /* Loop through all created sockets... */
875         foreach(l, sock_paths)
876         {
877                 char       *sock_path = (char *) lfirst(l);
878
879                 /* Ignore any error. */
880                 (void) unlink(sock_path);
881         }
882         /* Since we're about to exit, no need to reclaim storage */
883         sock_paths = NIL;
884 }
885
886
887 /* --------------------------------
888  * Low-level I/O routines begin here.
889  *
890  * These routines communicate with a frontend client across a connection
891  * already established by the preceding routines.
892  * --------------------------------
893  */
894
895 /* --------------------------------
896  *                        socket_set_nonblocking - set socket blocking/non-blocking
897  *
898  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
899  * blocking otherwise.
900  * --------------------------------
901  */
902 static void
903 socket_set_nonblocking(bool nonblocking)
904 {
905         if (MyProcPort == NULL)
906                 ereport(ERROR,
907                                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
908                                  errmsg("there is no client connection")));
909
910         MyProcPort->noblock = nonblocking;
911 }
912
913 /* --------------------------------
914  *              pq_recvbuf - load some bytes into the input buffer
915  *
916  *              returns 0 if OK, EOF if trouble
917  * --------------------------------
918  */
919 static int
920 pq_recvbuf(void)
921 {
922         if (PqRecvPointer > 0)
923         {
924                 if (PqRecvLength > PqRecvPointer)
925                 {
926                         /* still some unread data, left-justify it in the buffer */
927                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
928                                         PqRecvLength - PqRecvPointer);
929                         PqRecvLength -= PqRecvPointer;
930                         PqRecvPointer = 0;
931                 }
932                 else
933                         PqRecvLength = PqRecvPointer = 0;
934         }
935
936         /* Ensure that we're in blocking mode */
937         socket_set_nonblocking(false);
938
939         /* Can fill buffer from PqRecvLength and upwards */
940         for (;;)
941         {
942                 int                     r;
943
944                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
945                                                 PQ_RECV_BUFFER_SIZE - PqRecvLength);
946
947                 if (r < 0)
948                 {
949                         if (errno == EINTR)
950                                 continue;               /* Ok if interrupted */
951
952                         /*
953                          * Careful: an ereport() that tries to write to the client would
954                          * cause recursion to here, leading to stack overflow and core
955                          * dump!  This message must go *only* to the postmaster log.
956                          */
957                         ereport(COMMERROR,
958                                         (errcode_for_socket_access(),
959                                          errmsg("could not receive data from client: %m")));
960                         return EOF;
961                 }
962                 if (r == 0)
963                 {
964                         /*
965                          * EOF detected.  We used to write a log message here, but it's
966                          * better to expect the ultimate caller to do that.
967                          */
968                         return EOF;
969                 }
970                 /* r contains number of bytes read, so just incr length */
971                 PqRecvLength += r;
972                 return 0;
973         }
974 }
975
976 /* --------------------------------
977  *              pq_getbyte      - get a single byte from connection, or return EOF
978  * --------------------------------
979  */
980 int
981 pq_getbyte(void)
982 {
983         Assert(PqCommReadingMsg);
984
985         while (PqRecvPointer >= PqRecvLength)
986         {
987                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
988                         return EOF;                     /* Failed to recv data */
989         }
990         return (unsigned char) PqRecvBuffer[PqRecvPointer++];
991 }
992
993 /* --------------------------------
994  *              pq_peekbyte             - peek at next byte from connection
995  *
996  *       Same as pq_getbyte() except we don't advance the pointer.
997  * --------------------------------
998  */
999 int
1000 pq_peekbyte(void)
1001 {
1002         Assert(PqCommReadingMsg);
1003
1004         while (PqRecvPointer >= PqRecvLength)
1005         {
1006                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
1007                         return EOF;                     /* Failed to recv data */
1008         }
1009         return (unsigned char) PqRecvBuffer[PqRecvPointer];
1010 }
1011
1012 /* --------------------------------
1013  *              pq_getbyte_if_available - get a single byte from connection,
1014  *                      if available
1015  *
1016  * The received byte is stored in *c. Returns 1 if a byte was read,
1017  * 0 if no data was available, or EOF if trouble.
1018  * --------------------------------
1019  */
1020 int
1021 pq_getbyte_if_available(unsigned char *c)
1022 {
1023         int                     r;
1024
1025         Assert(PqCommReadingMsg);
1026
1027         if (PqRecvPointer < PqRecvLength)
1028         {
1029                 *c = PqRecvBuffer[PqRecvPointer++];
1030                 return 1;
1031         }
1032
1033         /* Put the socket into non-blocking mode */
1034         socket_set_nonblocking(true);
1035
1036         r = secure_read(MyProcPort, c, 1);
1037         if (r < 0)
1038         {
1039                 /*
1040                  * Ok if no data available without blocking or interrupted (though
1041                  * EINTR really shouldn't happen with a non-blocking socket). Report
1042                  * other errors.
1043                  */
1044                 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1045                         r = 0;
1046                 else
1047                 {
1048                         /*
1049                          * Careful: an ereport() that tries to write to the client would
1050                          * cause recursion to here, leading to stack overflow and core
1051                          * dump!  This message must go *only* to the postmaster log.
1052                          */
1053                         ereport(COMMERROR,
1054                                         (errcode_for_socket_access(),
1055                                          errmsg("could not receive data from client: %m")));
1056                         r = EOF;
1057                 }
1058         }
1059         else if (r == 0)
1060         {
1061                 /* EOF detected */
1062                 r = EOF;
1063         }
1064
1065         return r;
1066 }
1067
1068 /* --------------------------------
1069  *              pq_getbytes             - get a known number of bytes from connection
1070  *
1071  *              returns 0 if OK, EOF if trouble
1072  * --------------------------------
1073  */
1074 int
1075 pq_getbytes(char *s, size_t len)
1076 {
1077         size_t          amount;
1078
1079         Assert(PqCommReadingMsg);
1080
1081         while (len > 0)
1082         {
1083                 while (PqRecvPointer >= PqRecvLength)
1084                 {
1085                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1086                                 return EOF;             /* Failed to recv data */
1087                 }
1088                 amount = PqRecvLength - PqRecvPointer;
1089                 if (amount > len)
1090                         amount = len;
1091                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1092                 PqRecvPointer += amount;
1093                 s += amount;
1094                 len -= amount;
1095         }
1096         return 0;
1097 }
1098
1099 /* --------------------------------
1100  *              pq_discardbytes         - throw away a known number of bytes
1101  *
1102  *              same as pq_getbytes except we do not copy the data to anyplace.
1103  *              this is used for resynchronizing after read errors.
1104  *
1105  *              returns 0 if OK, EOF if trouble
1106  * --------------------------------
1107  */
1108 static int
1109 pq_discardbytes(size_t len)
1110 {
1111         size_t          amount;
1112
1113         Assert(PqCommReadingMsg);
1114
1115         while (len > 0)
1116         {
1117                 while (PqRecvPointer >= PqRecvLength)
1118                 {
1119                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1120                                 return EOF;             /* Failed to recv data */
1121                 }
1122                 amount = PqRecvLength - PqRecvPointer;
1123                 if (amount > len)
1124                         amount = len;
1125                 PqRecvPointer += amount;
1126                 len -= amount;
1127         }
1128         return 0;
1129 }
1130
1131 /* --------------------------------
1132  *              pq_getstring    - get a null terminated string from connection
1133  *
1134  *              The return value is placed in an expansible StringInfo, which has
1135  *              already been initialized by the caller.
1136  *
1137  *              This is used only for dealing with old-protocol clients.  The idea
1138  *              is to produce a StringInfo that looks the same as we would get from
1139  *              pq_getmessage() with a newer client; we will then process it with
1140  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
1141  *              even though this is presumably useful only for text.
1142  *
1143  *              returns 0 if OK, EOF if trouble
1144  * --------------------------------
1145  */
1146 int
1147 pq_getstring(StringInfo s)
1148 {
1149         int                     i;
1150
1151         Assert(PqCommReadingMsg);
1152
1153         resetStringInfo(s);
1154
1155         /* Read until we get the terminating '\0' */
1156         for (;;)
1157         {
1158                 while (PqRecvPointer >= PqRecvLength)
1159                 {
1160                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
1161                                 return EOF;             /* Failed to recv data */
1162                 }
1163
1164                 for (i = PqRecvPointer; i < PqRecvLength; i++)
1165                 {
1166                         if (PqRecvBuffer[i] == '\0')
1167                         {
1168                                 /* include the '\0' in the copy */
1169                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1170                                                                            i - PqRecvPointer + 1);
1171                                 PqRecvPointer = i + 1;  /* advance past \0 */
1172                                 return 0;
1173                         }
1174                 }
1175
1176                 /* If we're here we haven't got the \0 in the buffer yet. */
1177                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1178                                                            PqRecvLength - PqRecvPointer);
1179                 PqRecvPointer = PqRecvLength;
1180         }
1181 }
1182
1183
1184 /* --------------------------------
1185  *              pq_startmsgread - begin reading a message from the client.
1186  *
1187  *              This must be called before any of the pq_get* functions.
1188  * --------------------------------
1189  */
1190 void
1191 pq_startmsgread(void)
1192 {
1193         /*
1194          * There shouldn't be a read active already, but let's check just to be
1195          * sure.
1196          */
1197         if (PqCommReadingMsg)
1198                 ereport(FATAL,
1199                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1200                                  errmsg("terminating connection because protocol synchronization was lost")));
1201
1202         PqCommReadingMsg = true;
1203 }
1204
1205
1206 /* --------------------------------
1207  *              pq_endmsgread   - finish reading message.
1208  *
1209  *              This must be called after reading a V2 protocol message with
1210  *              pq_getstring() and friends, to indicate that we have read the whole
1211  *              message. In V3 protocol, pq_getmessage() does this implicitly.
1212  * --------------------------------
1213  */
1214 void
1215 pq_endmsgread(void)
1216 {
1217         Assert(PqCommReadingMsg);
1218
1219         PqCommReadingMsg = false;
1220 }
1221
1222 /* --------------------------------
1223  *              pq_is_reading_msg - are we currently reading a message?
1224  *
1225  * This is used in error recovery at the outer idle loop to detect if we have
1226  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1227  * will check for that too, but it's nicer to detect it earlier.
1228  * --------------------------------
1229  */
1230 bool
1231 pq_is_reading_msg(void)
1232 {
1233         return PqCommReadingMsg;
1234 }
1235
1236 /* --------------------------------
1237  *              pq_getmessage   - get a message with length word from connection
1238  *
1239  *              The return value is placed in an expansible StringInfo, which has
1240  *              already been initialized by the caller.
1241  *              Only the message body is placed in the StringInfo; the length word
1242  *              is removed.  Also, s->cursor is initialized to zero for convenience
1243  *              in scanning the message contents.
1244  *
1245  *              If maxlen is not zero, it is an upper limit on the length of the
1246  *              message we are willing to accept.  We abort the connection (by
1247  *              returning EOF) if client tries to send more than that.
1248  *
1249  *              returns 0 if OK, EOF if trouble
1250  * --------------------------------
1251  */
1252 int
1253 pq_getmessage(StringInfo s, int maxlen)
1254 {
1255         int32           len;
1256
1257         Assert(PqCommReadingMsg);
1258
1259         resetStringInfo(s);
1260
1261         /* Read message length word */
1262         if (pq_getbytes((char *) &len, 4) == EOF)
1263         {
1264                 ereport(COMMERROR,
1265                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1266                                  errmsg("unexpected EOF within message length word")));
1267                 return EOF;
1268         }
1269
1270         len = ntohl(len);
1271
1272         if (len < 4 ||
1273                 (maxlen > 0 && len > maxlen))
1274         {
1275                 ereport(COMMERROR,
1276                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1277                                  errmsg("invalid message length")));
1278                 return EOF;
1279         }
1280
1281         len -= 4;                                       /* discount length itself */
1282
1283         if (len > 0)
1284         {
1285                 /*
1286                  * Allocate space for message.  If we run out of room (ridiculously
1287                  * large message), we will elog(ERROR), but we want to discard the
1288                  * message body so as not to lose communication sync.
1289                  */
1290                 PG_TRY();
1291                 {
1292                         enlargeStringInfo(s, len);
1293                 }
1294                 PG_CATCH();
1295                 {
1296                         if (pq_discardbytes(len) == EOF)
1297                                 ereport(COMMERROR,
1298                                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1299                                                  errmsg("incomplete message from client")));
1300
1301                         /* we discarded the rest of the message so we're back in sync. */
1302                         PqCommReadingMsg = false;
1303                         PG_RE_THROW();
1304                 }
1305                 PG_END_TRY();
1306
1307                 /* And grab the message */
1308                 if (pq_getbytes(s->data, len) == EOF)
1309                 {
1310                         ereport(COMMERROR,
1311                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
1312                                          errmsg("incomplete message from client")));
1313                         return EOF;
1314                 }
1315                 s->len = len;
1316                 /* Place a trailing null per StringInfo convention */
1317                 s->data[len] = '\0';
1318         }
1319
1320         /* finished reading the message. */
1321         PqCommReadingMsg = false;
1322
1323         return 0;
1324 }
1325
1326
1327 /* --------------------------------
1328  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
1329  *
1330  *              returns 0 if OK, EOF if trouble
1331  * --------------------------------
1332  */
1333 int
1334 pq_putbytes(const char *s, size_t len)
1335 {
1336         int                     res;
1337
1338         /* Should only be called by old-style COPY OUT */
1339         Assert(DoingCopyOut);
1340         /* No-op if reentrant call */
1341         if (PqCommBusy)
1342                 return 0;
1343         PqCommBusy = true;
1344         res = internal_putbytes(s, len);
1345         PqCommBusy = false;
1346         return res;
1347 }
1348
1349 static int
1350 internal_putbytes(const char *s, size_t len)
1351 {
1352         size_t          amount;
1353
1354         while (len > 0)
1355         {
1356                 /* If buffer is full, then flush it out */
1357                 if (PqSendPointer >= PqSendBufferSize)
1358                 {
1359                         socket_set_nonblocking(false);
1360                         if (internal_flush())
1361                                 return EOF;
1362                 }
1363                 amount = PqSendBufferSize - PqSendPointer;
1364                 if (amount > len)
1365                         amount = len;
1366                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
1367                 PqSendPointer += amount;
1368                 s += amount;
1369                 len -= amount;
1370         }
1371         return 0;
1372 }
1373
1374 /* --------------------------------
1375  *              socket_flush            - flush pending output
1376  *
1377  *              returns 0 if OK, EOF if trouble
1378  * --------------------------------
1379  */
1380 static int
1381 socket_flush(void)
1382 {
1383         int                     res;
1384
1385         /* No-op if reentrant call */
1386         if (PqCommBusy)
1387                 return 0;
1388         PqCommBusy = true;
1389         socket_set_nonblocking(false);
1390         res = internal_flush();
1391         PqCommBusy = false;
1392         return res;
1393 }
1394
1395 /* --------------------------------
1396  *              internal_flush - flush pending output
1397  *
1398  * Returns 0 if OK (meaning everything was sent, or operation would block
1399  * and the socket is in non-blocking mode), or EOF if trouble.
1400  * --------------------------------
1401  */
1402 static int
1403 internal_flush(void)
1404 {
1405         static int      last_reported_send_errno = 0;
1406
1407         char       *bufptr = PqSendBuffer + PqSendStart;
1408         char       *bufend = PqSendBuffer + PqSendPointer;
1409
1410         while (bufptr < bufend)
1411         {
1412                 int                     r;
1413
1414                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1415
1416                 if (r <= 0)
1417                 {
1418                         if (errno == EINTR)
1419                                 continue;               /* Ok if we were interrupted */
1420
1421                         /*
1422                          * Ok if no data writable without blocking, and the socket is in
1423                          * non-blocking mode.
1424                          */
1425                         if (errno == EAGAIN ||
1426                                 errno == EWOULDBLOCK)
1427                         {
1428                                 return 0;
1429                         }
1430
1431                         /*
1432                          * Careful: an ereport() that tries to write to the client would
1433                          * cause recursion to here, leading to stack overflow and core
1434                          * dump!  This message must go *only* to the postmaster log.
1435                          *
1436                          * If a client disconnects while we're in the midst of output, we
1437                          * might write quite a bit of data before we get to a safe query
1438                          * abort point.  So, suppress duplicate log messages.
1439                          */
1440                         if (errno != last_reported_send_errno)
1441                         {
1442                                 last_reported_send_errno = errno;
1443                                 ereport(COMMERROR,
1444                                                 (errcode_for_socket_access(),
1445                                                  errmsg("could not send data to client: %m")));
1446                         }
1447
1448                         /*
1449                          * We drop the buffered data anyway so that processing can
1450                          * continue, even though we'll probably quit soon. We also set a
1451                          * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1452                          * the connection.
1453                          */
1454                         PqSendStart = PqSendPointer = 0;
1455                         ClientConnectionLost = 1;
1456                         InterruptPending = 1;
1457                         return EOF;
1458                 }
1459
1460                 last_reported_send_errno = 0;   /* reset after any successful send */
1461                 bufptr += r;
1462                 PqSendStart += r;
1463         }
1464
1465         PqSendStart = PqSendPointer = 0;
1466         return 0;
1467 }
1468
1469 /* --------------------------------
1470  *              pq_flush_if_writable - flush pending output if writable without blocking
1471  *
1472  * Returns 0 if OK, or EOF if trouble.
1473  * --------------------------------
1474  */
1475 static int
1476 socket_flush_if_writable(void)
1477 {
1478         int                     res;
1479
1480         /* Quick exit if nothing to do */
1481         if (PqSendPointer == PqSendStart)
1482                 return 0;
1483
1484         /* No-op if reentrant call */
1485         if (PqCommBusy)
1486                 return 0;
1487
1488         /* Temporarily put the socket into non-blocking mode */
1489         socket_set_nonblocking(true);
1490
1491         PqCommBusy = true;
1492         res = internal_flush();
1493         PqCommBusy = false;
1494         return res;
1495 }
1496
1497 /* --------------------------------
1498  *      socket_is_send_pending  - is there any pending data in the output buffer?
1499  * --------------------------------
1500  */
1501 static bool
1502 socket_is_send_pending(void)
1503 {
1504         return (PqSendStart < PqSendPointer);
1505 }
1506
1507 /* --------------------------------
1508  * Message-level I/O routines begin here.
1509  *
1510  * These routines understand about the old-style COPY OUT protocol.
1511  * --------------------------------
1512  */
1513
1514
1515 /* --------------------------------
1516  *              socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1517  *
1518  *              If msgtype is not '\0', it is a message type code to place before
1519  *              the message body.  If msgtype is '\0', then the message has no type
1520  *              code (this is only valid in pre-3.0 protocols).
1521  *
1522  *              len is the length of the message body data at *s.  In protocol 3.0
1523  *              and later, a message length word (equal to len+4 because it counts
1524  *              itself too) is inserted by this routine.
1525  *
1526  *              All normal messages are suppressed while old-style COPY OUT is in
1527  *              progress.  (In practice only a few notice messages might get emitted
1528  *              then; dropping them is annoying, but at least they will still appear
1529  *              in the postmaster log.)
1530  *
1531  *              We also suppress messages generated while pqcomm.c is busy.  This
1532  *              avoids any possibility of messages being inserted within other
1533  *              messages.  The only known trouble case arises if SIGQUIT occurs
1534  *              during a pqcomm.c routine --- quickdie() will try to send a warning
1535  *              message, and the most reasonable approach seems to be to drop it.
1536  *
1537  *              returns 0 if OK, EOF if trouble
1538  * --------------------------------
1539  */
1540 static int
1541 socket_putmessage(char msgtype, const char *s, size_t len)
1542 {
1543         if (DoingCopyOut || PqCommBusy)
1544                 return 0;
1545         PqCommBusy = true;
1546         if (msgtype)
1547                 if (internal_putbytes(&msgtype, 1))
1548                         goto fail;
1549         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1550         {
1551                 uint32          n32;
1552
1553                 n32 = htonl((uint32) (len + 4));
1554                 if (internal_putbytes((char *) &n32, 4))
1555                         goto fail;
1556         }
1557         if (internal_putbytes(s, len))
1558                 goto fail;
1559         PqCommBusy = false;
1560         return 0;
1561
1562 fail:
1563         PqCommBusy = false;
1564         return EOF;
1565 }
1566
1567 /* --------------------------------
1568  *              pq_putmessage_noblock   - like pq_putmessage, but never blocks
1569  *
1570  *              If the output buffer is too small to hold the message, the buffer
1571  *              is enlarged.
1572  */
1573 static void
1574 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1575 {
1576         int res         PG_USED_FOR_ASSERTS_ONLY;
1577         int                     required;
1578
1579         /*
1580          * Ensure we have enough space in the output buffer for the message header
1581          * as well as the message itself.
1582          */
1583         required = PqSendPointer + 1 + 4 + len;
1584         if (required > PqSendBufferSize)
1585         {
1586                 PqSendBuffer = repalloc(PqSendBuffer, required);
1587                 PqSendBufferSize = required;
1588         }
1589         res = pq_putmessage(msgtype, s, len);
1590         Assert(res == 0);                       /* should not fail when the message fits in
1591                                                                  * buffer */
1592 }
1593
1594
1595 /* --------------------------------
1596  *              socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1597  *                      is beginning
1598  * --------------------------------
1599  */
1600 static void
1601 socket_startcopyout(void)
1602 {
1603         DoingCopyOut = true;
1604 }
1605
1606 /* --------------------------------
1607  *              socket_endcopyout       - end an old-style COPY OUT transfer
1608  *
1609  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1610  *              and must send a terminator line.  Since a partial data line might have
1611  *              been emitted, send a couple of newlines first (the first one could
1612  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
1613  *              not allow binary transfers, so a textual terminator is always correct.
1614  * --------------------------------
1615  */
1616 static void
1617 socket_endcopyout(bool errorAbort)
1618 {
1619         if (!DoingCopyOut)
1620                 return;
1621         if (errorAbort)
1622                 pq_putbytes("\n\n\\.\n", 5);
1623         /* in non-error case, copy.c will have emitted the terminator line */
1624         DoingCopyOut = false;
1625 }
1626
1627 /*
1628  * Support for TCP Keepalive parameters
1629  */
1630
1631 /*
1632  * On Windows, we need to set both idle and interval at the same time.
1633  * We also cannot reset them to the default (setting to zero will
1634  * actually set them to zero, not default), therefore we fallback to
1635  * the out-of-the-box default instead.
1636  */
1637 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1638 static int
1639 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1640 {
1641         struct tcp_keepalive ka;
1642         DWORD           retsize;
1643
1644         if (idle <= 0)
1645                 idle = 2 * 60 * 60;             /* default = 2 hours */
1646         if (interval <= 0)
1647                 interval = 1;                   /* default = 1 second */
1648
1649         ka.onoff = 1;
1650         ka.keepalivetime = idle * 1000;
1651         ka.keepaliveinterval = interval * 1000;
1652
1653         if (WSAIoctl(port->sock,
1654                                  SIO_KEEPALIVE_VALS,
1655                                  (LPVOID) &ka,
1656                                  sizeof(ka),
1657                                  NULL,
1658                                  0,
1659                                  &retsize,
1660                                  NULL,
1661                                  NULL)
1662                 != 0)
1663         {
1664                 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1665                          WSAGetLastError());
1666                 return STATUS_ERROR;
1667         }
1668         if (port->keepalives_idle != idle)
1669                 port->keepalives_idle = idle;
1670         if (port->keepalives_interval != interval)
1671                 port->keepalives_interval = interval;
1672         return STATUS_OK;
1673 }
1674 #endif
1675
1676 int
1677 pq_getkeepalivesidle(Port *port)
1678 {
1679 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(WIN32)
1680         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1681                 return 0;
1682
1683         if (port->keepalives_idle != 0)
1684                 return port->keepalives_idle;
1685
1686         if (port->default_keepalives_idle == 0)
1687         {
1688 #ifndef WIN32
1689                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1690
1691 #ifdef TCP_KEEPIDLE
1692                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1693                                            (char *) &port->default_keepalives_idle,
1694                                            &size) < 0)
1695                 {
1696                         elog(LOG, "getsockopt(TCP_KEEPIDLE) failed: %m");
1697                         port->default_keepalives_idle = -1; /* don't know */
1698                 }
1699 #else
1700                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1701                                            (char *) &port->default_keepalives_idle,
1702                                            &size) < 0)
1703                 {
1704                         elog(LOG, "getsockopt(TCP_KEEPALIVE) failed: %m");
1705                         port->default_keepalives_idle = -1; /* don't know */
1706                 }
1707 #endif   /* TCP_KEEPIDLE */
1708 #else                                                   /* WIN32 */
1709                 /* We can't get the defaults on Windows, so return "don't know" */
1710                 port->default_keepalives_idle = -1;
1711 #endif   /* WIN32 */
1712         }
1713
1714         return port->default_keepalives_idle;
1715 #else
1716         return 0;
1717 #endif
1718 }
1719
1720 int
1721 pq_setkeepalivesidle(int idle, Port *port)
1722 {
1723         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1724                 return STATUS_OK;
1725
1726 #if defined(TCP_KEEPIDLE) || defined(TCP_KEEPALIVE) || defined(SIO_KEEPALIVE_VALS)
1727         if (idle == port->keepalives_idle)
1728                 return STATUS_OK;
1729
1730 #ifndef WIN32
1731         if (port->default_keepalives_idle <= 0)
1732         {
1733                 if (pq_getkeepalivesidle(port) < 0)
1734                 {
1735                         if (idle == 0)
1736                                 return STATUS_OK;               /* default is set but unknown */
1737                         else
1738                                 return STATUS_ERROR;
1739                 }
1740         }
1741
1742         if (idle == 0)
1743                 idle = port->default_keepalives_idle;
1744
1745 #ifdef TCP_KEEPIDLE
1746         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPIDLE,
1747                                    (char *) &idle, sizeof(idle)) < 0)
1748         {
1749                 elog(LOG, "setsockopt(TCP_KEEPIDLE) failed: %m");
1750                 return STATUS_ERROR;
1751         }
1752 #else
1753         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPALIVE,
1754                                    (char *) &idle, sizeof(idle)) < 0)
1755         {
1756                 elog(LOG, "setsockopt(TCP_KEEPALIVE) failed: %m");
1757                 return STATUS_ERROR;
1758         }
1759 #endif
1760
1761         port->keepalives_idle = idle;
1762 #else                                                   /* WIN32 */
1763         return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1764 #endif
1765 #else                                                   /* TCP_KEEPIDLE || SIO_KEEPALIVE_VALS */
1766         if (idle != 0)
1767         {
1768                 elog(LOG, "setting the keepalive idle time is not supported");
1769                 return STATUS_ERROR;
1770         }
1771 #endif
1772         return STATUS_OK;
1773 }
1774
1775 int
1776 pq_getkeepalivesinterval(Port *port)
1777 {
1778 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1779         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1780                 return 0;
1781
1782         if (port->keepalives_interval != 0)
1783                 return port->keepalives_interval;
1784
1785         if (port->default_keepalives_interval == 0)
1786         {
1787 #ifndef WIN32
1788                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1789
1790                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1791                                            (char *) &port->default_keepalives_interval,
1792                                            &size) < 0)
1793                 {
1794                         elog(LOG, "getsockopt(TCP_KEEPINTVL) failed: %m");
1795                         port->default_keepalives_interval = -1;         /* don't know */
1796                 }
1797 #else
1798                 /* We can't get the defaults on Windows, so return "don't know" */
1799                 port->default_keepalives_interval = -1;
1800 #endif   /* WIN32 */
1801         }
1802
1803         return port->default_keepalives_interval;
1804 #else
1805         return 0;
1806 #endif
1807 }
1808
1809 int
1810 pq_setkeepalivesinterval(int interval, Port *port)
1811 {
1812         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1813                 return STATUS_OK;
1814
1815 #if defined(TCP_KEEPINTVL) || defined (SIO_KEEPALIVE_VALS)
1816         if (interval == port->keepalives_interval)
1817                 return STATUS_OK;
1818
1819 #ifndef WIN32
1820         if (port->default_keepalives_interval <= 0)
1821         {
1822                 if (pq_getkeepalivesinterval(port) < 0)
1823                 {
1824                         if (interval == 0)
1825                                 return STATUS_OK;               /* default is set but unknown */
1826                         else
1827                                 return STATUS_ERROR;
1828                 }
1829         }
1830
1831         if (interval == 0)
1832                 interval = port->default_keepalives_interval;
1833
1834         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1835                                    (char *) &interval, sizeof(interval)) < 0)
1836         {
1837                 elog(LOG, "setsockopt(TCP_KEEPINTVL) failed: %m");
1838                 return STATUS_ERROR;
1839         }
1840
1841         port->keepalives_interval = interval;
1842 #else                                                   /* WIN32 */
1843         return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1844 #endif
1845 #else
1846         if (interval != 0)
1847         {
1848                 elog(LOG, "setsockopt(TCP_KEEPINTVL) not supported");
1849                 return STATUS_ERROR;
1850         }
1851 #endif
1852
1853         return STATUS_OK;
1854 }
1855
1856 int
1857 pq_getkeepalivescount(Port *port)
1858 {
1859 #ifdef TCP_KEEPCNT
1860         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1861                 return 0;
1862
1863         if (port->keepalives_count != 0)
1864                 return port->keepalives_count;
1865
1866         if (port->default_keepalives_count == 0)
1867         {
1868                 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1869
1870                 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1871                                            (char *) &port->default_keepalives_count,
1872                                            &size) < 0)
1873                 {
1874                         elog(LOG, "getsockopt(TCP_KEEPCNT) failed: %m");
1875                         port->default_keepalives_count = -1;            /* don't know */
1876                 }
1877         }
1878
1879         return port->default_keepalives_count;
1880 #else
1881         return 0;
1882 #endif
1883 }
1884
1885 int
1886 pq_setkeepalivescount(int count, Port *port)
1887 {
1888         if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1889                 return STATUS_OK;
1890
1891 #ifdef TCP_KEEPCNT
1892         if (count == port->keepalives_count)
1893                 return STATUS_OK;
1894
1895         if (port->default_keepalives_count <= 0)
1896         {
1897                 if (pq_getkeepalivescount(port) < 0)
1898                 {
1899                         if (count == 0)
1900                                 return STATUS_OK;               /* default is set but unknown */
1901                         else
1902                                 return STATUS_ERROR;
1903                 }
1904         }
1905
1906         if (count == 0)
1907                 count = port->default_keepalives_count;
1908
1909         if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1910                                    (char *) &count, sizeof(count)) < 0)
1911         {
1912                 elog(LOG, "setsockopt(TCP_KEEPCNT) failed: %m");
1913                 return STATUS_ERROR;
1914         }
1915
1916         port->keepalives_count = count;
1917 #else
1918         if (count != 0)
1919         {
1920                 elog(LOG, "setsockopt(TCP_KEEPCNT) not supported");
1921                 return STATUS_ERROR;
1922         }
1923 #endif
1924
1925         return STATUS_OK;
1926 }