]> granicus.if.org Git - postgresql/commitdiff
Improve shm_mq portability around MAXIMUM_ALIGNOF and sizeof(Size).
authorRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 15:19:13 +0000 (11:19 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 15:23:13 +0000 (11:23 -0400)
Revise the original decision to expose a uint64-based interface and
use Size everywhere possible.  Avoid assuming that MAXIMUM_ALIGNOF is
8, or making any assumption about the relationship between that value
and sizeof(Size).  If MAXIMUM_ALIGNOF is bigger, we'll now insert
padding after the length word; if it's smaller, we are now prepared
to read and write the length word in chunks.

Per discussion with Tom Lane.

src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h

index 2d298a35983b1722565fb70f87fa420afeedb43b..b31f4fb693cb640065057f0f746464af40bfa038 100644 (file)
@@ -72,7 +72,7 @@ struct shm_mq
        PGPROC     *mq_sender;
        uint64          mq_bytes_read;
        uint64          mq_bytes_written;
-       uint64          mq_ring_size;
+       Size            mq_ring_size;
        bool            mq_detached;
        uint8           mq_ring_offset;
        char            mq_ring[FLEXIBLE_ARRAY_MEMBER];
@@ -103,15 +103,16 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
- * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
+ * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
  * are expected to retry the call at a later time with the same argument;
  * we need to retain enough state to pick up where we left off.
- * mqh_did_length_word tracks whether we read or wrote the length word,
- * mqh_partial_message_bytes tracks the number of payload bytes read or
- * written, and mqh_expected_bytes - which is used only for reads - tracks
- * the expected total size of the payload.
+ * mqh_length_word_complete tracks whether we are done sending or receiving
+ * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
+ * the number of bytes read or written for either the length word or the
+ * message itself, and mqh_expected_bytes - which is used only for reads -
+ * tracks the expected total size of the payload.
  *
  * mqh_counterparty_attached tracks whether we know the counterparty to have
  * attached to the queue at some previous point.  This lets us avoid some
@@ -128,25 +129,25 @@ struct shm_mq_handle
        dsm_segment *mqh_segment;
        BackgroundWorkerHandle *mqh_handle;
        char       *mqh_buffer;
-       uint64          mqh_buflen;
-       uint64          mqh_consume_pending;
-       uint64          mqh_partial_message_bytes;
-       uint64          mqh_expected_bytes;
-       bool            mqh_did_length_word;
+       Size            mqh_buflen;
+       Size            mqh_consume_pending;
+       Size            mqh_partial_bytes;
+       Size            mqh_expected_bytes;
+       bool            mqh_length_word_complete;
        bool            mqh_counterparty_attached;
        MemoryContext mqh_context;
 };
 
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
-                                 void *data, bool nowait, uint64 *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
-                                        bool nowait, uint64 *nbytesp, void **datap);
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+                                 void *data, bool nowait, Size *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
+                                        bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
                                         BackgroundWorkerHandle *handle);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
@@ -163,7 +164,7 @@ shm_mq *
 shm_mq_create(void *address, Size size)
 {
        shm_mq     *mq = address;
-       uint64          data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+       Size            data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
 
        /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
        size = MAXALIGN_DOWN(size);
@@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
        mqh->mqh_buflen = 0;
        mqh->mqh_consume_pending = 0;
        mqh->mqh_context = CurrentMemoryContext;
-       mqh->mqh_partial_message_bytes = 0;
-       mqh->mqh_did_length_word = false;
+       mqh->mqh_partial_bytes = 0;
+       mqh->mqh_length_word_complete = false;
        mqh->mqh_counterparty_attached = false;
 
        if (seg != NULL)
@@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 {
        shm_mq_result   res;
        shm_mq             *mq = mqh->mqh_queue;
-       uint64                  bytes_written;
+       Size                    bytes_written;
 
        Assert(mq->mq_sender == MyProc);
 
-       /* Write the message length into the buffer. */
-       if (!mqh->mqh_did_length_word)
+       /* Try to write, or finish writing, the length word into the buffer. */
+       while (!mqh->mqh_length_word_complete)
        {
-               res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
-                                                               &bytes_written);
+               Assert(mqh->mqh_partial_bytes < sizeof(Size));
+               res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+                                                               ((char *) &nbytes) + mqh->mqh_partial_bytes,
+                                                               nowait, &bytes_written);
+               mqh->mqh_partial_bytes += bytes_written;
                if (res != SHM_MQ_SUCCESS)
                        return res;
 
-               /*
-                * We're sure to have sent the length in full, since we always
-                * write a MAXALIGN'd chunk.
-                */
-               Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
-               mqh->mqh_did_length_word = true;
+               if (mqh->mqh_partial_bytes >= sizeof(Size))
+               {
+                       Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+                       mqh->mqh_partial_bytes = 0;
+                       mqh->mqh_length_word_complete = true;
+               }
+
+               /* Length word can't be split unless bigger than required alignment. */
+               Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
        }
 
        /* Write the actual data bytes into the buffer. */
-       Assert(mqh->mqh_partial_message_bytes <= nbytes);
-       res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
-                                                       ((char *) data) + mqh->mqh_partial_message_bytes,
+       Assert(mqh->mqh_partial_bytes <= nbytes);
+       res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
+                                                       ((char *) data) + mqh->mqh_partial_bytes,
                                                        nowait, &bytes_written);
        if (res == SHM_MQ_WOULD_BLOCK)
-               mqh->mqh_partial_message_bytes += bytes_written;
+               mqh->mqh_partial_bytes += bytes_written;
        else
        {
-               mqh->mqh_partial_message_bytes = 0;
-               mqh->mqh_did_length_word = false;
+               mqh->mqh_partial_bytes = 0;
+               mqh->mqh_length_word_complete = false;
        }
        if (res != SHM_MQ_SUCCESS)
                return res;
@@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
  * function again after the process latch has been set.
  */
 shm_mq_result
-shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 {
        shm_mq             *mq = mqh->mqh_queue;
        shm_mq_result   res;
-       uint64                  rb = 0;
-       uint64                  nbytes;
-       uint64                  needed;
+       Size                    rb = 0;
+       Size                    nbytes;
        void               *rawdata;
 
        Assert(mq->mq_receiver == MyProc);
@@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
                mqh->mqh_consume_pending = 0;
        }
 
-       /* Determine the message length. */
-       if (mqh->mqh_did_length_word)
-       {
-               /* We've partially received a message; recall expected length. */
-               nbytes = mqh->mqh_expected_bytes;
-       }
-       else
+       /* Try to read, or finish reading, the length word from the buffer. */
+       while (!mqh->mqh_length_word_complete)
        {
                /* Try to receive the message length word. */
-               res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+               Assert(mqh->mqh_partial_bytes < sizeof(Size));
+               res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+                                                                  nowait, &rb, &rawdata);
                if (res != SHM_MQ_SUCCESS)
                        return res;
-               Assert(rb >= sizeof(uint64));
-               memcpy(&nbytes, rawdata, sizeof(uint64));
-               mqh->mqh_expected_bytes = nbytes;
 
-               /* If we've already got the whole message, we're done. */
-               needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
-               if (rb >= needed)
+               /*
+                * Hopefully, we'll receive the entire message length word at once.
+                * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
+                * multiple reads.
+                */
+               if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
                {
+                       Size                    needed;
+
+                       nbytes = * (Size *) rawdata;
+
+                       /* If we've already got the whole message, we're done. */
+                       needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
+                       if (rb >= needed)
+                       {
+                               /*
+                                * Technically, we could consume the message length information
+                                * at this point, but the extra write to shared memory wouldn't
+                                * be free and in most cases we would reap no benefit.
+                                */
+                               mqh->mqh_consume_pending = needed;
+                               *nbytesp = nbytes;
+                               *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
+                               return SHM_MQ_SUCCESS;
+                       }
+
                        /*
-                        * Technically, we could consume the message length information at
-                        * this point, but the extra write to shared memory wouldn't be
-                        * free and in most cases we would reap no benefit.
+                        * We don't have the whole message, but we at least have the whole
+                        * length word.
                         */
-                       mqh->mqh_consume_pending = needed;
-                       *nbytesp = nbytes;
-                       *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
-                       return SHM_MQ_SUCCESS;
+                       mqh->mqh_expected_bytes = nbytes;
+                       mqh->mqh_length_word_complete = true;
+                       shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+                       rb -= MAXALIGN(sizeof(Size));
                }
+               else
+               {
+                       Size    lengthbytes;
+
+                       /* Can't be split unless bigger than required alignment. */
+                       Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
 
-               /* Consume the length word. */
-               shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
-               mqh->mqh_did_length_word = true;
-               rb -= MAXALIGN64(sizeof(uint64));
+                       /* Message word is split; need buffer to reassemble. */
+                       if (mqh->mqh_buffer == NULL)
+                       {
+                               mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
+                                                                                                        MQH_INITIAL_BUFSIZE);
+                               mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
+                       }
+                       Assert(mqh->mqh_buflen >= sizeof(Size));
+
+                       /* Copy and consume partial length word. */
+                       if (mqh->mqh_partial_bytes + rb > sizeof(Size))
+                               lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
+                       else
+                               lengthbytes = rb - mqh->mqh_partial_bytes;
+                       memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
+                                  lengthbytes);
+                       mqh->mqh_partial_bytes += lengthbytes;
+                       shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+                       rb -= lengthbytes;
+
+                       /* If we now have the whole word, we're ready to read payload. */
+                       if (mqh->mqh_partial_bytes >= sizeof(Size))
+                       {
+                               Assert(mqh->mqh_partial_bytes == sizeof(Size));
+                               mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer;
+                               mqh->mqh_length_word_complete = true;
+                               mqh->mqh_partial_bytes = 0;
+                       }
+               }
        }
+       nbytes = mqh->mqh_expected_bytes;
 
-       if (mqh->mqh_partial_message_bytes == 0)
+       if (mqh->mqh_partial_bytes == 0)
        {
                /*
                 * Try to obtain the whole message in a single chunk.  If this works,
@@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
                        return res;
                if (rb >= nbytes)
                {
-                       mqh->mqh_did_length_word = false;
-                       mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+                       mqh->mqh_length_word_complete = false;
+                       mqh->mqh_consume_pending = MAXALIGN(nbytes);
                        *nbytesp = nbytes;
                        *datap = rawdata;
                        return SHM_MQ_SUCCESS;
@@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
                 */
                if (mqh->mqh_buflen < nbytes)
                {
-                       uint64          newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+                       Size    newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
 
                        while (newbuflen < nbytes)
                                newbuflen *= 2;
@@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
        /* Loop until we've copied the entire message. */
        for (;;)
        {
-               uint64  still_needed;
+               Size    still_needed;
 
                /* Copy as much as we can. */
-               Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
-               memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
-               mqh->mqh_partial_message_bytes += rb;
+               Assert(mqh->mqh_partial_bytes + rb <= nbytes);
+               memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
+               mqh->mqh_partial_bytes += rb;
 
                /*
                 * Update count of bytes read, with alignment padding.  Note
@@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
                 * end of a message, because the buffer size is a multiple of
                 * MAXIMUM_ALIGNOF, and each read and write is as well.
                 */
-               Assert(mqh->mqh_partial_message_bytes == nbytes ||
-                               rb == MAXALIGN64(rb));
-               shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+               Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
+               shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
 
                /* If we got all the data, exit the loop. */
-               if (mqh->mqh_partial_message_bytes >= nbytes)
+               if (mqh->mqh_partial_bytes >= nbytes)
                        break;
 
                /* Wait for some more data. */
-               still_needed = nbytes - mqh->mqh_partial_message_bytes;
+               still_needed = nbytes - mqh->mqh_partial_bytes;
                res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
                if (res != SHM_MQ_SUCCESS)
                        return res;
@@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
        /* Return the complete message, and reset for next message. */
        *nbytesp = nbytes;
        *datap = mqh->mqh_buffer;
-       mqh->mqh_did_length_word = false;
-       mqh->mqh_partial_message_bytes = 0;
+       mqh->mqh_length_word_complete = false;
+       mqh->mqh_partial_bytes = 0;
        return SHM_MQ_SUCCESS;
 }
 
@@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
-                                 uint64 *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
+                                 Size *bytes_written)
 {
        shm_mq     *mq = mqh->mqh_queue;
-       uint64          sent = 0;
+       Size            sent = 0;
        uint64          used;
-       uint64          ringsize = mq->mq_ring_size;
-       uint64          available;
+       Size            ringsize = mq->mq_ring_size;
+       Size            available;
 
        while (sent < nbytes)
        {
@@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
                        res = shm_mq_notify_receiver(mq);
                        if (res != SHM_MQ_SUCCESS)
                        {
-                               *bytes_written = res;
+                               *bytes_written = sent;
                                return res;
                        }
 
@@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
                }
                else
                {
-                       uint64  offset = mq->mq_bytes_written % ringsize;
-                       uint64  sendnow = Min(available, ringsize - offset);
+                       Size    offset = mq->mq_bytes_written % (uint64) ringsize;
+                       Size    sendnow = Min(available, ringsize - offset);
 
                        /* Write as much data as we can via a single memcpy(). */
                        memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
@@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
                         * end of a run of bytes, because the buffer size is a multiple of
                         * MAXIMUM_ALIGNOF, and each read is as well.
                         */
-                       Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
-                       shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+                       Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+                       shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
                        /*
                         * For efficiency, we don't set the reader's latch here.  We'll
@@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
  * bytes_needed.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
-                                        uint64 *nbytesp, void **datap)
+shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+                                        Size *nbytesp, void **datap)
 {
+       Size            ringsize = mq->mq_ring_size;
        uint64          used;
-       uint64          ringsize = mq->mq_ring_size;
        uint64          written;
 
        for (;;)
        {
-               uint64          offset;
+               Size            offset;
                bool            detached;
 
                /* Get bytes written, so we can compute what's available to read. */
                written = shm_mq_get_bytes_written(mq, &detached);
                used = written - mq->mq_bytes_read;
                Assert(used <= ringsize);
-               offset = mq->mq_bytes_read % ringsize;
+               offset = mq->mq_bytes_read % (uint64) ringsize;
 
                /* If we have enough data or buffer has wrapped, we're done. */
                if (used >= bytes_needed || offset + used >= ringsize)
@@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes read.
  */
 static void
-shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 {
        PGPROC     *sender;
 
@@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes written.
  */
 static void
-shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
 {
        SpinLockAcquire(&mq->mq_mutex);
        mq->mq_bytes_written += n;
index 1bc1f5611e71ea1ceb6d271b6b8a876b3b59e041..c7dd90532bf4e74c79e1400ac8acfe03d2d7a343 100644 (file)
@@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-                       uint64 nbytes, void *data, bool nowait);
+                       Size nbytes, void *data, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
-                          uint64 *nbytesp, void **datap, bool nowait);
+                          Size *nbytesp, void **datap, bool nowait);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);