]> granicus.if.org Git - postgresql/commitdiff
Add shared tuplestores.
authorAndres Freund <andres@anarazel.de>
Mon, 18 Dec 2017 22:23:19 +0000 (14:23 -0800)
committerAndres Freund <andres@anarazel.de>
Mon, 18 Dec 2017 22:23:19 +0000 (14:23 -0800)
SharedTuplestore allows multiple participants to write into it and
then read the tuples back from it in parallel.  Each reader receives
partial results.

For now it always uses disk files, but other buffering policies and
other kinds of scans (ie each reader receives complete results) may be
useful in future.

The upcoming parallel hash join feature will use this facility.

Author: Thomas Munro
Reviewed-By: Peter Geoghegan, Andres Freund, Robert Haas
Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com

src/backend/storage/lmgr/lwlock.c
src/backend/utils/sort/Makefile
src/backend/utils/sort/sharedtuplestore.c [new file with mode: 0644]
src/include/storage/lwlock.h
src/include/utils/sharedtuplestore.h [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index 46f5c4277d4acdffc8a3d2468cb6a45a0f33e35b..eab98b0760fe7327b98668804cab722387e2bf74 100644 (file)
@@ -516,6 +516,8 @@ RegisterLWLockTranches(void)
                                                  "session_record_table");
        LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
                                                  "session_typmod_table");
+       LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE,
+                                                 "shared_tuplestore");
        LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
        LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
 
index 370b12cee6cc97c6496387b7a436ac63774eeb2e..d8c08ac25e360530eaf5ac1c570fc64074240598 100644 (file)
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
 
-OBJS = logtape.o sortsupport.o tuplesort.o tuplestore.o
+OBJS = logtape.o sharedtuplestore.o sortsupport.o tuplesort.o tuplestore.o
 
 tuplesort.o: qsort_tuple.c
 
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
new file mode 100644 (file)
index 0000000..2c85056
--- /dev/null
@@ -0,0 +1,633 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ *       Simple mechanism for sharing tuples between backends.
+ *
+ * This module contains a shared temporary tuple storage mechanism providing
+ * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples.  Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/util/sort/sharedtuplestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup.h"
+#include "access/htup_details.h"
+#include "miscadmin.h"
+#include "storage/buffile.h"
+#include "storage/lwlock.h"
+#include "storage/sharedfileset.h"
+#include "utils/sharedtuplestore.h"
+
+#include <limits.h>
+
+/*
+ * The size of chunks, in pages.  This is somewhat arbitrarily set to match
+ * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
+ * at approximately the same rate as it allocates new chunks of memory to
+ * insert them into.
+ */
+#define STS_CHUNK_PAGES 4
+#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
+#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
+
+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+       int                     ntuples;                /* Number of tuples in this chunk. */
+       int                     overflow;               /* If overflow, how many including this one? */
+       char            data[FLEXIBLE_ARRAY_MEMBER];
+}                      SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+       LWLock          lock;
+       BlockNumber read_page;          /* Page number for next read. */
+       BlockNumber npages;                     /* Number of pages written. */
+       bool            writing;                /* Used only for assertions. */
+}                      SharedTuplestoreParticipant;
+
+/* The control object that lives in shared memory. */
+struct SharedTuplestore
+{
+       int                     nparticipants;  /* Number of participants that can write. */
+       int                     flags;                  /* Flag bits from SHARED_TUPLESTORE_XXX */
+       size_t          meta_data_size; /* Size of per-tuple header. */
+       char            name[NAMEDATALEN];      /* A name for this tuplestore. */
+
+       /* Followed by per-participant shared state. */
+       SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* Per-participant state that lives in backend-local memory. */
+struct SharedTuplestoreAccessor
+{
+       int                     participant;    /* My participant number. */
+       SharedTuplestore *sts;          /* The shared state. */
+       SharedFileSet *fileset;         /* The SharedFileSet holding files. */
+       MemoryContext context;          /* Memory context for buffers. */
+
+       /* State for reading. */
+       int                     read_participant;       /* The current participant to read from. */
+       BufFile    *read_file;          /* The current file to read from. */
+       int                     read_ntuples_available; /* The number of tuples in chunk. */
+       int                     read_ntuples;   /* How many tuples have we read from chunk? */
+       size_t          read_bytes;             /* How many bytes have we read from chunk? */
+       char       *read_buffer;        /* A buffer for loading tuples. */
+       size_t          read_buffer_size;
+       BlockNumber read_next_page; /* Lowest block we'll consider reading. */
+
+       /* State for writing. */
+       SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
+       BufFile    *write_file;         /* The current file to write to. */
+       BlockNumber write_page;         /* The next page to write to. */
+       char       *write_pointer;      /* Current write pointer within chunk. */
+       char       *write_end;          /* One past the end of the current chunk. */
+};
+
+static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
+                        int participant);
+
+/*
+ * Return the amount of shared memory required to hold SharedTuplestore for a
+ * given number of participants.
+ */
+size_t
+sts_estimate(int participants)
+{
+       return offsetof(SharedTuplestore, participants) +
+               sizeof(SharedTuplestoreParticipant) * participants;
+}
+
+/*
+ * Initialize a SharedTuplestore in existing shared memory.  There must be
+ * space for sts_estimate(participants) bytes.  If flags includes the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).
+ *
+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple.  This is useful for
+ * the hash values used in multi-batch hash joins, but could have other
+ * applications.
+ *
+ * The caller must supply a SharedFileSet, which is essentially a directory
+ * that will be cleaned up automatically, and a name which must be unique
+ * across all SharedTuplestores created in the same SharedFileSet.
+ */
+SharedTuplestoreAccessor *
+sts_initialize(SharedTuplestore *sts, int participants,
+                          int my_participant_number,
+                          size_t meta_data_size,
+                          int flags,
+                          SharedFileSet *fileset,
+                          const char *name)
+{
+       SharedTuplestoreAccessor *accessor;
+       int                     i;
+
+       Assert(my_participant_number < participants);
+
+       sts->nparticipants = participants;
+       sts->meta_data_size = meta_data_size;
+       sts->flags = flags;
+
+       if (strlen(name) > sizeof(sts->name) - 1)
+               elog(ERROR, "SharedTuplestore name too long");
+       strcpy(sts->name, name);
+
+       /*
+        * Limit meta-data so it + tuple size always fits into a single chunk.
+        * sts_puttuple() and sts_read_tuple() could be made to support scenarios
+        * where that's not the case, but it's not currently required. If so,
+        * meta-data size probably should be made variable, too.
+        */
+       if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
+               elog(ERROR, "meta-data too long");
+
+       for (i = 0; i < participants; ++i)
+       {
+               LWLockInitialize(&sts->participants[i].lock,
+                                                LWTRANCHE_SHARED_TUPLESTORE);
+               sts->participants[i].read_page = 0;
+               sts->participants[i].writing = false;
+       }
+
+       accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+       accessor->participant = my_participant_number;
+       accessor->sts = sts;
+       accessor->fileset = fileset;
+       accessor->context = CurrentMemoryContext;
+
+       return accessor;
+}
+
+/*
+ * Attach to a SharedTupleStore that has been initialized by another backend,
+ * so that this backend can read and write tuples.
+ */
+SharedTuplestoreAccessor *
+sts_attach(SharedTuplestore *sts,
+                  int my_participant_number,
+                  SharedFileSet *fileset)
+{
+       SharedTuplestoreAccessor *accessor;
+
+       Assert(my_participant_number < sts->nparticipants);
+
+       accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+       accessor->participant = my_participant_number;
+       accessor->sts = sts;
+       accessor->fileset = fileset;
+       accessor->context = CurrentMemoryContext;
+
+       return accessor;
+}
+
+static void
+sts_flush_chunk(SharedTuplestoreAccessor *accessor)
+{
+       size_t          size;
+       size_t          written;
+
+       size = STS_CHUNK_PAGES * BLCKSZ;
+       written = BufFileWrite(accessor->write_file, accessor->write_chunk, size);
+       if (written != size)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to temporary file: %m")));
+       memset(accessor->write_chunk, 0, size);
+       accessor->write_pointer = &accessor->write_chunk->data[0];
+       accessor->sts->participants[accessor->participant].npages +=
+               STS_CHUNK_PAGES;
+}
+
+/*
+ * Finish writing tuples.  This must be called by all backends that have
+ * written data before any backend begins reading it.
+ */
+void
+sts_end_write(SharedTuplestoreAccessor *accessor)
+{
+       if (accessor->write_file != NULL)
+       {
+               sts_flush_chunk(accessor);
+               BufFileClose(accessor->write_file);
+               pfree(accessor->write_chunk);
+               accessor->write_chunk = NULL;
+               accessor->write_file = NULL;
+               accessor->sts->participants[accessor->participant].writing = false;
+       }
+}
+
+/*
+ * Prepare to rescan.  Only one participant must call this.  After it returns,
+ * all participants may call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next().  This function must not be called concurrently
+ * with a scan, and synchronization to avoid that is the caller's
+ * responsibility.
+ */
+void
+sts_reinitialize(SharedTuplestoreAccessor *accessor)
+{
+       int                     i;
+
+       /*
+        * Reset the shared read head for all participants' files.  Also set the
+        * initial chunk size to the minimum (any increases from that size will be
+        * recorded in chunk_expansion_log).
+        */
+       for (i = 0; i < accessor->sts->nparticipants; ++i)
+       {
+               accessor->sts->participants[i].read_page = 0;
+       }
+}
+
+/*
+ * Begin scanning the contents in parallel.
+ */
+void
+sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+       int                     i PG_USED_FOR_ASSERTS_ONLY;
+
+       /* End any existing scan that was in progress. */
+       sts_end_parallel_scan(accessor);
+
+       /*
+        * Any backend that might have written into this shared tuplestore must
+        * have called sts_end_write(), so that all buffers are flushed and the
+        * files have stopped growing.
+        */
+       for (i = 0; i < accessor->sts->nparticipants; ++i)
+               Assert(!accessor->sts->participants[i].writing);
+
+       /*
+        * We will start out reading the file that THIS backend wrote.  There may
+        * be some caching locality advantage to that.
+        */
+       accessor->read_participant = accessor->participant;
+       accessor->read_file = NULL;
+       accessor->read_next_page = 0;
+}
+
+/*
+ * Finish a parallel scan, freeing associated backend-local resources.
+ */
+void
+sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+       /*
+        * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
+        * we'd probably need a reference count of current parallel scanners so we
+        * could safely do it only when the reference count reaches zero.
+        */
+       if (accessor->read_file != NULL)
+       {
+               BufFileClose(accessor->read_file);
+               accessor->read_file = NULL;
+       }
+}
+
+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+                        MinimalTuple tuple)
+{
+       size_t          size;
+
+       /* Do we have our own file yet? */
+       if (accessor->write_file == NULL)
+       {
+               SharedTuplestoreParticipant *participant;
+               char            name[MAXPGPATH];
+
+               /* Create one.  Only this backend will write into it. */
+               sts_filename(name, accessor, accessor->participant);
+               accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+
+               /* Set up the shared state for this backend's file. */
+               participant = &accessor->sts->participants[accessor->participant];
+               participant->writing = true;    /* for assertions only */
+       }
+
+       /* Do we have space? */
+       size = accessor->sts->meta_data_size + tuple->t_len;
+       if (accessor->write_pointer + size >= accessor->write_end)
+       {
+               if (accessor->write_chunk == NULL)
+               {
+                       /* First time through.  Allocate chunk. */
+                       accessor->write_chunk = (SharedTuplestoreChunk *)
+                               MemoryContextAllocZero(accessor->context,
+                                                                          STS_CHUNK_PAGES * BLCKSZ);
+                       accessor->write_chunk->ntuples = 0;
+                       accessor->write_pointer = &accessor->write_chunk->data[0];
+                       accessor->write_end = (char *)
+                               accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
+               }
+               else
+               {
+                       /* See if flushing helps. */
+                       sts_flush_chunk(accessor);
+               }
+
+               /* It may still not be enough in the case of a gigantic tuple. */
+               if (accessor->write_pointer + size >= accessor->write_end)
+               {
+                       size_t          written;
+
+                       /*
+                        * We'll write the beginning of the oversized tuple, and then
+                        * write the rest in some number of 'overflow' chunks.
+                        *
+                        * sts_initialize() verifies that the size of the tuple +
+                        * meta-data always fits into a chunk. Because the chunk has been
+                        * flushed above, we can be sure to have all of a chunk's usable
+                        * space available.
+                        */
+                       Assert(accessor->write_pointer + accessor->sts->meta_data_size +
+                                  sizeof(uint32) < accessor->write_end);
+
+                       /* Write the meta-data as one chunk. */
+                       if (accessor->sts->meta_data_size > 0)
+                               memcpy(accessor->write_pointer, meta_data,
+                                          accessor->sts->meta_data_size);
+
+                       /*
+                        * Write as much of the tuple as we can fit. This includes the
+                        * tuple's size at the start.
+                        */
+                       written = accessor->write_end - accessor->write_pointer -
+                               accessor->sts->meta_data_size;
+                       memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
+                                  tuple, written);
+                       ++accessor->write_chunk->ntuples;
+                       size -= accessor->sts->meta_data_size;
+                       size -= written;
+                       /* Now write as many overflow chunks as we need for the rest. */
+                       while (size > 0)
+                       {
+                               size_t          written_this_chunk;
+
+                               sts_flush_chunk(accessor);
+
+                               /*
+                                * How many oveflow chunks to go?  This will allow readers to
+                                * skip all of them at once instead of reading each one.
+                                */
+                               accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
+                                       STS_CHUNK_DATA_SIZE;
+                               written_this_chunk =
+                                       Min(accessor->write_end - accessor->write_pointer, size);
+                               memcpy(accessor->write_pointer, (char *) tuple + written,
+                                          written_this_chunk);
+                               accessor->write_pointer += written_this_chunk;
+                               size -= written_this_chunk;
+                               written += written_this_chunk;
+                       }
+                       return;
+               }
+       }
+
+       /* Copy meta-data and tuple into buffer. */
+       if (accessor->sts->meta_data_size > 0)
+               memcpy(accessor->write_pointer, meta_data,
+                          accessor->sts->meta_data_size);
+       memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
+                  tuple->t_len);
+       accessor->write_pointer += size;
+       ++accessor->write_chunk->ntuples;
+}
+
+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+       MinimalTuple tuple;
+       uint32          size;
+       size_t          remaining_size;
+       size_t          this_chunk_size;
+       char       *destination;
+
+       /*
+        * We'll keep track of bytes read from this chunk so that we can detect an
+        * overflowing tuple and switch to reading overflow pages.
+        */
+       if (accessor->sts->meta_data_size > 0)
+       {
+               if (BufFileRead(accessor->read_file,
+                                               meta_data,
+                                               accessor->sts->meta_data_size) !=
+                       accessor->sts->meta_data_size)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from shared tuplestore temporary file"),
+                                        errdetail_internal("Short read while reading meta-data.")));
+               accessor->read_bytes += accessor->sts->meta_data_size;
+       }
+       if (BufFileRead(accessor->read_file,
+                                       &size,
+                                       sizeof(size)) != sizeof(size))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from shared tuplestore temporary file"),
+                                errdetail_internal("Short read while reading size.")));
+       accessor->read_bytes += sizeof(size);
+       if (size > accessor->read_buffer_size)
+       {
+               size_t          new_read_buffer_size;
+
+               if (accessor->read_buffer != NULL)
+                       pfree(accessor->read_buffer);
+               new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
+               accessor->read_buffer =
+                       MemoryContextAlloc(accessor->context, new_read_buffer_size);
+               accessor->read_buffer_size = new_read_buffer_size;
+       }
+       remaining_size = size - sizeof(uint32);
+       this_chunk_size = Min(remaining_size,
+                                                 BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
+       destination = accessor->read_buffer + sizeof(uint32);
+       if (BufFileRead(accessor->read_file,
+                                       destination,
+                                       this_chunk_size) != this_chunk_size)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from shared tuplestore temporary file"),
+                                errdetail_internal("Short read while reading tuple.")));
+       accessor->read_bytes += this_chunk_size;
+       remaining_size -= this_chunk_size;
+       destination += this_chunk_size;
+       ++accessor->read_ntuples;
+
+       /* Check if we need to read any overflow chunks. */
+       while (remaining_size > 0)
+       {
+               /* We are now positioned at the start of an overflow chunk. */
+               SharedTuplestoreChunk chunk_header;
+
+               if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
+                       STS_CHUNK_HEADER_SIZE)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from shared tuplestore temporary file"),
+                                        errdetail_internal("Short read while reading overflow chunk header.")));
+               accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+               if (chunk_header.overflow == 0)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("unexpected chunk in shared tuplestore temporary file"),
+                                        errdetail_internal("Expected overflow chunk.")));
+               accessor->read_next_page += STS_CHUNK_PAGES;
+               this_chunk_size = Min(remaining_size,
+                                                         BLCKSZ * STS_CHUNK_PAGES -
+                                                         STS_CHUNK_HEADER_SIZE);
+               if (BufFileRead(accessor->read_file,
+                                               destination,
+                                               this_chunk_size) != this_chunk_size)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from shared tuplestore temporary file"),
+                                        errdetail_internal("Short read while reading tuple.")));
+               accessor->read_bytes += this_chunk_size;
+               remaining_size -= this_chunk_size;
+               destination += this_chunk_size;
+
+               /*
+                * These will be used to count regular tuples following the oversized
+                * tuple that spilled into this overflow chunk.
+                */
+               accessor->read_ntuples = 0;
+               accessor->read_ntuples_available = chunk_header.ntuples;
+       }
+
+       tuple = (MinimalTuple) accessor->read_buffer;
+       tuple->t_len = size;
+
+       return tuple;
+}
+
+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+       SharedTuplestoreParticipant *p;
+       BlockNumber read_page;
+       bool            eof;
+
+       for (;;)
+       {
+               /* Can we read more tuples from the current chunk? */
+               if (accessor->read_ntuples < accessor->read_ntuples_available)
+                       return sts_read_tuple(accessor, meta_data);
+
+               /* Find the location of a new chunk to read. */
+               p = &accessor->sts->participants[accessor->read_participant];
+
+               LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+               /* We can skip directly past overflow pages we know about. */
+               if (p->read_page < accessor->read_next_page)
+                       p->read_page = accessor->read_next_page;
+               eof = p->read_page >= p->npages;
+               if (!eof)
+               {
+                       /* Claim the next chunk. */
+                       read_page = p->read_page;
+                       /* Advance the read head for the next reader. */
+                       p->read_page += STS_CHUNK_PAGES;
+                       accessor->read_next_page = p->read_page;
+               }
+               LWLockRelease(&p->lock);
+
+               if (!eof)
+               {
+                       SharedTuplestoreChunk chunk_header;
+
+                       /* Make sure we have the file open. */
+                       if (accessor->read_file == NULL)
+                       {
+                               char            name[MAXPGPATH];
+
+                               sts_filename(name, accessor, accessor->read_participant);
+                               accessor->read_file =
+                                       BufFileOpenShared(accessor->fileset, name);
+                       }
+
+                       /* Seek and load the chunk header. */
+                       if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not read from shared tuplestore temporary file"),
+                                                errdetail_internal("Could not seek to next block.")));
+                       if (BufFileRead(accessor->read_file, &chunk_header,
+                                                       STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not read from shared tuplestore temporary file"),
+                                                errdetail_internal("Short read while reading chunk header.")));
+
+                       /*
+                        * If this is an overflow chunk, we skip it and any following
+                        * overflow chunks all at once.
+                        */
+                       if (chunk_header.overflow > 0)
+                       {
+                               accessor->read_next_page = read_page +
+                                       chunk_header.overflow * STS_CHUNK_PAGES;
+                               continue;
+                       }
+
+                       accessor->read_ntuples = 0;
+                       accessor->read_ntuples_available = chunk_header.ntuples;
+                       accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+
+                       /* Go around again, so we can get a tuple from this chunk. */
+               }
+               else
+               {
+                       if (accessor->read_file != NULL)
+                       {
+                               BufFileClose(accessor->read_file);
+                               accessor->read_file = NULL;
+                       }
+
+                       /*
+                        * Try the next participant's file.  If we've gone full circle,
+                        * we're done.
+                        */
+                       accessor->read_participant = (accessor->read_participant + 1) %
+                               accessor->sts->nparticipants;
+                       if (accessor->read_participant == accessor->participant)
+                               break;
+                       accessor->read_next_page = 0;
+
+                       /* Go around again, so we can get a chunk from this file. */
+               }
+       }
+
+       return NULL;
+}
+
+/*
+ * Create the name used for the BufFile that a given participant will write.
+ */
+static void
+sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+       snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}
index 460843d73e2d25c7a2efb1b31fd0afbc786cfc81..a347ee4d7de34e1229d283baabab748e752ab9af 100644 (file)
@@ -215,6 +215,7 @@ typedef enum BuiltinTrancheIds
        LWTRANCHE_SESSION_DSA,
        LWTRANCHE_SESSION_RECORD_TABLE,
        LWTRANCHE_SESSION_TYPMOD_TABLE,
+       LWTRANCHE_SHARED_TUPLESTORE,
        LWTRANCHE_TBM,
        LWTRANCHE_PARALLEL_APPEND,
        LWTRANCHE_FIRST_USER_DEFINED
diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h
new file mode 100644 (file)
index 0000000..49490ec
--- /dev/null
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.h
+ *       Simple mechinism for sharing tuples between backends.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/sharedtuplestore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SHAREDTUPLESTORE_H
+#define SHAREDTUPLESTORE_H
+
+#include "storage/fd.h"
+#include "storage/sharedfileset.h"
+
+struct SharedTuplestore;
+typedef struct SharedTuplestore SharedTuplestore;
+
+struct SharedTuplestoreAccessor;
+typedef struct SharedTuplestoreAccessor SharedTuplestoreAccessor;
+
+/*
+ * A flag indicating that the tuplestore will only be scanned once, so backing
+ * files can be unlinked early.
+ */
+#define SHARED_TUPLESTORE_SINGLE_PASS 0x01
+
+extern size_t sts_estimate(int participants);
+
+extern SharedTuplestoreAccessor *sts_initialize(SharedTuplestore *sts,
+                          int participants,
+                          int my_participant_number,
+                          size_t meta_data_size,
+                          int flags,
+                          SharedFileSet *fileset,
+                          const char *name);
+
+extern SharedTuplestoreAccessor *sts_attach(SharedTuplestore *sts,
+                  int my_participant_number,
+                  SharedFileSet *fileset);
+
+extern void sts_end_write(SharedTuplestoreAccessor *accessor);
+
+extern void sts_reinitialize(SharedTuplestoreAccessor *accessor);
+
+extern void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor);
+
+extern void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor);
+
+extern void sts_puttuple(SharedTuplestoreAccessor *accessor,
+                        void *meta_data,
+                        MinimalTuple tuple);
+
+extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor,
+                                          void *meta_data);
+
+#endif                                                 /* SHAREDTUPLESTORE_H */
index 72eb9fd390cd07f7130cc1ae2e50f21d5ce5c316..e308e201847c99054a8224f943e7f6ec38332ba5 100644 (file)
@@ -2038,6 +2038,8 @@ SharedRecordTableEntry
 SharedRecordTableKey
 SharedRecordTypmodRegistry
 SharedSortInfo
+SharedTuplestore
+SharedTuplestoreAccessor
 SharedTypmodTableEntry
 ShellTypeInfo
 ShippableCacheEntry