* parallel.c
* Infrastructure for launching parallel workers
*
- * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include "postgres.h"
+#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "catalog/pg_enum.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "miscadmin.h"
-#include "optimizer/planmain.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "storage/ipc.h"
+#include "storage/predicate.h"
#include "storage/sinval.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/memutils.h"
-#include "utils/resowner.h"
+#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/typcache.h"
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
+#define PARALLEL_KEY_ENUMBLACKLIST UINT64CONST(0xFFFFFFFFFFFF000D)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
PGPROC *parallel_master_pgproc;
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;
+ TimestampTz xact_ts;
+ TimestampTz stmt_ts;
+ SerializableXactHandle serializable_xact_handle;
/* Mutex protects remaining fields. */
slock_t mutex;
{
{
"ParallelQueryMain", ParallelQueryMain
+ },
+ {
+ "_bt_parallel_build_main", _bt_parallel_build_main
}
};
/* Number of workers should be non-negative. */
Assert(nworkers >= 0);
- /*
- * If dynamic shared memory is not available, we won't be able to use
- * background workers.
- */
- if (dynamic_shared_memory_type == DSM_IMPL_NONE)
- nworkers = 0;
-
- /*
- * If we are running under serializable isolation, we can't use parallel
- * workers, at least not until somebody enhances that mechanism to be
- * parallel-aware.
- */
- if (IsolationIsSerializable())
- nworkers = 0;
-
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
Size asnaplen = 0;
Size tstatelen = 0;
Size reindexlen = 0;
+ Size relmapperlen = 0;
+ Size enumblacklistlen = 0;
Size segsize = 0;
int i;
FixedParallelState *fps;
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
reindexlen = EstimateReindexStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
+ relmapperlen = EstimateRelationMapSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
+ enumblacklistlen = EstimateEnumBlacklistSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, enumblacklistlen);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 8);
+ shm_toc_estimate_keys(&pcxt->estimator, 10);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
fps->parallel_master_pgproc = MyProc;
fps->parallel_master_pid = MyProcPid;
fps->parallel_master_backend_id = MyBackendId;
+ fps->xact_ts = GetCurrentTransactionStartTimestamp();
+ fps->stmt_ts = GetCurrentStatementStartTimestamp();
+ fps->serializable_xact_handle = ShareSerializableXact();
SpinLockInit(&fps->mutex);
fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
+ char *enumblacklistspace;
Size lnamelen;
/* Serialize shared libraries we have loaded. */
SerializeReindexState(reindexlen, reindexspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+ /* Serialize relmapper state. */
+ relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
+ SerializeRelationMap(relmapperlen, relmapperspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
+ relmapperspace);
+
+ /* Serialize enum blacklist state. */
+ enumblacklistspace = shm_toc_allocate(pcxt->toc, enumblacklistlen);
+ SerializeEnumBlacklist(enumblacklistspace, enumblacklistlen);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENUMBLACKLIST,
+ enumblacklistspace);
+
/* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
ReinitializeParallelDSM(ParallelContext *pcxt)
{
FixedParallelState *fps;
- char *error_queue_space;
- int i;
/* Wait for any old workers to exit. */
if (pcxt->nworkers_launched > 0)
fps->last_xlog_end = 0;
/* Recreate error queues (if they exist). */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, true);
- Assert(pcxt->nworkers == 0 || error_queue_space != NULL);
- for (i = 0; i < pcxt->nworkers; ++i)
+ if (pcxt->nworkers > 0)
{
- char *start;
- shm_mq *mq;
+ char *error_queue_space;
+ int i;
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
}
}
* just end up waiting for the same worker again.
*/
rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
-1, WAIT_EVENT_BGWORKER_STARTUP);
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
-
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
}
}
- WaitLatch(MyLatch, WL_LATCH_SET, -1,
- WAIT_EVENT_PARALLEL_FINISH);
+ (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+ WAIT_EVENT_PARALLEL_FINISH);
ResetLatch(MyLatch);
}
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
+ char *enumblacklistspace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
Assert(ParallelWorkerNumber == -1);
memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
- /* Set up a memory context and resource owner. */
- Assert(CurrentResourceOwner == NULL);
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+ /* Set up a memory context to work in, just for cleanliness. */
CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
"Parallel worker",
ALLOCSET_DEFAULT_SIZES);
/*
- * Now that we have a resource owner, we can attach to the dynamic shared
- * memory segment and read the table of contents.
+ * Attach to the dynamic shared memory segment for the parallel query, and
+ * find its table of contents.
+ *
+ * Note: at this point, we have not created any ResourceOwner in this
+ * process. This will result in our DSM mapping surviving until process
+ * exit, which is fine. If there were a ResourceOwner, it would acquire
+ * ownership of the mapping, but we have no need for that.
*/
seg = dsm_attach(DatumGetUInt32(main_arg));
if (seg == NULL)
return;
/*
- * Load libraries that were loaded by original backend. We want to do
- * this before restoring GUCs, because the libraries might define custom
- * variables.
+ * Restore transaction and statement start-time timestamps. This must
+ * happen before anything that would start a transaction, else asserts in
+ * xact.c will fire.
*/
- libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
- RestoreLibraryState(libraryspace);
+ SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
/*
* Identify the entry point to be called. In theory this could result in
/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
- fps->authenticated_user_id);
+ fps->authenticated_user_id,
+ 0);
/*
* Set the client encoding to the database encoding, since that is what
*/
SetClientEncoding(GetDatabaseEncoding());
+ /*
+ * Load libraries that were loaded by original backend. We want to do
+ * this before restoring GUCs, because the libraries might define custom
+ * variables.
+ */
+ libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
+ StartTransactionCommand();
+ RestoreLibraryState(libraryspace);
+
/* Restore GUC values from launching backend. */
gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
- StartTransactionCommand();
RestoreGUCState(gucspace);
CommitTransactionCommand();
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);
+ /* Restore relmapper state. */
+ relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
+ RestoreRelationMap(relmapperspace);
+
+ /* Restore enum blacklist. */
+ enumblacklistspace = shm_toc_lookup(toc, PARALLEL_KEY_ENUMBLACKLIST,
+ false);
+ RestoreEnumBlacklist(enumblacklistspace);
+
+ /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
+ AttachSerializableXact(fps->serializable_xact_handle);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
/* Must exit parallel mode to pop active snapshot. */
ExitParallelMode();
- /* Must pop active snapshot so resowner.c doesn't complain. */
+ /* Must pop active snapshot so snapmgr.c doesn't complain. */
PopActiveSnapshot();
/* Shut down the parallel-worker transaction. */