* Some notes on synchronization:
*
* mq_receiver and mq_bytes_read can only be changed by the receiver; and
- * mq_sender and mq_bytes_written can only be changed by the sender. However,
- * because most of these fields are 8 bytes and we don't assume that 8 byte
- * reads and writes are atomic, the spinlock must be taken whenever the field
- * is updated, and whenever it is read by a process other than the one allowed
- * to modify it. But the process that is allowed to modify it is also allowed
- * to read it without the lock. On architectures where 8-byte writes are
- * atomic, we could replace these spinlocks with memory barriers, but
- * testing found no performance benefit, so it seems best to keep things
- * simple for now.
+ * mq_sender and mq_bytes_written can only be changed by the sender.
+ * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
+ * they cannot change once set, and thus may be read without a lock once this
+ * is known to be the case.
*
- * mq_detached can be set by either the sender or the receiver, so the mutex
- * must be held to read or write it. Memory barriers could be used here as
- * well, if needed.
+ * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
+ * they are written atomically using 8 byte loads and stores. Memory barriers
+ * must be carefully used to synchronize reads and writes of these values with
+ * reads and writes of the actual data in mq_ring.
+ *
+ * mq_detached needs no locking. It can be set by either the sender or the
+ * receiver, but only ever from false to true, so redundant writes don't
+ * matter. It is important that if we set mq_detached and then set the
+ * counterparty's latch, the counterparty must be certain to see the change
+ * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
+ * ends with one, this should be OK.
*
* mq_ring_size and mq_ring_offset never change after initialization, and
* can therefore be read without the lock.
*
- * Importantly, mq_ring can be safely read and written without a lock. Were
- * this not the case, we'd have to hold the spinlock for much longer
- * intervals, and performance might suffer. Fortunately, that's not
- * necessary. At any given time, the difference between mq_bytes_read and
+ * Importantly, mq_ring can be safely read and written without a lock.
+ * At any given time, the difference between mq_bytes_read and
* mq_bytes_written defines the number of bytes within mq_ring that contain
* unread data, and mq_bytes_read defines the position where those bytes
* begin. The sender can increase the number of unread bytes at any time,
slock_t mq_mutex;
PGPROC *mq_receiver;
PGPROC *mq_sender;
- uint64 mq_bytes_read;
- uint64 mq_bytes_written;
+ pg_atomic_uint64 mq_bytes_read;
+ pg_atomic_uint64 mq_bytes_written;
Size mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
BackgroundWorkerHandle *handle);
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
BackgroundWorkerHandle *handle);
-static uint64 shm_mq_get_bytes_read(shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
-static uint64 shm_mq_get_bytes_written(shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
-static shm_mq_result shm_mq_notify_receiver(shm_mq *mq);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
/* Minimum queue size is enough for header and at least one chunk of data. */
SpinLockInit(&mq->mq_mutex);
mq->mq_receiver = NULL;
mq->mq_sender = NULL;
- mq->mq_bytes_read = 0;
- mq->mq_bytes_written = 0;
+ pg_atomic_init_u64(&mq->mq_bytes_read, 0);
+ pg_atomic_init_u64(&mq->mq_bytes_written, 0);
mq->mq_ring_size = size - data_offset;
mq->mq_detached = false;
mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
{
shm_mq_result res;
shm_mq *mq = mqh->mqh_queue;
+ PGPROC *receiver;
Size nbytes = 0;
Size bytes_written;
int i;
mqh->mqh_partial_bytes = 0;
mqh->mqh_length_word_complete = false;
+ /* If queue has been detached, let caller know. */
+ if (mq->mq_detached)
+ return SHM_MQ_DETACHED;
+
+ /*
+ * If the counterpary is known to have attached, we can read mq_receiver
+ * without acquiring the spinlock and assume it isn't NULL. Otherwise,
+ * more caution is needed.
+ */
+ if (mqh->mqh_counterparty_attached)
+ receiver = mq->mq_receiver;
+ else
+ {
+ SpinLockAcquire(&mq->mq_mutex);
+ receiver = mq->mq_receiver;
+ SpinLockRelease(&mq->mq_mutex);
+ if (receiver == NULL)
+ return SHM_MQ_SUCCESS;
+ mqh->mqh_counterparty_attached = true;
+ }
+
/* Notify receiver of the newly-written data, and return. */
- return shm_mq_notify_receiver(mq);
+ SetLatch(&receiver->procLatch);
+ return SHM_MQ_SUCCESS;
}
/*
while (sent < nbytes)
{
- bool detached;
uint64 rb;
+ uint64 wb;
/* Compute number of ring buffer bytes used and available. */
- rb = shm_mq_get_bytes_read(mq, &detached);
- Assert(mq->mq_bytes_written >= rb);
- used = mq->mq_bytes_written - rb;
+ rb = pg_atomic_read_u64(&mq->mq_bytes_read);
+ wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+ Assert(wb >= rb);
+ used = wb - rb;
Assert(used <= ringsize);
available = Min(ringsize - used, nbytes - sent);
- /* Bail out if the queue has been detached. */
- if (detached)
+ /*
+ * Bail out if the queue has been detached. Note that we would be in
+ * trouble if the compiler decided to cache the value of
+ * mq->mq_detached in a register or on the stack across loop
+ * iterations. It probably shouldn't do that anyway since we'll
+ * always return, call an external function that performs a system
+ * call, or reach a memory barrier at some point later in the loop,
+ * but just to be sure, insert a compiler barrier here.
+ */
+ pg_compiler_barrier();
+ if (mq->mq_detached)
{
*bytes_written = sent;
return SHM_MQ_DETACHED;
}
else if (available == 0)
{
- shm_mq_result res;
-
- /* Let the receiver know that we need them to read some data. */
- res = shm_mq_notify_receiver(mq);
- if (res != SHM_MQ_SUCCESS)
- {
- *bytes_written = sent;
- return res;
- }
+ /*
+ * Since mq->mqh_counterparty_attached is known to be true at this
+ * point, mq_receiver has been set, and it can't change once set.
+ * Therefore, we can read it without acquiring the spinlock.
+ */
+ Assert(mqh->mqh_counterparty_attached);
+ SetLatch(&mq->mq_receiver->procLatch);
/* Skip manipulation of our latch if nowait = true. */
if (nowait)
}
else
{
- Size offset = mq->mq_bytes_written % (uint64) ringsize;
- Size sendnow = Min(available, ringsize - offset);
+ Size offset;
+ Size sendnow;
- /* Write as much data as we can via a single memcpy(). */
+ offset = wb % (uint64) ringsize;
+ sendnow = Min(available, ringsize - offset);
+
+ /*
+ * Write as much data as we can via a single memcpy(). Make sure
+ * these writes happen after the read of mq_bytes_read, above.
+ * This barrier pairs with the one in shm_mq_inc_bytes_read.
+ * (Since we're separating the read of mq_bytes_read from a
+ * subsequent write to mq_ring, we need a full barrier here.)
+ */
+ pg_memory_barrier();
memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
(char *) data + sent, sendnow);
sent += sendnow;
for (;;)
{
Size offset;
- bool detached;
+ uint64 read;
/* Get bytes written, so we can compute what's available to read. */
- written = shm_mq_get_bytes_written(mq, &detached);
- used = written - mq->mq_bytes_read;
+ written = pg_atomic_read_u64(&mq->mq_bytes_written);
+ read = pg_atomic_read_u64(&mq->mq_bytes_read);
+ used = written - read;
Assert(used <= ringsize);
- offset = mq->mq_bytes_read % (uint64) ringsize;
+ offset = read % (uint64) ringsize;
/* If we have enough data or buffer has wrapped, we're done. */
if (used >= bytes_needed || offset + used >= ringsize)
{
*nbytesp = Min(used, ringsize - offset);
*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+
+ /*
+ * Separate the read of mq_bytes_written, above, from caller's
+ * attempt to read the data itself. Pairs with the barrier in
+ * shm_mq_inc_bytes_written.
+ */
+ pg_read_barrier();
return SHM_MQ_SUCCESS;
}
* receiving a message stored in the buffer even after the sender has
* detached.
*/
- if (detached)
+ if (mq->mq_detached)
return SHM_MQ_DETACHED;
/* Skip manipulation of our latch if nowait = true. */
static bool
shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
{
- bool detached;
pid_t pid;
- /* Acquire the lock just long enough to check the pointer. */
- SpinLockAcquire(&mq->mq_mutex);
- detached = mq->mq_detached;
- SpinLockRelease(&mq->mq_mutex);
-
/* If the queue has been detached, counterparty is definitely gone. */
- if (detached)
+ if (mq->mq_detached)
return true;
/* If there's a handle, check worker status. */
if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
{
/* Mark it detached, just to make it official. */
- SpinLockAcquire(&mq->mq_mutex);
mq->mq_detached = true;
- SpinLockRelease(&mq->mq_mutex);
return true;
}
}
{
BgwHandleStatus status;
pid_t pid;
- bool detached;
/* Acquire the lock just long enough to check the pointer. */
SpinLockAcquire(&mq->mq_mutex);
- detached = mq->mq_detached;
result = (*ptr != NULL);
SpinLockRelease(&mq->mq_mutex);
/* Fail if detached; else succeed if initialized. */
- if (detached)
+ if (mq->mq_detached)
{
result = false;
break;
return result;
}
-/*
- * Get the number of bytes read. The receiver need not use this to access
- * the count of bytes read, but the sender must.
- */
-static uint64
-shm_mq_get_bytes_read(shm_mq *mq, bool *detached)
-{
- uint64 v;
-
- SpinLockAcquire(&mq->mq_mutex);
- v = mq->mq_bytes_read;
- *detached = mq->mq_detached;
- SpinLockRelease(&mq->mq_mutex);
-
- return v;
-}
-
/*
* Increment the number of bytes read.
*/
{
PGPROC *sender;
- SpinLockAcquire(&mq->mq_mutex);
- mq->mq_bytes_read += n;
+ /*
+ * Separate prior reads of mq_ring from the increment of mq_bytes_read
+ * which follows. Pairs with the full barrier in shm_mq_send_bytes(). We
+ * only need a read barrier here because the increment of mq_bytes_read is
+ * actually a read followed by a dependent write.
+ */
+ pg_read_barrier();
+
+ /*
+ * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+ * else can be changing this value. This method should be cheaper.
+ */
+ pg_atomic_write_u64(&mq->mq_bytes_read,
+ pg_atomic_read_u64(&mq->mq_bytes_read) + n);
+
+ /*
+ * We shouldn't have any bytes to read without a sender, so we can read
+ * mq_sender here without a lock. Once it's initialized, it can't change.
+ */
sender = mq->mq_sender;
- SpinLockRelease(&mq->mq_mutex);
-
- /* We shouldn't have any bytes to read without a sender. */
Assert(sender != NULL);
SetLatch(&sender->procLatch);
}
-/*
- * Get the number of bytes written. The sender need not use this to access
- * the count of bytes written, but the receiver must.
- */
-static uint64
-shm_mq_get_bytes_written(shm_mq *mq, bool *detached)
-{
- uint64 v;
-
- SpinLockAcquire(&mq->mq_mutex);
- v = mq->mq_bytes_written;
- *detached = mq->mq_detached;
- SpinLockRelease(&mq->mq_mutex);
-
- return v;
-}
-
/*
* Increment the number of bytes written.
*/
static void
shm_mq_inc_bytes_written(shm_mq *mq, Size n)
{
- SpinLockAcquire(&mq->mq_mutex);
- mq->mq_bytes_written += n;
- SpinLockRelease(&mq->mq_mutex);
-}
-
-/*
- * Set receiver's latch, unless queue is detached.
- */
-static shm_mq_result
-shm_mq_notify_receiver(shm_mq *mq)
-{
- PGPROC *receiver;
- bool detached;
-
- SpinLockAcquire(&mq->mq_mutex);
- detached = mq->mq_detached;
- receiver = mq->mq_receiver;
- SpinLockRelease(&mq->mq_mutex);
-
- if (detached)
- return SHM_MQ_DETACHED;
- if (receiver)
- SetLatch(&receiver->procLatch);
- return SHM_MQ_SUCCESS;
+ /*
+ * Separate prior reads of mq_ring from the write of mq_bytes_written
+ * which we're about to do. Pairs with the read barrier found in
+ * shm_mq_get_receive_bytes.
+ */
+ pg_write_barrier();
+
+ /*
+ * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+ * else can be changing this value. This method avoids taking the bus
+ * lock unnecessarily.
+ */
+ pg_atomic_write_u64(&mq->mq_bytes_written,
+ pg_atomic_read_u64(&mq->mq_bytes_written) + n);
}
/* Shim for on_dsm_callback. */