]> granicus.if.org Git - postgresql/commitdiff
Add explicit buffering in backend libpq, to compensate for
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 23 Jan 1999 22:27:29 +0000 (22:27 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 23 Jan 1999 22:27:29 +0000 (22:27 +0000)
buffering lost by not going through stdio anymore for client I/O.

src/backend/commands/copy.c
src/backend/libpq/pqcomm.c
src/backend/libpq/pqcomprim.c
src/backend/utils/error/elog.c
src/include/libpq/libpq.h

index 5a8b3fc96504c5de086f30a8ae8bd435cdf86aca..3126e82dfc37f1cbb0897855014e772a08674118 100644 (file)
@@ -6,7 +6,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.67 1999/01/17 06:18:15 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/copy.c,v 1.68 1999/01/23 22:27:26 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -303,9 +303,7 @@ DoCopy(char *relname, bool binary, bool oids, bool from, bool pipe,
                }
                else if (!from && !binary)
                {
-                       CopySendData("\\.\n",3,fp);
-                       if (IsUnderPostmaster)
-                               pq_flush();
+                       CopySendData("\\.\n",3,fp);
                }
        }
 }
index 8f9c14fee9fcca8d56f13912182289dd2e3ac842..386643fe95c3fdfb52b87cd64a2c893e81d19df7 100644 (file)
@@ -5,39 +5,40 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- *  $Id: pqcomm.c,v 1.63 1999/01/17 06:18:26 momjian Exp $
+ *  $Id: pqcomm.c,v 1.64 1999/01/23 22:27:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
- *              pq_init                 - initialize libpq
+ *             pq_init                 - initialize libpq
  *             pq_getport              - return the PGPORT setting
  *             pq_close                - close input / output connections
  *             pq_flush                - flush pending output
+ *             pq_recvbuf              - load some bytes into the input buffer
  *             pq_getstr               - get a null terminated string from connection
- *              pq_getchar              - get 1 character from connection
- *              pq_peekchar             - peek at first character in connection
+ *             pq_getchar              - get 1 character from connection
+ *             pq_peekchar             - peek at next character from connection
  *             pq_getnchar             - get n characters from connection, and null-terminate
  *             pq_getint               - get an integer from connection
- *              pq_putchar              - send 1 character to connection
+ *             pq_putchar              - send 1 character to connection
  *             pq_putstr               - send a null terminated string to connection
  *             pq_putnchar             - send n characters to connection
  *             pq_putint               - send an integer to connection
- *             pq_putncharlen          - send n characters to connection
+ *             pq_putncharlen  - send n characters to connection
  *                                       (also send an int header indicating
  *                                        the length)
  *             pq_getinaddr    - initialize address from host and port number
  *             pq_getinserv    - initialize address from host and service name
  *
- *              StreamDoUnlink          - Shutdown UNIX socket connectioin
- *              StreamServerPort        - Open sock stream
- *              StreamConnection        - Create new connection with client
- *              StreamClose             - Close a client/backend connection
+ *             StreamDoUnlink          - Shutdown UNIX socket connection
+ *             StreamServerPort        - Open socket stream
+ *             StreamConnection        - Create new connection with client
+ *             StreamClose                     - Close a client/backend connection
  * 
  * NOTES
- *              Frontend is now completey in interfaces/libpq, and no 
- *              functions from this file is used.
+ *              Frontend is now completely in interfaces/libpq, and no 
+ *              functions from this file are used there.
  *
  */
 #include "postgres.h"
 
 extern FILE * debug_port; /* in util.c */
 
+/*
+ * Buffers 
+ */
+char PqSendBuffer[PQ_BUFFER_SIZE];
+char PqRecvBuffer[PQ_BUFFER_SIZE];
+int PqSendPointer,PqRecvPointer,PqRecvLength;
+
+
 /* --------------------------------
  *             pq_init - open portal file descriptors
  * --------------------------------
@@ -86,6 +95,7 @@ extern FILE * debug_port; /* in util.c */
 void
 pq_init(int fd)
 {
+       PqSendPointer = PqRecvPointer = PqRecvLength = 0;
        PQnotifies_init();
        if (getenv("LIBPQ_DEBUG"))
          debug_port = stderr;
@@ -94,40 +104,40 @@ pq_init(int fd)
 /* -------------------------
  *      pq_getchar()
  *
- *      get a character from the input file,
- *
+ *      get a character from the input file, or EOF if trouble
+ * --------------------------------
  */
 
 int
 pq_getchar(void)
 {
-       char c;
-
-       while (recv(MyProcPort->sock, &c, 1, 0) != 1) {
-           if (errno != EINTR)
-                       return EOF; /* Not interrupted, so something went wrong */
+       while (PqRecvPointer >= PqRecvLength)
+       {
+               if (pq_recvbuf())               /* If nothing in buffer, then recv some */
+                       return EOF;                     /* Failed to recv data */
        }
-         
-       return c;
+       return PqRecvBuffer[PqRecvPointer++];
 }
 
-/*
+/* -------------------------
+ *      pq_peekchar()
+ *
+ *      get a character from the connection, but leave it in the buffer
+ *      to be read again
  * --------------------------------
- *              pq_peekchar - get 1 character from connection, but leave it in the stream
  */
-int
-pq_peekchar(void) {
-       char c;
 
-       while (recv(MyProcPort->sock, &c, 1, MSG_PEEK) != 1) {
-           if (errno != EINTR)
-                       return EOF; /* Not interrupted, so something went wrong */
+int
+pq_peekchar(void)
+{
+       while (PqRecvPointer >= PqRecvLength)
+       {
+               if (pq_recvbuf())               /* If nothing in buffer, then recv some */
+                       return EOF;                     /* Failed to recv data */
        }
-
-       return c;
+       /* Note we don't bump the pointer... */
+       return PqRecvBuffer[PqRecvPointer];
 }
-  
-
 
 /* --------------------------------
  *             pq_getport - return the PGPORT setting
@@ -150,18 +160,91 @@ pq_getport()
 void
 pq_close()
 {
-        close(MyProcPort->sock);
+       close(MyProcPort->sock);
        PQnotifies_init();
 }
 
 /* --------------------------------
  *             pq_flush - flush pending output
+ *
+ *             returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-void
+int
 pq_flush()
 {
-  /* Not supported/required? */
+       char *bufptr = PqSendBuffer;
+       char *bufend = PqSendBuffer + PqSendPointer;
+
+       while (bufptr < bufend)
+       {
+               int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
+               if (r <= 0)
+               {
+                       if (errno == EINTR)
+                               continue;               /* Ok if we were interrupted */
+                       /* We would like to use elog() here, but cannot because elog
+                        * tries to write to the client, which would cause a recursive
+                        * flush attempt!  So just write it out to the postmaster log.
+                        */
+                       fprintf(stderr, "pq_flush: send() failed, errno %d\n", errno);
+                       /* We drop the buffered data anyway so that processing
+                        * can continue, even though we'll probably quit soon.
+                        */
+                       PqSendPointer = 0;
+                       return EOF;
+               }
+               bufptr += r;
+       }
+       PqSendPointer = 0;
+       return 0;
+}
+
+/* --------------------------------
+ *             pq_recvbuf - load some bytes into the input buffer
+ *
+ *             returns 0 if OK, EOF if trouble
+ * --------------------------------
+ */
+
+int
+pq_recvbuf()
+{
+       if (PqRecvPointer > 0)
+       {
+               if (PqRecvLength > PqRecvPointer)
+               {
+                       /* still some unread data, left-justify it in the buffer */
+                       memmove(PqRecvBuffer, PqRecvBuffer+PqRecvPointer,
+                                       PqRecvLength-PqRecvPointer);
+                       PqRecvLength -= PqRecvPointer;
+                       PqRecvPointer = 0;
+               }
+               else
+                       PqRecvLength = PqRecvPointer = 0;
+       }
+
+       /* Can fill buffer from PqRecvLength and upwards */
+       for (;;)
+       {
+               int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
+                                        PQ_BUFFER_SIZE - PqRecvLength, 0);
+               if (r <= 0)
+               {
+                       if (errno == EINTR)
+                               continue;               /* Ok if interrupted */
+                       /* We would like to use elog() here, but dare not because elog
+                        * tries to write to the client, which will cause problems
+                        * if we have a hard communications failure ...
+                        * So just write the message to the postmaster log.
+                        */
+                       fprintf(stderr, "pq_recvbuf: recv() failed, errno %d\n", errno);
+                       return EOF;
+               }
+               /* r contains number of bytes read, so just incr length */
+               PqRecvLength += r;
+               return 0;
+       }
 }
 
 /* --------------------------------
@@ -194,7 +277,7 @@ pq_getstr(char *s, int maxlen)
 int
 pq_getnchar(char *s, int off, int maxlen)
 {
-        int r = pqGetNBytes(s + off, maxlen);
+       int r = pqGetNBytes(s + off, maxlen);
        s[off+maxlen] = '\0';
        return r;
 }
@@ -602,7 +685,7 @@ StreamConnection(int server_fd, Port *port)
                if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY,
                                           &on, sizeof(on)) < 0)
                {
-                       elog(ERROR, "postmaster: setsockopt failed");
+                       elog(ERROR, "postmaster: setsockopt failed: %m");
                        return STATUS_ERROR;
                }
        }
@@ -644,18 +727,9 @@ pq_putncharlen(char *s, int n)
  */
 int pq_putchar(char c) 
 {
-  char isDone = 0;
-
-  do {
-    if (send(MyProcPort->sock, &c, 1, 0) != 1) {
-      if (errno != EINTR) 
-       return EOF; /* Anything other than interrupt is error! */
-    }
-    else
-      isDone = 1; /* Done if we sent one char */
-  } while (!isDone);
-  return c;
+       if (PqSendPointer >= PQ_BUFFER_SIZE)
+               if (pq_flush())                 /* If buffer is full, then flush it out */
+                       return EOF;
+       PqSendBuffer[PqSendPointer++] = c; /* Put in buffer */
+       return c;
 }
-
-
-
index 23ecfd4e19fe7a13ef0fbb0552ae6091348c096d..e48a1c16888600ee82c27fbb3143abf3cb5bb56b 100644 (file)
@@ -98,7 +98,7 @@ pqPutLong(int integer)
        n = ((PG_PROTOCOL_MAJOR(FrontendProtocol) == 0) ? hton_l(integer) : htonl((uint32) integer));
 #endif
 
-        return pqPutNBytes((char *)&n,4);
+       return pqPutNBytes((char *)&n, 4);
 }
 
 /* --------------------------------------------------------------------- */
@@ -107,7 +107,7 @@ pqGetShort(int *result)
 {
        uint16          n;
 
-       if (pqGetNBytes((char *)&n,2) != 0)
+       if (pqGetNBytes((char *)&n, 2) != 0)
          return EOF;
 
 #ifdef FRONTEND
@@ -138,28 +138,29 @@ pqGetLong(int *result)
 }
 
 /* --------------------------------------------------------------------- */
-/* pqGetNBytes: Read a chunk of exactly len bytes in buffer s (which must be 1
-               byte longer) and terminate it with a '\0'.
-               Return 0 if ok.
-*/
+/* pqGetNBytes: Read a chunk of exactly len bytes into buffer s.
+ *     Return 0 if ok, EOF if trouble.
+ */
 int
 pqGetNBytes(char *s, size_t len)
 {
-       int bytesDone = 0;
-
-       do {
-         int r = recv(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
-         if (r == 0 || r == -1) {
-           if (errno != EINTR)
-             return EOF; /* All other than signal-interrupted is error */
-           continue; /* Otherwise, try again */
-         }
-         
-         /* r contains number of bytes received */
-         bytesDone += r;
-
-       } while (bytesDone < len);
-       /* Zero-termination now in pq_getnchar() instead */
+       size_t amount;
+
+       while (len > 0)
+       {
+               while (PqRecvPointer >= PqRecvLength)
+               {
+                       if (pq_recvbuf())       /* If nothing in buffer, then recv some */
+                               return EOF;             /* Failed to recv data */
+               }
+               amount = PqRecvLength - PqRecvPointer;
+               if (amount > len)
+                       amount = len;
+               memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
+               PqRecvPointer += amount;
+               s += amount;
+               len -= amount;
+       }
        return 0;
 }
 
@@ -167,20 +168,21 @@ pqGetNBytes(char *s, size_t len)
 int
 pqPutNBytes(const char *s, size_t len)
 {
-        int bytesDone = 0;
-
-       do {
-         int r = send(MyProcPort->sock, s+bytesDone, len-bytesDone, 0);
-         if (r == 0 || r == -1) {
-           if (errno != EINTR)
-             return EOF; /* Only signal interruption allowed */
-           continue; /* If interruped and read nothing, just try again */
-         }
-         
-         /* r contains number of bytes sent so far */
-         bytesDone += r;
-       } while (bytesDone < len);
-
+       size_t amount;
+
+       while (len > 0)
+       {
+               if (PqSendPointer >= PQ_BUFFER_SIZE)
+                       if (pq_flush())         /* If buffer is full, then flush it out */
+                               return EOF;
+               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               if (amount > len)
+                       amount = len;
+               memcpy(PqSendBuffer + PqSendPointer, s, amount);
+               PqSendPointer += amount;
+               s += amount;
+               len -= amount;
+       }
        return 0;
 }
 
@@ -191,8 +193,8 @@ pqGetString(char *s, size_t len)
        int                     c;
 
        /*
-        * Keep on reading until we get the terminating '\0' and discard those
-        * bytes we don't have room for.
+        * Keep on reading until we get the terminating '\0',
+        * discarding any bytes we don't have room for.
         */
 
        while ((c = pq_getchar()) != EOF && c != '\0')
@@ -216,4 +218,3 @@ pqPutString(const char *s)
 {
   return pqPutNBytes(s,strlen(s)+1);
 }
-
index 4e68c1e24a8177e2438f9e3c97bccdad7f45a345..473fc06c3e11cfb667024c9646b9a7bfb8312532 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.37 1999/01/11 03:56:07 scrappy Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/error/elog.c,v 1.38 1999/01/23 22:27:29 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -192,8 +192,15 @@ elog(int lev, const char *fmt,...)
                        pq_putnchar("N", 1);
                else
                        pq_putnchar("E", 1);
-               /* pq_putint(-101, 4); *//* should be query id */
                pq_putstr(line + TIMESTAMP_SIZE);               /* don't show timestamps */
+               /*
+                * This flush is normally not necessary, since postgres.c will
+                * flush out waiting data when control returns to the main loop.
+                * But it seems best to leave it here, so that the client has some
+                * clue what happened if the backend dies before getting back to the
+                * main loop ... error/notice messages should not be a performance-
+                * critical path anyway, so an extra flush won't hurt much ...
+                */
                pq_flush();
        }
        if (!IsUnderPostmaster)
index a315521eb3594ce5d901208a77b4c0e04bb50db7..c1cdd8ac5d97cc79e13a1b4ca0dfa16f4d16e4d4 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: libpq.h,v 1.23 1999/01/12 12:49:52 scrappy Exp $
+ * $Id: libpq.h,v 1.24 1999/01/23 22:27:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -254,12 +254,13 @@ extern void pq_init(int fd);
 extern void pq_gettty(char *tp);
 extern int     pq_getport(void);
 extern void pq_close(void);
-extern void pq_flush(void);
+extern int     pq_flush(void);
+extern int     pq_recvbuf(void);
 extern int     pq_getstr(char *s, int maxlen);
 extern int     PQgetline(char *s, int maxlen);
 extern int     PQputline(char *s);
-extern int      pq_getchar(void);
-extern int      pq_peekchar(void);
+extern int     pq_getchar(void);
+extern int     pq_peekchar(void);
 extern int     pq_getnchar(char *s, int off, int maxlen);
 extern int     pq_getint(int b);
 extern int  pq_putchar(char c);
@@ -282,4 +283,18 @@ extern int StreamServerPort(char *hostName, short portName, int *fdP);
 extern int     StreamConnection(int server_fd, Port *port);
 extern void StreamClose(int sock);
 
+/*
+ * Internal send/receive buffers in libpq.
+ * These probably shouldn't be global at all, but unless we merge
+ * pqcomm.c and pqcomprim.c they have to be...
+ */
+
+#define PQ_BUFFER_SIZE 8192
+
+extern char PqSendBuffer[PQ_BUFFER_SIZE];
+extern int PqSendPointer;      /* Next index to store a byte in PqSendBuffer */
+extern char PqRecvBuffer[PQ_BUFFER_SIZE];
+extern int PqRecvPointer;      /* Next index to read a byte from PqRecvBuffer */
+extern int PqRecvLength;       /* End of data available in PqRecvBuffer */
+
 #endif  /* LIBPQ_H */