]> granicus.if.org Git - postgresql/commitdiff
Implement a chunking protocol for writes to the syslogger pipe, with messages
authorAndrew Dunstan <andrew@dunslane.net>
Thu, 14 Jun 2007 01:48:51 +0000 (01:48 +0000)
committerAndrew Dunstan <andrew@dunslane.net>
Thu, 14 Jun 2007 01:48:51 +0000 (01:48 +0000)
reassembled in the syslogger before writing to the log file. This prevents
partial messages from being written, which mucks up log rotation, and
messages from different backends being interleaved, which causes garbled
logs. Backport as far as 8.0, where the syslogger was introduced.

Tom Lane and Andrew Dunstan

src/backend/postmaster/syslogger.c
src/backend/utils/error/elog.c
src/include/postmaster/syslogger.h

index e92cd73031f7645cfe0015e83fd27c19a5486fb2..0862b81dd7b4764f1f1af57a665fd1006241e2bf 100644 (file)
@@ -18,7 +18,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $
+ *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -31,6 +31,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "lib/stringinfo.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgtime.h"
 #define LBF_MODE       _IOLBF
 #endif
 
+/* 
+ * We read() into a temp buffer twice as big as a chunk, so that any fragment
+ * left after processing can be moved down to the front and we'll still have
+ * room to read a full chunk.
+ */
+#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+
 
 /*
  * GUC parameters.     Redirect_stderr cannot be changed after postmaster
@@ -75,15 +83,28 @@ bool                am_syslogger = false;
  * Private state
  */
 static pg_time_t next_rotation_time;
-
 static bool redirection_done = false;
-
 static bool pipe_eof_seen = false;
-
 static FILE *syslogFile = NULL;
-
 static char *last_file_name = NULL;
 
+/* 
+ * Buffers for saving partial messages from different backends. We don't expect
+ * that there will be very many outstanding at one time, so 20 seems plenty of
+ * leeway. If this array gets full we won't lose messages, but we will lose
+ * the protocol protection against them being partially written or interleaved.
+ *
+ * An inactive buffer has pid == 0 and undefined contents of data.
+ */
+typedef struct
+{
+       int32   pid;                            /* PID of source process */
+       StringInfoData data;            /* accumulated data, as a StringInfo */
+} save_buffer;
+
+#define CHUNK_SLOTS 20
+static save_buffer saved_chunks[CHUNK_SLOTS];
+
 /* These must be exported for EXEC_BACKEND case ... annoying */
 #ifndef WIN32
 int                    syslogPipe[2] = {-1, -1};
@@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false;
 static pid_t syslogger_forkexec(void);
 static void syslogger_parseArgs(int argc, char *argv[]);
 #endif
+static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
+static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
 
 #ifdef WIN32
 static unsigned int __stdcall pipeThread(void *arg);
@@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS);
 NON_EXEC_STATIC void
 SysLoggerMain(int argc, char *argv[])
 {
+#ifndef WIN32
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
+#endif
        char       *currentLogDir;
        char       *currentLogFilename;
        int                     currentLogRotationAge;
@@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[])
                bool            time_based_rotation = false;
 
 #ifndef WIN32
-               char            logbuffer[1024];
                int                     bytesRead;
                int                     rc;
                fd_set          rfds;
@@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[])
                else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
                {
                        bytesRead = piperead(syslogPipe[0],
-                                                                logbuffer, sizeof(logbuffer));
-
+                                                                logbuffer + bytes_in_logbuffer,
+                                                                sizeof(logbuffer) - bytes_in_logbuffer);
                        if (bytesRead < 0)
                        {
                                if (errno != EINTR)
@@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[])
                        }
                        else if (bytesRead > 0)
                        {
-                               write_syslogger_file(logbuffer, bytesRead);
+                               bytes_in_logbuffer += bytesRead;
+                               process_pipe_input(logbuffer, &bytes_in_logbuffer);
                                continue;
                        }
                        else
@@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[])
                                 * and all backends are shut down, and we are done.
                                 */
                                pipe_eof_seen = true;
+
+                               /* if there's any data left then force it out now */
+                               flush_pipe_input(logbuffer, &bytes_in_logbuffer);
                        }
                }
 #else                                                  /* WIN32 */
@@ -611,6 +641,207 @@ syslogger_parseArgs(int argc, char *argv[])
 #endif   /* EXEC_BACKEND */
 
 
+/* --------------------------------
+ *             pipe protocol handling
+ * --------------------------------
+ */
+
+/*
+ * Process data received through the syslogger pipe.
+ *
+ * This routine interprets the log pipe protocol which sends log messages as
+ * (hopefully atomic) chunks - such chunks are detected and reassembled here. 
+ *
+ * The protocol has a header that starts with two nul bytes, then has a 16 bit
+ * length, the pid of the sending process, and a flag to indicate if it is 
+ * the last chunk in a message. Incomplete chunks are saved until we read some
+ * more, and non-final chunks are accumulated until we get the final chunk.
+ *
+ * All of this is to avoid 2 problems:
+ * . partial messages being written to logfiles (messes rotation), and
+ * . messages from different backends being interleaved (messages garbled).
+ *
+ * Any non-protocol messages are written out directly. These should only come
+ * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+ * stderr).
+ *
+ * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
+ * of bytes present.  On exit, any not-yet-eaten data is left-justified in
+ * logbuffer, and *bytes_in_logbuffer is updated.
+ */
+static void
+process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       char   *cursor = logbuffer;
+       int             count = *bytes_in_logbuffer;
+
+       /* While we have enough for a header, process data... */
+       while (count >= (int) sizeof(PipeProtoHeader))
+       {
+               PipeProtoHeader p;
+               int             chunklen;
+
+               /* Do we have a valid header? */
+               memcpy(&p, cursor, sizeof(PipeProtoHeader));
+               if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
+                       p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
+                       p.pid != 0 &&
+                       (p.is_last == 't' || p.is_last == 'f'))
+               {
+                       chunklen = PIPE_HEADER_SIZE + p.len;
+
+                       /* Fall out of loop if we don't have the whole chunk yet */
+                       if (count < chunklen)
+                               break;
+
+                       if (p.is_last == 'f')
+                       {
+                               /* 
+                                * Save a complete non-final chunk in the per-pid buffer 
+                                * if possible - if not just write it out.
+                                */
+                               int free_slot = -1, existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                                       if (free_slot < 0 && saved_chunks[i].pid == 0)
+                                               free_slot = i;
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else if (free_slot >= 0)
+                               {
+                                       saved_chunks[free_slot].pid = p.pid;
+                                       str = &(saved_chunks[free_slot].data);
+                                       initStringInfo(str);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else
+                               {
+                                       /* 
+                                        * If there is no free slot we'll just have to take our
+                                        * chances and write out a partial message and hope that
+                                        * it's not followed by something from another pid.
+                                        */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+                       else
+                       {
+                               /* 
+                                * Final chunk --- add it to anything saved for that pid, and
+                                * either way write the whole thing out.
+                                */
+                               int existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE,
+                                                                                  p.len);
+                                       write_syslogger_file(str->data, str->len);
+                                       saved_chunks[existing_slot].pid = 0;
+                                       pfree(str->data);
+                               }
+                               else
+                               {
+                                       /* The whole message was one chunk, evidently. */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+
+                       /* Finished processing this chunk */
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+               else 
+               {
+                       /* Process non-protocol data */
+
+                       /*
+                        * Look for the start of a protocol header.  If found, dump data
+                        * up to there and repeat the loop.  Otherwise, dump it all and
+                        * fall out of the loop.  (Note: we want to dump it all if
+                        * at all possible, so as to avoid dividing non-protocol messages
+                        * across logfiles.  We expect that in many scenarios, a
+                        * non-protocol message will arrive all in one read(), and we
+                        * want to respect the read() boundary if possible.)
+                        */
+                       for (chunklen = 1; chunklen < count; chunklen++)
+                       {
+                               if (cursor[chunklen] == '\0')
+                                       break;
+                       }
+                       write_syslogger_file(cursor, chunklen);
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+       }
+
+       /* We don't have a full chunk, so left-align what remains in the buffer */
+       if (count > 0 && cursor != logbuffer)
+               memmove(logbuffer, cursor, count);
+       *bytes_in_logbuffer = count;
+}
+
+/*
+ * Force out any buffered data
+ *
+ * This is currently used only at syslogger shutdown, but could perhaps be
+ * useful at other times, so it is careful to leave things in a clean state.
+ */
+static void
+flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       int i;
+       StringInfo str;
+
+       /* Dump any incomplete protocol messages */
+       for (i = 0; i < CHUNK_SLOTS; i++)
+       {
+               if (saved_chunks[i].pid != 0)
+               {
+                       str = &(saved_chunks[i].data);
+                       write_syslogger_file(str->data, str->len);
+                       saved_chunks[i].pid = 0;
+                       pfree(str->data);
+               }
+       }
+       /*
+        * Force out any remaining pipe data as-is; we don't bother trying to
+        * remove any protocol headers that may exist in it.
+        */
+       if (*bytes_in_logbuffer > 0)
+               write_syslogger_file(logbuffer, *bytes_in_logbuffer);
+       *bytes_in_logbuffer = 0;
+}
+
+
 /* --------------------------------
  *             logfile routines
  * --------------------------------
@@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count)
 static unsigned int __stdcall
 pipeThread(void *arg)
 {
-       DWORD           bytesRead;
-       char            logbuffer[1024];
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
 
        for (;;)
        {
-               if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
+               DWORD           bytesRead;
+
+               if (!ReadFile(syslogPipe[0],
+                                         logbuffer + bytes_in_logbuffer,
+                                         sizeof(logbuffer) - bytes_in_logbuffer,
                                          &bytesRead, 0))
                {
                        DWORD           error = GetLastError();
@@ -672,11 +907,18 @@ pipeThread(void *arg)
                                         errmsg("could not read from logger pipe: %m")));
                }
                else if (bytesRead > 0)
-                       write_syslogger_file(logbuffer, bytesRead);
+               {
+                       bytes_in_logbuffer += bytesRead;
+                       process_pipe_input(logbuffer, &bytes_in_logbuffer);
+               }
        }
 
        /* We exit the above loop only upon detecting pipe EOF */
        pipe_eof_seen = true;
+
+       /* if there's any data left then force it out now */
+       flush_pipe_input(logbuffer, &bytes_in_logbuffer);
+
        _endthread();
        return 0;
 }
index c6952ef20e85e45e6fc266b436ffba025d4673e2..c762475d65a4a41c2008f6300b7ed3f02b890b2b 100644 (file)
@@ -42,7 +42,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.186 2007/06/07 21:45:59 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.187 2007/06/14 01:48:51 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -124,6 +124,7 @@ static const char *useful_strerror(int errnum);
 static const char *error_severity(int elevel);
 static void append_with_tabs(StringInfo buf, const char *str);
 static bool is_log_level_output(int elevel, int log_min_level);
+static void write_pipe_chunks(int fd, char *data, int len);
 
 
 /*
@@ -1783,7 +1784,10 @@ send_message_to_server_log(ErrorData *edata)
                        write_eventlog(edata->elevel, buf.data);
                else
 #endif
-                       fprintf(stderr, "%s", buf.data);
+                       if (Redirect_stderr)
+                               write_pipe_chunks(fileno(stderr), buf.data, buf.len);
+                       else
+                               write(fileno(stderr), buf.data, buf.len);
        }
 
        /* If in the syslogger process, try to write messages direct to file */
@@ -1793,6 +1797,37 @@ send_message_to_server_log(ErrorData *edata)
        pfree(buf.data);
 }
 
+/*
+ * Send data to the syslogger using the chunked protocol
+ */
+static void
+write_pipe_chunks(int fd, char *data, int len)
+{
+       PipeProtoChunk p;
+
+       Assert(len > 0);
+
+       p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+       p.proto.pid = MyProcPid;
+
+       /* write all but the last chunk */
+       while (len > PIPE_MAX_PAYLOAD)
+       {
+               p.proto.is_last = 'f';
+               p.proto.len = PIPE_MAX_PAYLOAD;
+               memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
+               write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
+               data += PIPE_MAX_PAYLOAD;
+               len -= PIPE_MAX_PAYLOAD;
+       }
+
+       /* write the last chunk */
+       p.proto.is_last = 't';
+       p.proto.len = len;
+       memcpy(p.proto.data, data, len);
+       write(fd, &p, PIPE_HEADER_SIZE + len);
+}
+
 
 /*
  * Write error report to client
@@ -2115,6 +2150,7 @@ write_stderr(const char *fmt,...)
 #ifndef WIN32
        /* On Unix, we just fprintf to stderr */
        vfprintf(stderr, fmt, ap);
+       fflush(stderr);
 #else
 
        /*
@@ -2130,8 +2166,11 @@ write_stderr(const char *fmt,...)
                write_eventlog(ERROR, errbuf);
        }
        else
+       {
                /* Not running as service, write to stderr */
                vfprintf(stderr, fmt, ap);
+               fflush(stderr);
+       }
 #endif
        va_end(ap);
 }
index 3e8b59dbacf2e28dda1573c3fe2a1f34c626389a..72c14c99cdde314665d548d492ce7c2ee069f355 100644 (file)
@@ -5,13 +5,61 @@
  *
  * Copyright (c) 2004-2007, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.8 2007/01/05 22:19:57 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.9 2007/06/14 01:48:51 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef _SYSLOGGER_H
 #define _SYSLOGGER_H
 
+#include <limits.h>                            /* for PIPE_BUF */
+
+
+/* 
+ * Primitive protocol structure for writing to syslogger pipe(s).  The idea
+ * here is to divide long messages into chunks that are not more than
+ * PIPE_BUF bytes long, which according to POSIX spec must be written into
+ * the pipe atomically.  The pipe reader then uses the protocol headers to
+ * reassemble the parts of a message into a single string.  The reader can
+ * also cope with non-protocol data coming down the pipe, though we cannot
+ * guarantee long strings won't get split apart.
+ *
+ * We use 't' or 'f' instead of a bool for is_last to make the protocol a tiny
+ * bit more robust against finding a false double nul byte prologue.  But we
+ * still might find it in the len and/or pid bytes unless we're careful.
+ */
+
+#ifdef PIPE_BUF
+/* Are there any systems with PIPE_BUF > 64K?  Unlikely, but ... */
+#if PIPE_BUF > 65536
+#define PIPE_CHUNK_SIZE  65536
+#else
+#define PIPE_CHUNK_SIZE  ((int) PIPE_BUF)
+#endif
+#else  /* not defined */
+/* POSIX says the value of PIPE_BUF must be at least 512, so use that */
+#define PIPE_CHUNK_SIZE  512
+#endif
+
+typedef struct 
+{
+       char            nuls[2];                /* always \0\0 */
+       uint16          len;                    /* size of this chunk (counts data only) */
+       int32           pid;                    /* writer's pid */
+       char            is_last;                /* last chunk of message? 't' or 'f' */
+       char            data[1];                /* data payload starts here */
+} PipeProtoHeader;
+
+typedef union
+{
+       PipeProtoHeader proto;
+       char            filler[PIPE_CHUNK_SIZE];
+} PipeProtoChunk;
+
+#define PIPE_HEADER_SIZE  offsetof(PipeProtoHeader, data)
+#define PIPE_MAX_PAYLOAD  ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
+
+
 /* GUC options */
 extern bool Redirect_stderr;
 extern int     Log_RotationAge;