From a5fe473ad79d8d2c85a625621c165e8c601274e4 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Mon, 1 Aug 2016 16:12:01 -0400 Subject: [PATCH] Minor cleanup for access/transam/parallel.c. ParallelMessagePending *must* be marked volatile, because it's set by a signal handler. On the other hand, it's pointless for HandleParallelMessageInterrupt to save/restore errno; that must be, and is, done at the outer level of the SIGUSR1 signal handler. Calling CHECK_FOR_INTERRUPTS() inside HandleParallelMessages, which itself is called from CHECK_FOR_INTERRUPTS(), seems both useless and hazardous. The comment claiming that this is needed to handle the error queue going away is certainly misguided, in any case. Improve a couple of error message texts, and use ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE to report loss of parallel worker connection, since that's what's used in e.g. tqueue.c. (Maybe it would be worth inventing a dedicated ERRCODE for this type of failure? But I do not think ERRCODE_INTERNAL_ERROR is appropriate.) Minor stylistic cleanups. --- src/backend/access/transam/parallel.c | 25 ++++++++++++------------- src/include/access/parallel.h | 13 ++++++------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index eef1dc2b18..a303fca35c 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,9 +14,9 @@ #include "postgres.h" +#include "access/parallel.h" #include "access/xact.h" #include "access/xlog.h" -#include "access/parallel.h" #include "catalog/namespace.h" #include "commands/async.h" #include "libpq/libpq.h" @@ -35,6 +35,7 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" + /* * We don't want to waste a lot of memory on an error queue which, most of * the time, will process only a handful of small messages. However, it is @@ -94,7 +95,7 @@ typedef struct FixedParallelState int ParallelWorkerNumber = -1; /* Is there a parallel message pending which we need to receive? */ -bool ParallelMessagePending = false; +volatile bool ParallelMessagePending = false; /* Are we initializing a parallel worker? */ bool InitializingParallelWorker = false; @@ -106,12 +107,13 @@ static FixedParallelState *MyFixedParallelState; static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); /* Private functions. */ -static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); + /* * Establish a new parallel context. This should be done after entering * parallel mode, and (unless there is an error) the context should be @@ -681,17 +683,17 @@ ParallelContextActive(void) /* * Handle receipt of an interrupt indicating a parallel worker message. + * + * Note: this is called within a signal handler! All we can do is set + * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke + * HandleParallelMessages(). */ void HandleParallelMessageInterrupt(void) { - int save_errno = errno; - InterruptPending = true; ParallelMessagePending = true; SetLatch(MyLatch); - - errno = save_errno; } /* @@ -742,11 +744,8 @@ HandleParallelMessages(void) } else ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ - errmsg("lost connection to parallel worker"))); - - /* This might make the error queue go away. */ - CHECK_FOR_INTERRUPTS(); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to parallel worker"))); } } } @@ -833,7 +832,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) default: { - elog(ERROR, "unknown message type: %c (%d bytes)", + elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)", msgtype, msg->len); } } diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 2a0832fec0..2f8f36fea4 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -19,7 +19,6 @@ #include "postmaster/bgworker.h" #include "storage/shm_mq.h" #include "storage/shm_toc.h" -#include "utils/elog.h" typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc); @@ -47,7 +46,7 @@ typedef struct ParallelContext ParallelWorkerInfo *worker; } ParallelContext; -extern bool ParallelMessagePending; +extern volatile bool ParallelMessagePending; extern int ParallelWorkerNumber; extern bool InitializingParallelWorker; @@ -55,17 +54,17 @@ extern bool InitializingParallelWorker; extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); -extern void InitializeParallelDSM(ParallelContext *); +extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); -extern void LaunchParallelWorkers(ParallelContext *); -extern void WaitForParallelWorkersToFinish(ParallelContext *); -extern void DestroyParallelContext(ParallelContext *); +extern void LaunchParallelWorkers(ParallelContext *pcxt); +extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); +extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); extern void HandleParallelMessageInterrupt(void); extern void HandleParallelMessages(void); extern void AtEOXact_Parallel(bool isCommit); extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); -extern void ParallelWorkerReportLastRecEnd(XLogRecPtr); +extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end); #endif /* PARALLEL_H */ -- 2.40.0