From 34db06ef9a1d7f36391c64293bf1e0ce44a33915 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 2 Mar 2018 12:16:59 -0500 Subject: [PATCH] shm_mq: Reduce spinlock usage. Previously, mq_bytes_read and mq_bytes_written were protected by the spinlock, but that turns out to cause pretty serious spinlock contention on queries which send many tuples through a Gather or Gather Merge node. This patches changes things so that we instead read and write those values using 8-byte atomics. Since mq_bytes_read can only be changed by the receiver and mq_bytes_written can only be changed by the sender, the only purpose of the spinlock is to prevent reads and writes of these values from being torn on platforms where 8-byte memory access is not atomic, making the conversion fairly straightforward. Testing shows that this produces some slowdown if we're using emulated 64-bit atomics, but since they should be available on any platform where performance is a primary concern, that seems OK. It's faster, sometimes a lot faster, on platforms where such atomics are available. Patch by me, reviewed by Andres Freund, who also suggested the design. Also tested by Rafia Sabih. Discussion: http://postgr.es/m/CA+TgmoYuK0XXxmUNTFT9TSNiBtWnRwasBcHHRCOK9iYmDLQVPg@mail.gmail.com --- src/backend/storage/ipc/shm_mq.c | 251 ++++++++++++++++--------------- 1 file changed, 129 insertions(+), 122 deletions(-) diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 5a2a16c995..5e33b9dfc6 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -31,27 +31,28 @@ * Some notes on synchronization: * * mq_receiver and mq_bytes_read can only be changed by the receiver; and - * mq_sender and mq_bytes_written can only be changed by the sender. However, - * because most of these fields are 8 bytes and we don't assume that 8 byte - * reads and writes are atomic, the spinlock must be taken whenever the field - * is updated, and whenever it is read by a process other than the one allowed - * to modify it. But the process that is allowed to modify it is also allowed - * to read it without the lock. On architectures where 8-byte writes are - * atomic, we could replace these spinlocks with memory barriers, but - * testing found no performance benefit, so it seems best to keep things - * simple for now. + * mq_sender and mq_bytes_written can only be changed by the sender. + * mq_receiver and mq_sender are protected by mq_mutex, although, importantly, + * they cannot change once set, and thus may be read without a lock once this + * is known to be the case. * - * mq_detached can be set by either the sender or the receiver, so the mutex - * must be held to read or write it. Memory barriers could be used here as - * well, if needed. + * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead, + * they are written atomically using 8 byte loads and stores. Memory barriers + * must be carefully used to synchronize reads and writes of these values with + * reads and writes of the actual data in mq_ring. + * + * mq_detached needs no locking. It can be set by either the sender or the + * receiver, but only ever from false to true, so redundant writes don't + * matter. It is important that if we set mq_detached and then set the + * counterparty's latch, the counterparty must be certain to see the change + * after waking up. Since SetLatch begins with a memory barrier and ResetLatch + * ends with one, this should be OK. * * mq_ring_size and mq_ring_offset never change after initialization, and * can therefore be read without the lock. * - * Importantly, mq_ring can be safely read and written without a lock. Were - * this not the case, we'd have to hold the spinlock for much longer - * intervals, and performance might suffer. Fortunately, that's not - * necessary. At any given time, the difference between mq_bytes_read and + * Importantly, mq_ring can be safely read and written without a lock. + * At any given time, the difference between mq_bytes_read and * mq_bytes_written defines the number of bytes within mq_ring that contain * unread data, and mq_bytes_read defines the position where those bytes * begin. The sender can increase the number of unread bytes at any time, @@ -71,8 +72,8 @@ struct shm_mq slock_t mq_mutex; PGPROC *mq_receiver; PGPROC *mq_sender; - uint64 mq_bytes_read; - uint64 mq_bytes_written; + pg_atomic_uint64 mq_bytes_read; + pg_atomic_uint64 mq_bytes_written; Size mq_ring_size; bool mq_detached; uint8 mq_ring_offset; @@ -150,11 +151,8 @@ static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle); static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle); -static uint64 shm_mq_get_bytes_read(shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); -static uint64 shm_mq_get_bytes_written(shm_mq *mq, bool *detached); static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); -static shm_mq_result shm_mq_notify_receiver(shm_mq *mq); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); /* Minimum queue size is enough for header and at least one chunk of data. */ @@ -182,8 +180,8 @@ shm_mq_create(void *address, Size size) SpinLockInit(&mq->mq_mutex); mq->mq_receiver = NULL; mq->mq_sender = NULL; - mq->mq_bytes_read = 0; - mq->mq_bytes_written = 0; + pg_atomic_init_u64(&mq->mq_bytes_read, 0); + pg_atomic_init_u64(&mq->mq_bytes_written, 0); mq->mq_ring_size = size - data_offset; mq->mq_detached = false; mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); @@ -348,6 +346,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) { shm_mq_result res; shm_mq *mq = mqh->mqh_queue; + PGPROC *receiver; Size nbytes = 0; Size bytes_written; int i; @@ -488,8 +487,30 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) mqh->mqh_partial_bytes = 0; mqh->mqh_length_word_complete = false; + /* If queue has been detached, let caller know. */ + if (mq->mq_detached) + return SHM_MQ_DETACHED; + + /* + * If the counterpary is known to have attached, we can read mq_receiver + * without acquiring the spinlock and assume it isn't NULL. Otherwise, + * more caution is needed. + */ + if (mqh->mqh_counterparty_attached) + receiver = mq->mq_receiver; + else + { + SpinLockAcquire(&mq->mq_mutex); + receiver = mq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + if (receiver == NULL) + return SHM_MQ_SUCCESS; + mqh->mqh_counterparty_attached = true; + } + /* Notify receiver of the newly-written data, and return. */ - return shm_mq_notify_receiver(mq); + SetLatch(&receiver->procLatch); + return SHM_MQ_SUCCESS; } /* @@ -843,18 +864,28 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, while (sent < nbytes) { - bool detached; uint64 rb; + uint64 wb; /* Compute number of ring buffer bytes used and available. */ - rb = shm_mq_get_bytes_read(mq, &detached); - Assert(mq->mq_bytes_written >= rb); - used = mq->mq_bytes_written - rb; + rb = pg_atomic_read_u64(&mq->mq_bytes_read); + wb = pg_atomic_read_u64(&mq->mq_bytes_written); + Assert(wb >= rb); + used = wb - rb; Assert(used <= ringsize); available = Min(ringsize - used, nbytes - sent); - /* Bail out if the queue has been detached. */ - if (detached) + /* + * Bail out if the queue has been detached. Note that we would be in + * trouble if the compiler decided to cache the value of + * mq->mq_detached in a register or on the stack across loop + * iterations. It probably shouldn't do that anyway since we'll + * always return, call an external function that performs a system + * call, or reach a memory barrier at some point later in the loop, + * but just to be sure, insert a compiler barrier here. + */ + pg_compiler_barrier(); + if (mq->mq_detached) { *bytes_written = sent; return SHM_MQ_DETACHED; @@ -895,15 +926,13 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, } else if (available == 0) { - shm_mq_result res; - - /* Let the receiver know that we need them to read some data. */ - res = shm_mq_notify_receiver(mq); - if (res != SHM_MQ_SUCCESS) - { - *bytes_written = sent; - return res; - } + /* + * Since mq->mqh_counterparty_attached is known to be true at this + * point, mq_receiver has been set, and it can't change once set. + * Therefore, we can read it without acquiring the spinlock. + */ + Assert(mqh->mqh_counterparty_attached); + SetLatch(&mq->mq_receiver->procLatch); /* Skip manipulation of our latch if nowait = true. */ if (nowait) @@ -929,10 +958,20 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, } else { - Size offset = mq->mq_bytes_written % (uint64) ringsize; - Size sendnow = Min(available, ringsize - offset); + Size offset; + Size sendnow; - /* Write as much data as we can via a single memcpy(). */ + offset = wb % (uint64) ringsize; + sendnow = Min(available, ringsize - offset); + + /* + * Write as much data as we can via a single memcpy(). Make sure + * these writes happen after the read of mq_bytes_read, above. + * This barrier pairs with the one in shm_mq_inc_bytes_read. + * (Since we're separating the read of mq_bytes_read from a + * subsequent write to mq_ring, we need a full barrier here.) + */ + pg_memory_barrier(); memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow); sent += sendnow; @@ -978,19 +1017,27 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, for (;;) { Size offset; - bool detached; + uint64 read; /* Get bytes written, so we can compute what's available to read. */ - written = shm_mq_get_bytes_written(mq, &detached); - used = written - mq->mq_bytes_read; + written = pg_atomic_read_u64(&mq->mq_bytes_written); + read = pg_atomic_read_u64(&mq->mq_bytes_read); + used = written - read; Assert(used <= ringsize); - offset = mq->mq_bytes_read % (uint64) ringsize; + offset = read % (uint64) ringsize; /* If we have enough data or buffer has wrapped, we're done. */ if (used >= bytes_needed || offset + used >= ringsize) { *nbytesp = Min(used, ringsize - offset); *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; + + /* + * Separate the read of mq_bytes_written, above, from caller's + * attempt to read the data itself. Pairs with the barrier in + * shm_mq_inc_bytes_written. + */ + pg_read_barrier(); return SHM_MQ_SUCCESS; } @@ -1002,7 +1049,7 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, * receiving a message stored in the buffer even after the sender has * detached. */ - if (detached) + if (mq->mq_detached) return SHM_MQ_DETACHED; /* Skip manipulation of our latch if nowait = true. */ @@ -1032,16 +1079,10 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) { - bool detached; pid_t pid; - /* Acquire the lock just long enough to check the pointer. */ - SpinLockAcquire(&mq->mq_mutex); - detached = mq->mq_detached; - SpinLockRelease(&mq->mq_mutex); - /* If the queue has been detached, counterparty is definitely gone. */ - if (detached) + if (mq->mq_detached) return true; /* If there's a handle, check worker status. */ @@ -1054,9 +1095,7 @@ shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) { /* Mark it detached, just to make it official. */ - SpinLockAcquire(&mq->mq_mutex); mq->mq_detached = true; - SpinLockRelease(&mq->mq_mutex); return true; } } @@ -1085,16 +1124,14 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) { BgwHandleStatus status; pid_t pid; - bool detached; /* Acquire the lock just long enough to check the pointer. */ SpinLockAcquire(&mq->mq_mutex); - detached = mq->mq_detached; result = (*ptr != NULL); SpinLockRelease(&mq->mq_mutex); /* Fail if detached; else succeed if initialized. */ - if (detached) + if (mq->mq_detached) { result = false; break; @@ -1126,23 +1163,6 @@ shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) return result; } -/* - * Get the number of bytes read. The receiver need not use this to access - * the count of bytes read, but the sender must. - */ -static uint64 -shm_mq_get_bytes_read(shm_mq *mq, bool *detached) -{ - uint64 v; - - SpinLockAcquire(&mq->mq_mutex); - v = mq->mq_bytes_read; - *detached = mq->mq_detached; - SpinLockRelease(&mq->mq_mutex); - - return v; -} - /* * Increment the number of bytes read. */ @@ -1151,63 +1171,50 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n) { PGPROC *sender; - SpinLockAcquire(&mq->mq_mutex); - mq->mq_bytes_read += n; + /* + * Separate prior reads of mq_ring from the increment of mq_bytes_read + * which follows. Pairs with the full barrier in shm_mq_send_bytes(). We + * only need a read barrier here because the increment of mq_bytes_read is + * actually a read followed by a dependent write. + */ + pg_read_barrier(); + + /* + * There's no need to use pg_atomic_fetch_add_u64 here, because nobody + * else can be changing this value. This method should be cheaper. + */ + pg_atomic_write_u64(&mq->mq_bytes_read, + pg_atomic_read_u64(&mq->mq_bytes_read) + n); + + /* + * We shouldn't have any bytes to read without a sender, so we can read + * mq_sender here without a lock. Once it's initialized, it can't change. + */ sender = mq->mq_sender; - SpinLockRelease(&mq->mq_mutex); - - /* We shouldn't have any bytes to read without a sender. */ Assert(sender != NULL); SetLatch(&sender->procLatch); } -/* - * Get the number of bytes written. The sender need not use this to access - * the count of bytes written, but the receiver must. - */ -static uint64 -shm_mq_get_bytes_written(shm_mq *mq, bool *detached) -{ - uint64 v; - - SpinLockAcquire(&mq->mq_mutex); - v = mq->mq_bytes_written; - *detached = mq->mq_detached; - SpinLockRelease(&mq->mq_mutex); - - return v; -} - /* * Increment the number of bytes written. */ static void shm_mq_inc_bytes_written(shm_mq *mq, Size n) { - SpinLockAcquire(&mq->mq_mutex); - mq->mq_bytes_written += n; - SpinLockRelease(&mq->mq_mutex); -} - -/* - * Set receiver's latch, unless queue is detached. - */ -static shm_mq_result -shm_mq_notify_receiver(shm_mq *mq) -{ - PGPROC *receiver; - bool detached; - - SpinLockAcquire(&mq->mq_mutex); - detached = mq->mq_detached; - receiver = mq->mq_receiver; - SpinLockRelease(&mq->mq_mutex); - - if (detached) - return SHM_MQ_DETACHED; - if (receiver) - SetLatch(&receiver->procLatch); - return SHM_MQ_SUCCESS; + /* + * Separate prior reads of mq_ring from the write of mq_bytes_written + * which we're about to do. Pairs with the read barrier found in + * shm_mq_get_receive_bytes. + */ + pg_write_barrier(); + + /* + * There's no need to use pg_atomic_fetch_add_u64 here, because nobody + * else can be changing this value. This method avoids taking the bus + * lock unnecessarily. + */ + pg_atomic_write_u64(&mq->mq_bytes_written, + pg_atomic_read_u64(&mq->mq_bytes_written) + n); } /* Shim for on_dsm_callback. */ -- 2.40.0