ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_sender(mq, MyProc);
mqh = shm_mq_attach(mq, seg, NULL);
- pq_redirect_to_shm_mq(mq, mqh);
+ pq_redirect_to_shm_mq(seg, mqh);
pq_set_parallel_master(fps->parallel_master_pid,
fps->parallel_master_backend_id);
static pid_t pq_mq_parallel_master_pid = 0;
static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
+static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
static void mq_comm_reset(void);
static int mq_flush(void);
static int mq_flush_if_writable(void);
* message queue.
*/
void
-pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
{
PqCommMethods = &PqCommMqMethods;
- pq_mq = mq;
+ pq_mq = shm_mq_get_queue(mqh);
pq_mq_handle = mqh;
whereToSendOutput = DestRemote;
FrontendProtocol = PG_PROTOCOL_LATEST;
+ on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+}
+
+/*
+ * When the DSM that contains our shm_mq goes away, we need to stop sending
+ * messages to it.
+ */
+static void
+pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
+{
+ pq_mq = NULL;
+ pq_mq_handle = NULL;
+ whereToSendOutput = DestNone;
}
/*
if (pq_mq != NULL)
shm_mq_detach(pq_mq);
pq_mq = NULL;
+ pq_mq_handle = NULL;
return EOF;
}
+ /*
+ * If the message queue is already gone, just ignore the message. This
+ * doesn't necessarily indicate a problem; for example, DEBUG messages
+ * can be generated late in the shutdown sequence, after all DSMs have
+ * already been detached.
+ */
+ if (pq_mq == NULL)
+ return 0;
+
pq_mq_busy = true;
iov[0].data = &msgtype;
#include "lib/stringinfo.h"
#include "storage/shm_mq.h"
-extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);