]> granicus.if.org Git - postgresql/commitdiff
Minor cleanup for access/transam/parallel.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Aug 2016 20:12:01 +0000 (16:12 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Aug 2016 20:12:01 +0000 (16:12 -0400)
ParallelMessagePending *must* be marked volatile, because it's set
by a signal handler.  On the other hand, it's pointless for
HandleParallelMessageInterrupt to save/restore errno; that must be,
and is, done at the outer level of the SIGUSR1 signal handler.

Calling CHECK_FOR_INTERRUPTS() inside HandleParallelMessages, which itself
is called from CHECK_FOR_INTERRUPTS(), seems both useless and hazardous.
The comment claiming that this is needed to handle the error queue going
away is certainly misguided, in any case.

Improve a couple of error message texts, and use
ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE to report loss of parallel worker
connection, since that's what's used in e.g. tqueue.c.  (Maybe it would be
worth inventing a dedicated ERRCODE for this type of failure?  But I do not
think ERRCODE_INTERNAL_ERROR is appropriate.)

Minor stylistic cleanups.

src/backend/access/transam/parallel.c
src/include/access/parallel.h

index eef1dc2b1843de5376e415a12ec1eb86a6a56182..a303fca35ce6f79f951d4e888179d8a7cd76ce77 100644 (file)
@@ -14,9 +14,9 @@
 
 #include "postgres.h"
 
+#include "access/parallel.h"
 #include "access/xact.h"
 #include "access/xlog.h"
-#include "access/parallel.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "libpq/libpq.h"
@@ -35,6 +35,7 @@
 #include "utils/resowner.h"
 #include "utils/snapmgr.h"
 
+
 /*
  * We don't want to waste a lot of memory on an error queue which, most of
  * the time, will process only a handful of small messages.  However, it is
@@ -94,7 +95,7 @@ typedef struct FixedParallelState
 int                    ParallelWorkerNumber = -1;
 
 /* Is there a parallel message pending which we need to receive? */
-bool           ParallelMessagePending = false;
+volatile bool ParallelMessagePending = false;
 
 /* Are we initializing a parallel worker? */
 bool           InitializingParallelWorker = false;
@@ -106,12 +107,13 @@ static FixedParallelState *MyFixedParallelState;
 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
 
 /* Private functions. */
-static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
 static void ParallelErrorContext(void *arg);
 static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
 static void ParallelWorkerMain(Datum main_arg);
 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 
+
 /*
  * Establish a new parallel context.  This should be done after entering
  * parallel mode, and (unless there is an error) the context should be
@@ -681,17 +683,17 @@ ParallelContextActive(void)
 
 /*
  * Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * Note: this is called within a signal handler!  All we can do is set
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * HandleParallelMessages().
  */
 void
 HandleParallelMessageInterrupt(void)
 {
-       int                     save_errno = errno;
-
        InterruptPending = true;
        ParallelMessagePending = true;
        SetLatch(MyLatch);
-
-       errno = save_errno;
 }
 
 /*
@@ -742,11 +744,8 @@ HandleParallelMessages(void)
                                }
                                else
                                        ereport(ERROR,
-                                                       (errcode(ERRCODE_INTERNAL_ERROR),       /* XXX: wrong errcode? */
-                                                        errmsg("lost connection to parallel worker")));
-
-                               /* This might make the error queue go away. */
-                               CHECK_FOR_INTERRUPTS();
+                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                  errmsg("lost connection to parallel worker")));
                        }
                }
        }
@@ -833,7 +832,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 
                default:
                        {
-                               elog(ERROR, "unknown message type: %c (%d bytes)",
+                               elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
                                         msgtype, msg->len);
                        }
        }
index 2a0832fec02d0973d14665ecde359da22c399c8b..2f8f36fea4a199983a39638fde3a8228ae00992e 100644 (file)
@@ -19,7 +19,6 @@
 #include "postmaster/bgworker.h"
 #include "storage/shm_mq.h"
 #include "storage/shm_toc.h"
-#include "utils/elog.h"
 
 typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
 
@@ -47,7 +46,7 @@ typedef struct ParallelContext
        ParallelWorkerInfo *worker;
 } ParallelContext;
 
-extern bool ParallelMessagePending;
+extern volatile bool ParallelMessagePending;
 extern int     ParallelWorkerNumber;
 extern bool InitializingParallelWorker;
 
@@ -55,17 +54,17 @@ extern bool InitializingParallelWorker;
 
 extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
 extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
-extern void InitializeParallelDSM(ParallelContext *);
+extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
-extern void LaunchParallelWorkers(ParallelContext *);
-extern void WaitForParallelWorkersToFinish(ParallelContext *);
-extern void DestroyParallelContext(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
+extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
 
 extern void HandleParallelMessageInterrupt(void);
 extern void HandleParallelMessages(void);
 extern void AtEOXact_Parallel(bool isCommit);
 extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
-extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
+extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
 
 #endif   /* PARALLEL_H */