*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $
*
*-------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <sys/time.h>
+#include "lib/stringinfo.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgtime.h"
#define LBF_MODE _IOLBF
#endif
+/*
+ * We read() into a temp buffer twice as big as a chunk, so that any fragment
+ * left after processing can be moved down to the front and we'll still have
+ * room to read a full chunk.
+ */
+#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+
/*
* GUC parameters. Redirect_stderr cannot be changed after postmaster
* Private state
*/
static pg_time_t next_rotation_time;
-
static bool redirection_done = false;
-
static bool pipe_eof_seen = false;
-
static FILE *syslogFile = NULL;
-
static char *last_file_name = NULL;
+/*
+ * Buffers for saving partial messages from different backends. We don't expect
+ * that there will be very many outstanding at one time, so 20 seems plenty of
+ * leeway. If this array gets full we won't lose messages, but we will lose
+ * the protocol protection against them being partially written or interleaved.
+ *
+ * An inactive buffer has pid == 0 and undefined contents of data.
+ */
+typedef struct
+{
+ int32 pid; /* PID of source process */
+ StringInfoData data; /* accumulated data, as a StringInfo */
+} save_buffer;
+
+#define CHUNK_SLOTS 20
+static save_buffer saved_chunks[CHUNK_SLOTS];
+
/* These must be exported for EXEC_BACKEND case ... annoying */
#ifndef WIN32
int syslogPipe[2] = {-1, -1};
static pid_t syslogger_forkexec(void);
static void syslogger_parseArgs(int argc, char *argv[]);
#endif
+static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
+static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
#ifdef WIN32
static unsigned int __stdcall pipeThread(void *arg);
NON_EXEC_STATIC void
SysLoggerMain(int argc, char *argv[])
{
+#ifndef WIN32
+ char logbuffer[READ_BUF_SIZE];
+ int bytes_in_logbuffer = 0;
+#endif
char *currentLogDir;
char *currentLogFilename;
int currentLogRotationAge;
bool time_based_rotation = false;
#ifndef WIN32
- char logbuffer[1024];
int bytesRead;
int rc;
fd_set rfds;
else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
{
bytesRead = piperead(syslogPipe[0],
- logbuffer, sizeof(logbuffer));
-
+ logbuffer + bytes_in_logbuffer,
+ sizeof(logbuffer) - bytes_in_logbuffer);
if (bytesRead < 0)
{
if (errno != EINTR)
}
else if (bytesRead > 0)
{
- write_syslogger_file(logbuffer, bytesRead);
+ bytes_in_logbuffer += bytesRead;
+ process_pipe_input(logbuffer, &bytes_in_logbuffer);
continue;
}
else
* and all backends are shut down, and we are done.
*/
pipe_eof_seen = true;
+
+ /* if there's any data left then force it out now */
+ flush_pipe_input(logbuffer, &bytes_in_logbuffer);
}
}
#else /* WIN32 */
#endif /* EXEC_BACKEND */
+/* --------------------------------
+ * pipe protocol handling
+ * --------------------------------
+ */
+
+/*
+ * Process data received through the syslogger pipe.
+ *
+ * This routine interprets the log pipe protocol which sends log messages as
+ * (hopefully atomic) chunks - such chunks are detected and reassembled here.
+ *
+ * The protocol has a header that starts with two nul bytes, then has a 16 bit
+ * length, the pid of the sending process, and a flag to indicate if it is
+ * the last chunk in a message. Incomplete chunks are saved until we read some
+ * more, and non-final chunks are accumulated until we get the final chunk.
+ *
+ * All of this is to avoid 2 problems:
+ * . partial messages being written to logfiles (messes rotation), and
+ * . messages from different backends being interleaved (messages garbled).
+ *
+ * Any non-protocol messages are written out directly. These should only come
+ * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+ * stderr).
+ *
+ * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
+ * of bytes present. On exit, any not-yet-eaten data is left-justified in
+ * logbuffer, and *bytes_in_logbuffer is updated.
+ */
+static void
+process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+ char *cursor = logbuffer;
+ int count = *bytes_in_logbuffer;
+
+ /* While we have enough for a header, process data... */
+ while (count >= (int) sizeof(PipeProtoHeader))
+ {
+ PipeProtoHeader p;
+ int chunklen;
+
+ /* Do we have a valid header? */
+ memcpy(&p, cursor, sizeof(PipeProtoHeader));
+ if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
+ p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
+ p.pid != 0 &&
+ (p.is_last == 't' || p.is_last == 'f'))
+ {
+ chunklen = PIPE_HEADER_SIZE + p.len;
+
+ /* Fall out of loop if we don't have the whole chunk yet */
+ if (count < chunklen)
+ break;
+
+ if (p.is_last == 'f')
+ {
+ /*
+ * Save a complete non-final chunk in the per-pid buffer
+ * if possible - if not just write it out.
+ */
+ int free_slot = -1, existing_slot = -1;
+ int i;
+ StringInfo str;
+
+ for (i = 0; i < CHUNK_SLOTS; i++)
+ {
+ if (saved_chunks[i].pid == p.pid)
+ {
+ existing_slot = i;
+ break;
+ }
+ if (free_slot < 0 && saved_chunks[i].pid == 0)
+ free_slot = i;
+ }
+ if (existing_slot >= 0)
+ {
+ str = &(saved_chunks[existing_slot].data);
+ appendBinaryStringInfo(str,
+ cursor + PIPE_HEADER_SIZE,
+ p.len);
+ }
+ else if (free_slot >= 0)
+ {
+ saved_chunks[free_slot].pid = p.pid;
+ str = &(saved_chunks[free_slot].data);
+ initStringInfo(str);
+ appendBinaryStringInfo(str,
+ cursor + PIPE_HEADER_SIZE,
+ p.len);
+ }
+ else
+ {
+ /*
+ * If there is no free slot we'll just have to take our
+ * chances and write out a partial message and hope that
+ * it's not followed by something from another pid.
+ */
+ write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+ }
+ }
+ else
+ {
+ /*
+ * Final chunk --- add it to anything saved for that pid, and
+ * either way write the whole thing out.
+ */
+ int existing_slot = -1;
+ int i;
+ StringInfo str;
+
+ for (i = 0; i < CHUNK_SLOTS; i++)
+ {
+ if (saved_chunks[i].pid == p.pid)
+ {
+ existing_slot = i;
+ break;
+ }
+ }
+ if (existing_slot >= 0)
+ {
+ str = &(saved_chunks[existing_slot].data);
+ appendBinaryStringInfo(str,
+ cursor + PIPE_HEADER_SIZE,
+ p.len);
+ write_syslogger_file(str->data, str->len);
+ saved_chunks[existing_slot].pid = 0;
+ pfree(str->data);
+ }
+ else
+ {
+ /* The whole message was one chunk, evidently. */
+ write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+ }
+ }
+
+ /* Finished processing this chunk */
+ cursor += chunklen;
+ count -= chunklen;
+ }
+ else
+ {
+ /* Process non-protocol data */
+
+ /*
+ * Look for the start of a protocol header. If found, dump data
+ * up to there and repeat the loop. Otherwise, dump it all and
+ * fall out of the loop. (Note: we want to dump it all if
+ * at all possible, so as to avoid dividing non-protocol messages
+ * across logfiles. We expect that in many scenarios, a
+ * non-protocol message will arrive all in one read(), and we
+ * want to respect the read() boundary if possible.)
+ */
+ for (chunklen = 1; chunklen < count; chunklen++)
+ {
+ if (cursor[chunklen] == '\0')
+ break;
+ }
+ write_syslogger_file(cursor, chunklen);
+ cursor += chunklen;
+ count -= chunklen;
+ }
+ }
+
+ /* We don't have a full chunk, so left-align what remains in the buffer */
+ if (count > 0 && cursor != logbuffer)
+ memmove(logbuffer, cursor, count);
+ *bytes_in_logbuffer = count;
+}
+
+/*
+ * Force out any buffered data
+ *
+ * This is currently used only at syslogger shutdown, but could perhaps be
+ * useful at other times, so it is careful to leave things in a clean state.
+ */
+static void
+flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+ int i;
+ StringInfo str;
+
+ /* Dump any incomplete protocol messages */
+ for (i = 0; i < CHUNK_SLOTS; i++)
+ {
+ if (saved_chunks[i].pid != 0)
+ {
+ str = &(saved_chunks[i].data);
+ write_syslogger_file(str->data, str->len);
+ saved_chunks[i].pid = 0;
+ pfree(str->data);
+ }
+ }
+ /*
+ * Force out any remaining pipe data as-is; we don't bother trying to
+ * remove any protocol headers that may exist in it.
+ */
+ if (*bytes_in_logbuffer > 0)
+ write_syslogger_file(logbuffer, *bytes_in_logbuffer);
+ *bytes_in_logbuffer = 0;
+}
+
+
/* --------------------------------
* logfile routines
* --------------------------------
static unsigned int __stdcall
pipeThread(void *arg)
{
- DWORD bytesRead;
- char logbuffer[1024];
+ char logbuffer[READ_BUF_SIZE];
+ int bytes_in_logbuffer = 0;
for (;;)
{
- if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
+ DWORD bytesRead;
+
+ if (!ReadFile(syslogPipe[0],
+ logbuffer + bytes_in_logbuffer,
+ sizeof(logbuffer) - bytes_in_logbuffer,
&bytesRead, 0))
{
DWORD error = GetLastError();
errmsg("could not read from logger pipe: %m")));
}
else if (bytesRead > 0)
- write_syslogger_file(logbuffer, bytesRead);
+ {
+ bytes_in_logbuffer += bytesRead;
+ process_pipe_input(logbuffer, &bytes_in_logbuffer);
+ }
}
/* We exit the above loop only upon detecting pipe EOF */
pipe_eof_seen = true;
+
+ /* if there's any data left then force it out now */
+ flush_pipe_input(logbuffer, &bytes_in_logbuffer);
+
_endthread();
return 0;
}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.186 2007/06/07 21:45:59 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.187 2007/06/14 01:48:51 adunstan Exp $
*
*-------------------------------------------------------------------------
*/
static const char *error_severity(int elevel);
static void append_with_tabs(StringInfo buf, const char *str);
static bool is_log_level_output(int elevel, int log_min_level);
+static void write_pipe_chunks(int fd, char *data, int len);
/*
write_eventlog(edata->elevel, buf.data);
else
#endif
- fprintf(stderr, "%s", buf.data);
+ if (Redirect_stderr)
+ write_pipe_chunks(fileno(stderr), buf.data, buf.len);
+ else
+ write(fileno(stderr), buf.data, buf.len);
}
/* If in the syslogger process, try to write messages direct to file */
pfree(buf.data);
}
+/*
+ * Send data to the syslogger using the chunked protocol
+ */
+static void
+write_pipe_chunks(int fd, char *data, int len)
+{
+ PipeProtoChunk p;
+
+ Assert(len > 0);
+
+ p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+ p.proto.pid = MyProcPid;
+
+ /* write all but the last chunk */
+ while (len > PIPE_MAX_PAYLOAD)
+ {
+ p.proto.is_last = 'f';
+ p.proto.len = PIPE_MAX_PAYLOAD;
+ memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
+ write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
+ data += PIPE_MAX_PAYLOAD;
+ len -= PIPE_MAX_PAYLOAD;
+ }
+
+ /* write the last chunk */
+ p.proto.is_last = 't';
+ p.proto.len = len;
+ memcpy(p.proto.data, data, len);
+ write(fd, &p, PIPE_HEADER_SIZE + len);
+}
+
/*
* Write error report to client
#ifndef WIN32
/* On Unix, we just fprintf to stderr */
vfprintf(stderr, fmt, ap);
+ fflush(stderr);
#else
/*
write_eventlog(ERROR, errbuf);
}
else
+ {
/* Not running as service, write to stderr */
vfprintf(stderr, fmt, ap);
+ fflush(stderr);
+ }
#endif
va_end(ap);
}