/* Do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- bloomBuildCallback, (void *) &buildstate);
+ bloomBuildCallback, (void *) &buildstate,
+ NULL);
/*
* There are could be some items in cached page. Flush this page if
<para>
When changing this value, consider also adjusting
- <xref linkend="guc-max-parallel-workers"/> and
+ <xref linkend="guc-max-parallel-workers"/>,
+ <xref linkend="guc-max-parallel-workers-maintenance"/>, and
<xref linkend="guc-max-parallel-workers-per-gather"/>.
</para>
</listitem>
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-parallel-workers-maintenance" xreflabel="max_parallel_maintenance_workers">
+ <term><varname>max_parallel_maintenance_workers</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_parallel_maintenance_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of parallel workers that can be
+ started by a single utility command. Currently, the only
+ parallel utility command that supports the use of parallel
+ workers is <command>CREATE INDEX</command>, and only when
+ building a B-tree index. Parallel workers are taken from the
+ pool of processes established by <xref
+ linkend="guc-max-worker-processes"/>, limited by <xref
+ linkend="guc-max-parallel-workers"/>. Note that the requested
+ number of workers may not actually be available at runtime.
+ If this occurs, the utility operation will run with fewer
+ workers than expected. The default value is 2. Setting this
+ value to 0 disables the use of parallel workers by utility
+ commands.
+ </para>
+
+ <para>
+ Note that parallel utility commands should not consume
+ substantially more memory than equivalent non-parallel
+ operations. This strategy differs from that of parallel
+ query, where resource limits generally apply per worker
+ process. Parallel utility commands treat the resource limit
+ <varname>maintenance_work_mem</varname> as a limit to be applied to
+ the entire utility command, regardless of the number of
+ parallel worker processes. However, parallel utility
+ commands may still consume substantially more CPU resources
+ and I/O bandwidth.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers">
<term><varname>max_parallel_workers</varname> (<type>integer</type>)
<indexterm>
<listitem>
<para>
Sets the maximum number of workers that the system can support for
- parallel queries. The default value is 8. When increasing or
+ parallel operations. The default value is 8. When increasing or
decreasing this value, consider also adjusting
+ <xref linkend="guc-max-parallel-workers-maintenance"/> and
<xref linkend="guc-max-parallel-workers-per-gather"/>.
Also, note that a setting for this value which is higher than
<xref linkend="guc-max-worker-processes"/> will have no effect,
<entry>Waiting in an extension.</entry>
</row>
<row>
- <entry morerows="32"><literal>IPC</literal></entry>
+ <entry morerows="33"><literal>IPC</literal></entry>
<entry><literal>BgWorkerShutdown</literal></entry>
<entry>Waiting for background worker to shut down.</entry>
</row>
<entry><literal>ParallelBitmapScan</literal></entry>
<entry>Waiting for parallel bitmap scan to become initialized.</entry>
</row>
+ <row>
+ <entry><literal>ParallelCreateIndexScan</literal></entry>
+ <entry>Waiting for parallel <command>CREATE INDEX</command> workers to finish heap scan.</entry>
+ </row>
<row>
<entry><literal>ProcArrayGroupUpdate</literal></entry>
<entry>Waiting for group leader to clear transaction id at transaction end.</entry>
</row>
<row>
<entry><literal>sort-start</literal></entry>
- <entry><literal>(int, bool, int, int, bool)</literal></entry>
+ <entry><literal>(int, bool, int, int, bool, int)</literal></entry>
<entry>Probe that fires when a sort operation is started.
arg0 indicates heap, index or datum sort.
arg1 is true for unique-value enforcement.
arg2 is the number of key columns.
arg3 is the number of kilobytes of work memory allowed.
- arg4 is true if random access to the sort result is required.</entry>
+ arg4 is true if random access to the sort result is required.
+ arg5 indicates serial when <literal>0</literal>, parallel worker when
+ <literal>1</literal>, or parallel leader when <literal>2</literal>.</entry>
</row>
<row>
<entry><literal>sort-done</literal></entry>
which would drive the machine into swapping.
</para>
+ <para>
+ <productname>PostgreSQL</productname> can build indexes while
+ leveraging multiple CPUs in order to process the table rows faster.
+ This feature is known as <firstterm>parallel index
+ build</firstterm>. For index methods that support building indexes
+ in parallel (currently, only B-tree),
+ <varname>maintenance_work_mem</varname> specifies the maximum
+ amount of memory that can be used by each index build operation as
+ a whole, regardless of how many worker processes were started.
+ Generally, a cost model automatically determines how many worker
+ processes should be requested, if any.
+ </para>
+
+ <para>
+ Parallel index builds may benefit from increasing
+ <varname>maintenance_work_mem</varname> where an equivalent serial
+ index build will see little or no benefit. Note that
+ <varname>maintenance_work_mem</varname> may influence the number of
+ worker processes requested, since parallel workers must have at
+ least a <literal>32MB</literal> share of the total
+ <varname>maintenance_work_mem</varname> budget. There must also be
+ a remaining <literal>32MB</literal> share for the leader process.
+ Increasing <xref linkend="guc-max-parallel-workers-maintenance"/>
+ may allow more workers to be used, which will reduce the time
+ needed for index creation, so long as the index build is not
+ already I/O bound. Of course, there should also be sufficient
+ CPU capacity that would otherwise lie idle.
+ </para>
+
+ <para>
+ Setting a value for <literal>parallel_workers</literal> via <xref
+ linkend="sql-altertable"/> directly controls how many parallel
+ worker processes will be requested by a <command>CREATE
+ INDEX</command> against the table. This bypasses the cost model
+ completely, and prevents <varname>maintenance_work_mem</varname>
+ from affecting how many parallel workers are requested. Setting
+ <literal>parallel_workers</literal> to 0 via <command>ALTER
+ TABLE</command> will disable parallel index builds on the table in
+ all cases.
+ </para>
+
+ <tip>
+ <para>
+ You might want to reset <literal>parallel_workers</literal> after
+ setting it as part of tuning an index build. This avoids
+ inadvertent changes to query plans, since
+ <literal>parallel_workers</literal> affects
+ <emphasis>all</emphasis> parallel table scans.
+ </para>
+ </tip>
+
+ <para>
+ While <command>CREATE INDEX</command> with the
+ <literal>CONCURRENTLY</literal> option supports parallel builds
+ without special restrictions, only the first table scan is actually
+ performed in parallel.
+ </para>
+
<para>
Use <xref linkend="sql-dropindex"/>
to remove an index.
This sets the number of workers that should be used to assist a parallel
scan of this table. If not set, the system will determine a value based
on the relation size. The actual number of workers chosen by the planner
- may be less, for example due to
- the setting of <xref linkend="guc-max-worker-processes"/>.
+ or by utility statements that use parallel scans may be less, for example
+ due to the setting of <xref linkend="guc-max-worker-processes"/>.
</para>
</listitem>
</varlistentry>
* heap blocks in physical order.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- brinbuildCallback, (void *) state);
+ brinbuildCallback, (void *) state, NULL);
/* process the final batch */
form_and_insert_tuple(state);
state->bs_currRangeStart = heapBlk;
IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
heapBlk, scanNumBlks,
- brinbuildCallback, (void *) state);
+ brinbuildCallback, (void *) state, NULL);
/*
* Now we update the values obtained by the scan with the placeholder
* prefers to receive tuples in TID order.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- ginBuildCallback, (void *) &buildstate);
+ ginBuildCallback, (void *) &buildstate, NULL);
/* dump remaining entries to the index */
oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
* Do the heap scan.
*/
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- gistBuildCallback, (void *) &buildstate);
+ gistBuildCallback, (void *) &buildstate, NULL);
/*
* If buffering was used, flush out all the tuples that are still in the
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- hashbuildCallback, (void *) &buildstate);
+ hashbuildCallback, (void *) &buildstate, NULL);
if (buildstate.spool)
{
hspool->low_mask,
hspool->max_buckets,
maintenance_work_mem,
+ NULL,
false);
return hspool;
SpinLockInit(&target->phs_mutex);
target->phs_startblock = InvalidBlockNumber;
pg_atomic_init_u64(&target->phs_nallocated, 0);
- SerializeSnapshot(snapshot, target->phs_snapshot_data);
+ if (IsMVCCSnapshot(snapshot))
+ {
+ SerializeSnapshot(snapshot, target->phs_snapshot_data);
+ target->phs_snapshot_any = false;
+ }
+ else
+ {
+ Assert(snapshot == SnapshotAny);
+ target->phs_snapshot_any = true;
+ }
}
/* ----------------
Snapshot snapshot;
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
- snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
- RegisterSnapshot(snapshot);
+
+ if (!parallel_scan->phs_snapshot_any)
+ {
+ /* Snapshot was serialized -- restore it */
+ snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+ RegisterSnapshot(snapshot);
+ }
+ else
+ {
+ /* SnapshotAny passed by caller (not serialized) */
+ snapshot = SnapshotAny;
+ }
return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
- true, true, true, false, false, true);
+ true, true, true, false, false,
+ !parallel_scan->phs_snapshot_any);
}
/* ----------------
#include "access/nbtree.h"
#include "access/relscan.h"
#include "access/xlog.h"
-#include "catalog/index.h"
#include "commands/vacuum.h"
+#include "nodes/execnodes.h"
#include "pgstat.h"
#include "storage/condition_variable.h"
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
-#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/builtins.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
-/* Working state for btbuild and its callback */
-typedef struct
-{
- bool isUnique;
- bool haveDead;
- Relation heapRel;
- BTSpool *spool;
-
- /*
- * spool2 is needed only when the index is a unique index. Dead tuples are
- * put into spool2 instead of spool in order to avoid uniqueness check.
- */
- BTSpool *spool2;
- double indtuples;
-} BTBuildState;
-
/* Working state needed by btvacuumpage */
typedef struct
{
typedef struct BTParallelScanDescData *BTParallelScanDesc;
-static void btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state);
static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state,
BTCycleId cycleid);
PG_RETURN_POINTER(amroutine);
}
-/*
- * btbuild() -- build a new btree index.
- */
-IndexBuildResult *
-btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
-{
- IndexBuildResult *result;
- double reltuples;
- BTBuildState buildstate;
-
- buildstate.isUnique = indexInfo->ii_Unique;
- buildstate.haveDead = false;
- buildstate.heapRel = heap;
- buildstate.spool = NULL;
- buildstate.spool2 = NULL;
- buildstate.indtuples = 0;
-
-#ifdef BTREE_BUILD_STATS
- if (log_btree_build_stats)
- ResetUsage();
-#endif /* BTREE_BUILD_STATS */
-
- /*
- * We expect to be called exactly once for any index relation. If that's
- * not the case, big trouble's what we have.
- */
- if (RelationGetNumberOfBlocks(index) != 0)
- elog(ERROR, "index \"%s\" already contains data",
- RelationGetRelationName(index));
-
- buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
-
- /*
- * If building a unique index, put dead tuples in a second spool to keep
- * them out of the uniqueness check.
- */
- if (indexInfo->ii_Unique)
- buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
-
- /* do the heap scan */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- btbuildCallback, (void *) &buildstate);
-
- /* okay, all heap tuples are indexed */
- if (buildstate.spool2 && !buildstate.haveDead)
- {
- /* spool2 turns out to be unnecessary */
- _bt_spooldestroy(buildstate.spool2);
- buildstate.spool2 = NULL;
- }
-
- /*
- * Finish the build by (1) completing the sort of the spool file, (2)
- * inserting the sorted tuples into btree pages and (3) building the upper
- * levels.
- */
- _bt_leafbuild(buildstate.spool, buildstate.spool2);
- _bt_spooldestroy(buildstate.spool);
- if (buildstate.spool2)
- _bt_spooldestroy(buildstate.spool2);
-
-#ifdef BTREE_BUILD_STATS
- if (log_btree_build_stats)
- {
- ShowUsage("BTREE BUILD STATS");
- ResetUsage();
- }
-#endif /* BTREE_BUILD_STATS */
-
- /*
- * Return statistics
- */
- result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
-
- result->heap_tuples = reltuples;
- result->index_tuples = buildstate.indtuples;
-
- return result;
-}
-
-/*
- * Per-tuple callback from IndexBuildHeapScan
- */
-static void
-btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *values,
- bool *isnull,
- bool tupleIsAlive,
- void *state)
-{
- BTBuildState *buildstate = (BTBuildState *) state;
-
- /*
- * insert the index tuple into the appropriate spool file for subsequent
- * processing
- */
- if (tupleIsAlive || buildstate->spool2 == NULL)
- _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
- else
- {
- /* dead tuples are put into spool2 */
- buildstate->haveDead = true;
- _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
- }
-
- buildstate->indtuples += 1;
-}
-
/*
* btbuildempty() -- build an empty btree index in the initialization fork
*/
#include "postgres.h"
#include "access/nbtree.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
+#include "catalog/index.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "storage/smgr.h"
-#include "tcop/tcopprot.h"
+#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/rel.h"
#include "utils/sortsupport.h"
#include "utils/tuplesort.h"
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BTREE_SHARED UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
+
+/*
+ * DISABLE_LEADER_PARTICIPATION disables the leader's participation in
+ * parallel index builds. This may be useful as a debugging aid.
+#undef DISABLE_LEADER_PARTICIPATION
+ */
+
/*
* Status record for spooling/sorting phase. (Note we may have two of
* these due to the special requirements for uniqueness-checking with
* dead tuples.)
*/
-struct BTSpool
+typedef struct BTSpool
{
Tuplesortstate *sortstate; /* state data for tuplesort.c */
Relation heap;
Relation index;
bool isunique;
-};
+} BTSpool;
+
+/*
+ * Status for index builds performed in parallel. This is allocated in a
+ * dynamic shared memory segment. Note that there is a separate tuplesort TOC
+ * entry, private to tuplesort.c but allocated by this module on its behalf.
+ */
+typedef struct BTShared
+{
+ /*
+ * These fields are not modified during the sort. They primarily exist
+ * for the benefit of worker processes that need to create BTSpool state
+ * corresponding to that used by the leader.
+ */
+ Oid heaprelid;
+ Oid indexrelid;
+ bool isunique;
+ bool isconcurrent;
+ int scantuplesortstates;
+
+ /*
+ * workersdonecv is used to monitor the progress of workers. All parallel
+ * participants must indicate that they are done before leader can use
+ * mutable state that workers maintain during scan (and before leader can
+ * proceed to tuplesort_performsort()).
+ */
+ ConditionVariable workersdonecv;
+
+ /*
+ * mutex protects all fields before heapdesc.
+ *
+ * These fields contain status information of interest to B-Tree index
+ * builds that must work just the same when an index is built in parallel.
+ */
+ slock_t mutex;
+
+ /*
+ * Mutable state that is maintained by workers, and reported back to
+ * leader at end of parallel scan.
+ *
+ * nparticipantsdone is number of worker processes finished.
+ *
+ * reltuples is the total number of input heap tuples.
+ *
+ * havedead indicates if RECENTLY_DEAD tuples were encountered during
+ * build.
+ *
+ * indtuples is the total number of tuples that made it into the index.
+ *
+ * brokenhotchain indicates if any worker detected a broken HOT chain
+ * during build.
+ */
+ int nparticipantsdone;
+ double reltuples;
+ bool havedead;
+ double indtuples;
+ bool brokenhotchain;
+
+ /*
+ * This variable-sized field must come last.
+ *
+ * See _bt_parallel_estimate_shared().
+ */
+ ParallelHeapScanDescData heapdesc;
+} BTShared;
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BTLeader
+{
+ /* parallel context itself */
+ ParallelContext *pcxt;
+
+ /*
+ * nparticipanttuplesorts is the exact number of worker processes
+ * successfully launched, plus one leader process if it participates as a
+ * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+ * participating as a worker).
+ */
+ int nparticipanttuplesorts;
+
+ /*
+ * Leader process convenience pointers to shared state (leader avoids TOC
+ * lookups).
+ *
+ * btshared is the shared state for entire build. sharedsort is the
+ * shared, tuplesort-managed state passed to each process tuplesort.
+ * sharedsort2 is the corresponding btspool2 shared state, used only when
+ * building unique indexes. snapshot is the snapshot used by the scan iff
+ * an MVCC snapshot is required.
+ */
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ Snapshot snapshot;
+} BTLeader;
+
+/*
+ * Working state for btbuild and its callback.
+ *
+ * When parallel CREATE INDEX is used, there is a BTBuildState for each
+ * participant.
+ */
+typedef struct BTBuildState
+{
+ bool isunique;
+ bool havedead;
+ Relation heap;
+ BTSpool *spool;
+
+ /*
+ * spool2 is needed only when the index is a unique index. Dead tuples are
+ * put into spool2 instead of spool in order to avoid uniqueness check.
+ */
+ BTSpool *spool2;
+ double indtuples;
+
+ /*
+ * btleader is only present when a parallel index build is performed, and
+ * only in the leader process. (Actually, only the leader has a
+ * BTBuildState. Workers have their own spool and spool2, though.)
+ */
+ BTLeader *btleader;
+} BTBuildState;
/*
* Status record for a btree page being built. We have one of these
} BTWriteState;
+static double _bt_spools_heapscan(Relation heap, Relation index,
+ BTBuildState *buildstate, IndexInfo *indexInfo);
+static void _bt_spooldestroy(BTSpool *btspool);
+static void _bt_spool(BTSpool *btspool, ItemPointer self,
+ Datum *values, bool *isnull);
+static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
+static void _bt_build_callback(Relation index, HeapTuple htup, Datum *values,
+ bool *isnull, bool tupleIsAlive, void *state);
static Page _bt_blnewpage(uint32 level);
static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
static void _bt_slideleft(Page page);
static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
static void _bt_load(BTWriteState *wstate,
BTSpool *btspool, BTSpool *btspool2);
+static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent,
+ int request);
+static void _bt_end_parallel(BTLeader *btleader);
+static Size _bt_parallel_estimate_shared(Snapshot snapshot);
+static double _bt_parallel_heapscan(BTBuildState *buildstate,
+ bool *brokenhotchain);
+static void _bt_leader_participate_as_worker(BTBuildState *buildstate);
+static void _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+ BTShared *btshared, Sharedsort *sharedsort,
+ Sharedsort *sharedsort2, int sortmem);
/*
- * Interface routines
+ * btbuild() -- build a new btree index.
*/
+IndexBuildResult *
+btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ IndexBuildResult *result;
+ BTBuildState buildstate;
+ double reltuples;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ ResetUsage();
+#endif /* BTREE_BUILD_STATS */
+
+ buildstate.isunique = indexInfo->ii_Unique;
+ buildstate.havedead = false;
+ buildstate.heap = heap;
+ buildstate.spool = NULL;
+ buildstate.spool2 = NULL;
+ buildstate.indtuples = 0;
+ buildstate.btleader = NULL;
+
+ /*
+ * We expect to be called exactly once for any index relation. If that's
+ * not the case, big trouble's what we have.
+ */
+ if (RelationGetNumberOfBlocks(index) != 0)
+ elog(ERROR, "index \"%s\" already contains data",
+ RelationGetRelationName(index));
+
+ reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo);
+
+ /*
+ * Finish the build by (1) completing the sort of the spool file, (2)
+ * inserting the sorted tuples into btree pages and (3) building the upper
+ * levels. Finally, it may also be necessary to end use of parallelism.
+ */
+ _bt_leafbuild(buildstate.spool, buildstate.spool2);
+ _bt_spooldestroy(buildstate.spool);
+ if (buildstate.spool2)
+ _bt_spooldestroy(buildstate.spool2);
+ if (buildstate.btleader)
+ _bt_end_parallel(buildstate.btleader);
+
+ result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
+
+ result->heap_tuples = reltuples;
+ result->index_tuples = buildstate.indtuples;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ ShowUsage("BTREE BUILD STATS");
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+ return result;
+}
/*
- * create and initialize a spool structure
+ * Create and initialize one or two spool structures, and save them in caller's
+ * buildstate argument. May also fill-in fields within indexInfo used by index
+ * builds.
+ *
+ * Scans the heap, possibly in parallel, filling spools with IndexTuples. This
+ * routine encapsulates all aspects of managing parallelism. Caller need only
+ * call _bt_end_parallel() in parallel case after it is done with spool/spool2.
+ *
+ * Returns the total number of heap tuples scanned.
*/
-BTSpool *
-_bt_spoolinit(Relation heap, Relation index, bool isunique, bool isdead)
+static double
+_bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
+ IndexInfo *indexInfo)
{
BTSpool *btspool = (BTSpool *) palloc0(sizeof(BTSpool));
- int btKbytes;
+ SortCoordinate coordinate = NULL;
+ double reltuples = 0;
+ /*
+ * We size the sort area as maintenance_work_mem rather than work_mem to
+ * speed index creation. This should be OK since a single backend can't
+ * run multiple index creations in parallel (see also: notes on
+ * parallelism and maintenance_work_mem below).
+ */
btspool->heap = heap;
btspool->index = index;
- btspool->isunique = isunique;
+ btspool->isunique = indexInfo->ii_Unique;
+
+ /* Save as primary spool */
+ buildstate->spool = btspool;
+
+ /* Attempt to launch parallel worker scan when required */
+ if (indexInfo->ii_ParallelWorkers > 0)
+ _bt_begin_parallel(buildstate, indexInfo->ii_Concurrent,
+ indexInfo->ii_ParallelWorkers);
/*
- * We size the sort area as maintenance_work_mem rather than work_mem to
- * speed index creation. This should be OK since a single backend can't
- * run multiple index creations in parallel. Note that creation of a
- * unique index actually requires two BTSpool objects. We expect that the
- * second one (for dead tuples) won't get very full, so we give it only
- * work_mem.
+ * If parallel build requested and at least one worker process was
+ * successfully launched, set up coordination state
*/
- btKbytes = isdead ? work_mem : maintenance_work_mem;
- btspool->sortstate = tuplesort_begin_index_btree(heap, index, isunique,
- btKbytes, false);
+ if (buildstate->btleader)
+ {
+ coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = false;
+ coordinate->nParticipants =
+ buildstate->btleader->nparticipanttuplesorts;
+ coordinate->sharedsort = buildstate->btleader->sharedsort;
+ }
- return btspool;
+ /*
+ * Begin serial/leader tuplesort.
+ *
+ * In cases where parallelism is involved, the leader receives the same
+ * share of maintenance_work_mem as a serial sort (it is generally treated
+ * in the same way as a serial sort once we return). Parallel worker
+ * Tuplesortstates will have received only a fraction of
+ * maintenance_work_mem, though.
+ *
+ * We rely on the lifetime of the Leader Tuplesortstate almost not
+ * overlapping with any worker Tuplesortstate's lifetime. There may be
+ * some small overlap, but that's okay because we rely on leader
+ * Tuplesortstate only allocating a small, fixed amount of memory here.
+ * When its tuplesort_performsort() is called (by our caller), and
+ * significant amounts of memory are likely to be used, all workers must
+ * have already freed almost all memory held by their Tuplesortstates
+ * (they are about to go away completely, too). The overall effect is
+ * that maintenance_work_mem always represents an absolute high watermark
+ * on the amount of memory used by a CREATE INDEX operation, regardless of
+ * the use of parallelism or any other factor.
+ */
+ buildstate->spool->sortstate =
+ tuplesort_begin_index_btree(heap, index, buildstate->isunique,
+ maintenance_work_mem, coordinate,
+ false);
+
+ /*
+ * If building a unique index, put dead tuples in a second spool to keep
+ * them out of the uniqueness check. We expect that the second spool (for
+ * dead tuples) won't get very full, so we give it only work_mem.
+ */
+ if (indexInfo->ii_Unique)
+ {
+ BTSpool *btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+ SortCoordinate coordinate2 = NULL;
+
+ /* Initialize secondary spool */
+ btspool2->heap = heap;
+ btspool2->index = index;
+ btspool2->isunique = false;
+ /* Save as secondary spool */
+ buildstate->spool2 = btspool2;
+
+ if (buildstate->btleader)
+ {
+ /*
+ * Set up non-private state that is passed to
+ * tuplesort_begin_index_btree() about the basic high level
+ * coordination of a parallel sort.
+ */
+ coordinate2 = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+ coordinate2->isWorker = false;
+ coordinate2->nParticipants =
+ buildstate->btleader->nparticipanttuplesorts;
+ coordinate2->sharedsort = buildstate->btleader->sharedsort2;
+ }
+
+ /*
+ * We expect that the second one (for dead tuples) won't get very
+ * full, so we give it only work_mem
+ */
+ buildstate->spool2->sortstate =
+ tuplesort_begin_index_btree(heap, index, false, work_mem,
+ coordinate2, false);
+ }
+
+ /* Fill spool using either serial or parallel heap scan */
+ if (!buildstate->btleader)
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
+ _bt_build_callback, (void *) buildstate,
+ NULL);
+ else
+ reltuples = _bt_parallel_heapscan(buildstate,
+ &indexInfo->ii_BrokenHotChain);
+
+ /* okay, all heap tuples are spooled */
+ if (buildstate->spool2 && !buildstate->havedead)
+ {
+ /* spool2 turns out to be unnecessary */
+ _bt_spooldestroy(buildstate->spool2);
+ buildstate->spool2 = NULL;
+ }
+
+ return reltuples;
}
/*
* clean up a spool structure and its substructures.
*/
-void
+static void
_bt_spooldestroy(BTSpool *btspool)
{
tuplesort_end(btspool->sortstate);
/*
* spool an index entry into the sort file.
*/
-void
+static void
_bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull)
{
tuplesort_putindextuplevalues(btspool->sortstate, btspool->index,
* given a spool loaded by successive calls to _bt_spool,
* create an entire btree.
*/
-void
+static void
_bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
{
BTWriteState wstate;
_bt_load(&wstate, btspool, btspool2);
}
-
/*
- * Internal routines.
+ * Per-tuple callback from IndexBuildHeapScan
*/
+static void
+_bt_build_callback(Relation index,
+ HeapTuple htup,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state)
+{
+ BTBuildState *buildstate = (BTBuildState *) state;
+ /*
+ * insert the index tuple into the appropriate spool file for subsequent
+ * processing
+ */
+ if (tupleIsAlive || buildstate->spool2 == NULL)
+ _bt_spool(buildstate->spool, &htup->t_self, values, isnull);
+ else
+ {
+ /* dead tuples are put into spool2 */
+ buildstate->havedead = true;
+ _bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
+ }
+
+ buildstate->indtuples += 1;
+}
/*
* allocate workspace for a new, clean btree page, not linked to any siblings.
smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM);
}
}
+
+/*
+ * Create parallel context, and launch workers for leader.
+ *
+ * buildstate argument should be initialized (with the exception of the
+ * tuplesort state in spools, which may later be created based on shared
+ * state initially set up here).
+ *
+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
+ *
+ * request is the target number of parallel worker processes to launch.
+ *
+ * Sets buildstate's BTLeader, which caller must use to shut down parallel
+ * mode by passing it to _bt_end_parallel() at the very end of its index
+ * build. If not even a single worker process can be launched, this is
+ * never set, and caller should proceed with a serial index build.
+ */
+static void
+_bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
+{
+ ParallelContext *pcxt;
+ int scantuplesortstates;
+ Snapshot snapshot;
+ Size estbtshared;
+ Size estsort;
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ BTSpool *btspool = buildstate->spool;
+ BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+ bool leaderparticipates = true;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+ leaderparticipates = false;
+#endif
+
+ /*
+ * Enter parallel mode, and create context for parallel build of btree
+ * index
+ */
+ EnterParallelMode();
+ Assert(request > 0);
+ pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main",
+ request, true);
+ scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we use
+ * SnapshotAny because we must retrieve all tuples and do our own time
+ * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+ * concurrent build, we take a regular MVCC snapshot and index whatever's
+ * live according to that.
+ */
+ if (!isconcurrent)
+ snapshot = SnapshotAny;
+ else
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ /*
+ * Estimate size for at least two keys -- our own
+ * PARALLEL_KEY_BTREE_SHARED workspace, and PARALLEL_KEY_TUPLESORT
+ * tuplesort workspace
+ */
+ estbtshared = _bt_parallel_estimate_shared(snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, estbtshared);
+ estsort = tuplesort_estimate_shared(scantuplesortstates);
+ shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+ /*
+ * Unique case requires a second spool, and so we may have to account for
+ * a third shared workspace -- PARALLEL_KEY_TUPLESORT_SPOOL2
+ */
+ if (!btspool->isunique)
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+ else
+ {
+ shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+ shm_toc_estimate_keys(&pcxt->estimator, 3);
+ }
+
+ /* Everyone's had a chance to ask for space, so now create the DSM */
+ InitializeParallelDSM(pcxt);
+
+ /* Store shared build state, for which we reserved space */
+ btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared);
+ /* Initialize immutable state */
+ btshared->heaprelid = RelationGetRelid(btspool->heap);
+ btshared->indexrelid = RelationGetRelid(btspool->index);
+ btshared->isunique = btspool->isunique;
+ btshared->isconcurrent = isconcurrent;
+ btshared->scantuplesortstates = scantuplesortstates;
+ ConditionVariableInit(&btshared->workersdonecv);
+ SpinLockInit(&btshared->mutex);
+ /* Initialize mutable state */
+ btshared->nparticipantsdone = 0;
+ btshared->reltuples = 0.0;
+ btshared->havedead = false;
+ btshared->indtuples = 0.0;
+ btshared->brokenhotchain = false;
+ heap_parallelscan_initialize(&btshared->heapdesc, btspool->heap, snapshot);
+
+ /*
+ * Store shared tuplesort-private state, for which we reserved space.
+ * Then, initialize opaque state using tuplesort routine.
+ */
+ sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+ tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+ pcxt->seg);
+
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_BTREE_SHARED, btshared);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+ /* Unique case requires a second spool, and associated shared state */
+ if (!btspool->isunique)
+ sharedsort2 = NULL;
+ else
+ {
+ /*
+ * Store additional shared tuplesort-private state, for which we
+ * reserved space. Then, initialize opaque state using tuplesort
+ * routine.
+ */
+ sharedsort2 = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+ tuplesort_initialize_shared(sharedsort2, scantuplesortstates,
+ pcxt->seg);
+
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT_SPOOL2, sharedsort2);
+ }
+
+ /* Launch workers, saving status for leader/caller */
+ LaunchParallelWorkers(pcxt);
+ btleader->pcxt = pcxt;
+ btleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+ if (leaderparticipates)
+ btleader->nparticipanttuplesorts++;
+ btleader->btshared = btshared;
+ btleader->sharedsort = sharedsort;
+ btleader->sharedsort2 = sharedsort2;
+ btleader->snapshot = snapshot;
+
+ /* If no workers were successfully launched, back out (do serial build) */
+ if (pcxt->nworkers_launched == 0)
+ {
+ _bt_end_parallel(btleader);
+ return;
+ }
+
+ /* Save leader state now that it's clear build will be parallel */
+ buildstate->btleader = btleader;
+
+ /* Join heap scan ourselves */
+ if (leaderparticipates)
+ _bt_leader_participate_as_worker(buildstate);
+
+ /*
+ * Caller needs to wait for all launched workers when we return. Make
+ * sure that the failure-to-start case will not hang forever.
+ */
+ WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_bt_end_parallel(BTLeader *btleader)
+{
+ /* Shutdown worker processes */
+ WaitForParallelWorkersToFinish(btleader->pcxt);
+ /* Free last reference to MVCC snapshot, if one was used */
+ if (IsMVCCSnapshot(btleader->snapshot))
+ UnregisterSnapshot(btleader->snapshot);
+ DestroyParallelContext(btleader->pcxt);
+ ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * btree index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_bt_parallel_estimate_shared(Snapshot snapshot)
+{
+ if (!IsMVCCSnapshot(snapshot))
+ {
+ Assert(snapshot == SnapshotAny);
+ return sizeof(BTShared);
+ }
+
+ return add_size(offsetof(BTShared, heapdesc) +
+ offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+}
+
+/*
+ * Within leader, wait for end of heap scan.
+ *
+ * When called, parallel heap scan started by _bt_begin_parallel() will
+ * already be underway within worker processes (when leader participates
+ * as a worker, we should end up here just as workers are finishing).
+ *
+ * Fills in fields needed for ambuild statistics, and lets caller set
+ * field indicating that some worker encountered a broken HOT chain.
+ *
+ * Returns the total number of heap tuples scanned.
+ */
+static double
+_bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain)
+{
+ BTShared *btshared = buildstate->btleader->btshared;
+ int nparticipanttuplesorts;
+ double reltuples;
+
+ nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts;
+ for (;;)
+ {
+ SpinLockAcquire(&btshared->mutex);
+ if (btshared->nparticipantsdone == nparticipanttuplesorts)
+ {
+ buildstate->havedead = btshared->havedead;
+ buildstate->indtuples = btshared->indtuples;
+ *brokenhotchain = btshared->brokenhotchain;
+ reltuples = btshared->reltuples;
+ SpinLockRelease(&btshared->mutex);
+ break;
+ }
+ SpinLockRelease(&btshared->mutex);
+
+ ConditionVariableSleep(&btshared->workersdonecv,
+ WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
+ }
+
+ ConditionVariableCancelSleep();
+
+ return reltuples;
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_bt_leader_participate_as_worker(BTBuildState *buildstate)
+{
+ BTLeader *btleader = buildstate->btleader;
+ BTSpool *leaderworker;
+ BTSpool *leaderworker2;
+ int sortmem;
+
+ /* Allocate memory and initialize private spool */
+ leaderworker = (BTSpool *) palloc0(sizeof(BTSpool));
+ leaderworker->heap = buildstate->spool->heap;
+ leaderworker->index = buildstate->spool->index;
+ leaderworker->isunique = buildstate->spool->isunique;
+
+ /* Initialize second spool, if required */
+ if (!btleader->btshared->isunique)
+ leaderworker2 = NULL;
+ else
+ {
+ /* Allocate memory for worker's own private secondary spool */
+ leaderworker2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Initialize worker's own secondary spool */
+ leaderworker2->heap = leaderworker->heap;
+ leaderworker2->index = leaderworker->index;
+ leaderworker2->isunique = false;
+ }
+
+ /*
+ * Might as well use reliable figure when doling out maintenance_work_mem
+ * (when requested number of workers were not launched, this will be
+ * somewhat higher than it is for other workers).
+ */
+ sortmem = maintenance_work_mem / btleader->nparticipanttuplesorts;
+
+ /* Perform work common to all participants */
+ _bt_parallel_scan_and_sort(leaderworker, leaderworker2, btleader->btshared,
+ btleader->sharedsort, btleader->sharedsort2,
+ sortmem);
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ ShowUsage("BTREE BUILD (Leader Partial Spool) STATISTICS");
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+ BTSpool *btspool;
+ BTSpool *btspool2;
+ BTShared *btshared;
+ Sharedsort *sharedsort;
+ Sharedsort *sharedsort2;
+ Relation heapRel;
+ Relation indexRel;
+ LOCKMODE heapLockmode;
+ LOCKMODE indexLockmode;
+ int sortmem;
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ ResetUsage();
+#endif /* BTREE_BUILD_STATS */
+
+ /* Look up shared state */
+ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false);
+
+ /* Open relations using lock modes known to be obtained by index.c */
+ if (!btshared->isconcurrent)
+ {
+ heapLockmode = ShareLock;
+ indexLockmode = AccessExclusiveLock;
+ }
+ else
+ {
+ heapLockmode = ShareUpdateExclusiveLock;
+ indexLockmode = RowExclusiveLock;
+ }
+
+ /* Open relations within worker */
+ heapRel = heap_open(btshared->heaprelid, heapLockmode);
+ indexRel = index_open(btshared->indexrelid, indexLockmode);
+
+ /* Initialize worker's own spool */
+ btspool = (BTSpool *) palloc0(sizeof(BTSpool));
+ btspool->heap = heapRel;
+ btspool->index = indexRel;
+ btspool->isunique = btshared->isunique;
+
+ /* Look up shared state private to tuplesort.c */
+ sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+ tuplesort_attach_shared(sharedsort, seg);
+ if (!btshared->isunique)
+ {
+ btspool2 = NULL;
+ sharedsort2 = NULL;
+ }
+ else
+ {
+ /* Allocate memory for worker's own private secondary spool */
+ btspool2 = (BTSpool *) palloc0(sizeof(BTSpool));
+
+ /* Initialize worker's own secondary spool */
+ btspool2->heap = btspool->heap;
+ btspool2->index = btspool->index;
+ btspool2->isunique = false;
+ /* Look up shared state private to tuplesort.c */
+ sharedsort2 = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT_SPOOL2, false);
+ tuplesort_attach_shared(sharedsort2, seg);
+ }
+
+ /* Perform sorting of spool, and possibly a spool2 */
+ sortmem = maintenance_work_mem / btshared->scantuplesortstates;
+ _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
+ sharedsort2, sortmem);
+
+#ifdef BTREE_BUILD_STATS
+ if (log_btree_build_stats)
+ {
+ ShowUsage("BTREE BUILD (Worker Partial Spool) STATISTICS");
+ ResetUsage();
+ }
+#endif /* BTREE_BUILD_STATS */
+
+ index_close(indexRel, indexLockmode);
+ heap_close(heapRel, heapLockmode);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds). All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
+ BTShared *btshared, Sharedsort *sharedsort,
+ Sharedsort *sharedsort2, int sortmem)
+{
+ SortCoordinate coordinate;
+ BTBuildState buildstate;
+ HeapScanDesc scan;
+ double reltuples;
+ IndexInfo *indexInfo;
+
+ /* Initialize local tuplesort coordination state */
+ coordinate = palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = true;
+ coordinate->nParticipants = -1;
+ coordinate->sharedsort = sharedsort;
+
+ /* Begin "partial" tuplesort */
+ btspool->sortstate = tuplesort_begin_index_btree(btspool->heap,
+ btspool->index,
+ btspool->isunique,
+ sortmem, coordinate,
+ false);
+
+ /*
+ * Just as with serial case, there may be a second spool. If so, a
+ * second, dedicated spool2 partial tuplesort is required.
+ */
+ if (btspool2)
+ {
+ SortCoordinate coordinate2;
+
+ /*
+ * We expect that the second one (for dead tuples) won't get very
+ * full, so we give it only work_mem (unless sortmem is less for
+ * worker). Worker processes are generally permitted to allocate
+ * work_mem independently.
+ */
+ coordinate2 = palloc0(sizeof(SortCoordinateData));
+ coordinate2->isWorker = true;
+ coordinate2->nParticipants = -1;
+ coordinate2->sharedsort = sharedsort2;
+ btspool2->sortstate =
+ tuplesort_begin_index_btree(btspool->heap, btspool->index, false,
+ Min(sortmem, work_mem), coordinate2,
+ false);
+ }
+
+ /* Fill in buildstate for _bt_build_callback() */
+ buildstate.isunique = btshared->isunique;
+ buildstate.havedead = false;
+ buildstate.heap = btspool->heap;
+ buildstate.spool = btspool;
+ buildstate.spool2 = btspool2;
+ buildstate.indtuples = 0;
+ buildstate.btleader = NULL;
+
+ /* Join parallel scan */
+ indexInfo = BuildIndexInfo(btspool->index);
+ indexInfo->ii_Concurrent = btshared->isconcurrent;
+ scan = heap_beginscan_parallel(btspool->heap, &btshared->heapdesc);
+ reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
+ true, _bt_build_callback,
+ (void *) &buildstate, scan);
+
+ /*
+ * Execute this worker's part of the sort.
+ *
+ * Unlike leader and serial cases, we cannot avoid calling
+ * tuplesort_performsort() for spool2 if it ends up containing no dead
+ * tuples (this is disallowed for workers by tuplesort).
+ */
+ tuplesort_performsort(btspool->sortstate);
+ if (btspool2)
+ tuplesort_performsort(btspool2->sortstate);
+
+ /*
+ * Done. Record ambuild statistics, and whether we encountered a broken
+ * HOT chain.
+ */
+ SpinLockAcquire(&btshared->mutex);
+ btshared->nparticipantsdone++;
+ btshared->reltuples += reltuples;
+ if (buildstate.havedead)
+ btshared->havedead = true;
+ btshared->indtuples += buildstate.indtuples;
+ if (indexInfo->ii_BrokenHotChain)
+ btshared->brokenhotchain = true;
+ SpinLockRelease(&btshared->mutex);
+
+ /* Notify leader */
+ ConditionVariableSignal(&btshared->workersdonecv);
+
+ /* We can end tuplesorts immediately */
+ tuplesort_end(btspool->sortstate);
+ if (btspool2)
+ tuplesort_end(btspool2->sortstate);
+}
ALLOCSET_DEFAULT_SIZES);
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
- spgistBuildCallback, (void *) &buildstate);
+ spgistBuildCallback, (void *) &buildstate,
+ NULL);
MemoryContextDelete(buildstate.tmpCtx);
#include "postgres.h"
+#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
#include "access/xact.h"
{
{
"ParallelQueryMain", ParallelQueryMain
+ },
+ {
+ "_bt_parallel_build_main", _bt_parallel_build_main
}
};
*/
ParallelContext *
CreateParallelContext(const char *library_name, const char *function_name,
- int nworkers)
+ int nworkers, bool serializable_okay)
{
MemoryContext oldcontext;
ParallelContext *pcxt;
/*
* If we are running under serializable isolation, we can't use parallel
* workers, at least not until somebody enhances that mechanism to be
- * parallel-aware.
+ * parallel-aware. Utility statement callers may ask us to ignore this
+ * restriction because they're always able to safely ignore the fact that
+ * SIREAD locks do not work with parallelism.
*/
- if (IsolationIsSerializable())
+ if (IsolationIsSerializable() && !serializable_okay)
nworkers = 0;
/* We might be running in a short-lived memory context. */
heap = heap_open(ILHead->il_heap, NoLock);
ind = index_open(ILHead->il_ind, NoLock);
- index_build(heap, ind, ILHead->il_info, false, false);
+ index_build(heap, ind, ILHead->il_info, false, false, false);
index_close(ind, NoLock);
heap_close(heap, NoLock);
/* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */
- index_build(heapRelation, currentIndex, indexInfo, false, true);
+ index_build(heapRelation, currentIndex, indexInfo, false, true, false);
/* We're done with this index */
index_close(currentIndex, NoLock);
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
+#include "optimizer/planner.h"
#include "parser/parser.h"
#include "rewrite/rewriteManip.h"
#include "storage/bufmgr.h"
Assert(indexRelationId == RelationGetRelid(indexRelation));
/*
- * Obtain exclusive lock on it. Although no other backends can see it
+ * Obtain exclusive lock on it. Although no other transactions can see it
* until we commit, this prevents deadlock-risk complaints from lock
* manager in cases such as CLUSTER.
*/
}
else
{
- index_build(heapRelation, indexRelation, indexInfo, isprimary, false);
+ index_build(heapRelation, indexRelation, indexInfo, isprimary, false,
+ true);
}
/*
/* initialize index-build state to default */
ii->ii_Concurrent = false;
ii->ii_BrokenHotChain = false;
+ ii->ii_ParallelWorkers = 0;
/* set up for possible use by index AM */
ii->ii_Am = index->rd_rel->relam;
*
* isprimary tells whether to mark the index as a primary-key index.
* isreindex indicates we are recreating a previously-existing index.
+ * parallel indicates if parallelism may be useful.
*
* Note: when reindexing an existing index, isprimary can be false even if
* the index is a PK; it's already properly marked and need not be re-marked.
Relation indexRelation,
IndexInfo *indexInfo,
bool isprimary,
- bool isreindex)
+ bool isreindex,
+ bool parallel)
{
IndexBuildResult *stats;
Oid save_userid;
Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild));
Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty));
- ereport(DEBUG1,
- (errmsg("building index \"%s\" on table \"%s\"",
- RelationGetRelationName(indexRelation),
- RelationGetRelationName(heapRelation))));
+ /*
+ * Determine worker process details for parallel CREATE INDEX. Currently,
+ * only btree has support for parallel builds.
+ *
+ * Note that planner considers parallel safety for us.
+ */
+ if (parallel && IsNormalProcessingMode() &&
+ indexRelation->rd_rel->relam == BTREE_AM_OID)
+ indexInfo->ii_ParallelWorkers =
+ plan_create_index_workers(RelationGetRelid(heapRelation),
+ RelationGetRelid(indexRelation));
+
+ if (indexInfo->ii_ParallelWorkers == 0)
+ ereport(DEBUG1,
+ (errmsg("building index \"%s\" on table \"%s\" serially",
+ RelationGetRelationName(indexRelation),
+ RelationGetRelationName(heapRelation))));
+ else
+ ereport(DEBUG1,
+ (errmsg_plural("building index \"%s\" on table \"%s\" with request for %d parallel worker",
+ "building index \"%s\" on table \"%s\" with request for %d parallel workers",
+ indexInfo->ii_ParallelWorkers,
+ RelationGetRelationName(indexRelation),
+ RelationGetRelationName(heapRelation),
+ indexInfo->ii_ParallelWorkers)));
/*
* Switch to the table owner's userid, so that any index functions are run
IndexInfo *indexInfo,
bool allow_sync,
IndexBuildCallback callback,
- void *callback_state)
+ void *callback_state,
+ HeapScanDesc scan)
{
return IndexBuildHeapRangeScan(heapRelation, indexRelation,
indexInfo, allow_sync,
false,
0, InvalidBlockNumber,
- callback, callback_state);
+ callback, callback_state, scan);
}
/*
BlockNumber start_blockno,
BlockNumber numblocks,
IndexBuildCallback callback,
- void *callback_state)
+ void *callback_state,
+ HeapScanDesc scan)
{
bool is_system_catalog;
bool checking_uniqueness;
- HeapScanDesc scan;
HeapTuple heapTuple;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
EState *estate;
ExprContext *econtext;
Snapshot snapshot;
+ bool need_unregister_snapshot = false;
TransactionId OldestXmin;
BlockNumber root_blkno = InvalidBlockNumber;
OffsetNumber root_offsets[MaxHeapTuplesPerPage];
* concurrent build, or during bootstrap, we take a regular MVCC snapshot
* and index whatever's live according to that.
*/
- if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
- {
- snapshot = RegisterSnapshot(GetTransactionSnapshot());
- OldestXmin = InvalidTransactionId; /* not used */
+ OldestXmin = InvalidTransactionId;
+
+ /* okay to ignore lazy VACUUMs here */
+ if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+ OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
- /* "any visible" mode is not compatible with this */
- Assert(!anyvisible);
+ if (!scan)
+ {
+ /*
+ * Serial index build.
+ *
+ * Must begin our own heap scan in this case. We may also need to
+ * register a snapshot whose lifetime is under our direct control.
+ */
+ if (!TransactionIdIsValid(OldestXmin))
+ {
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+ need_unregister_snapshot = true;
+ }
+ else
+ snapshot = SnapshotAny;
+
+ scan = heap_beginscan_strat(heapRelation, /* relation */
+ snapshot, /* snapshot */
+ 0, /* number of keys */
+ NULL, /* scan key */
+ true, /* buffer access strategy OK */
+ allow_sync); /* syncscan OK? */
}
else
{
- snapshot = SnapshotAny;
- /* okay to ignore lazy VACUUMs here */
- OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+ /*
+ * Parallel index build.
+ *
+ * Parallel case never registers/unregisters own snapshot. Snapshot
+ * is taken from parallel heap scan, and is SnapshotAny or an MVCC
+ * snapshot, based on same criteria as serial case.
+ */
+ Assert(!IsBootstrapProcessingMode());
+ Assert(allow_sync);
+ snapshot = scan->rs_snapshot;
}
- scan = heap_beginscan_strat(heapRelation, /* relation */
- snapshot, /* snapshot */
- 0, /* number of keys */
- NULL, /* scan key */
- true, /* buffer access strategy OK */
- allow_sync); /* syncscan OK? */
+ /*
+ * Must call GetOldestXmin() with SnapshotAny. Should never call
+ * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+ * this for parallel builds, since ambuild routines that support parallel
+ * builds must work these details out for themselves.)
+ */
+ Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+ Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+ !TransactionIdIsValid(OldestXmin));
+ Assert(snapshot == SnapshotAny || !anyvisible);
/* set our scan endpoints */
if (!allow_sync)
heap_endscan(scan);
- /* we can now forget our snapshot, if set */
- if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent)
+ /* we can now forget our snapshot, if set and registered by us */
+ if (need_unregister_snapshot)
UnregisterSnapshot(snapshot);
ExecDropSingleTupleTableSlot(slot);
state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator,
InvalidOid, false,
maintenance_work_mem,
- false);
+ NULL, false);
state.htups = state.itups = state.tups_inserted = 0;
(void) index_bulk_delete(&ivinfo, NULL,
/* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */
- index_build(heapRelation, iRel, indexInfo, false, true);
+ index_build(heapRelation, iRel, indexInfo, false, true, true);
}
PG_CATCH();
{
static void
ResetReindexProcessing(void)
{
- if (IsInParallelMode())
- elog(ERROR, "cannot modify reindex state during a parallel operation");
+ /* This may be called in leader error path */
currentlyReindexedHeap = InvalidOid;
currentlyReindexedIndex = InvalidOid;
}
indexInfo->ii_ReadyForInserts = true;
indexInfo->ii_Concurrent = false;
indexInfo->ii_BrokenHotChain = false;
+ indexInfo->ii_ParallelWorkers = 0;
indexInfo->ii_Am = BTREE_AM_OID;
indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext;
/* Set up sorting if wanted */
if (use_sort)
tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
- maintenance_work_mem, false);
+ maintenance_work_mem,
+ NULL, false);
else
tuplesort = NULL;
* this will typically require the caller to have already locked the
* relation. To avoid lock upgrade hazards, that lock should be at least
* as strong as the one we take here.
+ *
+ * NB: If the lock strength here ever changes, code that is run by
+ * parallel workers under the control of certain particular ambuild
+ * functions will need to be updated, too.
*/
lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock;
rel = heap_open(relationId, lockmode);
indexInfo->ii_ReadyForInserts = !stmt->concurrent;
indexInfo->ii_Concurrent = stmt->concurrent;
indexInfo->ii_BrokenHotChain = false;
+ indexInfo->ii_ParallelWorkers = 0;
indexInfo->ii_Am = accessMethodId;
indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext;
indexInfo->ii_BrokenHotChain = false;
/* Now build the index */
- index_build(rel, indexRelation, indexInfo, stmt->primary, false);
+ index_build(rel, indexRelation, indexInfo, stmt->primary, false, true);
/* Close both the relations, but keep the locks */
heap_close(rel, NoLock);
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
- pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
+ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
pei->pcxt = pcxt;
/*
sortnode->collations,
sortnode->nullsFirst,
work_mem,
- false);
+ NULL, false);
}
aggstate->current_phase = newphase;
pertrans->sortOperators[0],
pertrans->sortCollations[0],
pertrans->sortNullsFirst[0],
- work_mem, false);
+ work_mem, NULL, false);
}
else
pertrans->sortstates[aggstate->current_set] =
pertrans->sortOperators,
pertrans->sortCollations,
pertrans->sortNullsFirst,
- work_mem, false);
+ work_mem, NULL, false);
}
/*
plannode->collations,
plannode->nullsFirst,
work_mem,
- node->randomAccess);
+ NULL, node->randomAccess);
if (node->bounded)
tuplesort_set_bound(tuplesortstate, node->bound);
node->tuplesortstate = (void *) tuplesortstate;
{
int parallel_workers;
- parallel_workers = compute_parallel_worker(rel, rel->pages, -1);
+ parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
+ max_parallel_workers_per_gather);
/* If any limit was set to zero, the user doesn't want a parallel scan. */
if (parallel_workers <= 0)
pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
NULL, NULL);
- parallel_workers = compute_parallel_worker(rel, pages_fetched, -1);
+ parallel_workers = compute_parallel_worker(rel, pages_fetched, -1,
+ max_parallel_workers_per_gather);
if (parallel_workers <= 0)
return;
*
* "index_pages" is the number of pages from the index that we expect to scan, or
* -1 if we don't expect to scan any.
+ *
+ * "max_workers" is caller's limit on the number of workers. This typically
+ * comes from a GUC.
*/
int
-compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages)
+compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages,
+ int max_workers)
{
int parallel_workers = 0;
}
}
- /*
- * In no case use more than max_parallel_workers_per_gather workers.
- */
- parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
+ /* In no case use more than caller supplied maximum number of workers */
+ parallel_workers = Min(parallel_workers, max_workers);
return parallel_workers;
}
* order.
*/
path->path.parallel_workers = compute_parallel_worker(baserel,
- rand_heap_pages, index_pages);
+ rand_heap_pages,
+ index_pages,
+ max_parallel_workers_per_gather);
/*
* Fall out if workers can't be assigned for parallel scan, because in
return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost);
}
+/*
+ * plan_create_index_workers
+ * Use the planner to decide how many parallel worker processes
+ * CREATE INDEX should request for use
+ *
+ * tableOid is the table on which the index is to be built. indexOid is the
+ * OID of an index to be created or reindexed (which must be a btree index).
+ *
+ * Return value is the number of parallel worker processes to request. It
+ * may be unsafe to proceed if this is 0. Note that this does not include the
+ * leader participating as a worker (value is always a number of parallel
+ * worker processes).
+ *
+ * Note: caller had better already hold some type of lock on the table and
+ * index.
+ */
+int
+plan_create_index_workers(Oid tableOid, Oid indexOid)
+{
+ PlannerInfo *root;
+ Query *query;
+ PlannerGlobal *glob;
+ RangeTblEntry *rte;
+ Relation heap;
+ Relation index;
+ RelOptInfo *rel;
+ int parallel_workers;
+ BlockNumber heap_blocks;
+ double reltuples;
+ double allvisfrac;
+
+ /* Return immediately when parallelism disabled */
+ if (max_parallel_maintenance_workers == 0)
+ return 0;
+
+ /* Set up largely-dummy planner state */
+ query = makeNode(Query);
+ query->commandType = CMD_SELECT;
+
+ glob = makeNode(PlannerGlobal);
+
+ root = makeNode(PlannerInfo);
+ root->parse = query;
+ root->glob = glob;
+ root->query_level = 1;
+ root->planner_cxt = CurrentMemoryContext;
+ root->wt_param_id = -1;
+
+ /*
+ * Build a minimal RTE.
+ *
+ * Set the target's table to be an inheritance parent. This is a kludge
+ * that prevents problems within get_relation_info(), which does not
+ * expect that any IndexOptInfo is currently undergoing REINDEX.
+ */
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = tableOid;
+ rte->relkind = RELKIND_RELATION; /* Don't be too picky. */
+ rte->lateral = false;
+ rte->inh = true;
+ rte->inFromCl = true;
+ query->rtable = list_make1(rte);
+
+ /* Set up RTE/RelOptInfo arrays */
+ setup_simple_rel_arrays(root);
+
+ /* Build RelOptInfo */
+ rel = build_simple_rel(root, 1, NULL);
+
+ heap = heap_open(tableOid, NoLock);
+ index = index_open(indexOid, NoLock);
+
+ /*
+ * Determine if it's safe to proceed.
+ *
+ * Currently, parallel workers can't access the leader's temporary tables.
+ * Furthermore, any index predicate or index expressions must be parallel
+ * safe.
+ */
+ if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP ||
+ !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) ||
+ !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index)))
+ {
+ parallel_workers = 0;
+ goto done;
+ }
+
+ /*
+ * If parallel_workers storage parameter is set for the table, accept that
+ * as the number of parallel worker processes to launch (though still cap
+ * at max_parallel_maintenance_workers). Note that we deliberately do not
+ * consider any other factor when parallel_workers is set. (e.g., memory
+ * use by workers.)
+ */
+ if (rel->rel_parallel_workers != -1)
+ {
+ parallel_workers = Min(rel->rel_parallel_workers,
+ max_parallel_maintenance_workers);
+ goto done;
+ }
+
+ /*
+ * Estimate heap relation size ourselves, since rel->pages cannot be
+ * trusted (heap RTE was marked as inheritance parent)
+ */
+ estimate_rel_size(heap, NULL, &heap_blocks, &reltuples, &allvisfrac);
+
+ /*
+ * Determine number of workers to scan the heap relation using generic
+ * model
+ */
+ parallel_workers = compute_parallel_worker(rel, heap_blocks, -1,
+ max_parallel_maintenance_workers);
+
+ /*
+ * Cap workers based on available maintenance_work_mem as needed.
+ *
+ * Note that each tuplesort participant receives an even share of the
+ * total maintenance_work_mem budget. Aim to leave participants
+ * (including the leader as a participant) with no less than 32MB of
+ * memory. This leaves cases where maintenance_work_mem is set to 64MB
+ * immediately past the threshold of being capable of launching a single
+ * parallel worker to sort.
+ */
+ while (parallel_workers > 0 &&
+ maintenance_work_mem / (parallel_workers + 1) < 32768L)
+ parallel_workers--;
+
+done:
+ index_close(index, NoLock);
+ heap_close(heap, NoLock);
+
+ return parallel_workers;
+}
+
/*
* get_partitioned_child_rels
* Returns a list of the RT indexes of the partitioned child relations
case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
event_name = "ParallelBitmapScan";
break;
+ case WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN:
+ event_name = "ParallelCreateIndexScan";
+ break;
case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
event_name = "ProcArrayGroupUpdate";
break;
* Open a file that was previously created in another backend (or this one)
* with BufFileCreateShared in the same SharedFileSet using the same name.
* The backend that created the file must have called BufFileClose() or
- * BufFileExport() to make sure that it is ready to be opened by other
+ * BufFileExportShared() to make sure that it is ready to be opened by other
* backends and render it read-only.
*/
BufFile *
}
#endif
+
+/*
+ * Return the current file size. Counts any holes left behind by
+ * BufFileViewAppend as part of the size.
+ */
+off_t
+BufFileSize(BufFile *file)
+{
+ return ((file->numFiles - 1) * (off_t) MAX_PHYSICAL_FILESIZE) +
+ FileGetSize(file->files[file->numFiles - 1]);
+}
+
+/*
+ * Append the contents of source file (managed within shared fileset) to
+ * end of target file (managed within same shared fileset).
+ *
+ * Note that operation subsumes ownership of underlying resources from
+ * "source". Caller should never call BufFileClose against source having
+ * called here first. Resource owners for source and target must match,
+ * too.
+ *
+ * This operation works by manipulating lists of segment files, so the
+ * file content is always appended at a MAX_PHYSICAL_FILESIZE-aligned
+ * boundary, typically creating empty holes before the boundary. These
+ * areas do not contain any interesting data, and cannot be read from by
+ * caller.
+ *
+ * Returns the block number within target where the contents of source
+ * begins. Caller should apply this as an offset when working off block
+ * positions that are in terms of the original BufFile space.
+ */
+long
+BufFileAppend(BufFile *target, BufFile *source)
+{
+ long startBlock = target->numFiles * BUFFILE_SEG_SIZE;
+ int newNumFiles = target->numFiles + source->numFiles;
+ int i;
+
+ Assert(target->fileset != NULL);
+ Assert(source->readOnly);
+ Assert(!source->dirty);
+ Assert(source->fileset != NULL);
+
+ if (target->resowner != source->resowner)
+ elog(ERROR, "could not append BufFile with non-matching resource owner");
+
+ target->files = (File *)
+ repalloc(target->files, sizeof(File) * newNumFiles);
+ target->offsets = (off_t *)
+ repalloc(target->offsets, sizeof(off_t) * newNumFiles);
+ for (i = target->numFiles; i < newNumFiles; i++)
+ {
+ target->files[i] = source->files[i - target->numFiles];
+ target->offsets[i] = 0L;
+ }
+ target->numFiles = newNumFiles;
+
+ return startBlock;
+}
return VfdCache[file].fileMode;
}
+/*
+ * FileGetSize - returns the size of file
+ */
+off_t
+FileGetSize(File file)
+{
+ Assert(FileIsValid(file));
+ return VfdCache[file].fileSize;
+}
+
/*
* Make room for another allocatedDescs[] array entry if needed and possible.
* Returns true if an array element is available.
qstate->sortCollations,
qstate->sortNullsFirsts,
work_mem,
+ NULL,
qstate->rescan_needed);
else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
qstate->sortCollation,
qstate->sortNullsFirst,
work_mem,
+ NULL,
qstate->rescan_needed);
osastate->number_of_rows = 0;
bool allowSystemTableMods = false;
int work_mem = 1024;
int maintenance_work_mem = 16384;
+int max_parallel_maintenance_workers = 2;
/*
* Primary determinants of sizes of shared-memory structures.
check_autovacuum_max_workers, NULL, NULL
},
+ {
+ {"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Sets the maximum number of parallel processes per maintenance operation."),
+ NULL
+ },
+ &max_parallel_maintenance_workers,
+ 2, 0, 1024,
+ NULL, NULL, NULL
+ },
+
{
{"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Sets the maximum number of parallel processes per executor node."),
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8 # (change requires restart)
+#max_parallel_maintenance_workers = 2 # taken from max_parallel_workers
#max_parallel_workers_per_gather = 2 # taken from max_parallel_workers
#parallel_leader_participation = on
#max_parallel_workers = 8 # maximum number of max_worker_processes that
- # can be used in parallel queries
+ # can be used in parallel operations
#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate
# (change requires restart)
#backend_flush_after = 0 # measured in pages, 0 disables
probe query__done(const char *);
probe statement__status(const char *);
- probe sort__start(int, bool, int, int, bool);
+ probe sort__start(int, bool, int, int, bool, int);
probe sort__done(bool, long);
probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
* care that all calls for a single LogicalTapeSet are made in the same
* palloc context.
*
+ * To support parallel sort operations involving coordinated callers to
+ * tuplesort.c routines across multiple workers, it is necessary to
+ * concatenate each worker BufFile/tapeset into one single logical tapeset
+ * managed by the leader. Workers should have produced one final
+ * materialized tape (their entire output) when this happens in leader.
+ * There will always be the same number of runs as input tapes, and the same
+ * number of input tapes as participants (worker Tuplesortstates).
+ *
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
#include "postgres.h"
#include "storage/buffile.h"
+#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memutils.h"
* a frozen tape. (When reading from an unfrozen tape, we use a larger
* read buffer that holds multiple blocks, so the "current" block is
* ambiguous.)
+ *
+ * When concatenation of worker tape BufFiles is performed, an offset to
+ * the first block in the unified BufFile space is applied during reads.
*/
long firstBlockNumber;
long curBlockNumber;
long nextBlockNumber;
+ long offsetBlockNumber;
/*
* Buffer for current data block(s).
*/
char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
+ int max_size; /* highest useful, safe buffer_size */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
} LogicalTape;
* by ltsGetFreeBlock(), and it is always greater than or equal to
* nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
* blocks that have been allocated for a tape, but have not been written
- * to the underlying file yet.
+ * to the underlying file yet. nHoleBlocks tracks the total number of
+ * blocks that are in unused holes between worker spaces following BufFile
+ * concatenation.
*/
long nBlocksAllocated; /* # of blocks allocated */
long nBlocksWritten; /* # of blocks used in underlying file */
+ long nHoleBlocks; /* # of "hole" blocks left */
/*
* We store the numbers of recycled-and-available blocks in freeBlocks[].
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
+ SharedFileSet *fileset);
/*
* previous tape isn't flushed to disk until the end of the sort, so you
* get one-block hole, where the last block of the previous tape will
* later go.
+ *
+ * Note that BufFile concatenation can leave "holes" in BufFile between
+ * worker-owned block ranges. These are tracked for reporting purposes
+ * only. We never read from nor write to these hole blocks, and so they
+ * are not considered here.
*/
while (blocknum > lts->nBlocksWritten)
{
do
{
char *thisbuf = lt->buffer + lt->nbytes;
+ long datablocknum = lt->nextBlockNumber;
/* Fetch next block number */
- if (lt->nextBlockNumber == -1L)
+ if (datablocknum == -1L)
break; /* EOF */
+ /* Apply worker offset, needed for leader tapesets */
+ datablocknum += lt->offsetBlockNumber;
/* Read the block */
- ltsReadBlock(lts, lt->nextBlockNumber, (void *) thisbuf);
+ ltsReadBlock(lts, datablocknum, (void *) thisbuf);
if (!lt->frozen)
- ltsReleaseBlock(lts, lt->nextBlockNumber);
+ ltsReleaseBlock(lts, datablocknum);
lt->curBlockNumber = lt->nextBlockNumber;
lt->nbytes += TapeBlockGetNBytes(thisbuf);
lts->blocksSorted = false;
}
+/*
+ * Claim ownership of a set of logical tapes from existing shared BufFiles.
+ *
+ * Caller should be leader process. Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
+ */
+static void
+ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
+ SharedFileSet *fileset)
+{
+ LogicalTape *lt = NULL;
+ long tapeblocks;
+ long nphysicalblocks = 0L;
+ int i;
+
+ /* Should have at least one worker tape, plus leader's tape */
+ Assert(lts->nTapes >= 2);
+
+ /*
+ * Build concatenated view of all BufFiles, remembering the block number
+ * where each source file begins. No changes are needed for leader/last
+ * tape.
+ */
+ for (i = 0; i < lts->nTapes - 1; i++)
+ {
+ char filename[MAXPGPATH];
+ BufFile *file;
+
+ lt = <s->tapes[i];
+
+ pg_itoa(i, filename);
+ file = BufFileOpenShared(fileset, filename);
+
+ /*
+ * Stash first BufFile, and concatenate subsequent BufFiles to that.
+ * Store block offset into each tape as we go.
+ */
+ lt->firstBlockNumber = shared[i].firstblocknumber;
+ if (i == 0)
+ {
+ lts->pfile = file;
+ lt->offsetBlockNumber = 0L;
+ }
+ else
+ {
+ lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+ }
+ /* Don't allocate more for read buffer than could possibly help */
+ lt->max_size = Min(MaxAllocSize, shared[i].buffilesize);
+ tapeblocks = shared[i].buffilesize / BLCKSZ;
+ nphysicalblocks += tapeblocks;
+ }
+
+ /*
+ * Set # of allocated blocks, as well as # blocks written. Use extent of
+ * new BufFile space (from 0 to end of last worker's tape space) for this.
+ * Allocated/written blocks should include space used by holes left
+ * between concatenated BufFiles.
+ */
+ lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+ lts->nBlocksWritten = lts->nBlocksAllocated;
+
+ /*
+ * Compute number of hole blocks so that we can later work backwards, and
+ * instrument number of physical blocks. We don't simply use physical
+ * blocks directly for instrumentation because this would break if we ever
+ * subsequently wrote to worker tape.
+ *
+ * Working backwards like this keeps our options open. If shared BufFiles
+ * ever support being written to post-export, logtape.c can automatically
+ * take advantage of that. We'd then support writing to the leader tape
+ * while recycling space from worker tapes, because the leader tape has a
+ * zero offset (write routines won't need to have extra logic to apply an
+ * offset).
+ *
+ * The only thing that currently prevents writing to the leader tape from
+ * working is the fact that BufFiles opened using BufFileOpenShared() are
+ * read-only by definition, but that could be changed if it seemed
+ * worthwhile. For now, writing to the leader tape will raise a "Bad file
+ * descriptor" error, so tuplesort must avoid writing to the leader tape
+ * altogether.
+ */
+ lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
+}
+
/*
* Create a set of logical tapes in a temporary underlying file.
*
- * Each tape is initialized in write state.
+ * Each tape is initialized in write state. Serial callers pass ntapes,
+ * NULL argument for shared, and -1 for worker. Parallel worker callers
+ * pass ntapes, a shared file handle, NULL shared argument, and their own
+ * worker number. Leader callers, which claim shared worker tapes here,
+ * must supply non-sentinel values for all arguments except worker number,
+ * which should be -1.
+ *
+ * Leader caller is passing back an array of metadata each worker captured
+ * when LogicalTapeFreeze() was called for their final result tapes. Passed
+ * tapes array is actually sized ntapes - 1, because it includes only
+ * worker tapes, whereas leader requires its own leader tape. Note that we
+ * rely on the assumption that reclaimed worker tapes will only be read
+ * from once by leader, and never written to again (tapes are initialized
+ * for writing, but that's only to be consistent). Leader may not write to
+ * its own tape purely due to a restriction in the shared buffile
+ * infrastructure that may be lifted in the future.
*/
LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes)
+LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
+ int worker)
{
LogicalTapeSet *lts;
LogicalTape *lt;
Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
ntapes * sizeof(LogicalTape));
- lts->pfile = BufFileCreateTemp(false);
lts->nBlocksAllocated = 0L;
lts->nBlocksWritten = 0L;
+ lts->nHoleBlocks = 0L;
lts->forgetFreeSpace = false;
lts->blocksSorted = true; /* a zero-length array is sorted ... */
lts->freeBlocksLen = 32; /* reasonable initial guess */
lt->dirty = false;
lt->firstBlockNumber = -1L;
lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
lt->buffer = NULL;
lt->buffer_size = 0;
+ /* palloc() larger than MaxAllocSize would fail */
+ lt->max_size = MaxAllocSize;
lt->pos = 0;
lt->nbytes = 0;
}
+
+ /*
+ * Create temp BufFile storage as required.
+ *
+ * Leader concatenates worker tapes, which requires special adjustment to
+ * final tapeset data. Things are simpler for the worker case and the
+ * serial case, though. They are generally very similar -- workers use a
+ * shared fileset, whereas serial sorts use a conventional serial BufFile.
+ */
+ if (shared)
+ ltsConcatWorkerTapes(lts, shared, fileset);
+ else if (fileset)
+ {
+ char filename[MAXPGPATH];
+
+ pg_itoa(worker, filename);
+ lts->pfile = BufFileCreateShared(fileset, filename);
+ }
+ else
+ lts->pfile = BufFileCreateTemp(false);
+
return lts;
}
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
/* Allocate data buffer and first block on first write */
if (lt->buffer == NULL)
if (buffer_size < BLCKSZ)
buffer_size = BLCKSZ;
- /*
- * palloc() larger than MaxAllocSize would fail (a multi-gigabyte
- * buffer is unlikely to be helpful, anyway)
- */
- if (buffer_size > MaxAllocSize)
- buffer_size = MaxAllocSize;
+ /* palloc() larger than max_size is unlikely to be helpful */
+ if (buffer_size > lt->max_size)
+ buffer_size = lt->max_size;
/* round down to BLCKSZ boundary */
buffer_size -= buffer_size % BLCKSZ;
* tape is rewound (after rewind is too late!). It performs a rewind
* and switch to read mode "for free". An immediately following rewind-
* for-read call is OK but not necessary.
+ *
+ * share output argument is set with details of storage used for tape after
+ * freezing, which may be passed to LogicalTapeSetCreate within leader
+ * process later. This metadata is only of interest to worker callers
+ * freezing their final output for leader (single materialized tape).
+ * Serial sorts should set share to NULL.
*/
void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
/*
* Completion of a write phase. Flush last partial data block, and rewind
else
lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
lt->nbytes = TapeBlockGetNBytes(lt->buffer);
+
+ /* Handle extra steps when caller is to share its tapeset */
+ if (share)
+ {
+ BufFileExportShared(lts->pfile);
+ share->firstblocknumber = lt->firstBlockNumber;
+ share->buffilesize = BufFileSize(lts->pfile);
+ }
}
/*
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
+ Assert(lt->offsetBlockNumber == 0L);
/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
Assert(lt->buffer_size == BLCKSZ);
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
- return lts->nBlocksAllocated;
+ return lts->nBlocksAllocated - lts->nHoleBlocks;
}
* above. Nonetheless, with large workMem we can have many tapes (but not
* too many -- see the comments in tuplesort_merge_order).
*
+ * This module supports parallel sorting. Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state. The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process. This is then merged. Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
+ *
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#define DATUM_SORT 2
#define CLUSTER_SORT 3
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
+ (state)->worker >= 0 ? 1 : 2)
+
/* GUC variables */
#ifdef TRACE_SORT
bool trace_sort = false;
int markpos_offset; /* saved "current", or offset in tape block */
bool markpos_eof; /* saved "eof_reached" */
+ /*
+ * These variables are used during parallel sorting.
+ *
+ * worker is our worker identifier. Follows the general convention that
+ * -1 value relates to a leader tuplesort, and values >= 0 worker
+ * tuplesorts. (-1 can also be a serial tuplesort.)
+ *
+ * shared is mutable shared memory state, which is used to coordinate
+ * parallel sorts.
+ *
+ * nParticipants is the number of worker Tuplesortstates known by the
+ * leader to have actually been launched, which implies that they must
+ * finish a run leader can merge. Typically includes a worker state held
+ * by the leader process itself. Set in the leader Tuplesortstate only.
+ */
+ int worker;
+ Sharedsort *shared;
+ int nParticipants;
+
/*
* The sortKeys variable is used by every case other than the hash index
* case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
#endif
};
+/*
+ * Private mutable state of tuplesort-parallel-operation. This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+ /* mutex protects all fields prior to tapes */
+ slock_t mutex;
+
+ /*
+ * currentWorker generates ordinal identifier numbers for parallel sort
+ * workers. These start from 0, and are always gapless.
+ *
+ * Workers increment workersFinished to indicate having finished. If this
+ * is equal to state.nParticipants within the leader, leader is ready to
+ * merge worker runs.
+ */
+ int currentWorker;
+ int workersFinished;
+
+ /* Temporary file space */
+ SharedFileSet fileset;
+
+ /* Size of tapes flexible array */
+ int nTapes;
+
+ /*
+ * Tapes array used by workers to report back information needed by the
+ * leader to concatenate all worker tapes into one for merging
+ */
+ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
/*
* Is the given tuple allocated from the slab memory arena?
*/
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state) ((state)->shared == NULL)
+#define WORKER(state) ((state)->shared && (state)->worker != -1)
+#define LEADER(state) ((state)->shared && (state)->worker == -1)
/*
* NOTES about on-tape representation of tuples:
} while(0)
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
static bool consider_abort_common(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
*/
static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+ bool randomAccess)
{
Tuplesortstate *state;
MemoryContext sortcontext;
MemoryContext tuplecontext;
MemoryContext oldcontext;
+ /* See leader_takeover_tapes() remarks on randomAccess support */
+ if (coordinate && randomAccess)
+ elog(ERROR, "random access disallowed under parallel sort");
+
/*
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
state->bounded = false;
state->tuples = true;
state->boundUsed = false;
- state->allowedMem = workMem * (int64) 1024;
+
+ /*
+ * workMem is forced to be at least 64KB, the current minimum valid value
+ * for the work_mem GUC. This is a defense against parallel sort callers
+ * that divide out memory among many workers in a way that leaves each
+ * with very little memory.
+ */
+ state->allowedMem = Max(workMem, 64) * (int64) 1024;
state->availMem = state->allowedMem;
state->sortcontext = sortcontext;
state->tuplecontext = tuplecontext;
state->result_tape = -1; /* flag that result tape has not been formed */
+ /*
+ * Initialize parallel-related state based on coordination information
+ * from caller
+ */
+ if (!coordinate)
+ {
+ /* Serial sort */
+ state->shared = NULL;
+ state->worker = -1;
+ state->nParticipants = -1;
+ }
+ else if (coordinate->isWorker)
+ {
+ /* Parallel worker produces exactly one final run from all input */
+ state->shared = coordinate->sharedsort;
+ state->worker = worker_get_identifier(state);
+ state->nParticipants = -1;
+ }
+ else
+ {
+ /* Parallel leader state only used for final merge */
+ state->shared = coordinate->sharedsort;
+ state->worker = -1;
+ state->nParticipants = coordinate->nParticipants;
+ Assert(state->nParticipants >= 1);
+ }
+
MemoryContextSwitchTo(oldcontext);
return state;
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess)
+ int workMem, SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int i;
false, /* no unique check */
nkeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_heap;
state->copytup = copytup_heap;
Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,
Relation indexRel,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
false, /* no unique check */
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_cluster;
state->copytup = copytup_cluster;
tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
enforceUnique,
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_index_btree;
state->copytup = copytup_index;
uint32 high_mask,
uint32 low_mask,
uint32 max_buckets,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
- bool nullsFirstFlag,
- int workMem, bool randomAccess)
+ bool nullsFirstFlag, int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int16 typlen;
bool typbyval;
false, /* no unique check */
1,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_datum;
state->copytup = copytup_datum;
* delayed calls at the moment.)
*
* This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested. Parallel leader tuplesorts will always ignore the hint.
*/
void
tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Assert(state->status == TSS_INITIAL);
Assert(state->memtupcount == 0);
Assert(!state->bounded);
+ Assert(!WORKER(state));
#ifdef DEBUG_BOUNDED_SORT
/* Honor GUC setting that disables the feature (for easy testing) */
return;
#endif
+ /* Parallel leader ignores hint */
+ if (LEADER(state))
+ return;
+
/* We want to be able to compute bound * 2, so limit the setting */
if (bound > (int64) (INT_MAX / 2))
return;
if (trace_sort)
{
if (state->tapeset)
- elog(LOG, "external sort ended, %ld disk blocks used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of %d ended, %ld disk blocks used: %s",
+ SERIAL(state) ? "external sort" : "parallel external sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
else
- elog(LOG, "internal sort ended, %ld KB used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of %d ended, %ld KB used: %s",
+ SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
}
TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{
+ Assert(!LEADER(state));
+
switch (state->status)
{
case TSS_INITIAL:
/*
* Nope; time to switch to tape-based operation.
*/
- inittapes(state);
+ inittapes(state, true);
/*
* Dump all tuples.
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "performsort starting: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of %d starting: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
#endif
switch (state->status)
/*
* We were able to accumulate all the tuples within the allowed
- * amount of memory. Just qsort 'em and we're done.
+ * amount of memory, or leader to take over worker tapes
*/
- tuplesort_sort_memtuples(state);
+ if (SERIAL(state))
+ {
+ /* Just qsort 'em and we're done */
+ tuplesort_sort_memtuples(state);
+ state->status = TSS_SORTEDINMEM;
+ }
+ else if (WORKER(state))
+ {
+ /*
+ * Parallel workers must still dump out tuples to tape. No
+ * merge is required to produce single output run, though.
+ */
+ inittapes(state, false);
+ dumptuples(state, true);
+ worker_nomergeruns(state);
+ state->status = TSS_SORTEDONTAPE;
+ }
+ else
+ {
+ /*
+ * Leader will take over worker tapes and merge worker runs.
+ * Note that mergeruns sets the correct state->status.
+ */
+ leader_takeover_tapes(state);
+ mergeruns(state);
+ }
state->current = 0;
state->eof_reached = false;
+ state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
- state->status = TSS_SORTEDINMEM;
break;
case TSS_BOUNDED:
/*
* Finish tape-based sort. First, flush all tuples remaining in
* memory out to tape; then merge until we have a single remaining
- * run (or, if !randomAccess, one run per tape). Note that
- * mergeruns sets the correct state->status.
+ * run (or, if !randomAccess and !WORKER(), one run per tape).
+ * Note that mergeruns sets the correct state->status.
*/
dumptuples(state, true);
mergeruns(state);
if (trace_sort)
{
if (state->status == TSS_FINALMERGE)
- elog(LOG, "performsort done (except %d-way final merge): %s",
- state->activeTapes,
+ elog(LOG, "performsort of %d done (except %d-way final merge): %s",
+ state->worker, state->activeTapes,
pg_rusage_show(&state->ru_start));
else
- elog(LOG, "performsort done: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of %d done: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
}
#endif
unsigned int tuplen;
size_t nmoved;
+ Assert(!WORKER(state));
+
switch (state->status)
{
case TSS_SORTEDINMEM:
*/
Assert(forward);
Assert(ntuples >= 0);
+ Assert(!WORKER(state));
switch (state->status)
{
/*
* inittapes - initialize for tape sorting.
*
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
*/
static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
{
int maxTapes,
j;
- int64 tapeSpace;
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ Assert(!LEADER(state));
- state->maxTapes = maxTapes;
- state->tapeRange = maxTapes - 1;
+ if (mergeruns)
+ {
+ /* Compute number of tapes to use: merge order plus 1 */
+ maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ }
+ else
+ {
+ /* Workers can sometimes produce single run, output without merge */
+ Assert(WORKER(state));
+ maxTapes = MINORDER + 1;
+ }
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "switching to external sort with %d tapes: %s",
- maxTapes, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%d switching to external sort with %d tapes: %s",
+ state->worker, maxTapes, pg_rusage_show(&state->ru_start));
#endif
- /*
- * Decrease availMem to reflect the space needed for tape buffers, when
- * writing the initial runs; but don't decrease it to the point that we
- * have no room for tuples. (That case is only likely to occur if sorting
- * pass-by-value Datums; in all other scenarios the memtuples[] array is
- * unlikely to occupy more than half of allowedMem. In the pass-by-value
- * case it's not important to account for tuple space, so we don't care if
- * LACKMEM becomes inaccurate.)
- */
- tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
-
- if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
- USEMEM(state, tapeSpace);
-
- /*
- * Make sure that the temp file(s) underlying the tape set are created in
- * suitable temp tablespaces.
- */
- PrepareTempTablespaces();
-
- /*
- * Create the tape set and allocate the per-tape data arrays.
- */
- state->tapeset = LogicalTapeSetCreate(maxTapes);
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+ /* Create the tape set and allocate the per-tape data arrays */
+ inittapestate(state, maxTapes);
+ state->tapeset =
+ LogicalTapeSetCreate(maxTapes, NULL,
+ state->shared ? &state->shared->fileset : NULL,
+ state->worker);
state->currentRun = 0;
state->status = TSS_BUILDRUNS;
}
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+ int64 tapeSpace;
+
+ /*
+ * Decrease availMem to reflect the space needed for tape buffers; but
+ * don't decrease it to the point that we have no room for tuples. (That
+ * case is only likely to occur if sorting pass-by-value Datums; in all
+ * other scenarios the memtuples[] array is unlikely to occupy more than
+ * half of allowedMem. In the pass-by-value case it's not important to
+ * account for tuple space, so we don't care if LACKMEM becomes
+ * inaccurate.)
+ */
+ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+ if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+ USEMEM(state, tapeSpace);
+
+ /*
+ * Make sure that the temp file(s) underlying the tape set are created in
+ * suitable temp tablespaces. For parallel sorts, this should have been
+ * called already, but it doesn't matter if it is called a second time.
+ */
+ PrepareTempTablespaces();
+
+ state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+ state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+ /* Record # of tapes allocated (for duration of sort) */
+ state->maxTapes = maxTapes;
+ /* Record maximum # of tapes usable as inputs when merging */
+ state->tapeRange = maxTapes - 1;
+}
+
/*
* selectnewtape -- select new tape for new initial run.
*
*/
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- (state->availMem) / 1024, numInputTapes);
+ elog(LOG, "%d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ state->worker, state->availMem / 1024, numInputTapes);
#endif
state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
* pass remains. If we don't have to produce a materialized sorted
* tape, we can stop at this point and do the final merge on-the-fly.
*/
- if (!state->randomAccess)
+ if (!state->randomAccess && !WORKER(state))
{
bool allOneRun = true;
* a waste of cycles anyway...
*/
state->result_tape = state->tp_tapenum[state->tapeRange];
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ if (!WORKER(state))
+ LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+ else
+ worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
/* Release the read buffers of all the other tapes, by rewinding them. */
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "%d finished %d-way merge step: %s", state->worker,
+ state->activeTapes, pg_rusage_show(&state->ru_start));
#endif
}
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "starting quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%d starting quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
/*
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%d finished quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
memtupwrite = state->memtupcount;
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished writing run %d to tape %d: %s",
- state->currentRun, state->destTape,
+ elog(LOG, "%d finished writing run %d to tape %d: %s",
+ state->worker, state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif
Assert(state->status == TSS_INITIAL);
Assert(state->bounded);
Assert(tupcount >= state->bound);
+ Assert(SERIAL(state));
/* Reverse sort direction so largest entry will be at root */
reversedirection(state);
Assert(state->status == TSS_BOUNDED);
Assert(state->bounded);
Assert(tupcount == state->bound);
+ Assert(SERIAL(state));
/*
* We can unheapify in place because each delete-top call will remove the
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
+ Assert(!LEADER(state));
+
if (state->memtupcount > 1)
{
/* Can we use the single-key sort function? */
&tuplen, sizeof(tuplen));
}
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+ Size tapesSize;
+
+ Assert(nWorkers > 0);
+
+ /* Make sure that BufFile shared state is MAXALIGN'd */
+ tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+ tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+ return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates. nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+ int i;
+
+ Assert(nWorkers > 0);
+
+ SpinLockInit(&shared->mutex);
+ shared->currentWorker = 0;
+ shared->workersFinished = 0;
+ SharedFileSetInit(&shared->fileset, seg);
+ shared->nTapes = nWorkers;
+ for (i = 0; i < nWorkers; i++)
+ {
+ shared->tapes[i].firstblocknumber = 0L;
+ shared->tapes[i].buffilesize = 0;
+ }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+ /* Attach to SharedFileSet */
+ SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless. logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements. However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1. This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int worker;
+
+ Assert(WORKER(state));
+
+ SpinLockAcquire(&shared->mutex);
+ worker = shared->currentWorker++;
+ SpinLockRelease(&shared->mutex);
+
+ return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly. They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ TapeShare output;
+
+ Assert(WORKER(state));
+ Assert(state->result_tape != -1);
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Free most remaining memory, in case caller is sensitive to our holding
+ * on to it. memtuples may not be a tiny merge heap at this point.
+ */
+ pfree(state->memtuples);
+ /* Be tidy */
+ state->memtuples = NULL;
+ state->memtupsize = 0;
+
+ /*
+ * Parallel worker requires result tape metadata, which is to be stored in
+ * shared memory for leader
+ */
+ LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+ /* Store properties of output tape, and update finished worker count */
+ SpinLockAcquire(&shared->mutex);
+ shared->tapes[state->worker] = output;
+ shared->workersFinished++;
+ SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+ Assert(WORKER(state));
+ Assert(state->result_tape == -1);
+
+ state->result_tape = state->tp_tapenum[state->destTape];
+ worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting. By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int nParticipants = state->nParticipants;
+ int workersFinished;
+ int j;
+
+ Assert(LEADER(state));
+ Assert(nParticipants >= 1);
+
+ SpinLockAcquire(&shared->mutex);
+ workersFinished = shared->workersFinished;
+ SpinLockRelease(&shared->mutex);
+
+ if (nParticipants != workersFinished)
+ elog(ERROR, "cannot take over tapes before all workers finish");
+
+ /*
+ * Create the tapeset from worker tapes, including a leader-owned tape at
+ * the end. Parallel workers are far more expensive than logical tapes,
+ * so the number of tapes allocated here should never be excessive.
+ *
+ * We still have a leader tape, though it's not possible to write to it
+ * due to restrictions in the shared fileset infrastructure used by
+ * logtape.c. It will never be written to in practice because
+ * randomAccess is disallowed for parallel sorts.
+ */
+ inittapestate(state, nParticipants + 1);
+ state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+ &shared->fileset, state->worker);
+
+ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ state->currentRun = nParticipants;
+
+ /*
+ * Initialize variables of Algorithm D to be consistent with runs from
+ * workers having been generated in the leader.
+ *
+ * There will always be exactly 1 run per worker, and exactly one input
+ * tape per run, because workers always output exactly 1 run, even when
+ * there were no input tuples for workers to sort.
+ */
+ for (j = 0; j < state->maxTapes; j++)
+ {
+ /* One real run; no dummy runs for worker tapes */
+ state->tp_fib[j] = 1;
+ state->tp_runs[j] = 1;
+ state->tp_dummy[j] = 0;
+ state->tp_tapenum[j] = j;
+ }
+ /* Leader tape gets one dummy run, and no real runs */
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_runs[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 1;
+
+ state->Level = 1;
+ state->destTape = 0;
+
+ state->status = TSS_BUILDRUNS;
+}
+
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/
#include "catalog/pg_index.h"
#include "lib/stringinfo.h"
#include "storage/bufmgr.h"
+#include "storage/shm_toc.h"
/* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */
typedef uint16 BTCycleId;
/*
* external entry points for btree, in nbtree.c
*/
-extern IndexBuildResult *btbuild(Relation heap, Relation index,
- struct IndexInfo *indexInfo);
extern void btbuildempty(Relation index);
extern bool btinsert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
/*
* prototypes for functions in nbtsort.c
*/
-typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
-
-extern BTSpool *_bt_spoolinit(Relation heap, Relation index,
- bool isunique, bool isdead);
-extern void _bt_spooldestroy(BTSpool *btspool);
-extern void _bt_spool(BTSpool *btspool, ItemPointer self,
- Datum *values, bool *isnull);
-extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
+extern IndexBuildResult *btbuild(Relation heap, Relation index,
+ struct IndexInfo *indexInfo);
+extern void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc);
#endif /* NBTREE_H */
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
-extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers);
+extern ParallelContext *CreateParallelContext(const char *library_name,
+ const char *function_name, int nworkers,
+ bool serializable_okay);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
BlockNumber phs_startblock; /* starting block number */
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
* workers so far. */
+ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelHeapScanDescData;
Relation indexRelation,
IndexInfo *indexInfo,
bool isprimary,
- bool isreindex);
+ bool isreindex,
+ bool parallel);
extern double IndexBuildHeapScan(Relation heapRelation,
Relation indexRelation,
IndexInfo *indexInfo,
bool allow_sync,
IndexBuildCallback callback,
- void *callback_state);
+ void *callback_state,
+ HeapScanDesc scan);
extern double IndexBuildHeapRangeScan(Relation heapRelation,
Relation indexRelation,
IndexInfo *indexInfo,
BlockNumber start_blockno,
BlockNumber end_blockno,
IndexBuildCallback callback,
- void *callback_state);
+ void *callback_state,
+ HeapScanDesc scan);
extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
extern PGDLLIMPORT bool allowSystemTableMods;
extern PGDLLIMPORT int work_mem;
extern PGDLLIMPORT int maintenance_work_mem;
+extern PGDLLIMPORT int max_parallel_maintenance_workers;
extern int VacuumCostPageHit;
extern int VacuumCostPageMiss;
* ReadyForInserts is it valid for inserts?
* Concurrent are we doing a concurrent index build?
* BrokenHotChain did we detect any broken HOT chains?
+ * ParallelWorkers # of workers requested (excludes leader)
* AmCache private cache area for index AM
* Context memory context holding this IndexInfo
*
- * ii_Concurrent and ii_BrokenHotChain are used only during index build;
- * they're conventionally set to false otherwise.
+ * ii_Concurrent, ii_BrokenHotChain, and ii_ParallelWorkers are used only
+ * during index build; they're conventionally zeroed otherwise.
* ----------------
*/
typedef struct IndexInfo
bool ii_ReadyForInserts;
bool ii_Concurrent;
bool ii_BrokenHotChain;
+ int ii_ParallelWorkers;
Oid ii_Am;
void *ii_AmCache;
MemoryContext ii_Context;
extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages,
- double index_pages);
+ double index_pages, int max_workers);
extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
Path *bitmapqual);
extern void generate_partition_wise_join_paths(PlannerInfo *root,
extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr);
extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid);
+extern int plan_create_index_workers(Oid tableOid, Oid indexOid);
extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti,
bool *part_cols_updated);
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
+ WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN,
WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
WAIT_EVENT_CLOG_GROUP_UPDATE,
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern off_t BufFileSize(BufFile *file);
+extern long BufFileAppend(BufFile *target, BufFile *source);
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
extern void BufFileExportShared(BufFile *file);
extern int FileGetRawDesc(File file);
extern int FileGetRawFlags(File file);
extern mode_t FileGetRawMode(File file);
+extern off_t FileGetSize(File file);
/* Operations used for sharing named temporary files */
extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
#ifndef LOGTAPE_H
#define LOGTAPE_H
+#include "storage/sharedfileset.h"
+
/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
typedef struct LogicalTapeSet LogicalTapeSet;
+/*
+ * The approach tuplesort.c takes to parallel external sorts is that workers,
+ * whose state is almost the same as independent serial sorts, are made to
+ * produce a final materialized tape of sorted output in all cases. This is
+ * frozen, just like any case requiring a final materialized tape. However,
+ * there is one difference, which is that freezing will also export an
+ * underlying shared fileset BufFile for sharing. Freezing produces TapeShare
+ * metadata for the worker when this happens, which is passed along through
+ * shared memory to leader.
+ *
+ * The leader process can then pass an array of TapeShare metadata (one per
+ * worker participant) to LogicalTapeSetCreate(), alongside a handle to a
+ * shared fileset, which is sufficient to construct a new logical tapeset that
+ * consists of each of the tapes materialized by workers.
+ *
+ * Note that while logtape.c does create an empty leader tape at the end of the
+ * tapeset in the leader case, it can never be written to due to a restriction
+ * in the shared buffile infrastructure.
+ */
+typedef struct TapeShare
+{
+ /*
+ * firstblocknumber is first block that should be read from materialized
+ * tape.
+ *
+ * buffilesize is the size of associated BufFile following freezing.
+ */
+ long firstblocknumber;
+ off_t buffilesize;
+} TapeShare;
+
/*
* prototypes for functions in logtape.c
*/
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared,
+ SharedFileSet *fileset, int worker);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
size_t buffer_size);
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
+extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
+ TapeShare *share);
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
* if necessary). It works efficiently for both small and large amounts
* of data. Small amounts are sorted in-memory using qsort(). Large
* amounts are sorted using temporary files and a standard external sort
- * algorithm.
+ * algorithm. Parallel sorts use a variant of this external sort
+ * algorithm, and are typically only used for large amounts of data.
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#include "access/itup.h"
#include "executor/tuptable.h"
#include "fmgr.h"
+#include "storage/dsm.h"
#include "utils/relcache.h"
-/* Tuplesortstate is an opaque type whose details are not known outside
- * tuplesort.c.
+/*
+ * Tuplesortstate and Sharedsort are opaque types whose details are not
+ * known outside tuplesort.c.
*/
typedef struct Tuplesortstate Tuplesortstate;
+typedef struct Sharedsort Sharedsort;
+
+/*
+ * Tuplesort parallel coordination state, allocated by each participant in
+ * local memory. Participant caller initializes everything. See usage notes
+ * below.
+ */
+typedef struct SortCoordinateData
+{
+ /* Worker process? If not, must be leader. */
+ bool isWorker;
+
+ /*
+ * Leader-process-passed number of participants known launched (workers
+ * set this to -1). Includes state within leader needed for it to
+ * participate as a worker, if any.
+ */
+ int nParticipants;
+
+ /* Private opaque state (points to shared memory) */
+ Sharedsort *sharedsort;
+} SortCoordinateData;
+
+typedef struct SortCoordinateData *SortCoordinate;
/*
* Data structures for reporting sort statistics. Note that
* sorting HeapTuples and two more for sorting IndexTuples. Yet another
* API supports sorting bare Datums.
*
+ * Serial sort callers should pass NULL for their coordinate argument.
+ *
* The "heap" API actually stores/sorts MinimalTuples, which means it doesn't
* preserve the system columns (tuple identity and transaction visibility
* info). The sort keys are specified by column numbers within the tuples
*
* The "index_hash" API is similar to index_btree, but the tuples are
* actually sorted by their hash codes not the raw data.
+ *
+ * Parallel sort callers are required to coordinate multiple tuplesort states
+ * in a leader process and one or more worker processes. The leader process
+ * must launch workers, and have each perform an independent "partial"
+ * tuplesort, typically fed by the parallel heap interface. The leader later
+ * produces the final output (internally, it merges runs output by workers).
+ *
+ * Callers must do the following to perform a sort in parallel using multiple
+ * worker processes:
+ *
+ * 1. Request tuplesort-private shared memory for n workers. Use
+ * tuplesort_estimate_shared() to get the required size.
+ * 2. Have leader process initialize allocated shared memory using
+ * tuplesort_initialize_shared(). Launch workers.
+ * 3. Initialize a coordinate argument within both the leader process, and
+ * for each worker process. This has a pointer to the shared
+ * tuplesort-private structure, as well as some caller-initialized fields.
+ * Leader's coordinate argument reliably indicates number of workers
+ * launched (this is unused by workers).
+ * 4. Begin a tuplesort using some appropriate tuplesort_begin* routine,
+ * (passing the coordinate argument) within each worker. The workMem
+ * arguments need not be identical. All other arguments should match
+ * exactly, though.
+ * 5. tuplesort_attach_shared() should be called by all workers. Feed tuples
+ * to each worker, and call tuplesort_performsort() within each when input
+ * is exhausted.
+ * 6. Call tuplesort_end() in each worker process. Worker processes can shut
+ * down once tuplesort_end() returns.
+ * 7. Begin a tuplesort in the leader using the same tuplesort_begin*
+ * routine, passing a leader-appropriate coordinate argument (this can
+ * happen as early as during step 3, actually, since we only need to know
+ * the number of workers successfully launched). The leader must now wait
+ * for workers to finish. Caller must use own mechanism for ensuring that
+ * next step isn't reached until all workers have called and returned from
+ * tuplesort_performsort(). (Note that it's okay if workers have already
+ * also called tuplesort_end() by then.)
+ * 8. Call tuplesort_performsort() in leader. Consume output using the
+ * appropriate tuplesort_get* routine. Leader can skip this step if
+ * tuplesort turns out to be unnecessary.
+ * 9. Call tuplesort_end() in leader.
+ *
+ * This division of labor assumes nothing about how input tuples are produced,
+ * but does require that caller combine the state of multiple tuplesorts for
+ * any purpose other than producing the final output. For example, callers
+ * must consider that tuplesort_get_stats() reports on only one worker's role
+ * in a sort (or the leader's role), and not statistics for the sort as a
+ * whole.
+ *
+ * Note that callers may use the leader process to sort runs as if it was an
+ * independent worker process (prior to the process performing a leader sort
+ * to produce the final sorted output). Doing so only requires a second
+ * "partial" tuplesort within the leader process, initialized like that of a
+ * worker process. The steps above don't touch on this directly. The only
+ * difference is that the tuplesort_attach_shared() call is never needed within
+ * leader process, because the backend as a whole holds the shared fileset
+ * reference. A worker Tuplesortstate in leader is expected to do exactly the
+ * same amount of total initial processing work as a worker process
+ * Tuplesortstate, since the leader process has nothing else to do before
+ * workers finish.
+ *
+ * Note that only a very small amount of memory will be allocated prior to
+ * the leader state first consuming input, and that workers will free the
+ * vast majority of their memory upon returning from tuplesort_performsort().
+ * Callers can rely on this to arrange for memory to be used in a way that
+ * respects a workMem-style budget across an entire parallel sort operation.
+ *
+ * Callers are responsible for parallel safety in general. However, they
+ * can at least rely on there being no parallel safety hazards within
+ * tuplesort, because tuplesort thinks of the sort as several independent
+ * sorts whose results are combined. Since, in general, the behavior of
+ * sort operators is immutable, caller need only worry about the parallel
+ * safety of whatever the process is through which input tuples are
+ * generated (typically, caller uses a parallel heap scan).
*/
extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
- Relation indexRel,
- int workMem, bool randomAccess);
+ Relation indexRel, int workMem,
+ SortCoordinate coordinate, bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel,
uint32 high_mask,
uint32 low_mask,
uint32 max_buckets,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation,
bool nullsFirstFlag,
- int workMem, bool randomAccess);
+ int workMem, SortCoordinate coordinate,
+ bool randomAccess);
extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);
extern int tuplesort_merge_order(int64 allowedMem);
+extern Size tuplesort_estimate_shared(int nworkers);
+extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers,
+ dsm_segment *seg);
+extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg);
+
/*
* These routines may only be called if randomAccess was specified 'true'.
* Likewise, backwards scan in gettuple/getdatum is only allowed if
- * randomAccess was specified.
+ * randomAccess was specified. Note that parallel sorts do not support
+ * randomAccess.
*/
extern void tuplesort_rescan(Tuplesortstate *state);
BTBuildState
BTCycleId
BTIndexStat
+BTLeader
BTMetaPageData
BTOneVacInfo
BTPS_State
BTScanPos
BTScanPosData
BTScanPosItem
+BTShared
BTSortArrayContext
BTSpool
BTStack
SharedTuplestore
SharedTuplestoreAccessor
SharedTypmodTableEntry
+Sharedsort
ShellTypeInfo
ShippableCacheEntry
ShippableCacheKey
SortBy
SortByDir
SortByNulls
+SortCoordinate
+SortCoordinateData
SortGroupClause
SortItem
SortPath
TablespaceList
TablespaceListCell
TapeBlockTrailer
+TapeShare
TarMethodData
TarMethodFile
TargetEntry