1 /*-------------------------------------------------------------------------
4 * Communication functions between the Frontend and the Backend
6 * These routines handle the low-level details of communication between
7 * frontend and backend. They just shove data across the communication
8 * channel, and are ignorant of the semantics of the data --- or would be,
9 * except for major brain damage in the design of the COPY OUT protocol.
10 * Unfortunately, COPY OUT is designed to commandeer the communication
11 * channel (it just transfers data without wrapping it into messages).
12 * No other messages can be sent while COPY OUT is in progress; and if the
13 * copy is aborted by an elog(ERROR), we need to close out the copy so that
14 * the frontend gets back into sync. Therefore, these routines have to be
15 * aware of COPY OUT state.
17 * NOTE: generally, it's a bad idea to emit outgoing messages directly with
18 * pq_putbytes(), especially if the message would require multiple calls
19 * to send. Instead, use the routines in pqformat.c to construct the message
20 * in a buffer and then emit it in one call to pq_putmessage. This helps
21 * ensure that the channel will not be clogged by an incomplete message
22 * if execution is aborted by elog(ERROR) partway through the message.
23 * The only non-libpq code that should call pq_putbytes directly is COPY OUT.
25 * At one time, libpq was shared between frontend and backend, but now
26 * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
27 * All that remains is similarities of names to trap the unwary...
29 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
30 * Portions Copyright (c) 1994, Regents of the University of California
32 * $Id: pqcomm.c,v 1.126 2001/12/04 20:57:22 tgl Exp $
34 *-------------------------------------------------------------------------
37 /*------------------------
41 * StreamServerPort - Open postmaster's server port
42 * StreamConnection - Create new connection with client
43 * StreamClose - Close a client/backend connection
44 * pq_init - initialize libpq at backend startup
45 * pq_close - shutdown libpq at backend exit
48 * pq_getbytes - get a known number of bytes from connection
49 * pq_getstring - get a null terminated string from connection
50 * pq_getbyte - get next byte from connection
51 * pq_peekbyte - peek at next byte from connection
52 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
53 * pq_flush - flush pending output
55 * message-level I/O (and COPY OUT cruft):
56 * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
57 * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
58 * pq_endcopyout - end a COPY OUT transfer
60 *------------------------
69 #include <sys/types.h>
71 #include <sys/socket.h>
73 #include <netinet/in.h>
74 #ifdef HAVE_NETINET_TCP_H
75 #include <netinet/tcp.h>
77 #include <arpa/inet.h>
80 #include "libpq/libpq.h"
81 #include "miscadmin.h"
84 static void pq_close(void);
88 * Configuration options
90 int Unix_socket_permissions;
91 char *Unix_socket_group;
95 * Buffers for low-level I/O
98 #define PQ_BUFFER_SIZE 8192
100 static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
101 static int PqSendPointer; /* Next index to store a byte in
104 static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
105 static int PqRecvPointer; /* Next index to read a byte from
107 static int PqRecvLength; /* End of data available in PqRecvBuffer */
112 static bool DoingCopyOut;
115 /* --------------------------------
116 * pq_init - initialize libpq at backend startup
117 * --------------------------------
122 PqSendPointer = PqRecvPointer = PqRecvLength = 0;
123 DoingCopyOut = false;
124 on_proc_exit(pq_close, 0);
128 /* --------------------------------
129 * pq_close - shutdown libpq at backend exit
131 * Note: in a standalone backend MyProcPort will be null,
132 * don't crash during exit...
133 * --------------------------------
138 if (MyProcPort != NULL)
140 close(MyProcPort->sock);
141 /* make sure any subsequent attempts to do I/O fail cleanly */
142 MyProcPort->sock = -1;
149 * Streams -- wrapper around Unix socket system calls
152 * Stream functions are used for vanilla TCP connection protocol.
155 static char sock_path[MAXPGPATH];
159 * Shutdown routine for backend connection
160 * If a Unix socket is used for communication, explicitly close it.
165 Assert(sock_path[0]);
170 * StreamServerPort -- open a sock stream "listening" port.
172 * This initializes the Postmaster's connection-accepting port *fdP.
174 * RETURNS: STATUS_OK or STATUS_ERROR
178 StreamServerPort(int family, char *hostName, unsigned short portNumber,
179 char *unixSocketName, int *fdP)
188 Assert(family == AF_INET || family == AF_UNIX);
190 if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
192 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
193 "FATAL: StreamServerPort: socket() failed: %s\n",
195 fputs(PQerrormsg, stderr);
196 pqdebug("%s", PQerrormsg);
200 if (family == AF_INET)
202 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
205 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
206 "FATAL: StreamServerPort: setsockopt(SO_REUSEADDR) failed: %s\n",
208 fputs(PQerrormsg, stderr);
209 pqdebug("%s", PQerrormsg);
214 MemSet((char *) &saddr, 0, sizeof(saddr));
215 saddr.sa.sa_family = family;
217 #ifdef HAVE_UNIX_SOCKETS
218 if (family == AF_UNIX)
220 UNIXSOCK_PATH(saddr.un, portNumber, unixSocketName);
221 len = UNIXSOCK_LEN(saddr.un);
222 strcpy(sock_path, saddr.un.sun_path);
225 * Grab an interlock file associated with the socket file.
227 if (!CreateSocketLockFile(sock_path, true))
231 * Once we have the interlock, we can safely delete any
232 * pre-existing socket file to avoid failure at bind() time.
236 #endif /* HAVE_UNIX_SOCKETS */
238 if (family == AF_INET)
241 if (hostName[0] == '\0')
242 saddr.in.sin_addr.s_addr = htonl(INADDR_ANY);
247 hp = gethostbyname(hostName);
248 if ((hp == NULL) || (hp->h_addrtype != AF_INET))
250 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
251 "FATAL: StreamServerPort: gethostbyname(%s) failed\n",
253 fputs(PQerrormsg, stderr);
254 pqdebug("%s", PQerrormsg);
257 memmove((char *) &(saddr.in.sin_addr), (char *) hp->h_addr,
261 saddr.in.sin_port = htons(portNumber);
262 len = sizeof(struct sockaddr_in);
265 err = bind(fd, (struct sockaddr *) & saddr.sa, len);
268 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
269 "FATAL: StreamServerPort: bind() failed: %s\n"
270 "\tIs another postmaster already running on port %d?\n",
271 strerror(errno), (int) portNumber);
272 if (family == AF_UNIX)
273 snprintf(PQerrormsg + strlen(PQerrormsg),
274 PQERRORMSG_LENGTH - strlen(PQerrormsg),
275 "\tIf not, remove socket node (%s) and retry.\n",
278 snprintf(PQerrormsg + strlen(PQerrormsg),
279 PQERRORMSG_LENGTH - strlen(PQerrormsg),
280 "\tIf not, wait a few seconds and retry.\n");
281 fputs(PQerrormsg, stderr);
282 pqdebug("%s", PQerrormsg);
286 #ifdef HAVE_UNIX_SOCKETS
287 if (family == AF_UNIX)
289 /* Arrange to unlink the socket file at exit */
290 on_proc_exit(StreamDoUnlink, 0);
293 * Fix socket ownership/permission if requested. Note we must do
294 * this before we listen() to avoid a window where unwanted
295 * connections could get accepted.
297 Assert(Unix_socket_group);
298 if (Unix_socket_group[0] != '\0')
301 unsigned long int val;
304 val = strtoul(Unix_socket_group, &endptr, 10);
307 /* numeric group id */
312 /* convert group name to id */
315 gr = getgrnam(Unix_socket_group);
318 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
319 "FATAL: no such group '%s'\n",
321 fputs(PQerrormsg, stderr);
322 pqdebug("%s", PQerrormsg);
327 if (chown(sock_path, -1, gid) == -1)
329 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
330 "FATAL: could not set group of %s: %s\n",
331 sock_path, strerror(errno));
332 fputs(PQerrormsg, stderr);
333 pqdebug("%s", PQerrormsg);
338 if (chmod(sock_path, Unix_socket_permissions) == -1)
340 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
341 "FATAL: could not set permissions on %s: %s\n",
342 sock_path, strerror(errno));
343 fputs(PQerrormsg, stderr);
344 pqdebug("%s", PQerrormsg);
348 #endif /* HAVE_UNIX_SOCKETS */
351 * Select appropriate accept-queue length limit. PG_SOMAXCONN is only
352 * intended to provide a clamp on the request on platforms where an
353 * overly large request provokes a kernel error (are there any?).
355 maxconn = MaxBackends * 2;
356 if (maxconn > PG_SOMAXCONN)
357 maxconn = PG_SOMAXCONN;
359 err = listen(fd, maxconn);
362 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
363 "FATAL: StreamServerPort: listen() failed: %s\n",
365 fputs(PQerrormsg, stderr);
366 pqdebug("%s", PQerrormsg);
376 * StreamConnection -- create a new connection with client using
379 * ASSUME: that this doesn't need to be non-blocking because
380 * the Postmaster uses select() to tell when the server master
381 * socket is ready for accept().
383 * NB: this can NOT call elog() because it is invoked in the postmaster,
384 * not in standard backend context. If we get an error, the best we can do
385 * is log it to stderr.
387 * RETURNS: STATUS_OK or STATUS_ERROR
390 StreamConnection(int server_fd, Port *port)
392 ACCEPT_TYPE_ARG3 addrlen;
394 /* accept connection (and fill in the client (remote) address) */
395 addrlen = sizeof(port->raddr);
396 if ((port->sock = accept(server_fd,
397 (struct sockaddr *) & port->raddr,
400 perror("postmaster: StreamConnection: accept");
404 #ifdef SCO_ACCEPT_BUG
407 * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
408 * shouldn't hurt to catch it for all versions of those platforms.
410 if (port->raddr.sa.sa_family == 0)
411 port->raddr.sa.sa_family = AF_UNIX;
414 /* fill in the server (local) address */
415 addrlen = sizeof(port->laddr);
416 if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
419 perror("postmaster: StreamConnection: getsockname");
423 /* select NODELAY and KEEPALIVE options if it's a TCP connection */
424 if (port->laddr.sa.sa_family == AF_INET)
428 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
429 (char *) &on, sizeof(on)) < 0)
431 perror("postmaster: StreamConnection: setsockopt(TCP_NODELAY)");
434 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
435 (char *) &on, sizeof(on)) < 0)
437 perror("postmaster: StreamConnection: setsockopt(SO_KEEPALIVE)");
446 * StreamClose -- close a client/backend connection
449 StreamClose(int sock)
455 /* --------------------------------
456 * Low-level I/O routines begin here.
458 * These routines communicate with a frontend client across a connection
459 * already established by the preceding routines.
460 * --------------------------------
464 /* --------------------------------
465 * pq_recvbuf - load some bytes into the input buffer
467 * returns 0 if OK, EOF if trouble
468 * --------------------------------
473 if (PqRecvPointer > 0)
475 if (PqRecvLength > PqRecvPointer)
477 /* still some unread data, left-justify it in the buffer */
478 memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
479 PqRecvLength - PqRecvPointer);
480 PqRecvLength -= PqRecvPointer;
484 PqRecvLength = PqRecvPointer = 0;
487 /* Can fill buffer from PqRecvLength and upwards */
494 r = SSL_read(MyProcPort->ssl, PqRecvBuffer + PqRecvLength,
495 PQ_BUFFER_SIZE - PqRecvLength);
498 r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
499 PQ_BUFFER_SIZE - PqRecvLength, 0);
504 continue; /* Ok if interrupted */
507 * Careful: an elog() that tries to write to the client
508 * would cause recursion to here, leading to stack overflow
509 * and core dump! This message must go *only* to the postmaster
510 * log. elog(DEBUG) is presently safe.
512 elog(DEBUG, "pq_recvbuf: recv() failed: %m");
517 /* as above, only write to postmaster log */
518 elog(DEBUG, "pq_recvbuf: unexpected EOF on client connection");
521 /* r contains number of bytes read, so just incr length */
527 /* --------------------------------
528 * pq_getbyte - get a single byte from connection, or return EOF
529 * --------------------------------
534 while (PqRecvPointer >= PqRecvLength)
536 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
537 return EOF; /* Failed to recv data */
539 return PqRecvBuffer[PqRecvPointer++];
542 /* --------------------------------
543 * pq_peekbyte - peek at next byte from connection
545 * Same as pq_getbyte() except we don't advance the pointer.
546 * --------------------------------
551 while (PqRecvPointer >= PqRecvLength)
553 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
554 return EOF; /* Failed to recv data */
556 return PqRecvBuffer[PqRecvPointer];
559 /* --------------------------------
560 * pq_getbytes - get a known number of bytes from connection
562 * returns 0 if OK, EOF if trouble
563 * --------------------------------
566 pq_getbytes(char *s, size_t len)
572 while (PqRecvPointer >= PqRecvLength)
574 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
575 return EOF; /* Failed to recv data */
577 amount = PqRecvLength - PqRecvPointer;
580 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
581 PqRecvPointer += amount;
588 /* --------------------------------
589 * pq_getstring - get a null terminated string from connection
591 * The return value is placed in an expansible StringInfo.
592 * Note that space allocation comes from the current memory context!
594 * NOTE: this routine does not do any MULTIBYTE conversion,
595 * even though it is presumably useful only for text, because
596 * no code in this module should depend on MULTIBYTE mode.
597 * See pq_getstr in pqformat.c for that.
599 * returns 0 if OK, EOF if trouble
600 * --------------------------------
603 pq_getstring(StringInfo s)
607 /* Reset string to empty */
611 /* Read until we get the terminating '\0' */
612 while ((c = pq_getbyte()) != EOF && c != '\0')
613 appendStringInfoCharMacro(s, c);
622 /* --------------------------------
623 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
625 * returns 0 if OK, EOF if trouble
626 * --------------------------------
629 pq_putbytes(const char *s, size_t len)
635 if (PqSendPointer >= PQ_BUFFER_SIZE)
636 if (pq_flush()) /* If buffer is full, then flush it out */
638 amount = PQ_BUFFER_SIZE - PqSendPointer;
641 memcpy(PqSendBuffer + PqSendPointer, s, amount);
642 PqSendPointer += amount;
649 /* --------------------------------
650 * pq_flush - flush pending output
652 * returns 0 if OK, EOF if trouble
653 * --------------------------------
658 static int last_reported_send_errno = 0;
660 unsigned char *bufptr = PqSendBuffer;
661 unsigned char *bufend = PqSendBuffer + PqSendPointer;
663 while (bufptr < bufend)
669 r = SSL_write(MyProcPort->ssl, bufptr, bufend - bufptr);
672 r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
677 continue; /* Ok if we were interrupted */
680 * Careful: an elog() that tries to write to the client
681 * would cause recursion to here, leading to stack overflow
682 * and core dump! This message must go *only* to the postmaster
683 * log. elog(DEBUG) is presently safe.
685 * If a client disconnects while we're in the midst of output,
686 * we might write quite a bit of data before we get to a safe
687 * query abort point. So, suppress duplicate log messages.
689 if (errno != last_reported_send_errno)
691 last_reported_send_errno = errno;
692 elog(DEBUG, "pq_flush: send() failed: %m");
696 * We drop the buffered data anyway so that processing can
697 * continue, even though we'll probably quit soon.
703 last_reported_send_errno = 0; /* reset after any successful send */
713 * Return EOF if the connection has been broken, else 0.
721 res = recv(MyProcPort->sock, &x, 1, MSG_PEEK);
725 /* can log to postmaster log only */
726 elog(DEBUG, "pq_eof: recv() failed: %m");
736 /* --------------------------------
737 * Message-level I/O routines begin here.
739 * These routines understand about COPY OUT protocol.
740 * --------------------------------
744 /* --------------------------------
745 * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
747 * If msgtype is not '\0', it is a message type code to place before
748 * the message body (len counts only the body size!).
749 * If msgtype is '\0', then the buffer already includes the type code.
751 * All normal messages are suppressed while COPY OUT is in progress.
752 * (In practice only NOTICE messages might get emitted then; dropping
753 * them is annoying, but at least they will still appear in the
756 * returns 0 if OK, EOF if trouble
757 * --------------------------------
760 pq_putmessage(char msgtype, const char *s, size_t len)
765 if (pq_putbytes(&msgtype, 1))
767 return pq_putbytes(s, len);
770 /* --------------------------------
771 * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
772 * --------------------------------
775 pq_startcopyout(void)
780 /* --------------------------------
781 * pq_endcopyout - end a COPY OUT transfer
783 * If errorAbort is indicated, we are aborting a COPY OUT due to an error,
784 * and must send a terminator line. Since a partial data line might have
785 * been emitted, send a couple of newlines first (the first one could
786 * get absorbed by a backslash...)
787 * --------------------------------
790 pq_endcopyout(bool errorAbort)
795 pq_putbytes("\n\n\\.\n", 5);
796 /* in non-error case, copy.c will have emitted the terminator line */
797 DoingCopyOut = false;