From 3bd261ca18c67eafe18088e58fab511e3b965418 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 18 Mar 2014 11:19:13 -0400 Subject: [PATCH] Improve shm_mq portability around MAXIMUM_ALIGNOF and sizeof(Size). Revise the original decision to expose a uint64-based interface and use Size everywhere possible. Avoid assuming that MAXIMUM_ALIGNOF is 8, or making any assumption about the relationship between that value and sizeof(Size). If MAXIMUM_ALIGNOF is bigger, we'll now insert padding after the length word; if it's smaller, we are now prepared to read and write the length word in chunks. Per discussion with Tom Lane. --- src/backend/storage/ipc/shm_mq.c | 251 +++++++++++++++++++------------ src/include/storage/shm_mq.h | 4 +- 2 files changed, 154 insertions(+), 101 deletions(-) diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 2d298a3598..b31f4fb693 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -72,7 +72,7 @@ struct shm_mq PGPROC *mq_sender; uint64 mq_bytes_read; uint64 mq_bytes_written; - uint64 mq_ring_size; + Size mq_ring_size; bool mq_detached; uint8 mq_ring_offset; char mq_ring[FLEXIBLE_ARRAY_MEMBER]; @@ -103,15 +103,16 @@ struct shm_mq * locally by copying the chunks into a backend-local buffer. mqh_buffer is * the buffer, and mqh_buflen is the number of bytes allocated for it. * - * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word + * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete * are used to track the state of non-blocking operations. When the caller * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they * are expected to retry the call at a later time with the same argument; * we need to retain enough state to pick up where we left off. - * mqh_did_length_word tracks whether we read or wrote the length word, - * mqh_partial_message_bytes tracks the number of payload bytes read or - * written, and mqh_expected_bytes - which is used only for reads - tracks - * the expected total size of the payload. + * mqh_length_word_complete tracks whether we are done sending or receiving + * (whichever we're doing) the entire length word. mqh_partial_bytes tracks + * the number of bytes read or written for either the length word or the + * message itself, and mqh_expected_bytes - which is used only for reads - + * tracks the expected total size of the payload. * * mqh_counterparty_attached tracks whether we know the counterparty to have * attached to the queue at some previous point. This lets us avoid some @@ -128,25 +129,25 @@ struct shm_mq_handle dsm_segment *mqh_segment; BackgroundWorkerHandle *mqh_handle; char *mqh_buffer; - uint64 mqh_buflen; - uint64 mqh_consume_pending; - uint64 mqh_partial_message_bytes; - uint64 mqh_expected_bytes; - bool mqh_did_length_word; + Size mqh_buflen; + Size mqh_consume_pending; + Size mqh_partial_bytes; + Size mqh_expected_bytes; + bool mqh_length_word_complete; bool mqh_counterparty_attached; MemoryContext mqh_context; }; -static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes, - void *data, bool nowait, uint64 *bytes_written); -static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, - bool nowait, uint64 *nbytesp, void **datap); +static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, + void *data, bool nowait, Size *bytes_written); +static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, + bool nowait, Size *nbytesp, void **datap); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr, BackgroundWorkerHandle *handle); static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached); -static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n); +static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n); static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached); -static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n); +static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n); static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); @@ -163,7 +164,7 @@ shm_mq * shm_mq_create(void *address, Size size) { shm_mq *mq = address; - uint64 data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); + Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ size = MAXALIGN_DOWN(size); @@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) mqh->mqh_buflen = 0; mqh->mqh_consume_pending = 0; mqh->mqh_context = CurrentMemoryContext; - mqh->mqh_partial_message_bytes = 0; - mqh->mqh_did_length_word = false; + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; mqh->mqh_counterparty_attached = false; if (seg != NULL) @@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) * the length or payload will corrupt the queue.) */ shm_mq_result -shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) +shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait) { shm_mq_result res; shm_mq *mq = mqh->mqh_queue; - uint64 bytes_written; + Size bytes_written; Assert(mq->mq_sender == MyProc); - /* Write the message length into the buffer. */ - if (!mqh->mqh_did_length_word) + /* Try to write, or finish writing, the length word into the buffer. */ + while (!mqh->mqh_length_word_complete) { - res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait, - &bytes_written); + Assert(mqh->mqh_partial_bytes < sizeof(Size)); + res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, + ((char *) &nbytes) + mqh->mqh_partial_bytes, + nowait, &bytes_written); + mqh->mqh_partial_bytes += bytes_written; if (res != SHM_MQ_SUCCESS) return res; - /* - * We're sure to have sent the length in full, since we always - * write a MAXALIGN'd chunk. - */ - Assert(bytes_written == MAXALIGN64(sizeof(uint64))); - mqh->mqh_did_length_word = true; + if (mqh->mqh_partial_bytes >= sizeof(Size)) + { + Assert(mqh->mqh_partial_bytes == sizeof(Size)); + + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = true; + } + + /* Length word can't be split unless bigger than required alignment. */ + Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); } /* Write the actual data bytes into the buffer. */ - Assert(mqh->mqh_partial_message_bytes <= nbytes); - res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes, - ((char *) data) + mqh->mqh_partial_message_bytes, + Assert(mqh->mqh_partial_bytes <= nbytes); + res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes, + ((char *) data) + mqh->mqh_partial_bytes, nowait, &bytes_written); if (res == SHM_MQ_WOULD_BLOCK) - mqh->mqh_partial_message_bytes += bytes_written; + mqh->mqh_partial_bytes += bytes_written; else { - mqh->mqh_partial_message_bytes = 0; - mqh->mqh_did_length_word = false; + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; } if (res != SHM_MQ_SUCCESS) return res; @@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait) * function again after the process latch has been set. */ shm_mq_result -shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) +shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) { shm_mq *mq = mqh->mqh_queue; shm_mq_result res; - uint64 rb = 0; - uint64 nbytes; - uint64 needed; + Size rb = 0; + Size nbytes; void *rawdata; Assert(mq->mq_receiver == MyProc); @@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) mqh->mqh_consume_pending = 0; } - /* Determine the message length. */ - if (mqh->mqh_did_length_word) - { - /* We've partially received a message; recall expected length. */ - nbytes = mqh->mqh_expected_bytes; - } - else + /* Try to read, or finish reading, the length word from the buffer. */ + while (!mqh->mqh_length_word_complete) { /* Try to receive the message length word. */ - res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata); + Assert(mqh->mqh_partial_bytes < sizeof(Size)); + res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes, + nowait, &rb, &rawdata); if (res != SHM_MQ_SUCCESS) return res; - Assert(rb >= sizeof(uint64)); - memcpy(&nbytes, rawdata, sizeof(uint64)); - mqh->mqh_expected_bytes = nbytes; - /* If we've already got the whole message, we're done. */ - needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes); - if (rb >= needed) + /* + * Hopefully, we'll receive the entire message length word at once. + * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over + * multiple reads. + */ + if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size)) { + Size needed; + + nbytes = * (Size *) rawdata; + + /* If we've already got the whole message, we're done. */ + needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); + if (rb >= needed) + { + /* + * Technically, we could consume the message length information + * at this point, but the extra write to shared memory wouldn't + * be free and in most cases we would reap no benefit. + */ + mqh->mqh_consume_pending = needed; + *nbytesp = nbytes; + *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size)); + return SHM_MQ_SUCCESS; + } + /* - * Technically, we could consume the message length information at - * this point, but the extra write to shared memory wouldn't be - * free and in most cases we would reap no benefit. + * We don't have the whole message, but we at least have the whole + * length word. */ - mqh->mqh_consume_pending = needed; - *nbytesp = nbytes; - *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64)); - return SHM_MQ_SUCCESS; + mqh->mqh_expected_bytes = nbytes; + mqh->mqh_length_word_complete = true; + shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size))); + rb -= MAXALIGN(sizeof(Size)); } + else + { + Size lengthbytes; + + /* Can't be split unless bigger than required alignment. */ + Assert(sizeof(Size) > MAXIMUM_ALIGNOF); - /* Consume the length word. */ - shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64))); - mqh->mqh_did_length_word = true; - rb -= MAXALIGN64(sizeof(uint64)); + /* Message word is split; need buffer to reassemble. */ + if (mqh->mqh_buffer == NULL) + { + mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, + MQH_INITIAL_BUFSIZE); + mqh->mqh_buflen = MQH_INITIAL_BUFSIZE; + } + Assert(mqh->mqh_buflen >= sizeof(Size)); + + /* Copy and consume partial length word. */ + if (mqh->mqh_partial_bytes + rb > sizeof(Size)) + lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; + else + lengthbytes = rb - mqh->mqh_partial_bytes; + memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, + lengthbytes); + mqh->mqh_partial_bytes += lengthbytes; + shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes)); + rb -= lengthbytes; + + /* If we now have the whole word, we're ready to read payload. */ + if (mqh->mqh_partial_bytes >= sizeof(Size)) + { + Assert(mqh->mqh_partial_bytes == sizeof(Size)); + mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer; + mqh->mqh_length_word_complete = true; + mqh->mqh_partial_bytes = 0; + } + } } + nbytes = mqh->mqh_expected_bytes; - if (mqh->mqh_partial_message_bytes == 0) + if (mqh->mqh_partial_bytes == 0) { /* * Try to obtain the whole message in a single chunk. If this works, @@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) return res; if (rb >= nbytes) { - mqh->mqh_did_length_word = false; - mqh->mqh_consume_pending = MAXALIGN64(nbytes); + mqh->mqh_length_word_complete = false; + mqh->mqh_consume_pending = MAXALIGN(nbytes); *nbytesp = nbytes; *datap = rawdata; return SHM_MQ_SUCCESS; @@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) */ if (mqh->mqh_buflen < nbytes) { - uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); + Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); while (newbuflen < nbytes) newbuflen *= 2; @@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) /* Loop until we've copied the entire message. */ for (;;) { - uint64 still_needed; + Size still_needed; /* Copy as much as we can. */ - Assert(mqh->mqh_partial_message_bytes + rb <= nbytes); - memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb); - mqh->mqh_partial_message_bytes += rb; + Assert(mqh->mqh_partial_bytes + rb <= nbytes); + memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb); + mqh->mqh_partial_bytes += rb; /* * Update count of bytes read, with alignment padding. Note @@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) * end of a message, because the buffer size is a multiple of * MAXIMUM_ALIGNOF, and each read and write is as well. */ - Assert(mqh->mqh_partial_message_bytes == nbytes || - rb == MAXALIGN64(rb)); - shm_mq_inc_bytes_read(mq, MAXALIGN64(rb)); + Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); + shm_mq_inc_bytes_read(mq, MAXALIGN(rb)); /* If we got all the data, exit the loop. */ - if (mqh->mqh_partial_message_bytes >= nbytes) + if (mqh->mqh_partial_bytes >= nbytes) break; /* Wait for some more data. */ - still_needed = nbytes - mqh->mqh_partial_message_bytes; + still_needed = nbytes - mqh->mqh_partial_bytes; res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata); if (res != SHM_MQ_SUCCESS) return res; @@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait) /* Return the complete message, and reset for next message. */ *nbytesp = nbytes; *datap = mqh->mqh_buffer; - mqh->mqh_did_length_word = false; - mqh->mqh_partial_message_bytes = 0; + mqh->mqh_length_word_complete = false; + mqh->mqh_partial_bytes = 0; return SHM_MQ_SUCCESS; } @@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq) * Write bytes into a shared message queue. */ static shm_mq_result -shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, - uint64 *bytes_written) +shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait, + Size *bytes_written) { shm_mq *mq = mqh->mqh_queue; - uint64 sent = 0; + Size sent = 0; uint64 used; - uint64 ringsize = mq->mq_ring_size; - uint64 available; + Size ringsize = mq->mq_ring_size; + Size available; while (sent < nbytes) { @@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, res = shm_mq_notify_receiver(mq); if (res != SHM_MQ_SUCCESS) { - *bytes_written = res; + *bytes_written = sent; return res; } @@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, } else { - uint64 offset = mq->mq_bytes_written % ringsize; - uint64 sendnow = Min(available, ringsize - offset); + Size offset = mq->mq_bytes_written % (uint64) ringsize; + Size sendnow = Min(available, ringsize - offset); /* Write as much data as we can via a single memcpy(). */ memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], @@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, * end of a run of bytes, because the buffer size is a multiple of * MAXIMUM_ALIGNOF, and each read is as well. */ - Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow)); - shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow)); + Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); + shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); /* * For efficiency, we don't set the reader's latch here. We'll @@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait, * bytes_needed. */ static shm_mq_result -shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait, - uint64 *nbytesp, void **datap) +shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, + Size *nbytesp, void **datap) { + Size ringsize = mq->mq_ring_size; uint64 used; - uint64 ringsize = mq->mq_ring_size; uint64 written; for (;;) { - uint64 offset; + Size offset; bool detached; /* Get bytes written, so we can compute what's available to read. */ written = shm_mq_get_bytes_written(mq, &detached); used = written - mq->mq_bytes_read; Assert(used <= ringsize); - offset = mq->mq_bytes_read % ringsize; + offset = mq->mq_bytes_read % (uint64) ringsize; /* If we have enough data or buffer has wrapped, we're done. */ if (used >= bytes_needed || offset + used >= ringsize) @@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached) * Increment the number of bytes read. */ static void -shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n) +shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n) { PGPROC *sender; @@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached) * Increment the number of bytes written. */ static void -shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n) +shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n) { SpinLockAcquire(&mq->mq_mutex); mq->mq_bytes_written += n; diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 1bc1f5611e..c7dd90532b 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *); /* Send or receive messages. */ extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, - uint64 nbytes, void *data, bool nowait); + Size nbytes, void *data, bool nowait); extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, - uint64 *nbytesp, void **datap, bool nowait); + Size *nbytesp, void **datap, bool nowait); /* Wait for our counterparty to attach to the queue. */ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); -- 2.40.0