1 /*-------------------------------------------------------------------------
4 * Communication functions between the Frontend and the Backend
6 * These routines handle the low-level details of communication between
7 * frontend and backend. They just shove data across the communication
8 * channel, and are ignorant of the semantics of the data --- or would be,
9 * except for major brain damage in the design of the old COPY OUT protocol.
10 * Unfortunately, COPY OUT was designed to commandeer the communication
11 * channel (it just transfers data without wrapping it into messages).
12 * No other messages can be sent while COPY OUT is in progress; and if the
13 * copy is aborted by an elog(ERROR), we need to close out the copy so that
14 * the frontend gets back into sync. Therefore, these routines have to be
15 * aware of COPY OUT state. (New COPY-OUT is message-based and does *not*
16 * set the DoingCopyOut flag.)
18 * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19 * pq_putbytes(), especially if the message would require multiple calls
20 * to send. Instead, use the routines in pqformat.c to construct the message
21 * in a buffer and then emit it in one call to pq_putmessage. This ensures
22 * that the channel will not be clogged by an incomplete message if execution
23 * is aborted by elog(ERROR) partway through the message. The only non-libpq
24 * code that should call pq_putbytes directly is old-style COPY OUT.
26 * At one time, libpq was shared between frontend and backend, but now
27 * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28 * All that remains is similarities of names to trap the unwary...
30 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
31 * Portions Copyright (c) 1994, Regents of the University of California
33 * $Header: /cvsroot/pgsql/src/backend/libpq/pqcomm.c,v 1.155 2003/06/08 17:43:00 tgl Exp $
35 *-------------------------------------------------------------------------
38 /*------------------------
42 * StreamServerPort - Open postmaster's server port
43 * StreamConnection - Create new connection with client
44 * StreamClose - Close a client/backend connection
45 * TouchSocketFile - Protect socket file against /tmp cleaners
46 * pq_init - initialize libpq at backend startup
47 * pq_close - shutdown libpq at backend exit
50 * pq_getbytes - get a known number of bytes from connection
51 * pq_getstring - get a null terminated string from connection
52 * pq_getmessage - get a message with length word from connection
53 * pq_getbyte - get next byte from connection
54 * pq_peekbyte - peek at next byte from connection
55 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
56 * pq_flush - flush pending output
58 * message-level I/O (and old-style-COPY-OUT cruft):
59 * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
60 * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
61 * pq_endcopyout - end a COPY OUT transfer
63 *------------------------
73 #include <sys/socket.h>
77 #include <netinet/in.h>
78 #ifdef HAVE_NETINET_TCP_H
79 #include <netinet/tcp.h>
81 #include <arpa/inet.h>
86 #include "libpq/libpq.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
91 static void pq_close(void);
93 #ifdef HAVE_UNIX_SOCKETS
94 static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
95 static int Setup_AF_UNIX(void);
96 #endif /* HAVE_UNIX_SOCKETS */
100 * Configuration options
102 int Unix_socket_permissions;
103 char *Unix_socket_group;
107 * Buffers for low-level I/O
110 #define PQ_BUFFER_SIZE 8192
112 static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
113 static int PqSendPointer; /* Next index to store a byte in
116 static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
117 static int PqRecvPointer; /* Next index to read a byte from
119 static int PqRecvLength; /* End of data available in PqRecvBuffer */
124 static bool DoingCopyOut;
127 /* --------------------------------
128 * pq_init - initialize libpq at backend startup
129 * --------------------------------
134 PqSendPointer = PqRecvPointer = PqRecvLength = 0;
135 DoingCopyOut = false;
136 on_proc_exit(pq_close, 0);
140 /* --------------------------------
141 * pq_close - shutdown libpq at backend exit
143 * Note: in a standalone backend MyProcPort will be null,
144 * don't crash during exit...
145 * --------------------------------
150 if (MyProcPort != NULL)
152 /* Cleanly shut down SSL layer */
153 secure_close(MyProcPort);
155 * Formerly we did an explicit close() here, but it seems better
156 * to leave the socket open until the process dies. This allows
157 * clients to perform a "synchronous close" if they care --- wait
158 * till the transport layer reports connection closure, and you
159 * can be sure the backend has exited.
161 * We do set sock to -1 to prevent any further I/O, though.
163 MyProcPort->sock = -1;
170 * Streams -- wrapper around Unix socket system calls
173 * Stream functions are used for vanilla TCP connection protocol.
176 static char sock_path[MAXPGPATH];
180 * Shutdown routine for backend connection
181 * If a Unix socket is used for communication, explicitly close it.
183 #ifdef HAVE_UNIX_SOCKETS
187 Assert(sock_path[0]);
190 #endif /* HAVE_UNIX_SOCKETS */
193 * StreamServerPort -- open a sock stream "listening" port.
195 * This initializes the Postmaster's connection-accepting port *fdP.
197 * RETURNS: STATUS_OK or STATUS_ERROR
201 StreamServerPort(int family, char *hostName, unsigned short portNumber,
202 char *unixSocketName, int *fdP)
209 char portNumberStr[64];
211 struct addrinfo *addrs = NULL;
212 struct addrinfo hint;
214 #ifdef HAVE_UNIX_SOCKETS
215 Assert(family == AF_UNIX || isAF_INETx(family));
217 Assert(isAF_INETx(family));
220 /* Initialize hint structure */
221 MemSet(&hint, 0, sizeof(hint));
222 hint.ai_family = family;
223 hint.ai_flags = AI_PASSIVE;
224 hint.ai_socktype = SOCK_STREAM;
226 #ifdef HAVE_UNIX_SOCKETS
227 if (family == AF_UNIX)
229 if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
234 #endif /* HAVE_UNIX_SOCKETS */
236 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
237 service = portNumberStr;
240 ret = getaddrinfo2(hostName, service, &hint, &addrs);
241 if (ret || addrs == NULL)
243 elog(LOG, "server socket failure: getaddrinfo2(): %s",
245 freeaddrinfo2(addrs);
249 if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
251 elog(LOG, "server socket failure: socket(): %s",
253 freeaddrinfo2(addrs);
257 if (isAF_INETx(family))
259 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
262 elog(LOG, "server socket failure: setsockopt(SO_REUSEADDR): %s",
264 freeaddrinfo2(addrs);
269 Assert(addrs->ai_next == NULL && addrs->ai_family == family);
270 err = bind(fd, addrs->ai_addr, addrs->ai_addrlen);
273 elog(LOG, "server socket failure: bind(): %s\n"
274 "\tIs another postmaster already running on port %d?",
275 strerror(errno), (int) portNumber);
276 if (family == AF_UNIX)
277 elog(LOG, "\tIf not, remove socket node (%s) and retry.",
280 elog(LOG, "\tIf not, wait a few seconds and retry.");
281 freeaddrinfo2(addrs);
285 #ifdef HAVE_UNIX_SOCKETS
286 if (family == AF_UNIX)
288 if (Setup_AF_UNIX() != STATUS_OK)
290 freeaddrinfo2(addrs);
297 * Select appropriate accept-queue length limit. PG_SOMAXCONN is only
298 * intended to provide a clamp on the request on platforms where an
299 * overly large request provokes a kernel error (are there any?).
301 maxconn = MaxBackends * 2;
302 if (maxconn > PG_SOMAXCONN)
303 maxconn = PG_SOMAXCONN;
305 err = listen(fd, maxconn);
308 elog(LOG, "server socket failure: listen(): %s",
310 freeaddrinfo2(addrs);
315 freeaddrinfo2(addrs);
321 #ifdef HAVE_UNIX_SOCKETS
324 * Lock_AF_UNIX -- configure unix socket file path
327 Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
329 SockAddr saddr; /* just used to get socket path */
331 UNIXSOCK_PATH(saddr.un, portNumber, unixSocketName);
332 strcpy(sock_path, saddr.un.sun_path);
335 * Grab an interlock file associated with the socket file.
337 if (!CreateSocketLockFile(sock_path, true))
341 * Once we have the interlock, we can safely delete any pre-existing
342 * socket file to avoid failure at bind() time.
351 * Setup_AF_UNIX -- configure unix socket permissions
356 /* Arrange to unlink the socket file at exit */
357 on_proc_exit(StreamDoUnlink, 0);
360 * Fix socket ownership/permission if requested. Note we must do this
361 * before we listen() to avoid a window where unwanted connections
362 * could get accepted.
364 Assert(Unix_socket_group);
365 if (Unix_socket_group[0] != '\0')
368 elog(FATAL, "Config value 'unix_socket_group' not supported on this platform");
371 unsigned long int val;
374 val = strtoul(Unix_socket_group, &endptr, 10);
376 { /* numeric group id */
380 { /* convert group name to id */
383 gr = getgrnam(Unix_socket_group);
386 elog(LOG, "server socket failure: no such group '%s'",
392 if (chown(sock_path, -1, gid) == -1)
394 elog(LOG, "server socket failure: could not set group of %s: %s",
395 sock_path, strerror(errno));
401 if (chmod(sock_path, Unix_socket_permissions) == -1)
403 elog(LOG, "server socket failure: could not set permissions on %s: %s",
404 sock_path, strerror(errno));
410 #endif /* HAVE_UNIX_SOCKETS */
414 * StreamConnection -- create a new connection with client using
417 * ASSUME: that this doesn't need to be non-blocking because
418 * the Postmaster uses select() to tell when the server master
419 * socket is ready for accept().
421 * RETURNS: STATUS_OK or STATUS_ERROR
424 StreamConnection(int server_fd, Port *port)
426 ACCEPT_TYPE_ARG3 addrlen;
428 /* accept connection (and fill in the client (remote) address) */
429 addrlen = sizeof(port->raddr);
430 if ((port->sock = accept(server_fd,
431 (struct sockaddr *) &port->raddr,
434 elog(LOG, "StreamConnection: accept() failed: %m");
438 #ifdef SCO_ACCEPT_BUG
440 * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
441 * shouldn't hurt to catch it for all versions of those platforms.
443 if (port->raddr.sa.sa_family == 0)
444 port->raddr.sa.sa_family = AF_UNIX;
447 /* fill in the server (local) address */
448 addrlen = sizeof(port->laddr);
449 if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
452 elog(LOG, "StreamConnection: getsockname() failed: %m");
456 /* select NODELAY and KEEPALIVE options if it's a TCP connection */
457 if (isAF_INETx(port->laddr.sa.sa_family))
461 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
462 (char *) &on, sizeof(on)) < 0)
464 elog(LOG, "StreamConnection: setsockopt(TCP_NODELAY) failed: %m");
467 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
468 (char *) &on, sizeof(on)) < 0)
470 elog(LOG, "StreamConnection: setsockopt(SO_KEEPALIVE) failed: %m");
479 * StreamClose -- close a client/backend connection
481 * NOTE: this is NOT used to terminate a session; it is just used to release
482 * the file descriptor in a process that should no longer have the socket
483 * open. (For example, the postmaster calls this after passing ownership
484 * of the connection to a child process.) It is expected that someone else
485 * still has the socket open. So, we only want to close the descriptor,
486 * we do NOT want to send anything to the far end.
489 StreamClose(int sock)
495 * TouchSocketFile -- mark socket file as recently accessed
497 * This routine should be called every so often to ensure that the socket
498 * file has a recent mod date (ordinary operations on sockets usually won't
499 * change the mod date). That saves it from being removed by
500 * overenthusiastic /tmp-directory-cleaner daemons. (Another reason we should
501 * never have put the socket file in /tmp...)
504 TouchSocketFile(void)
506 /* Do nothing if we did not create a socket... */
507 if (sock_path[0] != '\0')
510 * utime() is POSIX standard, utimes() is a common alternative.
511 * If we have neither, there's no way to affect the mod or access
512 * time of the socket :-(
514 * In either path, we ignore errors; there's no point in complaining.
517 utime(sock_path, NULL);
518 #else /* !HAVE_UTIME */
520 utimes(sock_path, NULL);
521 #endif /* HAVE_UTIMES */
522 #endif /* HAVE_UTIME */
527 /* --------------------------------
528 * Low-level I/O routines begin here.
530 * These routines communicate with a frontend client across a connection
531 * already established by the preceding routines.
532 * --------------------------------
536 /* --------------------------------
537 * pq_recvbuf - load some bytes into the input buffer
539 * returns 0 if OK, EOF if trouble
540 * --------------------------------
545 if (PqRecvPointer > 0)
547 if (PqRecvLength > PqRecvPointer)
549 /* still some unread data, left-justify it in the buffer */
550 memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
551 PqRecvLength - PqRecvPointer);
552 PqRecvLength -= PqRecvPointer;
556 PqRecvLength = PqRecvPointer = 0;
559 /* Can fill buffer from PqRecvLength and upwards */
564 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
565 PQ_BUFFER_SIZE - PqRecvLength);
570 continue; /* Ok if interrupted */
573 * Careful: an elog() that tries to write to the client would
574 * cause recursion to here, leading to stack overflow and core
575 * dump! This message must go *only* to the postmaster log.
577 elog(COMMERROR, "pq_recvbuf: recv() failed: %m");
583 * EOF detected. We used to write a log message here, but it's
584 * better to expect the ultimate caller to do that.
588 /* r contains number of bytes read, so just incr length */
594 /* --------------------------------
595 * pq_getbyte - get a single byte from connection, or return EOF
596 * --------------------------------
601 while (PqRecvPointer >= PqRecvLength)
603 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
604 return EOF; /* Failed to recv data */
606 return PqRecvBuffer[PqRecvPointer++];
609 /* --------------------------------
610 * pq_peekbyte - peek at next byte from connection
612 * Same as pq_getbyte() except we don't advance the pointer.
613 * --------------------------------
618 while (PqRecvPointer >= PqRecvLength)
620 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
621 return EOF; /* Failed to recv data */
623 return PqRecvBuffer[PqRecvPointer];
626 /* --------------------------------
627 * pq_getbytes - get a known number of bytes from connection
629 * returns 0 if OK, EOF if trouble
630 * --------------------------------
633 pq_getbytes(char *s, size_t len)
639 while (PqRecvPointer >= PqRecvLength)
641 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
642 return EOF; /* Failed to recv data */
644 amount = PqRecvLength - PqRecvPointer;
647 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
648 PqRecvPointer += amount;
655 /* --------------------------------
656 * pq_getstring - get a null terminated string from connection
658 * The return value is placed in an expansible StringInfo, which has
659 * already been initialized by the caller.
661 * This is used only for dealing with old-protocol clients. The idea
662 * is to produce a StringInfo that looks the same as we would get from
663 * pq_getmessage() with a newer client; we will then process it with
664 * pq_getmsgstring. Therefore, no character set conversion is done here,
665 * even though this is presumably useful only for text.
667 * returns 0 if OK, EOF if trouble
668 * --------------------------------
671 pq_getstring(StringInfo s)
675 /* Reset string to empty */
680 /* Read until we get the terminating '\0' */
683 while (PqRecvPointer >= PqRecvLength)
685 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
686 return EOF; /* Failed to recv data */
689 for (i = PqRecvPointer; i < PqRecvLength; i++)
691 if (PqRecvBuffer[i] == '\0')
693 /* include the '\0' in the copy */
694 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
695 i - PqRecvPointer + 1);
696 PqRecvPointer = i + 1; /* advance past \0 */
701 /* If we're here we haven't got the \0 in the buffer yet. */
702 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
703 PqRecvLength - PqRecvPointer);
704 PqRecvPointer = PqRecvLength;
709 /* --------------------------------
710 * pq_getmessage - get a message with length word from connection
712 * The return value is placed in an expansible StringInfo, which has
713 * already been initialized by the caller.
714 * Only the message body is placed in the StringInfo; the length word
715 * is removed. Also, s->cursor is initialized to zero for convenience
716 * in scanning the message contents.
718 * If maxlen is not zero, it is an upper limit on the length of the
719 * message we are willing to accept. We abort the connection (by
720 * returning EOF) if client tries to send more than that.
722 * returns 0 if OK, EOF if trouble
723 * --------------------------------
726 pq_getmessage(StringInfo s, int maxlen)
730 /* Reset message buffer to empty */
735 /* Read message length word */
736 if (pq_getbytes((char *) &len, 4) == EOF)
738 elog(COMMERROR, "unexpected EOF within message length word");
743 len -= 4; /* discount length itself */
746 (maxlen > 0 && len > maxlen))
748 elog(COMMERROR, "invalid message length");
754 /* Allocate space for message */
755 enlargeStringInfo(s, len);
757 /* And grab the message */
758 if (pq_getbytes(s->data, len) == EOF)
760 elog(COMMERROR, "incomplete client message");
764 /* Place a trailing null per StringInfo convention */
772 /* --------------------------------
773 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
775 * returns 0 if OK, EOF if trouble
776 * --------------------------------
779 pq_putbytes(const char *s, size_t len)
785 if (PqSendPointer >= PQ_BUFFER_SIZE)
786 if (pq_flush()) /* If buffer is full, then flush it out */
788 amount = PQ_BUFFER_SIZE - PqSendPointer;
791 memcpy(PqSendBuffer + PqSendPointer, s, amount);
792 PqSendPointer += amount;
799 /* --------------------------------
800 * pq_flush - flush pending output
802 * returns 0 if OK, EOF if trouble
803 * --------------------------------
808 static int last_reported_send_errno = 0;
810 unsigned char *bufptr = PqSendBuffer;
811 unsigned char *bufend = PqSendBuffer + PqSendPointer;
813 while (bufptr < bufend)
817 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
822 continue; /* Ok if we were interrupted */
825 * Careful: an elog() that tries to write to the client would
826 * cause recursion to here, leading to stack overflow and core
827 * dump! This message must go *only* to the postmaster log.
829 * If a client disconnects while we're in the midst of output, we
830 * might write quite a bit of data before we get to a safe
831 * query abort point. So, suppress duplicate log messages.
833 if (errno != last_reported_send_errno)
835 last_reported_send_errno = errno;
836 elog(COMMERROR, "pq_flush: send() failed: %m");
840 * We drop the buffered data anyway so that processing can
841 * continue, even though we'll probably quit soon.
847 last_reported_send_errno = 0; /* reset after any successful send */
856 /* --------------------------------
857 * Message-level I/O routines begin here.
859 * These routines understand about the old-style COPY OUT protocol.
860 * --------------------------------
864 /* --------------------------------
865 * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
867 * If msgtype is not '\0', it is a message type code to place before
868 * the message body. If msgtype is '\0', then the message has no type
869 * code (this is only valid in pre-3.0 protocols).
871 * len is the length of the message body data at *s. In protocol 3.0
872 * and later, a message length word (equal to len+4 because it counts
873 * itself too) is inserted by this routine.
875 * All normal messages are suppressed while old-style COPY OUT is in
876 * progress. (In practice only a few notice messages might get emitted
877 * then; dropping them is annoying, but at least they will still appear
878 * in the postmaster log.)
880 * returns 0 if OK, EOF if trouble
881 * --------------------------------
884 pq_putmessage(char msgtype, const char *s, size_t len)
889 if (pq_putbytes(&msgtype, 1))
891 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
895 n32 = htonl((uint32) (len + 4));
896 if (pq_putbytes((char *) &n32, 4))
899 return pq_putbytes(s, len);
902 /* --------------------------------
903 * pq_startcopyout - inform libpq that an old-style COPY OUT transfer
905 * --------------------------------
908 pq_startcopyout(void)
913 /* --------------------------------
914 * pq_endcopyout - end an old-style COPY OUT transfer
916 * If errorAbort is indicated, we are aborting a COPY OUT due to an error,
917 * and must send a terminator line. Since a partial data line might have
918 * been emitted, send a couple of newlines first (the first one could
919 * get absorbed by a backslash...) Note that old-style COPY OUT does
920 * not allow binary transfers, so a textual terminator is always correct.
921 * --------------------------------
924 pq_endcopyout(bool errorAbort)
928 DoingCopyOut = false;
930 pq_putbytes("\n\n\\.\n", 5);
931 /* in non-error case, copy.c will have emitted the terminator line */