]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
freeaddrinfo2() does need two parameters after all, per comment by
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an elog(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by elog(ERROR) partway through the message.  The only non-libpq
24  * code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      $Header: /cvsroot/pgsql/src/backend/libpq/pqcomm.c,v 1.156 2003/06/09 17:59:19 tgl Exp $
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFile         - Protect socket file against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_close                - shutdown libpq at backend exit
48  *
49  * low-level I/O:
50  *              pq_getbytes             - get a known number of bytes from connection
51  *              pq_getstring    - get a null terminated string from connection
52  *              pq_getmessage   - get a message with length word from connection
53  *              pq_getbyte              - get next byte from connection
54  *              pq_peekbyte             - peek at next byte from connection
55  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
56  *              pq_flush                - flush pending output
57  *
58  * message-level I/O (and old-style-COPY-OUT cruft):
59  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
60  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
61  *              pq_endcopyout   - end a COPY OUT transfer
62  *
63  *------------------------
64  */
65 #include "postgres.h"
66
67 #include <signal.h>
68 #include <errno.h>
69 #include <fcntl.h>
70 #include <grp.h>
71 #include <unistd.h>
72 #include <sys/file.h>
73 #include <sys/socket.h>
74 #include <sys/stat.h>
75 #include <sys/time.h>
76 #include <netdb.h>
77 #include <netinet/in.h>
78 #ifdef HAVE_NETINET_TCP_H
79 #include <netinet/tcp.h>
80 #endif
81 #include <arpa/inet.h>
82 #ifdef HAVE_UTIME_H
83 #include <utime.h>
84 #endif
85
86 #include "libpq/libpq.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89
90
91 static void pq_close(void);
92
93 #ifdef HAVE_UNIX_SOCKETS
94 static int      Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
95 static int      Setup_AF_UNIX(void);
96 #endif   /* HAVE_UNIX_SOCKETS */
97
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105
106 /*
107  * Buffers for low-level I/O
108  */
109
110 #define PQ_BUFFER_SIZE 8192
111
112 static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
113 static int      PqSendPointer;          /* Next index to store a byte in
114                                                                  * PqSendBuffer */
115
116 static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
117 static int      PqRecvPointer;          /* Next index to read a byte from
118                                                                  * PqRecvBuffer */
119 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
120
121 /*
122  * Message status
123  */
124 static bool DoingCopyOut;
125
126
127 /* --------------------------------
128  *              pq_init - initialize libpq at backend startup
129  * --------------------------------
130  */
131 void
132 pq_init(void)
133 {
134         PqSendPointer = PqRecvPointer = PqRecvLength = 0;
135         DoingCopyOut = false;
136         on_proc_exit(pq_close, 0);
137 }
138
139
140 /* --------------------------------
141  *              pq_close - shutdown libpq at backend exit
142  *
143  * Note: in a standalone backend MyProcPort will be null,
144  * don't crash during exit...
145  * --------------------------------
146  */
147 static void
148 pq_close(void)
149 {
150         if (MyProcPort != NULL)
151         {
152                 /* Cleanly shut down SSL layer */
153                 secure_close(MyProcPort);
154                 /*
155                  * Formerly we did an explicit close() here, but it seems better
156                  * to leave the socket open until the process dies.  This allows
157                  * clients to perform a "synchronous close" if they care --- wait
158                  * till the transport layer reports connection closure, and you
159                  * can be sure the backend has exited.
160                  *
161                  * We do set sock to -1 to prevent any further I/O, though.
162                  */
163                 MyProcPort->sock = -1;
164         }
165 }
166
167
168
169 /*
170  * Streams -- wrapper around Unix socket system calls
171  *
172  *
173  *              Stream functions are used for vanilla TCP connection protocol.
174  */
175
176 static char sock_path[MAXPGPATH];
177
178
179 /* StreamDoUnlink()
180  * Shutdown routine for backend connection
181  * If a Unix socket is used for communication, explicitly close it.
182  */
183 #ifdef HAVE_UNIX_SOCKETS
184 static void
185 StreamDoUnlink(void)
186 {
187         Assert(sock_path[0]);
188         unlink(sock_path);
189 }
190 #endif   /* HAVE_UNIX_SOCKETS */
191
192 /*
193  * StreamServerPort -- open a sock stream "listening" port.
194  *
195  * This initializes the Postmaster's connection-accepting port *fdP.
196  *
197  * RETURNS: STATUS_OK or STATUS_ERROR
198  */
199
200 int
201 StreamServerPort(int family, char *hostName, unsigned short portNumber,
202                                  char *unixSocketName, int *fdP)
203 {
204         int                     fd,
205                                 err;
206         int                     maxconn;
207         int                     one = 1;
208         int                     ret;
209         char            portNumberStr[64];
210         char       *service;
211         struct addrinfo *addrs = NULL;
212         struct addrinfo hint;
213
214 #ifdef HAVE_UNIX_SOCKETS
215         Assert(family == AF_UNIX || isAF_INETx(family));
216 #else
217         Assert(isAF_INETx(family));
218 #endif
219
220         /* Initialize hint structure */
221         MemSet(&hint, 0, sizeof(hint));
222         hint.ai_family = family;
223         hint.ai_flags = AI_PASSIVE;
224         hint.ai_socktype = SOCK_STREAM;
225
226 #ifdef HAVE_UNIX_SOCKETS
227         if (family == AF_UNIX)
228         {
229                 if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
230                         return STATUS_ERROR;
231                 service = sock_path;
232         }
233         else
234 #endif   /* HAVE_UNIX_SOCKETS */
235         {
236                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
237                 service = portNumberStr;
238         }
239
240         ret = getaddrinfo2(hostName, service, &hint, &addrs);
241         if (ret || addrs == NULL)
242         {
243                 elog(LOG, "server socket failure: getaddrinfo2(): %s",
244                          gai_strerror(ret));
245                 freeaddrinfo2(hint.ai_family, addrs);
246                 return STATUS_ERROR;
247         }
248
249         if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
250         {
251                 elog(LOG, "server socket failure: socket(): %s",
252                          strerror(errno));
253                 freeaddrinfo2(hint.ai_family, addrs);
254                 return STATUS_ERROR;
255         }
256
257         if (isAF_INETx(family))
258         {
259                 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
260                                                 sizeof(one))) == -1)
261                 {
262                         elog(LOG, "server socket failure: setsockopt(SO_REUSEADDR): %s",
263                                  strerror(errno));
264                         freeaddrinfo2(hint.ai_family, addrs);
265                         return STATUS_ERROR;
266                 }
267         }
268
269         Assert(addrs->ai_next == NULL && addrs->ai_family == family);
270         err = bind(fd, addrs->ai_addr, addrs->ai_addrlen);
271         if (err < 0)
272         {
273                 elog(LOG, "server socket failure: bind(): %s\n"
274                          "\tIs another postmaster already running on port %d?",
275                          strerror(errno), (int) portNumber);
276                 if (family == AF_UNIX)
277                         elog(LOG, "\tIf not, remove socket node (%s) and retry.",
278                                  sock_path);
279                 else
280                         elog(LOG, "\tIf not, wait a few seconds and retry.");
281                 freeaddrinfo2(hint.ai_family, addrs);
282                 return STATUS_ERROR;
283         }
284
285 #ifdef HAVE_UNIX_SOCKETS
286         if (family == AF_UNIX)
287         {
288                 if (Setup_AF_UNIX() != STATUS_OK)
289                 {
290                         freeaddrinfo2(hint.ai_family, addrs);
291                         return STATUS_ERROR;
292                 }
293         }
294 #endif
295
296         /*
297          * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
298          * intended to provide a clamp on the request on platforms where an
299          * overly large request provokes a kernel error (are there any?).
300          */
301         maxconn = MaxBackends * 2;
302         if (maxconn > PG_SOMAXCONN)
303                 maxconn = PG_SOMAXCONN;
304
305         err = listen(fd, maxconn);
306         if (err < 0)
307         {
308                 elog(LOG, "server socket failure: listen(): %s",
309                          strerror(errno));
310                 freeaddrinfo2(hint.ai_family, addrs);
311                 return STATUS_ERROR;
312         }
313
314         *fdP = fd;
315         freeaddrinfo2(hint.ai_family, addrs);
316         return STATUS_OK;
317 }
318
319
320 #ifdef HAVE_UNIX_SOCKETS
321
322 /*
323  * Lock_AF_UNIX -- configure unix socket file path
324  */
325 static int
326 Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
327 {
328         SockAddr        saddr;  /* just used to get socket path */
329
330         UNIXSOCK_PATH(saddr.un, portNumber, unixSocketName);
331         strcpy(sock_path, saddr.un.sun_path);
332
333         /*
334          * Grab an interlock file associated with the socket file.
335          */
336         if (!CreateSocketLockFile(sock_path, true))
337                 return STATUS_ERROR;
338
339         /*
340          * Once we have the interlock, we can safely delete any pre-existing
341          * socket file to avoid failure at bind() time.
342          */
343         unlink(sock_path);
344
345         return STATUS_OK;
346 }
347
348
349 /*
350  * Setup_AF_UNIX -- configure unix socket permissions
351  */
352 static int
353 Setup_AF_UNIX(void)
354 {
355         /* Arrange to unlink the socket file at exit */
356         on_proc_exit(StreamDoUnlink, 0);
357
358         /*
359          * Fix socket ownership/permission if requested.  Note we must do this
360          * before we listen() to avoid a window where unwanted connections
361          * could get accepted.
362          */
363         Assert(Unix_socket_group);
364         if (Unix_socket_group[0] != '\0')
365         {
366 #ifdef WIN32
367                 elog(FATAL, "Config value 'unix_socket_group' not supported on this platform");
368 #else
369                 char       *endptr;
370                 unsigned long int val;
371                 gid_t           gid;
372
373                 val = strtoul(Unix_socket_group, &endptr, 10);
374                 if (*endptr == '\0')
375                 {                                               /* numeric group id */
376                         gid = val;
377                 }
378                 else
379                 {                                               /* convert group name to id */
380                         struct group *gr;
381
382                         gr = getgrnam(Unix_socket_group);
383                         if (!gr)
384                         {
385                                 elog(LOG, "server socket failure: no such group '%s'",
386                                          Unix_socket_group);
387                                 return STATUS_ERROR;
388                         }
389                         gid = gr->gr_gid;
390                 }
391                 if (chown(sock_path, -1, gid) == -1)
392                 {
393                         elog(LOG, "server socket failure: could not set group of %s: %s",
394                                  sock_path, strerror(errno));
395                         return STATUS_ERROR;
396                 }
397 #endif
398         }
399
400         if (chmod(sock_path, Unix_socket_permissions) == -1)
401         {
402                 elog(LOG, "server socket failure: could not set permissions on %s: %s",
403                          sock_path, strerror(errno));
404                 return STATUS_ERROR;
405         }
406         return STATUS_OK;
407 }
408
409 #endif   /* HAVE_UNIX_SOCKETS */
410
411
412 /*
413  * StreamConnection -- create a new connection with client using
414  *              server port.
415  *
416  * ASSUME: that this doesn't need to be non-blocking because
417  *              the Postmaster uses select() to tell when the server master
418  *              socket is ready for accept().
419  *
420  * RETURNS: STATUS_OK or STATUS_ERROR
421  */
422 int
423 StreamConnection(int server_fd, Port *port)
424 {
425         ACCEPT_TYPE_ARG3 addrlen;
426
427         /* accept connection (and fill in the client (remote) address) */
428         addrlen = sizeof(port->raddr);
429         if ((port->sock = accept(server_fd,
430                                                          (struct sockaddr *) &port->raddr,
431                                                          &addrlen)) < 0)
432         {
433                 elog(LOG, "StreamConnection: accept() failed: %m");
434                 return STATUS_ERROR;
435         }
436
437 #ifdef SCO_ACCEPT_BUG
438         /*
439          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
440          * shouldn't hurt to catch it for all versions of those platforms.
441          */
442         if (port->raddr.sa.sa_family == 0)
443                 port->raddr.sa.sa_family = AF_UNIX;
444 #endif
445
446         /* fill in the server (local) address */
447         addrlen = sizeof(port->laddr);
448         if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
449                                         &addrlen) < 0)
450         {
451                 elog(LOG, "StreamConnection: getsockname() failed: %m");
452                 return STATUS_ERROR;
453         }
454
455         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
456         if (isAF_INETx(port->laddr.sa.sa_family))
457         {
458                 int                     on = 1;
459
460                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
461                                            (char *) &on, sizeof(on)) < 0)
462                 {
463                         elog(LOG, "StreamConnection: setsockopt(TCP_NODELAY) failed: %m");
464                         return STATUS_ERROR;
465                 }
466                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
467                                            (char *) &on, sizeof(on)) < 0)
468                 {
469                         elog(LOG, "StreamConnection: setsockopt(SO_KEEPALIVE) failed: %m");
470                         return STATUS_ERROR;
471                 }
472         }
473
474         return STATUS_OK;
475 }
476
477 /*
478  * StreamClose -- close a client/backend connection
479  *
480  * NOTE: this is NOT used to terminate a session; it is just used to release
481  * the file descriptor in a process that should no longer have the socket
482  * open.  (For example, the postmaster calls this after passing ownership
483  * of the connection to a child process.)  It is expected that someone else
484  * still has the socket open.  So, we only want to close the descriptor,
485  * we do NOT want to send anything to the far end.
486  */
487 void
488 StreamClose(int sock)
489 {
490         closesocket(sock);
491 }
492
493 /*
494  * TouchSocketFile -- mark socket file as recently accessed
495  *
496  * This routine should be called every so often to ensure that the socket
497  * file has a recent mod date (ordinary operations on sockets usually won't
498  * change the mod date).  That saves it from being removed by
499  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
500  * never have put the socket file in /tmp...)
501  */
502 void
503 TouchSocketFile(void)
504 {
505         /* Do nothing if we did not create a socket... */
506         if (sock_path[0] != '\0')
507         {
508                 /*
509                  * utime() is POSIX standard, utimes() is a common alternative.
510                  * If we have neither, there's no way to affect the mod or access
511                  * time of the socket :-(
512                  *
513                  * In either path, we ignore errors; there's no point in complaining.
514                  */
515 #ifdef HAVE_UTIME
516                 utime(sock_path, NULL);
517 #else /* !HAVE_UTIME */
518 #ifdef HAVE_UTIMES
519                 utimes(sock_path, NULL);
520 #endif /* HAVE_UTIMES */
521 #endif /* HAVE_UTIME */
522         }
523 }
524
525
526 /* --------------------------------
527  * Low-level I/O routines begin here.
528  *
529  * These routines communicate with a frontend client across a connection
530  * already established by the preceding routines.
531  * --------------------------------
532  */
533
534
535 /* --------------------------------
536  *              pq_recvbuf - load some bytes into the input buffer
537  *
538  *              returns 0 if OK, EOF if trouble
539  * --------------------------------
540  */
541 static int
542 pq_recvbuf(void)
543 {
544         if (PqRecvPointer > 0)
545         {
546                 if (PqRecvLength > PqRecvPointer)
547                 {
548                         /* still some unread data, left-justify it in the buffer */
549                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
550                                         PqRecvLength - PqRecvPointer);
551                         PqRecvLength -= PqRecvPointer;
552                         PqRecvPointer = 0;
553                 }
554                 else
555                         PqRecvLength = PqRecvPointer = 0;
556         }
557
558         /* Can fill buffer from PqRecvLength and upwards */
559         for (;;)
560         {
561                 int                     r;
562
563                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
564                                                 PQ_BUFFER_SIZE - PqRecvLength);
565
566                 if (r < 0)
567                 {
568                         if (errno == EINTR)
569                                 continue;               /* Ok if interrupted */
570
571                         /*
572                          * Careful: an elog() that tries to write to the client would
573                          * cause recursion to here, leading to stack overflow and core
574                          * dump!  This message must go *only* to the postmaster log.
575                          */
576                         elog(COMMERROR, "pq_recvbuf: recv() failed: %m");
577                         return EOF;
578                 }
579                 if (r == 0)
580                 {
581                         /*
582                          * EOF detected.  We used to write a log message here, but it's
583                          * better to expect the ultimate caller to do that.
584                          */
585                         return EOF;
586                 }
587                 /* r contains number of bytes read, so just incr length */
588                 PqRecvLength += r;
589                 return 0;
590         }
591 }
592
593 /* --------------------------------
594  *              pq_getbyte      - get a single byte from connection, or return EOF
595  * --------------------------------
596  */
597 int
598 pq_getbyte(void)
599 {
600         while (PqRecvPointer >= PqRecvLength)
601         {
602                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
603                         return EOF;                     /* Failed to recv data */
604         }
605         return PqRecvBuffer[PqRecvPointer++];
606 }
607
608 /* --------------------------------
609  *              pq_peekbyte             - peek at next byte from connection
610  *
611  *       Same as pq_getbyte() except we don't advance the pointer.
612  * --------------------------------
613  */
614 int
615 pq_peekbyte(void)
616 {
617         while (PqRecvPointer >= PqRecvLength)
618         {
619                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
620                         return EOF;                     /* Failed to recv data */
621         }
622         return PqRecvBuffer[PqRecvPointer];
623 }
624
625 /* --------------------------------
626  *              pq_getbytes             - get a known number of bytes from connection
627  *
628  *              returns 0 if OK, EOF if trouble
629  * --------------------------------
630  */
631 int
632 pq_getbytes(char *s, size_t len)
633 {
634         size_t          amount;
635
636         while (len > 0)
637         {
638                 while (PqRecvPointer >= PqRecvLength)
639                 {
640                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
641                                 return EOF;             /* Failed to recv data */
642                 }
643                 amount = PqRecvLength - PqRecvPointer;
644                 if (amount > len)
645                         amount = len;
646                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
647                 PqRecvPointer += amount;
648                 s += amount;
649                 len -= amount;
650         }
651         return 0;
652 }
653
654 /* --------------------------------
655  *              pq_getstring    - get a null terminated string from connection
656  *
657  *              The return value is placed in an expansible StringInfo, which has
658  *              already been initialized by the caller.
659  *
660  *              This is used only for dealing with old-protocol clients.  The idea
661  *              is to produce a StringInfo that looks the same as we would get from
662  *              pq_getmessage() with a newer client; we will then process it with
663  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
664  *              even though this is presumably useful only for text.
665  *
666  *              returns 0 if OK, EOF if trouble
667  * --------------------------------
668  */
669 int
670 pq_getstring(StringInfo s)
671 {
672         int                     i;
673
674         /* Reset string to empty */
675         s->len = 0;
676         s->data[0] = '\0';
677         s->cursor = 0;
678
679         /* Read until we get the terminating '\0' */
680         for (;;)
681         {
682                 while (PqRecvPointer >= PqRecvLength)
683                 {
684                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
685                                 return EOF;             /* Failed to recv data */
686                 }
687
688                 for (i = PqRecvPointer; i < PqRecvLength; i++)
689                 {
690                         if (PqRecvBuffer[i] == '\0')
691                         {
692                                 /* include the '\0' in the copy */
693                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
694                                                                            i - PqRecvPointer + 1);
695                                 PqRecvPointer = i + 1;  /* advance past \0 */
696                                 return 0;
697                         }
698                 }
699
700                 /* If we're here we haven't got the \0 in the buffer yet. */
701                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
702                                                            PqRecvLength - PqRecvPointer);
703                 PqRecvPointer = PqRecvLength;
704         }
705 }
706
707
708 /* --------------------------------
709  *              pq_getmessage   - get a message with length word from connection
710  *
711  *              The return value is placed in an expansible StringInfo, which has
712  *              already been initialized by the caller.
713  *              Only the message body is placed in the StringInfo; the length word
714  *              is removed.  Also, s->cursor is initialized to zero for convenience
715  *              in scanning the message contents.
716  *
717  *              If maxlen is not zero, it is an upper limit on the length of the
718  *              message we are willing to accept.  We abort the connection (by
719  *              returning EOF) if client tries to send more than that.
720  *
721  *              returns 0 if OK, EOF if trouble
722  * --------------------------------
723  */
724 int
725 pq_getmessage(StringInfo s, int maxlen)
726 {
727         int32           len;
728
729         /* Reset message buffer to empty */
730         s->len = 0;
731         s->data[0] = '\0';
732         s->cursor = 0;
733
734         /* Read message length word */
735         if (pq_getbytes((char *) &len, 4) == EOF)
736         {
737                 elog(COMMERROR, "unexpected EOF within message length word");
738                 return EOF;
739         }
740
741         len = ntohl(len);
742         len -= 4;                                       /* discount length itself */
743
744         if (len < 0 ||
745                 (maxlen > 0 && len > maxlen))
746         {
747                 elog(COMMERROR, "invalid message length");
748                 return EOF;
749         }
750
751         if (len > 0)
752         {
753                 /* Allocate space for message */
754                 enlargeStringInfo(s, len);
755
756                 /* And grab the message */
757                 if (pq_getbytes(s->data, len) == EOF)
758                 {
759                         elog(COMMERROR, "incomplete client message");
760                         return EOF;
761                 }
762                 s->len = len;
763                 /* Place a trailing null per StringInfo convention */
764                 s->data[len] = '\0';
765         }
766
767         return 0;
768 }
769
770
771 /* --------------------------------
772  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
773  *
774  *              returns 0 if OK, EOF if trouble
775  * --------------------------------
776  */
777 int
778 pq_putbytes(const char *s, size_t len)
779 {
780         size_t          amount;
781
782         while (len > 0)
783         {
784                 if (PqSendPointer >= PQ_BUFFER_SIZE)
785                         if (pq_flush())         /* If buffer is full, then flush it out */
786                                 return EOF;
787                 amount = PQ_BUFFER_SIZE - PqSendPointer;
788                 if (amount > len)
789                         amount = len;
790                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
791                 PqSendPointer += amount;
792                 s += amount;
793                 len -= amount;
794         }
795         return 0;
796 }
797
798 /* --------------------------------
799  *              pq_flush                - flush pending output
800  *
801  *              returns 0 if OK, EOF if trouble
802  * --------------------------------
803  */
804 int
805 pq_flush(void)
806 {
807         static int      last_reported_send_errno = 0;
808
809         unsigned char *bufptr = PqSendBuffer;
810         unsigned char *bufend = PqSendBuffer + PqSendPointer;
811
812         while (bufptr < bufend)
813         {
814                 int                     r;
815
816                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
817
818                 if (r <= 0)
819                 {
820                         if (errno == EINTR)
821                                 continue;               /* Ok if we were interrupted */
822
823                         /*
824                          * Careful: an elog() that tries to write to the client would
825                          * cause recursion to here, leading to stack overflow and core
826                          * dump!  This message must go *only* to the postmaster log.
827                          *
828                          * If a client disconnects while we're in the midst of output, we
829                          * might write quite a bit of data before we get to a safe
830                          * query abort point.  So, suppress duplicate log messages.
831                          */
832                         if (errno != last_reported_send_errno)
833                         {
834                                 last_reported_send_errno = errno;
835                                 elog(COMMERROR, "pq_flush: send() failed: %m");
836                         }
837
838                         /*
839                          * We drop the buffered data anyway so that processing can
840                          * continue, even though we'll probably quit soon.
841                          */
842                         PqSendPointer = 0;
843                         return EOF;
844                 }
845
846                 last_reported_send_errno = 0;   /* reset after any successful send */
847                 bufptr += r;
848         }
849
850         PqSendPointer = 0;
851         return 0;
852 }
853
854
855 /* --------------------------------
856  * Message-level I/O routines begin here.
857  *
858  * These routines understand about the old-style COPY OUT protocol.
859  * --------------------------------
860  */
861
862
863 /* --------------------------------
864  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
865  *
866  *              If msgtype is not '\0', it is a message type code to place before
867  *              the message body.  If msgtype is '\0', then the message has no type
868  *              code (this is only valid in pre-3.0 protocols).
869  *
870  *              len is the length of the message body data at *s.  In protocol 3.0
871  *              and later, a message length word (equal to len+4 because it counts
872  *              itself too) is inserted by this routine.
873  *
874  *              All normal messages are suppressed while old-style COPY OUT is in
875  *              progress.  (In practice only a few notice messages might get emitted
876  *              then; dropping them is annoying, but at least they will still appear
877  *              in the postmaster log.)
878  *
879  *              returns 0 if OK, EOF if trouble
880  * --------------------------------
881  */
882 int
883 pq_putmessage(char msgtype, const char *s, size_t len)
884 {
885         if (DoingCopyOut)
886                 return 0;
887         if (msgtype)
888                 if (pq_putbytes(&msgtype, 1))
889                         return EOF;
890         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
891         {
892                 uint32          n32;
893
894                 n32 = htonl((uint32) (len + 4));
895                 if (pq_putbytes((char *) &n32, 4))
896                         return EOF;
897         }
898         return pq_putbytes(s, len);
899 }
900
901 /* --------------------------------
902  *              pq_startcopyout - inform libpq that an old-style COPY OUT transfer
903  *                      is beginning
904  * --------------------------------
905  */
906 void
907 pq_startcopyout(void)
908 {
909         DoingCopyOut = true;
910 }
911
912 /* --------------------------------
913  *              pq_endcopyout   - end an old-style COPY OUT transfer
914  *
915  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
916  *              and must send a terminator line.  Since a partial data line might have
917  *              been emitted, send a couple of newlines first (the first one could
918  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
919  *              not allow binary transfers, so a textual terminator is always correct.
920  * --------------------------------
921  */
922 void
923 pq_endcopyout(bool errorAbort)
924 {
925         if (!DoingCopyOut)
926                 return;
927         DoingCopyOut = false;
928         if (errorAbort)
929                 pq_putbytes("\n\n\\.\n", 5);
930         /* in non-error case, copy.c will have emitted the terminator line */
931 }