]> granicus.if.org Git - postgresql/blob - src/backend/libpq/pqcomm.c
Add code to allow profiling of backends on Linux: save and restore the
[postgresql] / src / backend / libpq / pqcomm.c
1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *        Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the COPY OUT protocol.
10  * Unfortunately, COPY OUT is designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an elog(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.
16  *
17  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
18  * pq_putbytes(), especially if the message would require multiple calls
19  * to send.  Instead, use the routines in pqformat.c to construct the message
20  * in a buffer and then emit it in one call to pq_putmessage.  This helps
21  * ensure that the channel will not be clogged by an incomplete message
22  * if execution is aborted by elog(ERROR) partway through the message.
23  * The only non-libpq code that should call pq_putbytes directly is COPY OUT.
24  *
25  * At one time, libpq was shared between frontend and backend, but now
26  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
27  * All that remains is similarities of names to trap the unwary...
28  *
29  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
30  * Portions Copyright (c) 1994, Regents of the University of California
31  *
32  *      $Id: pqcomm.c,v 1.126 2001/12/04 20:57:22 tgl Exp $
33  *
34  *-------------------------------------------------------------------------
35  */
36
37 /*------------------------
38  * INTERFACE ROUTINES
39  *
40  * setup/teardown:
41  *              StreamServerPort        - Open postmaster's server port
42  *              StreamConnection        - Create new connection with client
43  *              StreamClose                     - Close a client/backend connection
44  *              pq_init                 - initialize libpq at backend startup
45  *              pq_close                - shutdown libpq at backend exit
46  *
47  * low-level I/O:
48  *              pq_getbytes             - get a known number of bytes from connection
49  *              pq_getstring    - get a null terminated string from connection
50  *              pq_getbyte              - get next byte from connection
51  *              pq_peekbyte             - peek at next byte from connection
52  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
53  *              pq_flush                - flush pending output
54  *
55  * message-level I/O (and COPY OUT cruft):
56  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
57  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
58  *              pq_endcopyout   - end a COPY OUT transfer
59  *
60  *------------------------
61  */
62 #include "postgres.h"
63
64 #include <signal.h>
65 #include <errno.h>
66 #include <fcntl.h>
67 #include <grp.h>
68 #include <unistd.h>
69 #include <sys/types.h>
70 #include <sys/stat.h>
71 #include <sys/socket.h>
72 #include <netdb.h>
73 #include <netinet/in.h>
74 #ifdef HAVE_NETINET_TCP_H
75 #include <netinet/tcp.h>
76 #endif
77 #include <arpa/inet.h>
78 #include <sys/file.h>
79
80 #include "libpq/libpq.h"
81 #include "miscadmin.h"
82
83
84 static void pq_close(void);
85
86
87 /*
88  * Configuration options
89  */
90 int                     Unix_socket_permissions;
91 char       *Unix_socket_group;
92
93
94 /*
95  * Buffers for low-level I/O
96  */
97
98 #define PQ_BUFFER_SIZE 8192
99
100 static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
101 static int      PqSendPointer;          /* Next index to store a byte in
102                                                                  * PqSendBuffer */
103
104 static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
105 static int      PqRecvPointer;          /* Next index to read a byte from
106                                                                  * PqRecvBuffer */
107 static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
108
109 /*
110  * Message status
111  */
112 static bool DoingCopyOut;
113
114
115 /* --------------------------------
116  *              pq_init - initialize libpq at backend startup
117  * --------------------------------
118  */
119 void
120 pq_init(void)
121 {
122         PqSendPointer = PqRecvPointer = PqRecvLength = 0;
123         DoingCopyOut = false;
124         on_proc_exit(pq_close, 0);
125 }
126
127
128 /* --------------------------------
129  *              pq_close - shutdown libpq at backend exit
130  *
131  * Note: in a standalone backend MyProcPort will be null,
132  * don't crash during exit...
133  * --------------------------------
134  */
135 static void
136 pq_close(void)
137 {
138         if (MyProcPort != NULL)
139         {
140                 close(MyProcPort->sock);
141                 /* make sure any subsequent attempts to do I/O fail cleanly */
142                 MyProcPort->sock = -1;
143         }
144 }
145
146
147
148 /*
149  * Streams -- wrapper around Unix socket system calls
150  *
151  *
152  *              Stream functions are used for vanilla TCP connection protocol.
153  */
154
155 static char sock_path[MAXPGPATH];
156
157
158 /* StreamDoUnlink()
159  * Shutdown routine for backend connection
160  * If a Unix socket is used for communication, explicitly close it.
161  */
162 static void
163 StreamDoUnlink(void)
164 {
165         Assert(sock_path[0]);
166         unlink(sock_path);
167 }
168
169 /*
170  * StreamServerPort -- open a sock stream "listening" port.
171  *
172  * This initializes the Postmaster's connection-accepting port *fdP.
173  *
174  * RETURNS: STATUS_OK or STATUS_ERROR
175  */
176
177 int
178 StreamServerPort(int family, char *hostName, unsigned short portNumber,
179                                  char *unixSocketName, int *fdP)
180 {
181         SockAddr        saddr;
182         int                     fd,
183                                 err;
184         int                     maxconn;
185         size_t          len = 0;
186         int                     one = 1;
187
188         Assert(family == AF_INET || family == AF_UNIX);
189
190         if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
191         {
192                 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
193                                  "FATAL: StreamServerPort: socket() failed: %s\n",
194                                  strerror(errno));
195                 fputs(PQerrormsg, stderr);
196                 pqdebug("%s", PQerrormsg);
197                 return STATUS_ERROR;
198         }
199
200         if (family == AF_INET)
201         {
202                 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
203                                                 sizeof(one))) == -1)
204                 {
205                         snprintf(PQerrormsg, PQERRORMSG_LENGTH,
206                                          "FATAL: StreamServerPort: setsockopt(SO_REUSEADDR) failed: %s\n",
207                                          strerror(errno));
208                         fputs(PQerrormsg, stderr);
209                         pqdebug("%s", PQerrormsg);
210                         return STATUS_ERROR;
211                 }
212         }
213
214         MemSet((char *) &saddr, 0, sizeof(saddr));
215         saddr.sa.sa_family = family;
216
217 #ifdef HAVE_UNIX_SOCKETS
218         if (family == AF_UNIX)
219         {
220                 UNIXSOCK_PATH(saddr.un, portNumber, unixSocketName);
221                 len = UNIXSOCK_LEN(saddr.un);
222                 strcpy(sock_path, saddr.un.sun_path);
223
224                 /*
225                  * Grab an interlock file associated with the socket file.
226                  */
227                 if (!CreateSocketLockFile(sock_path, true))
228                         return STATUS_ERROR;
229
230                 /*
231                  * Once we have the interlock, we can safely delete any
232                  * pre-existing socket file to avoid failure at bind() time.
233                  */
234                 unlink(sock_path);
235         }
236 #endif   /* HAVE_UNIX_SOCKETS */
237
238         if (family == AF_INET)
239         {
240                 /* TCP/IP socket */
241                 if (hostName[0] == '\0')
242                         saddr.in.sin_addr.s_addr = htonl(INADDR_ANY);
243                 else
244                 {
245                         struct hostent *hp;
246
247                         hp = gethostbyname(hostName);
248                         if ((hp == NULL) || (hp->h_addrtype != AF_INET))
249                         {
250                                 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
251                                    "FATAL: StreamServerPort: gethostbyname(%s) failed\n",
252                                                  hostName);
253                                 fputs(PQerrormsg, stderr);
254                                 pqdebug("%s", PQerrormsg);
255                                 return STATUS_ERROR;
256                         }
257                         memmove((char *) &(saddr.in.sin_addr), (char *) hp->h_addr,
258                                         hp->h_length);
259                 }
260
261                 saddr.in.sin_port = htons(portNumber);
262                 len = sizeof(struct sockaddr_in);
263         }
264
265         err = bind(fd, (struct sockaddr *) & saddr.sa, len);
266         if (err < 0)
267         {
268                 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
269                                  "FATAL: StreamServerPort: bind() failed: %s\n"
270                                  "\tIs another postmaster already running on port %d?\n",
271                                  strerror(errno), (int) portNumber);
272                 if (family == AF_UNIX)
273                         snprintf(PQerrormsg + strlen(PQerrormsg),
274                                          PQERRORMSG_LENGTH - strlen(PQerrormsg),
275                                          "\tIf not, remove socket node (%s) and retry.\n",
276                                          sock_path);
277                 else
278                         snprintf(PQerrormsg + strlen(PQerrormsg),
279                                          PQERRORMSG_LENGTH - strlen(PQerrormsg),
280                                          "\tIf not, wait a few seconds and retry.\n");
281                 fputs(PQerrormsg, stderr);
282                 pqdebug("%s", PQerrormsg);
283                 return STATUS_ERROR;
284         }
285
286 #ifdef HAVE_UNIX_SOCKETS
287         if (family == AF_UNIX)
288         {
289                 /* Arrange to unlink the socket file at exit */
290                 on_proc_exit(StreamDoUnlink, 0);
291
292                 /*
293                  * Fix socket ownership/permission if requested.  Note we must do
294                  * this before we listen() to avoid a window where unwanted
295                  * connections could get accepted.
296                  */
297                 Assert(Unix_socket_group);
298                 if (Unix_socket_group[0] != '\0')
299                 {
300                         char       *endptr;
301                         unsigned long int val;
302                         gid_t           gid;
303
304                         val = strtoul(Unix_socket_group, &endptr, 10);
305                         if (*endptr == '\0')
306                         {
307                                 /* numeric group id */
308                                 gid = val;
309                         }
310                         else
311                         {
312                                 /* convert group name to id */
313                                 struct group *gr;
314
315                                 gr = getgrnam(Unix_socket_group);
316                                 if (!gr)
317                                 {
318                                         snprintf(PQerrormsg, PQERRORMSG_LENGTH,
319                                                          "FATAL:  no such group '%s'\n",
320                                                          Unix_socket_group);
321                                         fputs(PQerrormsg, stderr);
322                                         pqdebug("%s", PQerrormsg);
323                                         return STATUS_ERROR;
324                                 }
325                                 gid = gr->gr_gid;
326                         }
327                         if (chown(sock_path, -1, gid) == -1)
328                         {
329                                 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
330                                                  "FATAL:  could not set group of %s: %s\n",
331                                                  sock_path, strerror(errno));
332                                 fputs(PQerrormsg, stderr);
333                                 pqdebug("%s", PQerrormsg);
334                                 return STATUS_ERROR;
335                         }
336                 }
337
338                 if (chmod(sock_path, Unix_socket_permissions) == -1)
339                 {
340                         snprintf(PQerrormsg, PQERRORMSG_LENGTH,
341                                          "FATAL:  could not set permissions on %s: %s\n",
342                                          sock_path, strerror(errno));
343                         fputs(PQerrormsg, stderr);
344                         pqdebug("%s", PQerrormsg);
345                         return STATUS_ERROR;
346                 }
347         }
348 #endif   /* HAVE_UNIX_SOCKETS */
349
350         /*
351          * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
352          * intended to provide a clamp on the request on platforms where an
353          * overly large request provokes a kernel error (are there any?).
354          */
355         maxconn = MaxBackends * 2;
356         if (maxconn > PG_SOMAXCONN)
357                 maxconn = PG_SOMAXCONN;
358
359         err = listen(fd, maxconn);
360         if (err < 0)
361         {
362                 snprintf(PQerrormsg, PQERRORMSG_LENGTH,
363                                  "FATAL: StreamServerPort: listen() failed: %s\n",
364                                  strerror(errno));
365                 fputs(PQerrormsg, stderr);
366                 pqdebug("%s", PQerrormsg);
367                 return STATUS_ERROR;
368         }
369
370         *fdP = fd;
371
372         return STATUS_OK;
373 }
374
375 /*
376  * StreamConnection -- create a new connection with client using
377  *              server port.
378  *
379  * ASSUME: that this doesn't need to be non-blocking because
380  *              the Postmaster uses select() to tell when the server master
381  *              socket is ready for accept().
382  *
383  * NB: this can NOT call elog() because it is invoked in the postmaster,
384  * not in standard backend context.  If we get an error, the best we can do
385  * is log it to stderr.
386  *
387  * RETURNS: STATUS_OK or STATUS_ERROR
388  */
389 int
390 StreamConnection(int server_fd, Port *port)
391 {
392         ACCEPT_TYPE_ARG3 addrlen;
393
394         /* accept connection (and fill in the client (remote) address) */
395         addrlen = sizeof(port->raddr);
396         if ((port->sock = accept(server_fd,
397                                                          (struct sockaddr *) & port->raddr,
398                                                          &addrlen)) < 0)
399         {
400                 perror("postmaster: StreamConnection: accept");
401                 return STATUS_ERROR;
402         }
403
404 #ifdef SCO_ACCEPT_BUG
405
406         /*
407          * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
408          * shouldn't hurt to catch it for all versions of those platforms.
409          */
410         if (port->raddr.sa.sa_family == 0)
411                 port->raddr.sa.sa_family = AF_UNIX;
412 #endif
413
414         /* fill in the server (local) address */
415         addrlen = sizeof(port->laddr);
416         if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
417                                         &addrlen) < 0)
418         {
419                 perror("postmaster: StreamConnection: getsockname");
420                 return STATUS_ERROR;
421         }
422
423         /* select NODELAY and KEEPALIVE options if it's a TCP connection */
424         if (port->laddr.sa.sa_family == AF_INET)
425         {
426                 int                     on = 1;
427
428                 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
429                                            (char *) &on, sizeof(on)) < 0)
430                 {
431                         perror("postmaster: StreamConnection: setsockopt(TCP_NODELAY)");
432                         return STATUS_ERROR;
433                 }
434                 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
435                                            (char *) &on, sizeof(on)) < 0)
436                 {
437                         perror("postmaster: StreamConnection: setsockopt(SO_KEEPALIVE)");
438                         return STATUS_ERROR;
439                 }
440         }
441
442         return STATUS_OK;
443 }
444
445 /*
446  * StreamClose -- close a client/backend connection
447  */
448 void
449 StreamClose(int sock)
450 {
451         close(sock);
452 }
453
454
455 /* --------------------------------
456  * Low-level I/O routines begin here.
457  *
458  * These routines communicate with a frontend client across a connection
459  * already established by the preceding routines.
460  * --------------------------------
461  */
462
463
464 /* --------------------------------
465  *              pq_recvbuf - load some bytes into the input buffer
466  *
467  *              returns 0 if OK, EOF if trouble
468  * --------------------------------
469  */
470 static int
471 pq_recvbuf(void)
472 {
473         if (PqRecvPointer > 0)
474         {
475                 if (PqRecvLength > PqRecvPointer)
476                 {
477                         /* still some unread data, left-justify it in the buffer */
478                         memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
479                                         PqRecvLength - PqRecvPointer);
480                         PqRecvLength -= PqRecvPointer;
481                         PqRecvPointer = 0;
482                 }
483                 else
484                         PqRecvLength = PqRecvPointer = 0;
485         }
486
487         /* Can fill buffer from PqRecvLength and upwards */
488         for (;;)
489         {
490                 int                     r;
491
492 #ifdef USE_SSL
493                 if (MyProcPort->ssl)
494                         r = SSL_read(MyProcPort->ssl, PqRecvBuffer + PqRecvLength,
495                                                  PQ_BUFFER_SIZE - PqRecvLength);
496                 else
497 #endif
498                         r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
499                                          PQ_BUFFER_SIZE - PqRecvLength, 0);
500
501                 if (r < 0)
502                 {
503                         if (errno == EINTR)
504                                 continue;               /* Ok if interrupted */
505
506                         /*
507                          * Careful: an elog() that tries to write to the client
508                          * would cause recursion to here, leading to stack overflow
509                          * and core dump!  This message must go *only* to the postmaster
510                          * log.  elog(DEBUG) is presently safe.
511                          */
512                         elog(DEBUG, "pq_recvbuf: recv() failed: %m");
513                         return EOF;
514                 }
515                 if (r == 0)
516                 {
517                         /* as above, only write to postmaster log */
518                         elog(DEBUG, "pq_recvbuf: unexpected EOF on client connection");
519                         return EOF;
520                 }
521                 /* r contains number of bytes read, so just incr length */
522                 PqRecvLength += r;
523                 return 0;
524         }
525 }
526
527 /* --------------------------------
528  *              pq_getbyte      - get a single byte from connection, or return EOF
529  * --------------------------------
530  */
531 int
532 pq_getbyte(void)
533 {
534         while (PqRecvPointer >= PqRecvLength)
535         {
536                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
537                         return EOF;                     /* Failed to recv data */
538         }
539         return PqRecvBuffer[PqRecvPointer++];
540 }
541
542 /* --------------------------------
543  *              pq_peekbyte             - peek at next byte from connection
544  *
545  *       Same as pq_getbyte() except we don't advance the pointer.
546  * --------------------------------
547  */
548 int
549 pq_peekbyte(void)
550 {
551         while (PqRecvPointer >= PqRecvLength)
552         {
553                 if (pq_recvbuf())               /* If nothing in buffer, then recv some */
554                         return EOF;                     /* Failed to recv data */
555         }
556         return PqRecvBuffer[PqRecvPointer];
557 }
558
559 /* --------------------------------
560  *              pq_getbytes             - get a known number of bytes from connection
561  *
562  *              returns 0 if OK, EOF if trouble
563  * --------------------------------
564  */
565 int
566 pq_getbytes(char *s, size_t len)
567 {
568         size_t          amount;
569
570         while (len > 0)
571         {
572                 while (PqRecvPointer >= PqRecvLength)
573                 {
574                         if (pq_recvbuf())       /* If nothing in buffer, then recv some */
575                                 return EOF;             /* Failed to recv data */
576                 }
577                 amount = PqRecvLength - PqRecvPointer;
578                 if (amount > len)
579                         amount = len;
580                 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
581                 PqRecvPointer += amount;
582                 s += amount;
583                 len -= amount;
584         }
585         return 0;
586 }
587
588 /* --------------------------------
589  *              pq_getstring    - get a null terminated string from connection
590  *
591  *              The return value is placed in an expansible StringInfo.
592  *              Note that space allocation comes from the current memory context!
593  *
594  *              NOTE: this routine does not do any MULTIBYTE conversion,
595  *              even though it is presumably useful only for text, because
596  *              no code in this module should depend on MULTIBYTE mode.
597  *              See pq_getstr in pqformat.c for that.
598  *
599  *              returns 0 if OK, EOF if trouble
600  * --------------------------------
601  */
602 int
603 pq_getstring(StringInfo s)
604 {
605         int                     c;
606
607         /* Reset string to empty */
608         s->len = 0;
609         s->data[0] = '\0';
610
611         /* Read until we get the terminating '\0' */
612         while ((c = pq_getbyte()) != EOF && c != '\0')
613                 appendStringInfoCharMacro(s, c);
614
615         if (c == EOF)
616                 return EOF;
617
618         return 0;
619 }
620
621
622 /* --------------------------------
623  *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
624  *
625  *              returns 0 if OK, EOF if trouble
626  * --------------------------------
627  */
628 int
629 pq_putbytes(const char *s, size_t len)
630 {
631         size_t          amount;
632
633         while (len > 0)
634         {
635                 if (PqSendPointer >= PQ_BUFFER_SIZE)
636                         if (pq_flush())         /* If buffer is full, then flush it out */
637                                 return EOF;
638                 amount = PQ_BUFFER_SIZE - PqSendPointer;
639                 if (amount > len)
640                         amount = len;
641                 memcpy(PqSendBuffer + PqSendPointer, s, amount);
642                 PqSendPointer += amount;
643                 s += amount;
644                 len -= amount;
645         }
646         return 0;
647 }
648
649 /* --------------------------------
650  *              pq_flush                - flush pending output
651  *
652  *              returns 0 if OK, EOF if trouble
653  * --------------------------------
654  */
655 int
656 pq_flush(void)
657 {
658         static int last_reported_send_errno = 0;
659
660         unsigned char *bufptr = PqSendBuffer;
661         unsigned char *bufend = PqSendBuffer + PqSendPointer;
662
663         while (bufptr < bufend)
664         {
665                 int                     r;
666
667 #ifdef USE_SSL
668                 if (MyProcPort->ssl)
669                         r = SSL_write(MyProcPort->ssl, bufptr, bufend - bufptr);
670                 else
671 #endif
672                         r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
673
674                 if (r <= 0)
675                 {
676                         if (errno == EINTR)
677                                 continue;               /* Ok if we were interrupted */
678
679                         /*
680                          * Careful: an elog() that tries to write to the client
681                          * would cause recursion to here, leading to stack overflow
682                          * and core dump!  This message must go *only* to the postmaster
683                          * log.  elog(DEBUG) is presently safe.
684                          *
685                          * If a client disconnects while we're in the midst of output,
686                          * we might write quite a bit of data before we get to a safe
687                          * query abort point.  So, suppress duplicate log messages.
688                          */
689                         if (errno != last_reported_send_errno)
690                         {
691                                 last_reported_send_errno = errno;
692                                 elog(DEBUG, "pq_flush: send() failed: %m");
693                         }
694
695                         /*
696                          * We drop the buffered data anyway so that processing can
697                          * continue, even though we'll probably quit soon.
698                          */
699                         PqSendPointer = 0;
700                         return EOF;
701                 }
702
703                 last_reported_send_errno = 0; /* reset after any successful send */
704                 bufptr += r;
705         }
706
707         PqSendPointer = 0;
708         return 0;
709 }
710
711
712 /*
713  * Return EOF if the connection has been broken, else 0.
714  */
715 int
716 pq_eof(void)
717 {
718         char            x;
719         int                     res;
720
721         res = recv(MyProcPort->sock, &x, 1, MSG_PEEK);
722
723         if (res < 0)
724         {
725                 /* can log to postmaster log only */
726                 elog(DEBUG, "pq_eof: recv() failed: %m");
727                 return EOF;
728         }
729         if (res == 0)
730                 return EOF;
731         else
732                 return 0;
733 }
734
735
736 /* --------------------------------
737  * Message-level I/O routines begin here.
738  *
739  * These routines understand about COPY OUT protocol.
740  * --------------------------------
741  */
742
743
744 /* --------------------------------
745  *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
746  *
747  *              If msgtype is not '\0', it is a message type code to place before
748  *              the message body (len counts only the body size!).
749  *              If msgtype is '\0', then the buffer already includes the type code.
750  *
751  *              All normal messages are suppressed while COPY OUT is in progress.
752  *              (In practice only NOTICE messages might get emitted then; dropping
753  *              them is annoying, but at least they will still appear in the
754  *              postmaster log.)
755  *
756  *              returns 0 if OK, EOF if trouble
757  * --------------------------------
758  */
759 int
760 pq_putmessage(char msgtype, const char *s, size_t len)
761 {
762         if (DoingCopyOut)
763                 return 0;
764         if (msgtype)
765                 if (pq_putbytes(&msgtype, 1))
766                         return EOF;
767         return pq_putbytes(s, len);
768 }
769
770 /* --------------------------------
771  *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
772  * --------------------------------
773  */
774 void
775 pq_startcopyout(void)
776 {
777         DoingCopyOut = true;
778 }
779
780 /* --------------------------------
781  *              pq_endcopyout   - end a COPY OUT transfer
782  *
783  *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
784  *              and must send a terminator line.  Since a partial data line might have
785  *              been emitted, send a couple of newlines first (the first one could
786  *              get absorbed by a backslash...)
787  * --------------------------------
788  */
789 void
790 pq_endcopyout(bool errorAbort)
791 {
792         if (!DoingCopyOut)
793                 return;
794         if (errorAbort)
795                 pq_putbytes("\n\n\\.\n", 5);
796         /* in non-error case, copy.c will have emitted the terminator line */
797         DoingCopyOut = false;
798 }