]> granicus.if.org Git - postgresql/commitdiff
shm_mq: Reduce spinlock usage.
authorRobert Haas <rhaas@postgresql.org>
Fri, 2 Mar 2018 17:16:59 +0000 (12:16 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 2 Mar 2018 17:16:59 +0000 (12:16 -0500)
Previously, mq_bytes_read and mq_bytes_written were protected by the
spinlock, but that turns out to cause pretty serious spinlock
contention on queries which send many tuples through a Gather or
Gather Merge node.  This patches changes things so that we instead
read and write those values using 8-byte atomics.  Since mq_bytes_read
can only be changed by the receiver and mq_bytes_written can only be
changed by the sender, the only purpose of the spinlock is to prevent
reads and writes of these values from being torn on platforms where
8-byte memory access is not atomic, making the conversion fairly
straightforward.

Testing shows that this produces some slowdown if we're using emulated
64-bit atomics, but since they should be available on any platform
where performance is a primary concern, that seems OK.  It's faster,
sometimes a lot faster, on platforms where such atomics are available.

Patch by me, reviewed by Andres Freund, who also suggested the
design.  Also tested by Rafia Sabih.

Discussion: http://postgr.es/m/CA+TgmoYuK0XXxmUNTFT9TSNiBtWnRwasBcHHRCOK9iYmDLQVPg@mail.gmail.com

src/backend/storage/ipc/shm_mq.c

index 5a2a16c995d6108a1d3b1d3c1ac66a73db34ce13..5e33b9dfc69f1db2b9ce0da779e0d9361a31f513 100644 (file)
  * Some notes on synchronization:
  *
  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
- * mq_sender and mq_bytes_written can only be changed by the sender.  However,
- * because most of these fields are 8 bytes and we don't assume that 8 byte
- * reads and writes are atomic, the spinlock must be taken whenever the field
- * is updated, and whenever it is read by a process other than the one allowed
- * to modify it. But the process that is allowed to modify it is also allowed
- * to read it without the lock.  On architectures where 8-byte writes are
- * atomic, we could replace these spinlocks with memory barriers, but
- * testing found no performance benefit, so it seems best to keep things
- * simple for now.
+ * mq_sender and mq_bytes_written can only be changed by the sender.
+ * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
+ * they cannot change once set, and thus may be read without a lock once this
+ * is known to be the case.
  *
- * mq_detached can be set by either the sender or the receiver, so the mutex
- * must be held to read or write it.  Memory barriers could be used here as
- * well, if needed.
+ * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
+ * they are written atomically using 8 byte loads and stores.  Memory barriers
+ * must be carefully used to synchronize reads and writes of these values with
+ * reads and writes of the actual data in mq_ring.
+ *
+ * mq_detached needs no locking.  It can be set by either the sender or the
+ * receiver, but only ever from false to true, so redundant writes don't
+ * matter.  It is important that if we set mq_detached and then set the
+ * counterparty's latch, the counterparty must be certain to see the change
+ * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
+ * ends with one, this should be OK.
  *
  * mq_ring_size and mq_ring_offset never change after initialization, and
  * can therefore be read without the lock.
  *
- * Importantly, mq_ring can be safely read and written without a lock.  Were
- * this not the case, we'd have to hold the spinlock for much longer
- * intervals, and performance might suffer.  Fortunately, that's not
- * necessary.  At any given time, the difference between mq_bytes_read and
+ * Importantly, mq_ring can be safely read and written without a lock.
+ * At any given time, the difference between mq_bytes_read and
  * mq_bytes_written defines the number of bytes within mq_ring that contain
  * unread data, and mq_bytes_read defines the position where those bytes
  * begin.  The sender can increase the number of unread bytes at any time,
@@ -71,8 +72,8 @@ struct shm_mq
        slock_t         mq_mutex;
        PGPROC     *mq_receiver;
        PGPROC     *mq_sender;
-       uint64          mq_bytes_read;
-       uint64          mq_bytes_written;
+       pg_atomic_uint64 mq_bytes_read;
+       pg_atomic_uint64 mq_bytes_written;
        Size            mq_ring_size;
        bool            mq_detached;
        uint8           mq_ring_offset;
@@ -150,11 +151,8 @@ static bool shm_mq_counterparty_gone(shm_mq *mq,
                                                 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
                                         BackgroundWorkerHandle *handle);
-static uint64 shm_mq_get_bytes_read(shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
-static uint64 shm_mq_get_bytes_written(shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
-static shm_mq_result shm_mq_notify_receiver(shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
@@ -182,8 +180,8 @@ shm_mq_create(void *address, Size size)
        SpinLockInit(&mq->mq_mutex);
        mq->mq_receiver = NULL;
        mq->mq_sender = NULL;
-       mq->mq_bytes_read = 0;
-       mq->mq_bytes_written = 0;
+       pg_atomic_init_u64(&mq->mq_bytes_read, 0);
+       pg_atomic_init_u64(&mq->mq_bytes_written, 0);
        mq->mq_ring_size = size - data_offset;
        mq->mq_detached = false;
        mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
@@ -348,6 +346,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 {
        shm_mq_result res;
        shm_mq     *mq = mqh->mqh_queue;
+       PGPROC     *receiver;
        Size            nbytes = 0;
        Size            bytes_written;
        int                     i;
@@ -488,8 +487,30 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
        mqh->mqh_partial_bytes = 0;
        mqh->mqh_length_word_complete = false;
 
+       /* If queue has been detached, let caller know. */
+       if (mq->mq_detached)
+               return SHM_MQ_DETACHED;
+
+       /*
+        * If the counterpary is known to have attached, we can read mq_receiver
+        * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
+        * more caution is needed.
+        */
+       if (mqh->mqh_counterparty_attached)
+               receiver = mq->mq_receiver;
+       else
+       {
+               SpinLockAcquire(&mq->mq_mutex);
+               receiver = mq->mq_receiver;
+               SpinLockRelease(&mq->mq_mutex);
+               if (receiver == NULL)
+                       return SHM_MQ_SUCCESS;
+               mqh->mqh_counterparty_attached = true;
+       }
+
        /* Notify receiver of the newly-written data, and return. */
-       return shm_mq_notify_receiver(mq);
+       SetLatch(&receiver->procLatch);
+       return SHM_MQ_SUCCESS;
 }
 
 /*
@@ -843,18 +864,28 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 
        while (sent < nbytes)
        {
-               bool            detached;
                uint64          rb;
+               uint64          wb;
 
                /* Compute number of ring buffer bytes used and available. */
-               rb = shm_mq_get_bytes_read(mq, &detached);
-               Assert(mq->mq_bytes_written >= rb);
-               used = mq->mq_bytes_written - rb;
+               rb = pg_atomic_read_u64(&mq->mq_bytes_read);
+               wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+               Assert(wb >= rb);
+               used = wb - rb;
                Assert(used <= ringsize);
                available = Min(ringsize - used, nbytes - sent);
 
-               /* Bail out if the queue has been detached. */
-               if (detached)
+               /*
+                * Bail out if the queue has been detached.  Note that we would be in
+                * trouble if the compiler decided to cache the value of
+                * mq->mq_detached in a register or on the stack across loop
+                * iterations.  It probably shouldn't do that anyway since we'll
+                * always return, call an external function that performs a system
+                * call, or reach a memory barrier at some point later in the loop,
+                * but just to be sure, insert a compiler barrier here.
+                */
+               pg_compiler_barrier();
+               if (mq->mq_detached)
                {
                        *bytes_written = sent;
                        return SHM_MQ_DETACHED;
@@ -895,15 +926,13 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                }
                else if (available == 0)
                {
-                       shm_mq_result res;
-
-                       /* Let the receiver know that we need them to read some data. */
-                       res = shm_mq_notify_receiver(mq);
-                       if (res != SHM_MQ_SUCCESS)
-                       {
-                               *bytes_written = sent;
-                               return res;
-                       }
+                       /*
+                        * Since mq->mqh_counterparty_attached is known to be true at this
+                        * point, mq_receiver has been set, and it can't change once set.
+                        * Therefore, we can read it without acquiring the spinlock.
+                        */
+                       Assert(mqh->mqh_counterparty_attached);
+                       SetLatch(&mq->mq_receiver->procLatch);
 
                        /* Skip manipulation of our latch if nowait = true. */
                        if (nowait)
@@ -929,10 +958,20 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                }
                else
                {
-                       Size            offset = mq->mq_bytes_written % (uint64) ringsize;
-                       Size            sendnow = Min(available, ringsize - offset);
+                       Size            offset;
+                       Size            sendnow;
 
-                       /* Write as much data as we can via a single memcpy(). */
+                       offset = wb % (uint64) ringsize;
+                       sendnow = Min(available, ringsize - offset);
+
+                       /*
+                        * Write as much data as we can via a single memcpy(). Make sure
+                        * these writes happen after the read of mq_bytes_read, above.
+                        * This barrier pairs with the one in shm_mq_inc_bytes_read.
+                        * (Since we're separating the read of mq_bytes_read from a
+                        * subsequent write to mq_ring, we need a full barrier here.)
+                        */
+                       pg_memory_barrier();
                        memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
                                   (char *) data + sent, sendnow);
                        sent += sendnow;
@@ -978,19 +1017,27 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
        for (;;)
        {
                Size            offset;
-               bool            detached;
+               uint64          read;
 
                /* Get bytes written, so we can compute what's available to read. */
-               written = shm_mq_get_bytes_written(mq, &detached);
-               used = written - mq->mq_bytes_read;
+               written = pg_atomic_read_u64(&mq->mq_bytes_written);
+               read = pg_atomic_read_u64(&mq->mq_bytes_read);
+               used = written - read;
                Assert(used <= ringsize);
-               offset = mq->mq_bytes_read % (uint64) ringsize;
+               offset = read % (uint64) ringsize;
 
                /* If we have enough data or buffer has wrapped, we're done. */
                if (used >= bytes_needed || offset + used >= ringsize)
                {
                        *nbytesp = Min(used, ringsize - offset);
                        *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+
+                       /*
+                        * Separate the read of mq_bytes_written, above, from caller's
+                        * attempt to read the data itself.  Pairs with the barrier in
+                        * shm_mq_inc_bytes_written.
+                        */
+                       pg_read_barrier();
                        return SHM_MQ_SUCCESS;
                }
 
@@ -1002,7 +1049,7 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
                 * receiving a message stored in the buffer even after the sender has
                 * detached.
                 */
-               if (detached)
+               if (mq->mq_detached)
                        return SHM_MQ_DETACHED;
 
                /* Skip manipulation of our latch if nowait = true. */
@@ -1032,16 +1079,10 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 static bool
 shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
 {
-       bool            detached;
        pid_t           pid;
 
-       /* Acquire the lock just long enough to check the pointer. */
-       SpinLockAcquire(&mq->mq_mutex);
-       detached = mq->mq_detached;
-       SpinLockRelease(&mq->mq_mutex);
-
        /* If the queue has been detached, counterparty is definitely gone. */
-       if (detached)
+       if (mq->mq_detached)
                return true;
 
        /* If there's a handle, check worker status. */
@@ -1054,9 +1095,7 @@ shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
                if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
                {
                        /* Mark it detached, just to make it official. */
-                       SpinLockAcquire(&mq->mq_mutex);
                        mq->mq_detached = true;
-                       SpinLockRelease(&mq->mq_mutex);
                        return true;
                }
        }
@@ -1085,16 +1124,14 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
        {
                BgwHandleStatus status;
                pid_t           pid;
-               bool            detached;
 
                /* Acquire the lock just long enough to check the pointer. */
                SpinLockAcquire(&mq->mq_mutex);
-               detached = mq->mq_detached;
                result = (*ptr != NULL);
                SpinLockRelease(&mq->mq_mutex);
 
                /* Fail if detached; else succeed if initialized. */
-               if (detached)
+               if (mq->mq_detached)
                {
                        result = false;
                        break;
@@ -1126,23 +1163,6 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
        return result;
 }
 
-/*
- * Get the number of bytes read.  The receiver need not use this to access
- * the count of bytes read, but the sender must.
- */
-static uint64
-shm_mq_get_bytes_read(shm_mq *mq, bool *detached)
-{
-       uint64          v;
-
-       SpinLockAcquire(&mq->mq_mutex);
-       v = mq->mq_bytes_read;
-       *detached = mq->mq_detached;
-       SpinLockRelease(&mq->mq_mutex);
-
-       return v;
-}
-
 /*
  * Increment the number of bytes read.
  */
@@ -1151,63 +1171,50 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n)
 {
        PGPROC     *sender;
 
-       SpinLockAcquire(&mq->mq_mutex);
-       mq->mq_bytes_read += n;
+       /*
+        * Separate prior reads of mq_ring from the increment of mq_bytes_read
+        * which follows.  Pairs with the full barrier in shm_mq_send_bytes(). We
+        * only need a read barrier here because the increment of mq_bytes_read is
+        * actually a read followed by a dependent write.
+        */
+       pg_read_barrier();
+
+       /*
+        * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+        * else can be changing this value.  This method should be cheaper.
+        */
+       pg_atomic_write_u64(&mq->mq_bytes_read,
+                                               pg_atomic_read_u64(&mq->mq_bytes_read) + n);
+
+       /*
+        * We shouldn't have any bytes to read without a sender, so we can read
+        * mq_sender here without a lock.  Once it's initialized, it can't change.
+        */
        sender = mq->mq_sender;
-       SpinLockRelease(&mq->mq_mutex);
-
-       /* We shouldn't have any bytes to read without a sender. */
        Assert(sender != NULL);
        SetLatch(&sender->procLatch);
 }
 
-/*
- * Get the number of bytes written.  The sender need not use this to access
- * the count of bytes written, but the receiver must.
- */
-static uint64
-shm_mq_get_bytes_written(shm_mq *mq, bool *detached)
-{
-       uint64          v;
-
-       SpinLockAcquire(&mq->mq_mutex);
-       v = mq->mq_bytes_written;
-       *detached = mq->mq_detached;
-       SpinLockRelease(&mq->mq_mutex);
-
-       return v;
-}
-
 /*
  * Increment the number of bytes written.
  */
 static void
 shm_mq_inc_bytes_written(shm_mq *mq, Size n)
 {
-       SpinLockAcquire(&mq->mq_mutex);
-       mq->mq_bytes_written += n;
-       SpinLockRelease(&mq->mq_mutex);
-}
-
-/*
- * Set receiver's latch, unless queue is detached.
- */
-static shm_mq_result
-shm_mq_notify_receiver(shm_mq *mq)
-{
-       PGPROC     *receiver;
-       bool            detached;
-
-       SpinLockAcquire(&mq->mq_mutex);
-       detached = mq->mq_detached;
-       receiver = mq->mq_receiver;
-       SpinLockRelease(&mq->mq_mutex);
-
-       if (detached)
-               return SHM_MQ_DETACHED;
-       if (receiver)
-               SetLatch(&receiver->procLatch);
-       return SHM_MQ_SUCCESS;
+       /*
+        * Separate prior reads of mq_ring from the write of mq_bytes_written
+        * which we're about to do.  Pairs with the read barrier found in
+        * shm_mq_get_receive_bytes.
+        */
+       pg_write_barrier();
+
+       /*
+        * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+        * else can be changing this value.  This method avoids taking the bus
+        * lock unnecessarily.
+        */
+       pg_atomic_write_u64(&mq->mq_bytes_written,
+                                               pg_atomic_read_u64(&mq->mq_bytes_written) + n);
 }
 
 /* Shim for on_dsm_callback. */