#include "lib/stringinfo.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "nodes/pg_list.h"
#include "pgtime.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
static char *last_csv_file_name = NULL;
/*
- * Buffers for saving partial messages from different backends. We don't expect
- * that there will be very many outstanding at one time, so 20 seems plenty of
- * leeway. If this array gets full we won't lose messages, but we will lose
- * the protocol protection against them being partially written or interleaved.
+ * Buffers for saving partial messages from different backends.
*
+ * Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
+ * being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
+ * the number of entries we have to examine for any one incoming message.
+ * There must never be more than one entry for the same source pid.
+ *
+ * An inactive buffer is not removed from its list, just held for re-use.
* An inactive buffer has pid == 0 and undefined contents of data.
*/
typedef struct
StringInfoData data; /* accumulated data, as a StringInfo */
} save_buffer;
-#define CHUNK_SLOTS 20
-static save_buffer saved_chunks[CHUNK_SLOTS];
+#define NBUFFER_LISTS 256
+static List *buffer_lists[NBUFFER_LISTS];
/* These must be exported for EXEC_BACKEND case ... annoying */
#ifndef WIN32
* Now we are done with the write end of the pipe.
* CloseHandle() must not be called because the preceding
* close() closes the underlying handle.
- */
+ */
syslogPipe[1] = 0;
#endif
redirection_done = true;
(p.is_last == 't' || p.is_last == 'f' ||
p.is_last == 'T' || p.is_last == 'F'))
{
+ List *buffer_list;
+ ListCell *cell;
+ save_buffer *existing_slot = NULL,
+ *free_slot = NULL;
+ StringInfo str;
+
chunklen = PIPE_HEADER_SIZE + p.len;
/* Fall out of loop if we don't have the whole chunk yet */
dest = (p.is_last == 'T' || p.is_last == 'F') ?
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
- if (p.is_last == 'f' || p.is_last == 'F')
+ /* Locate any existing buffer for this source pid */
+ buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
+ foreach(cell, buffer_list)
{
- /*
- * Save a complete non-final chunk in the per-pid buffer if
- * possible - if not just write it out.
- */
- int free_slot = -1,
- existing_slot = -1;
- int i;
- StringInfo str;
+ save_buffer *buf = (save_buffer *) lfirst(cell);
- for (i = 0; i < CHUNK_SLOTS; i++)
+ if (buf->pid == p.pid)
{
- if (saved_chunks[i].pid == p.pid)
- {
- existing_slot = i;
- break;
- }
- if (free_slot < 0 && saved_chunks[i].pid == 0)
- free_slot = i;
+ existing_slot = buf;
+ break;
}
- if (existing_slot >= 0)
+ if (buf->pid == 0 && free_slot == NULL)
+ free_slot = buf;
+ }
+
+ if (p.is_last == 'f' || p.is_last == 'F')
+ {
+ /*
+ * Save a complete non-final chunk in a per-pid buffer
+ */
+ if (existing_slot != NULL)
{
- str = &(saved_chunks[existing_slot].data);
+ /* Add chunk to data from preceding chunks */
+ str = &(existing_slot->data);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
}
- else if (free_slot >= 0)
+ else
{
- saved_chunks[free_slot].pid = p.pid;
- str = &(saved_chunks[free_slot].data);
+ /* First chunk of message, save in a new buffer */
+ if (free_slot == NULL)
+ {
+ /*
+ * Need a free slot, but there isn't one in the list,
+ * so create a new one and extend the list with it.
+ */
+ free_slot = palloc(sizeof(save_buffer));
+ buffer_list = lappend(buffer_list, free_slot);
+ buffer_lists[p.pid % NBUFFER_LISTS] = buffer_list;
+ }
+ free_slot->pid = p.pid;
+ str = &(free_slot->data);
initStringInfo(str);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
}
- else
- {
- /*
- * If there is no free slot we'll just have to take our
- * chances and write out a partial message and hope that
- * it's not followed by something from another pid.
- */
- write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len,
- dest);
- }
}
else
{
* Final chunk --- add it to anything saved for that pid, and
* either way write the whole thing out.
*/
- int existing_slot = -1;
- int i;
- StringInfo str;
-
- for (i = 0; i < CHUNK_SLOTS; i++)
- {
- if (saved_chunks[i].pid == p.pid)
- {
- existing_slot = i;
- break;
- }
- }
- if (existing_slot >= 0)
+ if (existing_slot != NULL)
{
- str = &(saved_chunks[existing_slot].data);
+ str = &(existing_slot->data);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
write_syslogger_file(str->data, str->len, dest);
- saved_chunks[existing_slot].pid = 0;
+ /* Mark the buffer unused, and reclaim string storage */
+ existing_slot->pid = 0;
pfree(str->data);
}
else
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
{
int i;
- StringInfo str;
/* Dump any incomplete protocol messages */
- for (i = 0; i < CHUNK_SLOTS; i++)
+ for (i = 0; i < NBUFFER_LISTS; i++)
{
- if (saved_chunks[i].pid != 0)
+ List *list = buffer_lists[i];
+ ListCell *cell;
+
+ foreach(cell, list)
{
- str = &(saved_chunks[i].data);
- write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
- saved_chunks[i].pid = 0;
- pfree(str->data);
+ save_buffer *buf = (save_buffer *) lfirst(cell);
+
+ if (buf->pid != 0)
+ {
+ StringInfo str = &(buf->data);
+
+ write_syslogger_file(str->data, str->len,
+ LOG_DESTINATION_STDERR);
+ /* Mark the buffer unused, and reclaim string storage */
+ buf->pid = 0;
+ pfree(str->data);
+ }
}
}