]> granicus.if.org Git - postgresql/commitdiff
Single-reader, single-writer, lightweight shared message queue.
authorRobert Haas <rhaas@postgresql.org>
Tue, 14 Jan 2014 17:23:22 +0000 (12:23 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 14 Jan 2014 17:23:22 +0000 (12:23 -0500)
This code provides infrastructure for user backends to communicate
relatively easily with background workers.  The message queue is
structured as a ring buffer and allows messages of arbitary length
to be sent and received.

Patch by me.  Review by KaiGai Kohei and Andres Freund.

src/backend/storage/ipc/Makefile
src/backend/storage/ipc/shm_mq.c [new file with mode: 0644]
src/include/storage/shm_mq.h [new file with mode: 0644]

index df0a49ed6cfb7bfc2ecfb3d21258d2badb2307d3..850347c36717997fd7c1ddd6636206062325aa72 100644 (file)
@@ -16,6 +16,6 @@ endif
 endif
 
 OBJS = dsm_impl.o dsm.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o \
-       shmem.o shmqueue.o shm_toc.o sinval.o sinvaladt.o standby.o
+       shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o sinvaladt.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
new file mode 100644 (file)
index 0000000..2d298a3
--- /dev/null
@@ -0,0 +1,945 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_mq.c
+ *       single-reader, single-writer shared memory message queue
+ *
+ * Both the sender and the receiver must have a PGPROC; their respective
+ * process latches are used for synchronization.  Only the sender may send,
+ * and only the receiver may receive.  This is intended to allow a user
+ * backend to communicate with worker backends that it has registered.
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_mq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * This structure represents the actual queue, stored in shared memory.
+ *
+ * Some notes on synchronization:
+ *
+ * mq_receiver and mq_bytes_read can only be changed by the receiver; and
+ * mq_sender and mq_bytes_written can only be changed by the sender.  However,
+ * because most of these fields are 8 bytes and we don't assume that 8 byte
+ * reads and writes are atomic, the spinlock must be taken whenever the field
+ * is updated, and whenever it is read by a process other than the one allowed
+ * to modify it. But the process that is allowed to modify it is also allowed
+ * to read it without the lock.  On architectures where 8-byte writes are
+ * atomic, we could replace these spinlocks with memory barriers, but
+ * testing found no performance benefit, so it seems best to keep things
+ * simple for now.
+ *
+ * mq_detached can be set by either the sender or the receiver, so the mutex
+ * must be held to read or write it.  Memory barriers could be used here as
+ * well, if needed.
+ *
+ * mq_ring_size and mq_ring_offset never change after initialization, and
+ * can therefore be read without the lock.
+ *
+ * Importantly, mq_ring can be safely read and written without a lock.  Were
+ * this not the case, we'd have to hold the spinlock for much longer
+ * intervals, and performance might suffer.  Fortunately, that's not
+ * necessary.  At any given time, the difference between mq_bytes_read and
+ * mq_bytes_written defines the number of bytes within mq_ring that contain
+ * unread data, and mq_bytes_read defines the position where those bytes
+ * begin.  The sender can increase the number of unread bytes at any time,
+ * but only the receiver can give license to overwrite those bytes, by
+ * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
+ * the unread bytes it knows to be present without the lock.  Conversely,
+ * the sender can write to the unused portion of the ring buffer without
+ * the lock, because nobody else can be reading or writing those bytes.  The
+ * receiver could be making more bytes unused by incrementing mq_bytes_read,
+ * but that's OK.  Note that it would be unsafe for the receiver to read any
+ * data it's already marked as read, or to write any data; and it would be
+ * unsafe for the sender to reread any data after incrementing
+ * mq_bytes_written, but fortunately there's no need for any of that.
+ */
+struct shm_mq
+{
+       slock_t         mq_mutex;
+       PGPROC     *mq_receiver;
+       PGPROC     *mq_sender;
+       uint64          mq_bytes_read;
+       uint64          mq_bytes_written;
+       uint64          mq_ring_size;
+       bool            mq_detached;
+       uint8           mq_ring_offset;
+       char            mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * This structure is a backend-private handle for access to a queue.
+ *
+ * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
+ * a pointer to the dynamic shared memory segment that contains it.
+ *
+ * If this queue is intended to connect the current process with a background
+ * worker that started it, the user can pass a pointer to the worker handle
+ * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
+ * is to allow us to begin sending to or receiving from that queue before the
+ * process we'll be communicating with has even been started.  If it fails
+ * to start, the handle will allow us to notice that and fail cleanly, rather
+ * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
+ * simple cases - e.g. where there are just 2 processes communicating; in
+ * more complex scenarios, every process may not have a BackgroundWorkerHandle
+ * available, or may need to watch for the failure of more than one other
+ * process at a time.
+ *
+ * When a message exists as a contiguous chunk of bytes in the queue - that is,
+ * it is smaller than the size of the ring buffer and does not wrap around
+ * the end - we return the message to the caller as a pointer into the buffer.
+ * For messages that are larger or happen to wrap, we reassemble the message
+ * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
+ * the buffer, and mqh_buflen is the number of bytes allocated for it.
+ *
+ * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
+ * are used to track the state of non-blocking operations.  When the caller
+ * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
+ * are expected to retry the call at a later time with the same argument;
+ * we need to retain enough state to pick up where we left off.
+ * mqh_did_length_word tracks whether we read or wrote the length word,
+ * mqh_partial_message_bytes tracks the number of payload bytes read or
+ * written, and mqh_expected_bytes - which is used only for reads - tracks
+ * the expected total size of the payload.
+ *
+ * mqh_counterparty_attached tracks whether we know the counterparty to have
+ * attached to the queue at some previous point.  This lets us avoid some
+ * mutex acquisitions.
+ *
+ * mqh_context is the memory context in effect at the time we attached to
+ * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
+ * we make sure any other allocations we do happen in this context as well,
+ * to avoid nasty surprises.
+ */
+struct shm_mq_handle
+{
+       shm_mq     *mqh_queue;
+       dsm_segment *mqh_segment;
+       BackgroundWorkerHandle *mqh_handle;
+       char       *mqh_buffer;
+       uint64          mqh_buflen;
+       uint64          mqh_consume_pending;
+       uint64          mqh_partial_message_bytes;
+       uint64          mqh_expected_bytes;
+       bool            mqh_did_length_word;
+       bool            mqh_counterparty_attached;
+       MemoryContext mqh_context;
+};
+
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
+                                 void *data, bool nowait, uint64 *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
+                                        bool nowait, uint64 *nbytesp, void **datap);
+static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
+                                        BackgroundWorkerHandle *handle);
+static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
+static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+
+/* Minimum queue size is enough for header and at least one chunk of data. */
+const Size shm_mq_minimum_size =
+       MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
+
+#define MQH_INITIAL_BUFSIZE                            8192
+
+/*
+ * Initialize a new shared message queue.
+ */
+shm_mq *
+shm_mq_create(void *address, Size size)
+{
+       shm_mq     *mq = address;
+       uint64          data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+       /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+       size = MAXALIGN_DOWN(size);
+
+       /* Queue size must be large enough to hold some data. */
+       Assert(size > data_offset);
+
+       /* Initialize queue header. */
+       SpinLockInit(&mq->mq_mutex);
+       mq->mq_receiver = NULL;
+       mq->mq_sender = NULL;
+       mq->mq_bytes_read = 0;
+       mq->mq_bytes_written = 0;
+       mq->mq_ring_size = size - data_offset;
+       mq->mq_detached = false;
+       mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
+
+       return mq;
+}
+
+/*
+ * Set the identity of the process that will receive from a shared message
+ * queue.
+ */
+void
+shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
+{
+       volatile shm_mq *vmq = mq;
+       PGPROC   *sender;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       Assert(vmq->mq_receiver == NULL);
+       vmq->mq_receiver = proc;
+       sender = vmq->mq_sender;
+       SpinLockRelease(&mq->mq_mutex);
+
+       if (sender != NULL)
+               SetLatch(&sender->procLatch);
+}
+
+/*
+ * Set the identity of the process that will send to a shared message queue.
+ */
+void
+shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
+{
+       volatile shm_mq *vmq = mq;
+       PGPROC   *receiver;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       Assert(vmq->mq_sender == NULL);
+       vmq->mq_sender = proc;
+       receiver = vmq->mq_receiver;
+       SpinLockRelease(&mq->mq_mutex);
+
+       if (receiver != NULL)
+               SetLatch(&receiver->procLatch);
+}
+
+/*
+ * Get the configured receiver.
+ */
+PGPROC *
+shm_mq_get_receiver(shm_mq *mq)
+{
+       volatile shm_mq *vmq = mq;
+       PGPROC   *receiver;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       receiver = vmq->mq_receiver;
+       SpinLockRelease(&mq->mq_mutex);
+
+       return receiver;
+}
+
+/*
+ * Get the configured sender.
+ */
+PGPROC *
+shm_mq_get_sender(shm_mq *mq)
+{
+       volatile shm_mq *vmq = mq;
+       PGPROC   *sender;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       sender = vmq->mq_sender;
+       SpinLockRelease(&mq->mq_mutex);
+
+       return sender;
+}
+
+/*
+ * Attach to a shared message queue so we can send or receive messages.
+ *
+ * The memory context in effect at the time this function is called should
+ * be one which will last for at least as long as the message queue itself.
+ * We'll allocate the handle in that context, and future allocations that
+ * are needed to buffer incoming data will happen in that context as well.
+ *
+ * If seg != NULL, the queue will be automatically detached when that dynamic
+ * shared memory segment is detached.
+ *
+ * If handle != NULL, the queue can be read or written even before the
+ * other process has attached.  We'll wait for it to do so if needed.  The
+ * handle must be for a background worker initialized with bgw_notify_pid
+ * equal to our PID.
+ *
+ * shm_mq_detach() should be called when done.  This will free the
+ * shm_mq_handle and mark the queue itself as detached, so that our
+ * counterpart won't get stuck waiting for us to fill or drain the queue
+ * after we've already lost interest.
+ */
+shm_mq_handle *
+shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
+{
+       shm_mq_handle      *mqh = palloc(sizeof(shm_mq_handle));
+
+       Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+       mqh->mqh_queue = mq;
+       mqh->mqh_segment = seg;
+       mqh->mqh_buffer = NULL;
+       mqh->mqh_handle = handle;
+       mqh->mqh_buflen = 0;
+       mqh->mqh_consume_pending = 0;
+       mqh->mqh_context = CurrentMemoryContext;
+       mqh->mqh_partial_message_bytes = 0;
+       mqh->mqh_did_length_word = false;
+       mqh->mqh_counterparty_attached = false;
+
+       if (seg != NULL)
+               on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
+
+       return mqh;
+}
+
+/*
+ * Write a message into a shared message queue.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * fills up, and then continue writing once the receiver has drained some data.
+ * The process latch is reset after each wait.
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
+ * this case, the caller should call this function again, with the same
+ * arguments, each time the process latch is set.  (Once begun, the sending
+ * of a message cannot be aborted except by detaching from the queue; changing
+ * the length or payload will corrupt the queue.)
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+{
+       shm_mq_result   res;
+       shm_mq             *mq = mqh->mqh_queue;
+       uint64                  bytes_written;
+
+       Assert(mq->mq_sender == MyProc);
+
+       /* Write the message length into the buffer. */
+       if (!mqh->mqh_did_length_word)
+       {
+               res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
+                                                               &bytes_written);
+               if (res != SHM_MQ_SUCCESS)
+                       return res;
+
+               /*
+                * We're sure to have sent the length in full, since we always
+                * write a MAXALIGN'd chunk.
+                */
+               Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
+               mqh->mqh_did_length_word = true;
+       }
+
+       /* Write the actual data bytes into the buffer. */
+       Assert(mqh->mqh_partial_message_bytes <= nbytes);
+       res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
+                                                       ((char *) data) + mqh->mqh_partial_message_bytes,
+                                                       nowait, &bytes_written);
+       if (res == SHM_MQ_WOULD_BLOCK)
+               mqh->mqh_partial_message_bytes += bytes_written;
+       else
+       {
+               mqh->mqh_partial_message_bytes = 0;
+               mqh->mqh_did_length_word = false;
+       }
+       if (res != SHM_MQ_SUCCESS)
+               return res;
+
+       /* Notify receiver of the newly-written data, and return. */
+       return shm_mq_notify_receiver(mq);
+}
+
+/*
+ * Receive a message from a shared message queue.
+ *
+ * We set *nbytes to the message length and *data to point to the message
+ * payload.  If the entire message exists in the queue as a single,
+ * contiguous chunk, *data will point directly into shared memory; otherwise,
+ * it will point to a temporary buffer.  This mostly avoids data copying in
+ * the hoped-for case where messages are short compared to the buffer size,
+ * while still allowing longer messages.  In either case, the return value
+ * remains valid until the next receive operation is perfomed on the queue.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * is empty and we have not yet received a full message.  The sender will
+ * set our process latch after more data has been written, and we'll resume
+ * processing.  Each call will therefore return a complete message
+ * (unless the sender detaches the queue).
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, whenever the buffer is empty and we need to read from it, we
+ * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
+ * function again after the process latch has been set.
+ */
+shm_mq_result
+shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+{
+       shm_mq             *mq = mqh->mqh_queue;
+       shm_mq_result   res;
+       uint64                  rb = 0;
+       uint64                  nbytes;
+       uint64                  needed;
+       void               *rawdata;
+
+       Assert(mq->mq_receiver == MyProc);
+
+       /* We can't receive data until the sender has attached. */
+       if (!mqh->mqh_counterparty_attached)
+       {
+               if (nowait)
+               {
+                       if (shm_mq_get_sender(mq) == NULL)
+                               return SHM_MQ_WOULD_BLOCK;
+               }
+               else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle))
+               {
+                       mq->mq_detached = true;
+                       return SHM_MQ_DETACHED;
+               }
+               mqh->mqh_counterparty_attached = true;
+       }
+
+       /* Consume any zero-copy data from previous receive operation. */
+       if (mqh->mqh_consume_pending > 0)
+       {
+               shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+               mqh->mqh_consume_pending = 0;
+       }
+
+       /* Determine the message length. */
+       if (mqh->mqh_did_length_word)
+       {
+               /* We've partially received a message; recall expected length. */
+               nbytes = mqh->mqh_expected_bytes;
+       }
+       else
+       {
+               /* Try to receive the message length word. */
+               res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+               if (res != SHM_MQ_SUCCESS)
+                       return res;
+               Assert(rb >= sizeof(uint64));
+               memcpy(&nbytes, rawdata, sizeof(uint64));
+               mqh->mqh_expected_bytes = nbytes;
+
+               /* If we've already got the whole message, we're done. */
+               needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
+               if (rb >= needed)
+               {
+                       /*
+                        * Technically, we could consume the message length information at
+                        * this point, but the extra write to shared memory wouldn't be
+                        * free and in most cases we would reap no benefit.
+                        */
+                       mqh->mqh_consume_pending = needed;
+                       *nbytesp = nbytes;
+                       *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
+                       return SHM_MQ_SUCCESS;
+               }
+
+               /* Consume the length word. */
+               shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
+               mqh->mqh_did_length_word = true;
+               rb -= MAXALIGN64(sizeof(uint64));
+       }
+
+       if (mqh->mqh_partial_message_bytes == 0)
+       {
+               /*
+                * Try to obtain the whole message in a single chunk.  If this works,
+                * we need not copy the data and can return a pointer directly into
+                * shared memory.
+                */
+               res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+               if (res != SHM_MQ_SUCCESS)
+                       return res;
+               if (rb >= nbytes)
+               {
+                       mqh->mqh_did_length_word = false;
+                       mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+                       *nbytesp = nbytes;
+                       *datap = rawdata;
+                       return SHM_MQ_SUCCESS;
+               }
+
+               /*
+                * The message has wrapped the buffer.  We'll need to copy it in order
+                * to return it to the client in one chunk.  First, make sure we have a
+                * large enough buffer available.
+                */
+               if (mqh->mqh_buflen < nbytes)
+               {
+                       uint64          newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+
+                       while (newbuflen < nbytes)
+                               newbuflen *= 2;
+
+                       if (mqh->mqh_buffer != NULL)
+                       {
+                               pfree(mqh->mqh_buffer);
+                               mqh->mqh_buffer = NULL;
+                               mqh->mqh_buflen = 0;
+                       }
+                       mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
+                       mqh->mqh_buflen = newbuflen;
+               }
+       }
+
+       /* Loop until we've copied the entire message. */
+       for (;;)
+       {
+               uint64  still_needed;
+
+               /* Copy as much as we can. */
+               Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
+               memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
+               mqh->mqh_partial_message_bytes += rb;
+
+               /*
+                * Update count of bytes read, with alignment padding.  Note
+                * that this will never actually insert any padding except at the
+                * end of a message, because the buffer size is a multiple of
+                * MAXIMUM_ALIGNOF, and each read and write is as well.
+                */
+               Assert(mqh->mqh_partial_message_bytes == nbytes ||
+                               rb == MAXALIGN64(rb));
+               shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+
+               /* If we got all the data, exit the loop. */
+               if (mqh->mqh_partial_message_bytes >= nbytes)
+                       break;
+
+               /* Wait for some more data. */
+               still_needed = nbytes - mqh->mqh_partial_message_bytes;
+               res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+               if (res != SHM_MQ_SUCCESS)
+                       return res;
+               if (rb > still_needed)
+                       rb = still_needed;
+       }
+
+       /* Return the complete message, and reset for next message. */
+       *nbytesp = nbytes;
+       *datap = mqh->mqh_buffer;
+       mqh->mqh_did_length_word = false;
+       mqh->mqh_partial_message_bytes = 0;
+       return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait for the other process that's supposed to use this queue to attach
+ * to it.
+ *
+ * The return value is SHM_MQ_DETACHED if the worker has already detached or
+ * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
+ * Note that we will only be able to detect that the worker has died before
+ * attaching if a background worker handle was passed to shm_mq_attach().
+ */
+shm_mq_result
+shm_mq_wait_for_attach(shm_mq_handle *mqh)
+{
+       shm_mq     *mq = mqh->mqh_queue;
+       PGPROC     **victim;
+
+       if (shm_mq_get_receiver(mq) == MyProc)
+               victim = &mq->mq_sender;
+       else
+       {
+               Assert(shm_mq_get_sender(mq) == MyProc);
+               victim = &mq->mq_receiver;
+       }
+
+       if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
+               return SHM_MQ_SUCCESS;
+       else
+               return SHM_MQ_DETACHED;
+}
+
+/*
+ * Detach a shared message queue.
+ *
+ * The purpose of this function is to make sure that the process
+ * with which we're communicating doesn't block forever waiting for us to
+ * fill or drain the queue once we've lost interest.  Whem the sender
+ * detaches, the receiver can read any messages remaining in the queue;
+ * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
+ * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ */
+void
+shm_mq_detach(shm_mq *mq)
+{
+       volatile shm_mq *vmq = mq;
+       PGPROC     *victim;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       if (vmq->mq_sender == MyProc)
+               victim = vmq->mq_receiver;
+       else
+       {
+               Assert(vmq->mq_receiver == MyProc);
+               victim = vmq->mq_sender;
+       }
+       vmq->mq_detached = true;
+       SpinLockRelease(&mq->mq_mutex);
+
+       if (victim != NULL)
+               SetLatch(&victim->procLatch);
+}
+
+/*
+ * Write bytes into a shared message queue.
+ */
+static shm_mq_result
+shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
+                                 uint64 *bytes_written)
+{
+       shm_mq     *mq = mqh->mqh_queue;
+       uint64          sent = 0;
+       uint64          used;
+       uint64          ringsize = mq->mq_ring_size;
+       uint64          available;
+
+       while (sent < nbytes)
+       {
+               bool    detached;
+               uint64  rb;
+
+               /* Compute number of ring buffer bytes used and available. */
+               rb = shm_mq_get_bytes_read(mq, &detached);
+               Assert(mq->mq_bytes_written >= rb);
+               used = mq->mq_bytes_written - rb;
+               Assert(used <= ringsize);
+               available = Min(ringsize - used, nbytes - sent);
+
+               /* Bail out if the queue has been detached. */
+               if (detached)
+                       return SHM_MQ_DETACHED;
+
+               if (available == 0)
+               {
+                       shm_mq_result   res;
+
+                       /*
+                        * The queue is full, so if the receiver isn't yet known to be
+                        * attached, we must wait for that to happen.
+                        */
+                       if (!mqh->mqh_counterparty_attached)
+                       {
+                               if (nowait)
+                               {
+                                       if (shm_mq_get_receiver(mq) == NULL)
+                                               return SHM_MQ_WOULD_BLOCK;
+                               }
+                               else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
+                                                                                          mqh->mqh_handle))
+                               {
+                                       mq->mq_detached = true;
+                                       return SHM_MQ_DETACHED;
+                               }
+                               mqh->mqh_counterparty_attached = true;
+                       }
+
+                       /* Let the receiver know that we need them to read some data. */
+                       res = shm_mq_notify_receiver(mq);
+                       if (res != SHM_MQ_SUCCESS)
+                       {
+                               *bytes_written = res;
+                               return res;
+                       }
+
+                       /* Skip manipulation of our latch if nowait = true. */
+                       if (nowait)
+                       {
+                               *bytes_written = sent;
+                               return SHM_MQ_WOULD_BLOCK;
+                       }
+
+                       /*
+                        * Wait for our latch to be set.  It might already be set for
+                        * some unrelated reason, but that'll just result in one extra
+                        * trip through the loop.  It's worth it to avoid resetting the
+                        * latch at top of loop, because setting an already-set latch is
+                        * much cheaper than setting one that has been reset.
+                        */
+                       WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+                       /* An interrupt may have occurred while we were waiting. */
+                       CHECK_FOR_INTERRUPTS();
+
+                       /* Reset the latch so we don't spin. */
+                       ResetLatch(&MyProc->procLatch);
+               }
+               else
+               {
+                       uint64  offset = mq->mq_bytes_written % ringsize;
+                       uint64  sendnow = Min(available, ringsize - offset);
+
+                       /* Write as much data as we can via a single memcpy(). */
+                       memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
+                                  (char *) data + sent, sendnow);
+                       sent += sendnow;
+
+                       /*
+                        * Update count of bytes written, with alignment padding.  Note
+                        * that this will never actually insert any padding except at the
+                        * end of a run of bytes, because the buffer size is a multiple of
+                        * MAXIMUM_ALIGNOF, and each read is as well.
+                        */
+                       Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
+                       shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+
+                       /*
+                        * For efficiency, we don't set the reader's latch here.  We'll
+                        * do that only when the buffer fills up or after writing an
+                        * entire message.
+                        */
+               }
+       }
+
+       *bytes_written = sent;
+       return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait until at least *nbytesp bytes are available to be read from the
+ * shared message queue, or until the buffer wraps around.  On return,
+ * *datap is set to the location at which data bytes can be read.  The
+ * return value is the number of bytes available to be read starting at
+ * that offset; if the message has wrapped the buffer, it may be less than
+ * bytes_needed.
+ */
+static shm_mq_result
+shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
+                                        uint64 *nbytesp, void **datap)
+{
+       uint64          used;
+       uint64          ringsize = mq->mq_ring_size;
+       uint64          written;
+
+       for (;;)
+       {
+               uint64          offset;
+               bool            detached;
+
+               /* Get bytes written, so we can compute what's available to read. */
+               written = shm_mq_get_bytes_written(mq, &detached);
+               used = written - mq->mq_bytes_read;
+               Assert(used <= ringsize);
+               offset = mq->mq_bytes_read % ringsize;
+
+               /* If we have enough data or buffer has wrapped, we're done. */
+               if (used >= bytes_needed || offset + used >= ringsize)
+               {
+                       *nbytesp = Min(used, ringsize - offset);
+                       *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+                       return SHM_MQ_SUCCESS;
+               }
+
+               /*
+                * Fall out before waiting if the queue has been detached.
+                *
+                * Note that we don't check for this until *after* considering
+                * whether the data already available is enough, since the
+                * receiver can finish receiving a message stored in the buffer
+                * even after the sender has detached.
+                */
+               if (detached)
+                       return SHM_MQ_DETACHED;
+
+               /* Skip manipulation of our latch if nowait = true. */
+               if (nowait)
+                       return SHM_MQ_WOULD_BLOCK;
+
+               /*
+                * Wait for our latch to be set.  It might already be set for
+                * some unrelated reason, but that'll just result in one extra
+                * trip through the loop.  It's worth it to avoid resetting the
+                * latch at top of loop, because setting an already-set latch is
+                * much cheaper than setting one that has been reset.
+                */
+               WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+               /* An interrupt may have occurred while we were waiting. */
+               CHECK_FOR_INTERRUPTS();
+
+               /* Reset the latch so we don't spin. */
+               ResetLatch(&MyProc->procLatch);
+       }
+}
+
+/*
+ * This is used when a process is waiting for its counterpart to attach to the
+ * queue.  We exit when the other process attaches as expected, or, if
+ * handle != NULL, when the referenced background process or the postmaster
+ * dies.  Note that if handle == NULL, and the process fails to attach, we'll
+ * potentially get stuck here forever waiting for a process that may never
+ * start.  We do check for interrupts, though.
+ *
+ * ptr is a pointer to the memory address that we're expecting to become
+ * non-NULL when our counterpart attaches to the queue.
+ */
+static bool
+shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
+                                        BackgroundWorkerHandle *handle)
+{
+       bool    save_set_latch_on_sigusr1;
+       bool    result = false;
+
+       save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
+       if (handle != NULL)
+               set_latch_on_sigusr1 = true;
+
+       PG_TRY();
+       {
+               for (;;)
+               {
+                       BgwHandleStatus status;
+                       pid_t   pid;
+                       bool    detached;
+
+                       /* Acquire the lock just long enough to check the pointer. */
+                       SpinLockAcquire(&mq->mq_mutex);
+                       detached = mq->mq_detached;
+                       result = (*ptr != NULL);
+                       SpinLockRelease(&mq->mq_mutex);
+
+                       /* Fail if detached; else succeed if initialized. */
+                       if (detached)
+                       {
+                               result = false;
+                               break;
+                       }
+                       if (result)
+                               break;
+
+                       if (handle != NULL)
+                       {
+                               /* Check for unexpected worker death. */
+                               status = GetBackgroundWorkerPid(handle, &pid);
+                               if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
+                               {
+                                       result = false;
+                                       break;
+                               }
+                       }
+
+                       /* Wait to be signalled. */
+                       WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+
+                       /* An interrupt may have occurred while we were waiting. */
+                       CHECK_FOR_INTERRUPTS();
+
+                       /* Reset the latch so we don't spin. */
+                       ResetLatch(&MyProc->procLatch);
+               }
+       }
+       PG_CATCH();
+       {
+               set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       return result;
+}
+
+/*
+ * Get the number of bytes read.  The receiver need not use this to access
+ * the count of bytes read, but the sender must.
+ */
+static uint64
+shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
+{
+       uint64  v;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       v = mq->mq_bytes_read;
+       *detached = mq->mq_detached;
+       SpinLockRelease(&mq->mq_mutex);
+
+       return v;
+}
+
+/*
+ * Increment the number of bytes read.
+ */
+static void
+shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+{
+       PGPROC     *sender;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       mq->mq_bytes_read += n;
+       sender = mq->mq_sender;
+       SpinLockRelease(&mq->mq_mutex);
+
+       /* We shoudn't have any bytes to read without a sender. */
+       Assert(sender != NULL);
+       SetLatch(&sender->procLatch);
+}
+
+/*
+ * Get the number of bytes written.  The sender need not use this to access
+ * the count of bytes written, but the reciever must.
+ */
+static uint64
+shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
+{
+       uint64  v;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       v = mq->mq_bytes_written;
+       *detached = mq->mq_detached;
+       SpinLockRelease(&mq->mq_mutex);
+
+       return v;
+}
+
+/*
+ * Increment the number of bytes written.
+ */
+static void
+shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+{
+       SpinLockAcquire(&mq->mq_mutex);
+       mq->mq_bytes_written += n;
+       SpinLockRelease(&mq->mq_mutex);
+}
+
+/*
+ * Set sender's latch, unless queue is detached.
+ */
+static shm_mq_result
+shm_mq_notify_receiver(volatile shm_mq *mq)
+{
+       PGPROC *receiver;
+       bool    detached;
+
+       SpinLockAcquire(&mq->mq_mutex);
+       detached = mq->mq_detached;
+       receiver = mq->mq_receiver;
+       SpinLockRelease(&mq->mq_mutex);
+
+       if (detached)
+               return SHM_MQ_DETACHED;
+       if (receiver)
+               SetLatch(&receiver->procLatch);
+       return SHM_MQ_SUCCESS;
+}
+
+/* Shim for on_dsm_callback. */
+static void
+shm_mq_detach_callback(dsm_segment *seg, Datum arg)
+{
+       shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
+
+       shm_mq_detach(mq);
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
new file mode 100644 (file)
index 0000000..1ce88a1
--- /dev/null
@@ -0,0 +1,70 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_mq.h
+ *       single-reader, single-writer shared memory message queue
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/shm_mq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SHM_MQ_H
+#define SHM_MQ_H
+
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/proc.h"
+
+/* The queue itself, in shared memory. */
+struct shm_mq;
+typedef struct shm_mq shm_mq;
+
+/* Backend-private state. */
+struct shm_mq_handle;
+typedef struct shm_mq_handle shm_mq_handle;
+
+/* Possible results of a send or receive operation. */
+typedef enum
+{
+       SHM_MQ_SUCCESS,                 /* Sent or received a message. */
+       SHM_MQ_WOULD_BLOCK,             /* Not completed; retry later. */
+       SHM_MQ_DETACHED                 /* Other process has detached queue. */
+} shm_mq_result;
+
+/*
+ * Primitives to create a queue and set the sender and receiver.
+ *
+ * Both the sender and the receiver must be set before any messages are read
+ * or written, but they need not be set by the same process.  Each must be
+ * set exactly once.
+ */
+extern shm_mq *shm_mq_create(void *address, Size size);
+extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
+extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
+
+/* Accessor methods for sender and receiver. */
+extern PGPROC *shm_mq_get_receiver(shm_mq *);
+extern PGPROC *shm_mq_get_sender(shm_mq *);
+
+/* Set up backend-local queue state. */
+extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
+                         BackgroundWorkerHandle *handle);
+
+/* Break connection. */
+extern void shm_mq_detach(shm_mq *);
+
+/* Send or receive messages. */
+extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
+                       uint64 nbytes, void *data, bool nowait);
+extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
+                          uint64 *nbytesp, void **datap, bool nowait);
+
+/* Wait for our counterparty to attach to the queue. */
+extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
+
+/* Smallest possible queue. */
+extern const Size shm_mq_minimum_size;
+
+#endif   /* SHM_MQ_H */