]> granicus.if.org Git - postgresql/commitdiff
Extend shm_mq API with new functions shm_mq_sendv, shm_mq_set_handle.
authorRobert Haas <rhaas@postgresql.org>
Wed, 8 Oct 2014 18:35:43 +0000 (14:35 -0400)
committerRobert Haas <rhaas@postgresql.org>
Wed, 8 Oct 2014 18:38:31 +0000 (14:38 -0400)
shm_mq_sendv sends a message to the queue assembled from multiple
locations.  This is expected to be used by forthcoming patches to
allow frontend/backend protocol messages to be sent via shm_mq, but
might be useful for other purposes as well.

shm_mq_set_handle associates a BackgroundWorkerHandle with an
already-existing shm_mq_handle.  This solves a timing problem when
creating a shm_mq to communicate with a newly-launched background
worker: if you attach to the queue first, and the background worker
fails to start, you might block forever trying to do I/O on the queue;
but if you start the background worker first, but then die before
attaching to the queue, the background worrker might block forever
trying to do I/O on the queue.  This lets you attach before starting
the worker (so that the worker is protected) and then associate the
BackgroundWorkerHandle later (so that you are also protected).

Patch by me, reviewed by Stephen Frost.

src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h

index d96627a774e9b000c04612de9f906057c8bcd561..90df5930e1a33477473412da23768025a9a59bbf 100644 (file)
@@ -139,7 +139,7 @@ struct shm_mq_handle
 };
 
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
-                                 void *data, bool nowait, Size *bytes_written);
+                                 const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
                                         bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
@@ -300,8 +300,34 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
        return mqh;
 }
 
+/*
+ * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
+ * been passed to shm_mq_attach.
+ */
+void
+shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
+{
+       Assert(mqh->mqh_handle == NULL);
+       mqh->mqh_handle = handle;
+}
+
 /*
  * Write a message into a shared message queue.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+{
+       shm_mq_iovec    iov;
+
+       iov.data = data;
+       iov.len = nbytes;
+
+       return shm_mq_sendv(mqh, &iov, 1, nowait);
+}
+
+/*
+ * Write a message into a shared message queue, gathered from multiple
+ * addresses.
  *
  * When nowait = false, we'll wait on our process latch when the ring buffer
  * fills up, and then continue writing once the receiver has drained some data.
@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 {
        shm_mq_result res;
        shm_mq     *mq = mqh->mqh_queue;
+       Size            nbytes = 0;
        Size            bytes_written;
+       int                     i;
+       int                     which_iov = 0;
+       Size            offset;
 
        Assert(mq->mq_sender == MyProc);
 
+       /* Compute total size of write. */
+       for (i = 0; i < iovcnt; ++i)
+               nbytes += iov[i].len;
+
        /* Try to write, or finish writing, the length word into the buffer. */
        while (!mqh->mqh_length_word_complete)
        {
@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 
        /* Write the actual data bytes into the buffer. */
        Assert(mqh->mqh_partial_bytes <= nbytes);
-       res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
-                                                       ((char *) data) + mqh->mqh_partial_bytes,
-                                                       nowait, &bytes_written);
-       if (res == SHM_MQ_WOULD_BLOCK)
-               mqh->mqh_partial_bytes += bytes_written;
-       else
+       offset = mqh->mqh_partial_bytes;
+       do
        {
-               mqh->mqh_partial_bytes = 0;
-               mqh->mqh_length_word_complete = false;
-       }
-       if (res != SHM_MQ_SUCCESS)
-               return res;
+               Size    chunksize;
+
+               /* Figure out which bytes need to be sent next. */
+               if (offset >= iov[which_iov].len)
+               {
+                       offset -= iov[which_iov].len;
+                       ++which_iov;
+                       if (which_iov >= iovcnt)
+                               break;
+                       continue;
+               }
+
+               /*
+                * We want to avoid copying the data if at all possible, but every
+                * chunk of bytes we write into the queue has to be MAXALIGN'd,
+                * except the last.  Thus, if a chunk other than the last one ends
+                * on a non-MAXALIGN'd boundary, we have to combine the tail end of
+                * its data with data from one or more following chunks until we
+                * either reach the last chunk or accumulate a number of bytes which
+                * is MAXALIGN'd.
+                */
+               if (which_iov + 1 < iovcnt &&
+                       offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+               {
+                       char    tmpbuf[MAXIMUM_ALIGNOF];
+                       int             j = 0;
+
+                       for (;;)
+                       {
+                               if (offset < iov[which_iov].len)
+                               {
+                                       tmpbuf[j] = iov[which_iov].data[offset];
+                                       j++;
+                                       offset++;
+                                       if (j == MAXIMUM_ALIGNOF)
+                                               break;
+                               }
+                               else
+                               {
+                                       offset -= iov[which_iov].len;
+                                       which_iov++;
+                                       if (which_iov >= iovcnt)
+                                               break;
+                               }
+                       }
+                       res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+                       mqh->mqh_partial_bytes += bytes_written;
+                       if (res != SHM_MQ_SUCCESS)
+                               return res;
+                       continue;
+               }
+
+               /*
+                * If this is the last chunk, we can write all the data, even if it
+                * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
+                * MAXALIGN_DOWN the write size.
+                */
+               chunksize = iov[which_iov].len - offset;
+               if (which_iov + 1 < iovcnt)
+                       chunksize = MAXALIGN_DOWN(chunksize);
+               res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+                                                               nowait, &bytes_written);
+               mqh->mqh_partial_bytes += bytes_written;
+               offset += bytes_written;
+               if (res != SHM_MQ_SUCCESS)
+                       return res;
+       } while (mqh->mqh_partial_bytes < nbytes);
+
+       /* Reset for next message. */
+       mqh->mqh_partial_bytes = 0;
+       mqh->mqh_length_word_complete = false;
 
        /* Notify receiver of the newly-written data, and return. */
        return shm_mq_notify_receiver(mq);
@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
-                                 Size *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+                                 bool nowait, Size *bytes_written)
 {
        shm_mq     *mq = mqh->mqh_queue;
        Size            sent = 0;
index 5bae3807afbf19ccb7a613405d5e4d37775fb621..063400ae2865fb56c629c0f12b69a3e0312cf18a 100644 (file)
@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
 struct shm_mq_handle;
 typedef struct shm_mq_handle shm_mq_handle;
 
+/* Descriptors for a single write spanning multiple locations. */
+typedef struct
+{
+       const char  *data;
+       Size    len;
+} shm_mq_iovec;
+
 /* Possible results of a send or receive operation. */
 typedef enum
 {
@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
 extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
                          BackgroundWorkerHandle *handle);
 
+/* Associate worker handle with shm_mq. */
+extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
+
 /* Break connection. */
 extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-                       Size nbytes, void *data, bool nowait);
+                       Size nbytes, const void *data, bool nowait);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
+                       shm_mq_iovec *iov, int iovcnt, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
                           Size *nbytesp, void **datap, bool nowait);