--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ * Simple mechanism for sharing tuples between backends.
+ *
+ * This module contains a shared temporary tuple storage mechanism providing
+ * a parallel-aware subset of the features of tuplestore.c. Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples. Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/util/sort/sharedtuplestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup.h"
+#include "access/htup_details.h"
+#include "miscadmin.h"
+#include "storage/buffile.h"
+#include "storage/lwlock.h"
+#include "storage/sharedfileset.h"
+#include "utils/sharedtuplestore.h"
+
+#include <limits.h>
+
+/*
+ * The size of chunks, in pages. This is somewhat arbitrarily set to match
+ * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
+ * at approximately the same rate as it allocates new chunks of memory to
+ * insert them into.
+ */
+#define STS_CHUNK_PAGES 4
+#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
+#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
+
+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+ int ntuples; /* Number of tuples in this chunk. */
+ int overflow; /* If overflow, how many including this one? */
+ char data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+ LWLock lock;
+ BlockNumber read_page; /* Page number for next read. */
+ BlockNumber npages; /* Number of pages written. */
+ bool writing; /* Used only for assertions. */
+} SharedTuplestoreParticipant;
+
+/* The control object that lives in shared memory. */
+struct SharedTuplestore
+{
+ int nparticipants; /* Number of participants that can write. */
+ int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
+ size_t meta_data_size; /* Size of per-tuple header. */
+ char name[NAMEDATALEN]; /* A name for this tuplestore. */
+
+ /* Followed by per-participant shared state. */
+ SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* Per-participant state that lives in backend-local memory. */
+struct SharedTuplestoreAccessor
+{
+ int participant; /* My participant number. */
+ SharedTuplestore *sts; /* The shared state. */
+ SharedFileSet *fileset; /* The SharedFileSet holding files. */
+ MemoryContext context; /* Memory context for buffers. */
+
+ /* State for reading. */
+ int read_participant; /* The current participant to read from. */
+ BufFile *read_file; /* The current file to read from. */
+ int read_ntuples_available; /* The number of tuples in chunk. */
+ int read_ntuples; /* How many tuples have we read from chunk? */
+ size_t read_bytes; /* How many bytes have we read from chunk? */
+ char *read_buffer; /* A buffer for loading tuples. */
+ size_t read_buffer_size;
+ BlockNumber read_next_page; /* Lowest block we'll consider reading. */
+
+ /* State for writing. */
+ SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
+ BufFile *write_file; /* The current file to write to. */
+ BlockNumber write_page; /* The next page to write to. */
+ char *write_pointer; /* Current write pointer within chunk. */
+ char *write_end; /* One past the end of the current chunk. */
+};
+
+static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
+ int participant);
+
+/*
+ * Return the amount of shared memory required to hold SharedTuplestore for a
+ * given number of participants.
+ */
+size_t
+sts_estimate(int participants)
+{
+ return offsetof(SharedTuplestore, participants) +
+ sizeof(SharedTuplestoreParticipant) * participants;
+}
+
+/*
+ * Initialize a SharedTuplestore in existing shared memory. There must be
+ * space for sts_estimate(participants) bytes. If flags includes the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).
+ *
+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple. This is useful for
+ * the hash values used in multi-batch hash joins, but could have other
+ * applications.
+ *
+ * The caller must supply a SharedFileSet, which is essentially a directory
+ * that will be cleaned up automatically, and a name which must be unique
+ * across all SharedTuplestores created in the same SharedFileSet.
+ */
+SharedTuplestoreAccessor *
+sts_initialize(SharedTuplestore *sts, int participants,
+ int my_participant_number,
+ size_t meta_data_size,
+ int flags,
+ SharedFileSet *fileset,
+ const char *name)
+{
+ SharedTuplestoreAccessor *accessor;
+ int i;
+
+ Assert(my_participant_number < participants);
+
+ sts->nparticipants = participants;
+ sts->meta_data_size = meta_data_size;
+ sts->flags = flags;
+
+ if (strlen(name) > sizeof(sts->name) - 1)
+ elog(ERROR, "SharedTuplestore name too long");
+ strcpy(sts->name, name);
+
+ /*
+ * Limit meta-data so it + tuple size always fits into a single chunk.
+ * sts_puttuple() and sts_read_tuple() could be made to support scenarios
+ * where that's not the case, but it's not currently required. If so,
+ * meta-data size probably should be made variable, too.
+ */
+ if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
+ elog(ERROR, "meta-data too long");
+
+ for (i = 0; i < participants; ++i)
+ {
+ LWLockInitialize(&sts->participants[i].lock,
+ LWTRANCHE_SHARED_TUPLESTORE);
+ sts->participants[i].read_page = 0;
+ sts->participants[i].writing = false;
+ }
+
+ accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+ accessor->participant = my_participant_number;
+ accessor->sts = sts;
+ accessor->fileset = fileset;
+ accessor->context = CurrentMemoryContext;
+
+ return accessor;
+}
+
+/*
+ * Attach to a SharedTupleStore that has been initialized by another backend,
+ * so that this backend can read and write tuples.
+ */
+SharedTuplestoreAccessor *
+sts_attach(SharedTuplestore *sts,
+ int my_participant_number,
+ SharedFileSet *fileset)
+{
+ SharedTuplestoreAccessor *accessor;
+
+ Assert(my_participant_number < sts->nparticipants);
+
+ accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+ accessor->participant = my_participant_number;
+ accessor->sts = sts;
+ accessor->fileset = fileset;
+ accessor->context = CurrentMemoryContext;
+
+ return accessor;
+}
+
+static void
+sts_flush_chunk(SharedTuplestoreAccessor *accessor)
+{
+ size_t size;
+ size_t written;
+
+ size = STS_CHUNK_PAGES * BLCKSZ;
+ written = BufFileWrite(accessor->write_file, accessor->write_chunk, size);
+ if (written != size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to temporary file: %m")));
+ memset(accessor->write_chunk, 0, size);
+ accessor->write_pointer = &accessor->write_chunk->data[0];
+ accessor->sts->participants[accessor->participant].npages +=
+ STS_CHUNK_PAGES;
+}
+
+/*
+ * Finish writing tuples. This must be called by all backends that have
+ * written data before any backend begins reading it.
+ */
+void
+sts_end_write(SharedTuplestoreAccessor *accessor)
+{
+ if (accessor->write_file != NULL)
+ {
+ sts_flush_chunk(accessor);
+ BufFileClose(accessor->write_file);
+ pfree(accessor->write_chunk);
+ accessor->write_chunk = NULL;
+ accessor->write_file = NULL;
+ accessor->sts->participants[accessor->participant].writing = false;
+ }
+}
+
+/*
+ * Prepare to rescan. Only one participant must call this. After it returns,
+ * all participants may call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next(). This function must not be called concurrently
+ * with a scan, and synchronization to avoid that is the caller's
+ * responsibility.
+ */
+void
+sts_reinitialize(SharedTuplestoreAccessor *accessor)
+{
+ int i;
+
+ /*
+ * Reset the shared read head for all participants' files. Also set the
+ * initial chunk size to the minimum (any increases from that size will be
+ * recorded in chunk_expansion_log).
+ */
+ for (i = 0; i < accessor->sts->nparticipants; ++i)
+ {
+ accessor->sts->participants[i].read_page = 0;
+ }
+}
+
+/*
+ * Begin scanning the contents in parallel.
+ */
+void
+sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+ int i PG_USED_FOR_ASSERTS_ONLY;
+
+ /* End any existing scan that was in progress. */
+ sts_end_parallel_scan(accessor);
+
+ /*
+ * Any backend that might have written into this shared tuplestore must
+ * have called sts_end_write(), so that all buffers are flushed and the
+ * files have stopped growing.
+ */
+ for (i = 0; i < accessor->sts->nparticipants; ++i)
+ Assert(!accessor->sts->participants[i].writing);
+
+ /*
+ * We will start out reading the file that THIS backend wrote. There may
+ * be some caching locality advantage to that.
+ */
+ accessor->read_participant = accessor->participant;
+ accessor->read_file = NULL;
+ accessor->read_next_page = 0;
+}
+
+/*
+ * Finish a parallel scan, freeing associated backend-local resources.
+ */
+void
+sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+ /*
+ * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
+ * we'd probably need a reference count of current parallel scanners so we
+ * could safely do it only when the reference count reaches zero.
+ */
+ if (accessor->read_file != NULL)
+ {
+ BufFileClose(accessor->read_file);
+ accessor->read_file = NULL;
+ }
+}
+
+/*
+ * Write a tuple. If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+ MinimalTuple tuple)
+{
+ size_t size;
+
+ /* Do we have our own file yet? */
+ if (accessor->write_file == NULL)
+ {
+ SharedTuplestoreParticipant *participant;
+ char name[MAXPGPATH];
+
+ /* Create one. Only this backend will write into it. */
+ sts_filename(name, accessor, accessor->participant);
+ accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+
+ /* Set up the shared state for this backend's file. */
+ participant = &accessor->sts->participants[accessor->participant];
+ participant->writing = true; /* for assertions only */
+ }
+
+ /* Do we have space? */
+ size = accessor->sts->meta_data_size + tuple->t_len;
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ if (accessor->write_chunk == NULL)
+ {
+ /* First time through. Allocate chunk. */
+ accessor->write_chunk = (SharedTuplestoreChunk *)
+ MemoryContextAllocZero(accessor->context,
+ STS_CHUNK_PAGES * BLCKSZ);
+ accessor->write_chunk->ntuples = 0;
+ accessor->write_pointer = &accessor->write_chunk->data[0];
+ accessor->write_end = (char *)
+ accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
+ }
+ else
+ {
+ /* See if flushing helps. */
+ sts_flush_chunk(accessor);
+ }
+
+ /* It may still not be enough in the case of a gigantic tuple. */
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ size_t written;
+
+ /*
+ * We'll write the beginning of the oversized tuple, and then
+ * write the rest in some number of 'overflow' chunks.
+ *
+ * sts_initialize() verifies that the size of the tuple +
+ * meta-data always fits into a chunk. Because the chunk has been
+ * flushed above, we can be sure to have all of a chunk's usable
+ * space available.
+ */
+ Assert(accessor->write_pointer + accessor->sts->meta_data_size +
+ sizeof(uint32) < accessor->write_end);
+
+ /* Write the meta-data as one chunk. */
+ if (accessor->sts->meta_data_size > 0)
+ memcpy(accessor->write_pointer, meta_data,
+ accessor->sts->meta_data_size);
+
+ /*
+ * Write as much of the tuple as we can fit. This includes the
+ * tuple's size at the start.
+ */
+ written = accessor->write_end - accessor->write_pointer -
+ accessor->sts->meta_data_size;
+ memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
+ tuple, written);
+ ++accessor->write_chunk->ntuples;
+ size -= accessor->sts->meta_data_size;
+ size -= written;
+ /* Now write as many overflow chunks as we need for the rest. */
+ while (size > 0)
+ {
+ size_t written_this_chunk;
+
+ sts_flush_chunk(accessor);
+
+ /*
+ * How many oveflow chunks to go? This will allow readers to
+ * skip all of them at once instead of reading each one.
+ */
+ accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
+ STS_CHUNK_DATA_SIZE;
+ written_this_chunk =
+ Min(accessor->write_end - accessor->write_pointer, size);
+ memcpy(accessor->write_pointer, (char *) tuple + written,
+ written_this_chunk);
+ accessor->write_pointer += written_this_chunk;
+ size -= written_this_chunk;
+ written += written_this_chunk;
+ }
+ return;
+ }
+ }
+
+ /* Copy meta-data and tuple into buffer. */
+ if (accessor->sts->meta_data_size > 0)
+ memcpy(accessor->write_pointer, meta_data,
+ accessor->sts->meta_data_size);
+ memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
+ tuple->t_len);
+ accessor->write_pointer += size;
+ ++accessor->write_chunk->ntuples;
+}
+
+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ MinimalTuple tuple;
+ uint32 size;
+ size_t remaining_size;
+ size_t this_chunk_size;
+ char *destination;
+
+ /*
+ * We'll keep track of bytes read from this chunk so that we can detect an
+ * overflowing tuple and switch to reading overflow pages.
+ */
+ if (accessor->sts->meta_data_size > 0)
+ {
+ if (BufFileRead(accessor->read_file,
+ meta_data,
+ accessor->sts->meta_data_size) !=
+ accessor->sts->meta_data_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading meta-data.")));
+ accessor->read_bytes += accessor->sts->meta_data_size;
+ }
+ if (BufFileRead(accessor->read_file,
+ &size,
+ sizeof(size)) != sizeof(size))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading size.")));
+ accessor->read_bytes += sizeof(size);
+ if (size > accessor->read_buffer_size)
+ {
+ size_t new_read_buffer_size;
+
+ if (accessor->read_buffer != NULL)
+ pfree(accessor->read_buffer);
+ new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
+ accessor->read_buffer =
+ MemoryContextAlloc(accessor->context, new_read_buffer_size);
+ accessor->read_buffer_size = new_read_buffer_size;
+ }
+ remaining_size = size - sizeof(uint32);
+ this_chunk_size = Min(remaining_size,
+ BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
+ destination = accessor->read_buffer + sizeof(uint32);
+ if (BufFileRead(accessor->read_file,
+ destination,
+ this_chunk_size) != this_chunk_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading tuple.")));
+ accessor->read_bytes += this_chunk_size;
+ remaining_size -= this_chunk_size;
+ destination += this_chunk_size;
+ ++accessor->read_ntuples;
+
+ /* Check if we need to read any overflow chunks. */
+ while (remaining_size > 0)
+ {
+ /* We are now positioned at the start of an overflow chunk. */
+ SharedTuplestoreChunk chunk_header;
+
+ if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
+ STS_CHUNK_HEADER_SIZE)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading overflow chunk header.")));
+ accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+ if (chunk_header.overflow == 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected chunk in shared tuplestore temporary file"),
+ errdetail_internal("Expected overflow chunk.")));
+ accessor->read_next_page += STS_CHUNK_PAGES;
+ this_chunk_size = Min(remaining_size,
+ BLCKSZ * STS_CHUNK_PAGES -
+ STS_CHUNK_HEADER_SIZE);
+ if (BufFileRead(accessor->read_file,
+ destination,
+ this_chunk_size) != this_chunk_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading tuple.")));
+ accessor->read_bytes += this_chunk_size;
+ remaining_size -= this_chunk_size;
+ destination += this_chunk_size;
+
+ /*
+ * These will be used to count regular tuples following the oversized
+ * tuple that spilled into this overflow chunk.
+ */
+ accessor->read_ntuples = 0;
+ accessor->read_ntuples_available = chunk_header.ntuples;
+ }
+
+ tuple = (MinimalTuple) accessor->read_buffer;
+ tuple->t_len = size;
+
+ return tuple;
+}
+
+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ SharedTuplestoreParticipant *p;
+ BlockNumber read_page;
+ bool eof;
+
+ for (;;)
+ {
+ /* Can we read more tuples from the current chunk? */
+ if (accessor->read_ntuples < accessor->read_ntuples_available)
+ return sts_read_tuple(accessor, meta_data);
+
+ /* Find the location of a new chunk to read. */
+ p = &accessor->sts->participants[accessor->read_participant];
+
+ LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+ /* We can skip directly past overflow pages we know about. */
+ if (p->read_page < accessor->read_next_page)
+ p->read_page = accessor->read_next_page;
+ eof = p->read_page >= p->npages;
+ if (!eof)
+ {
+ /* Claim the next chunk. */
+ read_page = p->read_page;
+ /* Advance the read head for the next reader. */
+ p->read_page += STS_CHUNK_PAGES;
+ accessor->read_next_page = p->read_page;
+ }
+ LWLockRelease(&p->lock);
+
+ if (!eof)
+ {
+ SharedTuplestoreChunk chunk_header;
+
+ /* Make sure we have the file open. */
+ if (accessor->read_file == NULL)
+ {
+ char name[MAXPGPATH];
+
+ sts_filename(name, accessor, accessor->read_participant);
+ accessor->read_file =
+ BufFileOpenShared(accessor->fileset, name);
+ }
+
+ /* Seek and load the chunk header. */
+ if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Could not seek to next block.")));
+ if (BufFileRead(accessor->read_file, &chunk_header,
+ STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading chunk header.")));
+
+ /*
+ * If this is an overflow chunk, we skip it and any following
+ * overflow chunks all at once.
+ */
+ if (chunk_header.overflow > 0)
+ {
+ accessor->read_next_page = read_page +
+ chunk_header.overflow * STS_CHUNK_PAGES;
+ continue;
+ }
+
+ accessor->read_ntuples = 0;
+ accessor->read_ntuples_available = chunk_header.ntuples;
+ accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+
+ /* Go around again, so we can get a tuple from this chunk. */
+ }
+ else
+ {
+ if (accessor->read_file != NULL)
+ {
+ BufFileClose(accessor->read_file);
+ accessor->read_file = NULL;
+ }
+
+ /*
+ * Try the next participant's file. If we've gone full circle,
+ * we're done.
+ */
+ accessor->read_participant = (accessor->read_participant + 1) %
+ accessor->sts->nparticipants;
+ if (accessor->read_participant == accessor->participant)
+ break;
+ accessor->read_next_page = 0;
+
+ /* Go around again, so we can get a chunk from this file. */
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Create the name used for the BufFile that a given participant will write.
+ */
+static void
+sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+ snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}