]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
libpq can now talk to either 3.0 or 2.0 protocol servers. It first tries
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an elog(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by elog(ERROR) partway through the message.  The only non-libpq
24  * code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *      $Header: /cvsroot/pgsql/src/backend/libpq/pqcomm.c,v 1.155 2003/06/08 17:43:00 tgl Exp $
34  *
35  *-------------------------------------------------------------------------
36  */
37
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *              StreamServerPort        - Open postmaster's server port
43  *              StreamConnection        - Create new connection with client
44  *              StreamClose                     - Close a client/backend connection
45  *              TouchSocketFile         - Protect socket file against /tmp cleaners
46  *              pq_init                 - initialize libpq at backend startup
47  *              pq_close                - shutdown libpq at backend exit
48  *
49  * low-level I/O:
50  *              pq_getbytes             - get a known number of bytes from connection
51  *              pq_getstring    - get a null terminated string from connection
52  *              pq_getmessage   - get a message with length word from connection
53  *              pq_getbyte              - get next byte from connection
54  *              pq_peekbyte             - peek at next byte from connection
55  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
56  *              pq_flush                - flush pending output
57  *
58  * message-level I/O (and old-style-COPY-OUT cruft):
59  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
60  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
61  *              pq_endcopyout   - end a COPY OUT transfer
62  *
63  *------------------------
64  */
65 #include "postgres.h"
66
67 #include <signal.h>
68 #include <errno.h>
69 #include <fcntl.h>
70 #include <grp.h>
71 #include <unistd.h>
72 #include <sys/file.h>
73 #include <sys/socket.h>
74 #include <sys/stat.h>
75 #include <sys/time.h>
76 #include <netdb.h>
77 #include <netinet/in.h>
78 #ifdef HAVE_NETINET_TCP_H
79 #include <netinet/tcp.h>
80 #endif
81 #include <arpa/inet.h>
82 #ifdef HAVE_UTIME_H
83 #include <utime.h>
84 #endif
85
86 #include "libpq/libpq.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89
90
91 static void pq_close(void);
92
93 #ifdef HAVE_UNIX_SOCKETS
94 static int      Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
95 static int      Setup_AF_UNIX(void);
96 #endif   /* HAVE_UNIX_SOCKETS */
97
98
99 /*
100  * Configuration options
101  */
102 int                     Unix_socket_permissions;
103 char       *Unix_socket_group;
104
105
106 /*
107  * Buffers for low-level I/O
108  */
109
110 #define PQ_BUFFER_SIZE 8192
111
112 static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
113 static int      PqSendPointer;          /* Next index to store a byte in
114                                                                  * PqSendBuffer */
115
116 static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
117 static int      PqRecvPointer;          /* Next index to read a byte from
118                                                                  * PqRecvBuffer */
119 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
120
121 /*
122  * Message status
123  */
124 static bool DoingCopyOut;
125
126
127 /* --------------------------------
128  *              pq_init - initialize libpq at backend startup
129  * --------------------------------
130  */
131 void
132 pq_init(void)
133 {
134         PqSendPointer = PqRecvPointer = PqRecvLength = 0;
135         DoingCopyOut = false;
136         on_proc_exit(pq_close, 0);
137 }
138
139
140 /* --------------------------------
141  *              pq_close - shutdown libpq at backend exit
142  *
143  * Note: in a standalone backend MyProcPort will be null,
144  * don't crash during exit...
145  * --------------------------------
146  */
147 static void
148 pq_close(void)
149 {
150         if (MyProcPort != NULL)
151         {
152                 /* Cleanly shut down SSL layer */
153                 secure_close(MyProcPort);
154                 /*
155                  * Formerly we did an explicit close() here, but it seems better
156                  * to leave the socket open until the process dies.  This allows
157                  * clients to perform a "synchronous close" if they care --- wait
158                  * till the transport layer reports connection closure, and you
159                  * can be sure the backend has exited.
160                  *
161                  * We do set sock to -1 to prevent any further I/O, though.
162                  */
163                 MyProcPort->sock = -1;
164         }
165 }
166
167
168
169 /*
170  * Streams -- wrapper around Unix socket system calls
171  *
172  *
173  *              Stream functions are used for vanilla TCP connection protocol.
174  */
175
176 static char sock_path[MAXPGPATH];
177
178
179 /* StreamDoUnlink()
180  * Shutdown routine for backend connection
181  * If a Unix socket is used for communication, explicitly close it.
182  */
183 #ifdef HAVE_UNIX_SOCKETS
184 static void
185 StreamDoUnlink(void)
186 {
187         Assert(sock_path[0]);
188         unlink(sock_path);
189 }
190 #endif   /* HAVE_UNIX_SOCKETS */
191
192 /*
193  * StreamServerPort -- open a sock stream "listening" port.
194  *
195  * This initializes the Postmaster's connection-accepting port *fdP.
196  *
197  * RETURNS: STATUS_OK or STATUS_ERROR
198  */
199
200 int
201 StreamServerPort(int family, char *hostName, unsigned short portNumber,
202                                  char *unixSocketName, int *fdP)
203 {
204         int                     fd,
205                                 err;
206         int                     maxconn;
207         int                     one = 1;
208         int                     ret;
209         char            portNumberStr[64];
210         char       *service;
211         struct addrinfo *addrs = NULL;
212         struct addrinfo hint;
213
214 #ifdef HAVE_UNIX_SOCKETS
215         Assert(family == AF_UNIX || isAF_INETx(family));
216 #else
217         Assert(isAF_INETx(family));
218 #endif
219
220         /* Initialize hint structure */
221         MemSet(&hint, 0, sizeof(hint));
222         hint.ai_family = family;
223         hint.ai_flags = AI_PASSIVE;
224         hint.ai_socktype = SOCK_STREAM;
225
226 #ifdef HAVE_UNIX_SOCKETS
227         if (family == AF_UNIX)
228         {
229                 if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
230                         return STATUS_ERROR;
231                 service = sock_path;
232         }
233         else
234 #endif   /* HAVE_UNIX_SOCKETS */
235         {
236                 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
237                 service = portNumberStr;
238         }
239
240         ret = getaddrinfo2(hostName, service, &hint, &addrs);
241         if (ret || addrs == NULL)
242         {
243                 elog(LOG, "server socket failure: getaddrinfo2(): %s",
244                          gai_strerror(ret));
245                 freeaddrinfo2(addrs);
246                 return STATUS_ERROR;
247         }
248
249         if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
250         {
251                 elog(LOG, "server socket failure: socket(): %s",
252                          strerror(errno));
253                 freeaddrinfo2(addrs);
254                 return STATUS_ERROR;
255         }
256
257         if (isAF_INETx(family))
258         {
259                 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
260                                                 sizeof(one))) == -1)
261                 {
262                         elog(LOG, "server socket failure: setsockopt(SO_REUSEADDR): %s",
263                                  strerror(errno));
264                         freeaddrinfo2(addrs);
265                         return STATUS_ERROR;
266                 }
267         }
268
269         Assert(addrs->ai_next == NULL && addrs->ai_family == family);
270         err = bind(fd, addrs->ai_addr, addrs->ai_addrlen);
271         if (err < 0)
272         {
273                 elog(LOG, "server socket failure: bind(): %s\n"
274                          "\tIs another postmaster already running on port %d?",
275                          strerror(errno), (int) portNumber);
276                 if (family == AF_UNIX)
277                         elog(LOG, "\tIf not, remove socket node (%s) and retry.",
278                                  sock_path);
279                 else
280                         elog(LOG, "\tIf not, wait a few seconds and retry.");
281                 freeaddrinfo2(addrs);
282                 return STATUS_ERROR;
283         }
284
285 #ifdef HAVE_UNIX_SOCKETS
286         if (family == AF_UNIX)
287         {
288                 if (Setup_AF_UNIX() != STATUS_OK)
289                 {
290                         freeaddrinfo2(addrs);
291                         return STATUS_ERROR;
292                 }
293         }
294 #endif
295
296         /*
297          * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
298          * intended to provide a clamp on the request on platforms where an
299          * overly large request provokes a kernel error (are there any?).
300          */
301         maxconn = MaxBackends * 2;
302         if (maxconn > PG_SOMAXCONN)
303                 maxconn = PG_SOMAXCONN;
304
305         err = listen(fd, maxconn);
306         if (err < 0)
307         {
308                 elog(LOG, "server socket failure: listen(): %s",
309                          strerror(errno));
310                 freeaddrinfo2(addrs);
311                 return STATUS_ERROR;
312         }
313
314         *fdP = fd;
315         freeaddrinfo2(addrs);
316         return STATUS_OK;
317
318 }
319
320
321 #ifdef HAVE_UNIX_SOCKETS
322
323 /*
324  * Lock_AF_UNIX -- configure unix socket file path
325  */
326 static int
327 Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
328 {
329         SockAddr        saddr;  /* just used to get socket path */
330
331         UNIXSOCK_PATH(saddr.un, portNumber, unixSocketName);
332         strcpy(sock_path, saddr.un.sun_path);
333
334         /*
335          * Grab an interlock file associated with the socket file.
336          */
337         if (!CreateSocketLockFile(sock_path, true))
338                 return STATUS_ERROR;
339
340         /*
341          * Once we have the interlock, we can safely delete any pre-existing
342          * socket file to avoid failure at bind() time.
343          */
344         unlink(sock_path);
345
346         return STATUS_OK;
347 }
348
349
350 /*
351  * Setup_AF_UNIX -- configure unix socket permissions
352  */
353 static int
354 Setup_AF_UNIX(void)
355 {
356         /* Arrange to unlink the socket file at exit */
357         on_proc_exit(StreamDoUnlink, 0);
358
359         /*
360          * Fix socket ownership/permission if requested.  Note we must do this
361          * before we listen() to avoid a window where unwanted connections
362          * could get accepted.
363          */
364         Assert(Unix_socket_group);
365         if (Unix_socket_group[0] != '\0')
366         {
367 #ifdef WIN32
368                 elog(FATAL, "Config value 'unix_socket_group' not supported on this platform");
369 #else
370                 char       *endptr;
371                 unsigned long int val;
372                 gid_t           gid;
373
374                 val = strtoul(Unix_socket_group, &endptr, 10);
375                 if (*endptr == '\0')
376                 {                                               /* numeric group id */
377                         gid = val;
378                 }
379                 else
380                 {                                               /* convert group name to id */
381                         struct group *gr;
382
383                         gr = getgrnam(Unix_socket_group);
384                         if (!gr)
385                         {
386                                 elog(LOG, "server socket failure: no such group '%s'",
387                                          Unix_socket_group);
388                                 return STATUS_ERROR;
389                         }
390                         gid = gr->gr_gid;
391                 }
392                 if (chown(sock_path, -1, gid) == -1)
393                 {
394                         elog(LOG, "server socket failure: could not set group of %s: %s",
395                                  sock_path, strerror(errno));
396                         return STATUS_ERROR;
397                 }
398 #endif
399         }
400
401         if (chmod(sock_path, Unix_socket_permissions) == -1)
402         {
403                 elog(LOG, "server socket failure: could not set permissions on %s: %s",
404                          sock_path, strerror(errno));
405                 return STATUS_ERROR;
406         }
407         return STATUS_OK;
408 }
409
410 #endif   /* HAVE_UNIX_SOCKETS */
411
412
413 /*
414  * StreamConnection -- create a new connection with client using
415  *              server port.
416  *
417  * ASSUME: that this doesn't need to be non-blocking because
418  *              the Postmaster uses select() to tell when the server master
419  *              socket is ready for accept().
420  *
421  * RETURNS: STATUS_OK or STATUS_ERROR
422  */
423 int
424 StreamConnection(int server_fd, Port *port)
425 {
426         ACCEPT_TYPE_ARG3 addrlen;
427
428         /* accept connection (and fill in the client (remote) address) */
429         addrlen = sizeof(port->raddr);
430         if ((port->sock = accept(server_fd,
431                                                          (struct sockaddr *) &port->raddr,
432                                                          &addrlen)) < 0)
433         {
434                 elog(LOG, "StreamConnection: accept() failed: %m");
435                 return STATUS_ERROR;
436         }
437
438 #ifdef SCO_ACCEPT_BUG
439         /*
440          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
441          * shouldn't hurt to catch it for all versions of those platforms.
442          */
443         if (port->raddr.sa.sa_family == 0)
444                 port->raddr.sa.sa_family = AF_UNIX;
445 #endif
446
447         /* fill in the server (local) address */
448         addrlen = sizeof(port->laddr);
449         if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
450                                         &addrlen) < 0)
451         {
452                 elog(LOG, "StreamConnection: getsockname() failed: %m");
453                 return STATUS_ERROR;
454         }
455
456         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
457         if (isAF_INETx(port->laddr.sa.sa_family))
458         {
459                 int                     on = 1;
460
461                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
462                                            (char *) &on, sizeof(on)) < 0)
463                 {
464                         elog(LOG, "StreamConnection: setsockopt(TCP_NODELAY) failed: %m");
465                         return STATUS_ERROR;
466                 }
467                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
468                                            (char *) &on, sizeof(on)) < 0)
469                 {
470                         elog(LOG, "StreamConnection: setsockopt(SO_KEEPALIVE) failed: %m");
471                         return STATUS_ERROR;
472                 }
473         }
474
475         return STATUS_OK;
476 }
477
478 /*
479  * StreamClose -- close a client/backend connection
480  *
481  * NOTE: this is NOT used to terminate a session; it is just used to release
482  * the file descriptor in a process that should no longer have the socket
483  * open.  (For example, the postmaster calls this after passing ownership
484  * of the connection to a child process.)  It is expected that someone else
485  * still has the socket open.  So, we only want to close the descriptor,
486  * we do NOT want to send anything to the far end.
487  */
488 void
489 StreamClose(int sock)
490 {
491         closesocket(sock);
492 }
493
494 /*
495  * TouchSocketFile -- mark socket file as recently accessed
496  *
497  * This routine should be called every so often to ensure that the socket
498  * file has a recent mod date (ordinary operations on sockets usually won't
499  * change the mod date).  That saves it from being removed by
500  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
501  * never have put the socket file in /tmp...)
502  */
503 void
504 TouchSocketFile(void)
505 {
506         /* Do nothing if we did not create a socket... */
507         if (sock_path[0] != '\0')
508         {
509                 /*
510                  * utime() is POSIX standard, utimes() is a common alternative.
511                  * If we have neither, there's no way to affect the mod or access
512                  * time of the socket :-(
513                  *
514                  * In either path, we ignore errors; there's no point in complaining.
515                  */
516 #ifdef HAVE_UTIME
517                 utime(sock_path, NULL);
518 #else /* !HAVE_UTIME */
519 #ifdef HAVE_UTIMES
520                 utimes(sock_path, NULL);
521 #endif /* HAVE_UTIMES */
522 #endif /* HAVE_UTIME */
523         }
524 }
525
526
527 /* --------------------------------
528  * Low-level I/O routines begin here.
529  *
530  * These routines communicate with a frontend client across a connection
531  * already established by the preceding routines.
532  * --------------------------------
533  */
534
535
536 /* --------------------------------
537  *              pq_recvbuf - load some bytes into the input buffer
538  *
539  *              returns 0 if OK, EOF if trouble
540  * --------------------------------
541  */
542 static int
543 pq_recvbuf(void)
544 {
545         if (PqRecvPointer > 0)
546         {
547                 if (PqRecvLength > PqRecvPointer)
548                 {
549                         /* still some unread data, left-justify it in the buffer */
550                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
551                                         PqRecvLength - PqRecvPointer);
552                         PqRecvLength -= PqRecvPointer;
553                         PqRecvPointer = 0;
554                 }
555                 else
556                         PqRecvLength = PqRecvPointer = 0;
557         }
558
559         /* Can fill buffer from PqRecvLength and upwards */
560         for (;;)
561         {
562                 int                     r;
563
564                 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
565                                                 PQ_BUFFER_SIZE - PqRecvLength);
566
567                 if (r < 0)
568                 {
569                         if (errno == EINTR)
570                                 continue;               /* Ok if interrupted */
571
572                         /*
573                          * Careful: an elog() that tries to write to the client would
574                          * cause recursion to here, leading to stack overflow and core
575                          * dump!  This message must go *only* to the postmaster log.
576                          */
577                         elog(COMMERROR, "pq_recvbuf: recv() failed: %m");
578                         return EOF;
579                 }
580                 if (r == 0)
581                 {
582                         /*
583                          * EOF detected.  We used to write a log message here, but it's
584                          * better to expect the ultimate caller to do that.
585                          */
586                         return EOF;
587                 }
588                 /* r contains number of bytes read, so just incr length */
589                 PqRecvLength += r;
590                 return 0;
591         }
592 }
593
594 /* --------------------------------
595  *              pq_getbyte      - get a single byte from connection, or return EOF
596  * --------------------------------
597  */
598 int
599 pq_getbyte(void)
600 {
601         while (PqRecvPointer >= PqRecvLength)
602         {
603                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
604                         return EOF;                     /* Failed to recv data */
605         }
606         return PqRecvBuffer[PqRecvPointer++];
607 }
608
609 /* --------------------------------
610  *              pq_peekbyte             - peek at next byte from connection
611  *
612  *       Same as pq_getbyte() except we don't advance the pointer.
613  * --------------------------------
614  */
615 int
616 pq_peekbyte(void)
617 {
618         while (PqRecvPointer >= PqRecvLength)
619         {
620                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
621                         return EOF;                     /* Failed to recv data */
622         }
623         return PqRecvBuffer[PqRecvPointer];
624 }
625
626 /* --------------------------------
627  *              pq_getbytes             - get a known number of bytes from connection
628  *
629  *              returns 0 if OK, EOF if trouble
630  * --------------------------------
631  */
632 int
633 pq_getbytes(char *s, size_t len)
634 {
635         size_t          amount;
636
637         while (len > 0)
638         {
639                 while (PqRecvPointer >= PqRecvLength)
640                 {
641                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
642                                 return EOF;             /* Failed to recv data */
643                 }
644                 amount = PqRecvLength - PqRecvPointer;
645                 if (amount > len)
646                         amount = len;
647                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
648                 PqRecvPointer += amount;
649                 s += amount;
650                 len -= amount;
651         }
652         return 0;
653 }
654
655 /* --------------------------------
656  *              pq_getstring    - get a null terminated string from connection
657  *
658  *              The return value is placed in an expansible StringInfo, which has
659  *              already been initialized by the caller.
660  *
661  *              This is used only for dealing with old-protocol clients.  The idea
662  *              is to produce a StringInfo that looks the same as we would get from
663  *              pq_getmessage() with a newer client; we will then process it with
664  *              pq_getmsgstring.  Therefore, no character set conversion is done here,
665  *              even though this is presumably useful only for text.
666  *
667  *              returns 0 if OK, EOF if trouble
668  * --------------------------------
669  */
670 int
671 pq_getstring(StringInfo s)
672 {
673         int                     i;
674
675         /* Reset string to empty */
676         s->len = 0;
677         s->data[0] = '\0';
678         s->cursor = 0;
679
680         /* Read until we get the terminating '\0' */
681         for (;;)
682         {
683                 while (PqRecvPointer >= PqRecvLength)
684                 {
685                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
686                                 return EOF;             /* Failed to recv data */
687                 }
688
689                 for (i = PqRecvPointer; i < PqRecvLength; i++)
690                 {
691                         if (PqRecvBuffer[i] == '\0')
692                         {
693                                 /* include the '\0' in the copy */
694                                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
695                                                                            i - PqRecvPointer + 1);
696                                 PqRecvPointer = i + 1;  /* advance past \0 */
697                                 return 0;
698                         }
699                 }
700
701                 /* If we're here we haven't got the \0 in the buffer yet. */
702                 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
703                                                            PqRecvLength - PqRecvPointer);
704                 PqRecvPointer = PqRecvLength;
705         }
706 }
707
708
709 /* --------------------------------
710  *              pq_getmessage   - get a message with length word from connection
711  *
712  *              The return value is placed in an expansible StringInfo, which has
713  *              already been initialized by the caller.
714  *              Only the message body is placed in the StringInfo; the length word
715  *              is removed.  Also, s->cursor is initialized to zero for convenience
716  *              in scanning the message contents.
717  *
718  *              If maxlen is not zero, it is an upper limit on the length of the
719  *              message we are willing to accept.  We abort the connection (by
720  *              returning EOF) if client tries to send more than that.
721  *
722  *              returns 0 if OK, EOF if trouble
723  * --------------------------------
724  */
725 int
726 pq_getmessage(StringInfo s, int maxlen)
727 {
728         int32           len;
729
730         /* Reset message buffer to empty */
731         s->len = 0;
732         s->data[0] = '\0';
733         s->cursor = 0;
734
735         /* Read message length word */
736         if (pq_getbytes((char *) &len, 4) == EOF)
737         {
738                 elog(COMMERROR, "unexpected EOF within message length word");
739                 return EOF;
740         }
741
742         len = ntohl(len);
743         len -= 4;                                       /* discount length itself */
744
745         if (len < 0 ||
746                 (maxlen > 0 && len > maxlen))
747         {
748                 elog(COMMERROR, "invalid message length");
749                 return EOF;
750         }
751
752         if (len > 0)
753         {
754                 /* Allocate space for message */
755                 enlargeStringInfo(s, len);
756
757                 /* And grab the message */
758                 if (pq_getbytes(s->data, len) == EOF)
759                 {
760                         elog(COMMERROR, "incomplete client message");
761                         return EOF;
762                 }
763                 s->len = len;
764                 /* Place a trailing null per StringInfo convention */
765                 s->data[len] = '\0';
766         }
767
768         return 0;
769 }
770
771
772 /* --------------------------------
773  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
774  *
775  *              returns 0 if OK, EOF if trouble
776  * --------------------------------
777  */
778 int
779 pq_putbytes(const char *s, size_t len)
780 {
781         size_t          amount;
782
783         while (len > 0)
784         {
785                 if (PqSendPointer >= PQ_BUFFER_SIZE)
786                         if (pq_flush())         /* If buffer is full, then flush it out */
787                                 return EOF;
788                 amount = PQ_BUFFER_SIZE - PqSendPointer;
789                 if (amount > len)
790                         amount = len;
791                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
792                 PqSendPointer += amount;
793                 s += amount;
794                 len -= amount;
795         }
796         return 0;
797 }
798
799 /* --------------------------------
800  *              pq_flush                - flush pending output
801  *
802  *              returns 0 if OK, EOF if trouble
803  * --------------------------------
804  */
805 int
806 pq_flush(void)
807 {
808         static int      last_reported_send_errno = 0;
809
810         unsigned char *bufptr = PqSendBuffer;
811         unsigned char *bufend = PqSendBuffer + PqSendPointer;
812
813         while (bufptr < bufend)
814         {
815                 int                     r;
816
817                 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
818
819                 if (r <= 0)
820                 {
821                         if (errno == EINTR)
822                                 continue;               /* Ok if we were interrupted */
823
824                         /*
825                          * Careful: an elog() that tries to write to the client would
826                          * cause recursion to here, leading to stack overflow and core
827                          * dump!  This message must go *only* to the postmaster log.
828                          *
829                          * If a client disconnects while we're in the midst of output, we
830                          * might write quite a bit of data before we get to a safe
831                          * query abort point.  So, suppress duplicate log messages.
832                          */
833                         if (errno != last_reported_send_errno)
834                         {
835                                 last_reported_send_errno = errno;
836                                 elog(COMMERROR, "pq_flush: send() failed: %m");
837                         }
838
839                         /*
840                          * We drop the buffered data anyway so that processing can
841                          * continue, even though we'll probably quit soon.
842                          */
843                         PqSendPointer = 0;
844                         return EOF;
845                 }
846
847                 last_reported_send_errno = 0;   /* reset after any successful send */
848                 bufptr += r;
849         }
850
851         PqSendPointer = 0;
852         return 0;
853 }
854
855
856 /* --------------------------------
857  * Message-level I/O routines begin here.
858  *
859  * These routines understand about the old-style COPY OUT protocol.
860  * --------------------------------
861  */
862
863
864 /* --------------------------------
865  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
866  *
867  *              If msgtype is not '\0', it is a message type code to place before
868  *              the message body.  If msgtype is '\0', then the message has no type
869  *              code (this is only valid in pre-3.0 protocols).
870  *
871  *              len is the length of the message body data at *s.  In protocol 3.0
872  *              and later, a message length word (equal to len+4 because it counts
873  *              itself too) is inserted by this routine.
874  *
875  *              All normal messages are suppressed while old-style COPY OUT is in
876  *              progress.  (In practice only a few notice messages might get emitted
877  *              then; dropping them is annoying, but at least they will still appear
878  *              in the postmaster log.)
879  *
880  *              returns 0 if OK, EOF if trouble
881  * --------------------------------
882  */
883 int
884 pq_putmessage(char msgtype, const char *s, size_t len)
885 {
886         if (DoingCopyOut)
887                 return 0;
888         if (msgtype)
889                 if (pq_putbytes(&msgtype, 1))
890                         return EOF;
891         if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
892         {
893                 uint32          n32;
894
895                 n32 = htonl((uint32) (len + 4));
896                 if (pq_putbytes((char *) &n32, 4))
897                         return EOF;
898         }
899         return pq_putbytes(s, len);
900 }
901
902 /* --------------------------------
903  *              pq_startcopyout - inform libpq that an old-style COPY OUT transfer
904  *                      is beginning
905  * --------------------------------
906  */
907 void
908 pq_startcopyout(void)
909 {
910         DoingCopyOut = true;
911 }
912
913 /* --------------------------------
914  *              pq_endcopyout   - end an old-style COPY OUT transfer
915  *
916  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
917  *              and must send a terminator line.  Since a partial data line might have
918  *              been emitted, send a couple of newlines first (the first one could
919  *              get absorbed by a backslash...)  Note that old-style COPY OUT does
920  *              not allow binary transfers, so a textual terminator is always correct.
921  * --------------------------------
922  */
923 void
924 pq_endcopyout(bool errorAbort)
925 {
926         if (!DoingCopyOut)
927                 return;
928         DoingCopyOut = false;
929         if (errorAbort)
930                 pq_putbytes("\n\n\\.\n", 5);
931         /* in non-error case, copy.c will have emitted the terminator line */
932 }