#include "postgres.h"
+#include "access/parallel.h"
#include "access/xact.h"
#include "access/xlog.h"
-#include "access/parallel.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "libpq/libpq.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+
/*
* We don't want to waste a lot of memory on an error queue which, most of
* the time, will process only a handful of small messages. However, it is
int ParallelWorkerNumber = -1;
/* Is there a parallel message pending which we need to receive? */
-bool ParallelMessagePending = false;
+volatile bool ParallelMessagePending = false;
/* Are we initializing a parallel worker? */
bool InitializingParallelWorker = false;
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
/* Private functions. */
-static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
+
/*
* Establish a new parallel context. This should be done after entering
* parallel mode, and (unless there is an error) the context should be
/*
* Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * Note: this is called within a signal handler! All we can do is set
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * HandleParallelMessages().
*/
void
HandleParallelMessageInterrupt(void)
{
- int save_errno = errno;
-
InterruptPending = true;
ParallelMessagePending = true;
SetLatch(MyLatch);
-
- errno = save_errno;
}
/*
}
else
ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
- errmsg("lost connection to parallel worker")));
-
- /* This might make the error queue go away. */
- CHECK_FOR_INTERRUPTS();
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to parallel worker")));
}
}
}
default:
{
- elog(ERROR, "unknown message type: %c (%d bytes)",
+ elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
msgtype, msg->len);
}
}
#include "postmaster/bgworker.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
-#include "utils/elog.h"
typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
ParallelWorkerInfo *worker;
} ParallelContext;
-extern bool ParallelMessagePending;
+extern volatile bool ParallelMessagePending;
extern int ParallelWorkerNumber;
extern bool InitializingParallelWorker;
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
-extern void InitializeParallelDSM(ParallelContext *);
+extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
-extern void LaunchParallelWorkers(ParallelContext *);
-extern void WaitForParallelWorkersToFinish(ParallelContext *);
-extern void DestroyParallelContext(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
+extern void DestroyParallelContext(ParallelContext *pcxt);
extern bool ParallelContextActive(void);
extern void HandleParallelMessageInterrupt(void);
extern void HandleParallelMessages(void);
extern void AtEOXact_Parallel(bool isCommit);
extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
-extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
+extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
#endif /* PARALLEL_H */