1 /*-------------------------------------------------------------------------
4 * single-reader, single-writer shared memory message queue
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
11 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
14 * src/include/storage/shm_mq.h
16 *-------------------------------------------------------------------------
21 #include "miscadmin.h"
22 #include "postmaster/bgworker.h"
23 #include "storage/procsignal.h"
24 #include "storage/shm_mq.h"
25 #include "storage/spin.h"
28 * This structure represents the actual queue, stored in shared memory.
30 * Some notes on synchronization:
32 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
33 * mq_sender and mq_bytes_written can only be changed by the sender. However,
34 * because most of these fields are 8 bytes and we don't assume that 8 byte
35 * reads and writes are atomic, the spinlock must be taken whenever the field
36 * is updated, and whenever it is read by a process other than the one allowed
37 * to modify it. But the process that is allowed to modify it is also allowed
38 * to read it without the lock. On architectures where 8-byte writes are
39 * atomic, we could replace these spinlocks with memory barriers, but
40 * testing found no performance benefit, so it seems best to keep things
43 * mq_detached can be set by either the sender or the receiver, so the mutex
44 * must be held to read or write it. Memory barriers could be used here as
47 * mq_ring_size and mq_ring_offset never change after initialization, and
48 * can therefore be read without the lock.
50 * Importantly, mq_ring can be safely read and written without a lock. Were
51 * this not the case, we'd have to hold the spinlock for much longer
52 * intervals, and performance might suffer. Fortunately, that's not
53 * necessary. At any given time, the difference between mq_bytes_read and
54 * mq_bytes_written defines the number of bytes within mq_ring that contain
55 * unread data, and mq_bytes_read defines the position where those bytes
56 * begin. The sender can increase the number of unread bytes at any time,
57 * but only the receiver can give license to overwrite those bytes, by
58 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
59 * the unread bytes it knows to be present without the lock. Conversely,
60 * the sender can write to the unused portion of the ring buffer without
61 * the lock, because nobody else can be reading or writing those bytes. The
62 * receiver could be making more bytes unused by incrementing mq_bytes_read,
63 * but that's OK. Note that it would be unsafe for the receiver to read any
64 * data it's already marked as read, or to write any data; and it would be
65 * unsafe for the sender to reread any data after incrementing
66 * mq_bytes_written, but fortunately there's no need for any of that.
74 uint64 mq_bytes_written;
78 char mq_ring[FLEXIBLE_ARRAY_MEMBER];
82 * This structure is a backend-private handle for access to a queue.
84 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
85 * a pointer to the dynamic shared memory segment that contains it.
87 * If this queue is intended to connect the current process with a background
88 * worker that started it, the user can pass a pointer to the worker handle
89 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
90 * is to allow us to begin sending to or receiving from that queue before the
91 * process we'll be communicating with has even been started. If it fails
92 * to start, the handle will allow us to notice that and fail cleanly, rather
93 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
94 * simple cases - e.g. where there are just 2 processes communicating; in
95 * more complex scenarios, every process may not have a BackgroundWorkerHandle
96 * available, or may need to watch for the failure of more than one other
99 * When a message exists as a contiguous chunk of bytes in the queue - that is,
100 * it is smaller than the size of the ring buffer and does not wrap around
101 * the end - we return the message to the caller as a pointer into the buffer.
102 * For messages that are larger or happen to wrap, we reassemble the message
103 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
104 * the buffer, and mqh_buflen is the number of bytes allocated for it.
106 * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
107 * are used to track the state of non-blocking operations. When the caller
108 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
109 * are expected to retry the call at a later time with the same argument;
110 * we need to retain enough state to pick up where we left off.
111 * mqh_length_word_complete tracks whether we are done sending or receiving
112 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
113 * the number of bytes read or written for either the length word or the
114 * message itself, and mqh_expected_bytes - which is used only for reads -
115 * tracks the expected total size of the payload.
117 * mqh_counterparty_attached tracks whether we know the counterparty to have
118 * attached to the queue at some previous point. This lets us avoid some
119 * mutex acquisitions.
121 * mqh_context is the memory context in effect at the time we attached to
122 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
123 * we make sure any other allocations we do happen in this context as well,
124 * to avoid nasty surprises.
129 dsm_segment *mqh_segment;
130 BackgroundWorkerHandle *mqh_handle;
133 Size mqh_consume_pending;
134 Size mqh_partial_bytes;
135 Size mqh_expected_bytes;
136 bool mqh_length_word_complete;
137 bool mqh_counterparty_attached;
138 MemoryContext mqh_context;
141 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
142 const void *data, bool nowait, Size *bytes_written);
143 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
144 bool nowait, Size *nbytesp, void **datap);
145 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
146 BackgroundWorkerHandle *handle);
147 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
148 BackgroundWorkerHandle *handle);
149 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
150 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
151 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
152 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
153 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
154 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
156 /* Minimum queue size is enough for header and at least one chunk of data. */
157 const Size shm_mq_minimum_size =
158 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
160 #define MQH_INITIAL_BUFSIZE 8192
163 * Initialize a new shared message queue.
166 shm_mq_create(void *address, Size size)
168 shm_mq *mq = address;
169 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
171 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
172 size = MAXALIGN_DOWN(size);
174 /* Queue size must be large enough to hold some data. */
175 Assert(size > data_offset);
177 /* Initialize queue header. */
178 SpinLockInit(&mq->mq_mutex);
179 mq->mq_receiver = NULL;
180 mq->mq_sender = NULL;
181 mq->mq_bytes_read = 0;
182 mq->mq_bytes_written = 0;
183 mq->mq_ring_size = size - data_offset;
184 mq->mq_detached = false;
185 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
191 * Set the identity of the process that will receive from a shared message
195 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
197 volatile shm_mq *vmq = mq;
200 SpinLockAcquire(&mq->mq_mutex);
201 Assert(vmq->mq_receiver == NULL);
202 vmq->mq_receiver = proc;
203 sender = vmq->mq_sender;
204 SpinLockRelease(&mq->mq_mutex);
207 SetLatch(&sender->procLatch);
211 * Set the identity of the process that will send to a shared message queue.
214 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
216 volatile shm_mq *vmq = mq;
219 SpinLockAcquire(&mq->mq_mutex);
220 Assert(vmq->mq_sender == NULL);
221 vmq->mq_sender = proc;
222 receiver = vmq->mq_receiver;
223 SpinLockRelease(&mq->mq_mutex);
225 if (receiver != NULL)
226 SetLatch(&receiver->procLatch);
230 * Get the configured receiver.
233 shm_mq_get_receiver(shm_mq *mq)
235 volatile shm_mq *vmq = mq;
238 SpinLockAcquire(&mq->mq_mutex);
239 receiver = vmq->mq_receiver;
240 SpinLockRelease(&mq->mq_mutex);
246 * Get the configured sender.
249 shm_mq_get_sender(shm_mq *mq)
251 volatile shm_mq *vmq = mq;
254 SpinLockAcquire(&mq->mq_mutex);
255 sender = vmq->mq_sender;
256 SpinLockRelease(&mq->mq_mutex);
262 * Attach to a shared message queue so we can send or receive messages.
264 * The memory context in effect at the time this function is called should
265 * be one which will last for at least as long as the message queue itself.
266 * We'll allocate the handle in that context, and future allocations that
267 * are needed to buffer incoming data will happen in that context as well.
269 * If seg != NULL, the queue will be automatically detached when that dynamic
270 * shared memory segment is detached.
272 * If handle != NULL, the queue can be read or written even before the
273 * other process has attached. We'll wait for it to do so if needed. The
274 * handle must be for a background worker initialized with bgw_notify_pid
277 * shm_mq_detach() should be called when done. This will free the
278 * shm_mq_handle and mark the queue itself as detached, so that our
279 * counterpart won't get stuck waiting for us to fill or drain the queue
280 * after we've already lost interest.
283 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
285 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
287 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
289 mqh->mqh_segment = seg;
290 mqh->mqh_buffer = NULL;
291 mqh->mqh_handle = handle;
293 mqh->mqh_consume_pending = 0;
294 mqh->mqh_context = CurrentMemoryContext;
295 mqh->mqh_partial_bytes = 0;
296 mqh->mqh_length_word_complete = false;
297 mqh->mqh_counterparty_attached = false;
300 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
306 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
307 * been passed to shm_mq_attach.
310 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
312 Assert(mqh->mqh_handle == NULL);
313 mqh->mqh_handle = handle;
317 * Write a message into a shared message queue.
320 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
327 return shm_mq_sendv(mqh, &iov, 1, nowait);
331 * Write a message into a shared message queue, gathered from multiple
334 * When nowait = false, we'll wait on our process latch when the ring buffer
335 * fills up, and then continue writing once the receiver has drained some data.
336 * The process latch is reset after each wait.
338 * When nowait = true, we do not manipulate the state of the process latch;
339 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
340 * this case, the caller should call this function again, with the same
341 * arguments, each time the process latch is set. (Once begun, the sending
342 * of a message cannot be aborted except by detaching from the queue; changing
343 * the length or payload will corrupt the queue.)
346 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
349 shm_mq *mq = mqh->mqh_queue;
356 Assert(mq->mq_sender == MyProc);
358 /* Compute total size of write. */
359 for (i = 0; i < iovcnt; ++i)
360 nbytes += iov[i].len;
362 /* Try to write, or finish writing, the length word into the buffer. */
363 while (!mqh->mqh_length_word_complete)
365 Assert(mqh->mqh_partial_bytes < sizeof(Size));
366 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
367 ((char *) &nbytes) +mqh->mqh_partial_bytes,
368 nowait, &bytes_written);
370 if (res == SHM_MQ_DETACHED)
372 /* Reset state in case caller tries to send another message. */
373 mqh->mqh_partial_bytes = 0;
374 mqh->mqh_length_word_complete = false;
377 mqh->mqh_partial_bytes += bytes_written;
379 if (mqh->mqh_partial_bytes >= sizeof(Size))
381 Assert(mqh->mqh_partial_bytes == sizeof(Size));
383 mqh->mqh_partial_bytes = 0;
384 mqh->mqh_length_word_complete = true;
387 if (res != SHM_MQ_SUCCESS)
390 /* Length word can't be split unless bigger than required alignment. */
391 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
394 /* Write the actual data bytes into the buffer. */
395 Assert(mqh->mqh_partial_bytes <= nbytes);
396 offset = mqh->mqh_partial_bytes;
401 /* Figure out which bytes need to be sent next. */
402 if (offset >= iov[which_iov].len)
404 offset -= iov[which_iov].len;
406 if (which_iov >= iovcnt)
412 * We want to avoid copying the data if at all possible, but every
413 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
414 * the last. Thus, if a chunk other than the last one ends on a
415 * non-MAXALIGN'd boundary, we have to combine the tail end of its
416 * data with data from one or more following chunks until we either
417 * reach the last chunk or accumulate a number of bytes which is
420 if (which_iov + 1 < iovcnt &&
421 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423 char tmpbuf[MAXIMUM_ALIGNOF];
428 if (offset < iov[which_iov].len)
430 tmpbuf[j] = iov[which_iov].data[offset];
433 if (j == MAXIMUM_ALIGNOF)
438 offset -= iov[which_iov].len;
440 if (which_iov >= iovcnt)
445 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447 if (res == SHM_MQ_DETACHED)
449 /* Reset state in case caller tries to send another message. */
450 mqh->mqh_partial_bytes = 0;
451 mqh->mqh_length_word_complete = false;
455 mqh->mqh_partial_bytes += bytes_written;
456 if (res != SHM_MQ_SUCCESS)
462 * If this is the last chunk, we can write all the data, even if it
463 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
464 * MAXALIGN_DOWN the write size.
466 chunksize = iov[which_iov].len - offset;
467 if (which_iov + 1 < iovcnt)
468 chunksize = MAXALIGN_DOWN(chunksize);
469 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
470 nowait, &bytes_written);
472 if (res == SHM_MQ_DETACHED)
474 /* Reset state in case caller tries to send another message. */
475 mqh->mqh_length_word_complete = false;
476 mqh->mqh_partial_bytes = 0;
480 mqh->mqh_partial_bytes += bytes_written;
481 offset += bytes_written;
482 if (res != SHM_MQ_SUCCESS)
484 } while (mqh->mqh_partial_bytes < nbytes);
486 /* Reset for next message. */
487 mqh->mqh_partial_bytes = 0;
488 mqh->mqh_length_word_complete = false;
490 /* Notify receiver of the newly-written data, and return. */
491 return shm_mq_notify_receiver(mq);
495 * Receive a message from a shared message queue.
497 * We set *nbytes to the message length and *data to point to the message
498 * payload. If the entire message exists in the queue as a single,
499 * contiguous chunk, *data will point directly into shared memory; otherwise,
500 * it will point to a temporary buffer. This mostly avoids data copying in
501 * the hoped-for case where messages are short compared to the buffer size,
502 * while still allowing longer messages. In either case, the return value
503 * remains valid until the next receive operation is perfomed on the queue.
505 * When nowait = false, we'll wait on our process latch when the ring buffer
506 * is empty and we have not yet received a full message. The sender will
507 * set our process latch after more data has been written, and we'll resume
508 * processing. Each call will therefore return a complete message
509 * (unless the sender detaches the queue).
511 * When nowait = true, we do not manipulate the state of the process latch;
512 * instead, whenever the buffer is empty and we need to read from it, we
513 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
514 * function again after the process latch has been set.
517 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
519 shm_mq *mq = mqh->mqh_queue;
525 Assert(mq->mq_receiver == MyProc);
527 /* We can't receive data until the sender has attached. */
528 if (!mqh->mqh_counterparty_attached)
532 int counterparty_gone;
535 * We shouldn't return at this point at all unless the sender
536 * hasn't attached yet. However, the correct return value depends
537 * on whether the sender is still attached. If we first test
538 * whether the sender has ever attached and then test whether the
539 * sender has detached, there's a race condition: a sender that
540 * attaches and detaches very quickly might fool us into thinking
541 * the sender never attached at all. So, test whether our
542 * counterparty is definitively gone first, and only afterwards
543 * check whether the sender ever attached in the first place.
545 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
546 if (shm_mq_get_sender(mq) == NULL)
548 if (counterparty_gone)
549 return SHM_MQ_DETACHED;
551 return SHM_MQ_WOULD_BLOCK;
554 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
555 && shm_mq_get_sender(mq) == NULL)
557 mq->mq_detached = true;
558 return SHM_MQ_DETACHED;
560 mqh->mqh_counterparty_attached = true;
563 /* Consume any zero-copy data from previous receive operation. */
564 if (mqh->mqh_consume_pending > 0)
566 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
567 mqh->mqh_consume_pending = 0;
570 /* Try to read, or finish reading, the length word from the buffer. */
571 while (!mqh->mqh_length_word_complete)
573 /* Try to receive the message length word. */
574 Assert(mqh->mqh_partial_bytes < sizeof(Size));
575 res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
576 nowait, &rb, &rawdata);
577 if (res != SHM_MQ_SUCCESS)
581 * Hopefully, we'll receive the entire message length word at once.
582 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
585 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
589 nbytes = *(Size *) rawdata;
591 /* If we've already got the whole message, we're done. */
592 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
596 * Technically, we could consume the message length
597 * information at this point, but the extra write to shared
598 * memory wouldn't be free and in most cases we would reap no
601 mqh->mqh_consume_pending = needed;
603 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
604 return SHM_MQ_SUCCESS;
608 * We don't have the whole message, but we at least have the whole
611 mqh->mqh_expected_bytes = nbytes;
612 mqh->mqh_length_word_complete = true;
613 shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
614 rb -= MAXALIGN(sizeof(Size));
620 /* Can't be split unless bigger than required alignment. */
621 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
623 /* Message word is split; need buffer to reassemble. */
624 if (mqh->mqh_buffer == NULL)
626 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
627 MQH_INITIAL_BUFSIZE);
628 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
630 Assert(mqh->mqh_buflen >= sizeof(Size));
632 /* Copy and consume partial length word. */
633 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
634 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
637 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
639 mqh->mqh_partial_bytes += lengthbytes;
640 shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
643 /* If we now have the whole word, we're ready to read payload. */
644 if (mqh->mqh_partial_bytes >= sizeof(Size))
646 Assert(mqh->mqh_partial_bytes == sizeof(Size));
647 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
648 mqh->mqh_length_word_complete = true;
649 mqh->mqh_partial_bytes = 0;
653 nbytes = mqh->mqh_expected_bytes;
655 if (mqh->mqh_partial_bytes == 0)
658 * Try to obtain the whole message in a single chunk. If this works,
659 * we need not copy the data and can return a pointer directly into
662 res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
663 if (res != SHM_MQ_SUCCESS)
667 mqh->mqh_length_word_complete = false;
668 mqh->mqh_consume_pending = MAXALIGN(nbytes);
671 return SHM_MQ_SUCCESS;
675 * The message has wrapped the buffer. We'll need to copy it in order
676 * to return it to the client in one chunk. First, make sure we have
677 * a large enough buffer available.
679 if (mqh->mqh_buflen < nbytes)
681 Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
683 while (newbuflen < nbytes)
686 if (mqh->mqh_buffer != NULL)
688 pfree(mqh->mqh_buffer);
689 mqh->mqh_buffer = NULL;
692 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
693 mqh->mqh_buflen = newbuflen;
697 /* Loop until we've copied the entire message. */
702 /* Copy as much as we can. */
703 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
704 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
705 mqh->mqh_partial_bytes += rb;
708 * Update count of bytes read, with alignment padding. Note that this
709 * will never actually insert any padding except at the end of a
710 * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
711 * and each read and write is as well.
713 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
714 shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
716 /* If we got all the data, exit the loop. */
717 if (mqh->mqh_partial_bytes >= nbytes)
720 /* Wait for some more data. */
721 still_needed = nbytes - mqh->mqh_partial_bytes;
722 res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
723 if (res != SHM_MQ_SUCCESS)
725 if (rb > still_needed)
729 /* Return the complete message, and reset for next message. */
731 *datap = mqh->mqh_buffer;
732 mqh->mqh_length_word_complete = false;
733 mqh->mqh_partial_bytes = 0;
734 return SHM_MQ_SUCCESS;
738 * Wait for the other process that's supposed to use this queue to attach
741 * The return value is SHM_MQ_DETACHED if the worker has already detached or
742 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
743 * Note that we will only be able to detect that the worker has died before
744 * attaching if a background worker handle was passed to shm_mq_attach().
747 shm_mq_wait_for_attach(shm_mq_handle *mqh)
749 shm_mq *mq = mqh->mqh_queue;
752 if (shm_mq_get_receiver(mq) == MyProc)
753 victim = &mq->mq_sender;
756 Assert(shm_mq_get_sender(mq) == MyProc);
757 victim = &mq->mq_receiver;
760 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
761 return SHM_MQ_SUCCESS;
763 return SHM_MQ_DETACHED;
767 * Detach a shared message queue.
769 * The purpose of this function is to make sure that the process
770 * with which we're communicating doesn't block forever waiting for us to
771 * fill or drain the queue once we've lost interest. Whem the sender
772 * detaches, the receiver can read any messages remaining in the queue;
773 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
774 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
777 shm_mq_detach(shm_mq *mq)
779 volatile shm_mq *vmq = mq;
782 SpinLockAcquire(&mq->mq_mutex);
783 if (vmq->mq_sender == MyProc)
784 victim = vmq->mq_receiver;
787 Assert(vmq->mq_receiver == MyProc);
788 victim = vmq->mq_sender;
790 vmq->mq_detached = true;
791 SpinLockRelease(&mq->mq_mutex);
794 SetLatch(&victim->procLatch);
798 * Get the shm_mq from handle.
801 shm_mq_get_queue(shm_mq_handle *mqh)
803 return mqh->mqh_queue;
807 * Write bytes into a shared message queue.
810 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
811 bool nowait, Size *bytes_written)
813 shm_mq *mq = mqh->mqh_queue;
816 Size ringsize = mq->mq_ring_size;
819 while (sent < nbytes)
824 /* Compute number of ring buffer bytes used and available. */
825 rb = shm_mq_get_bytes_read(mq, &detached);
826 Assert(mq->mq_bytes_written >= rb);
827 used = mq->mq_bytes_written - rb;
828 Assert(used <= ringsize);
829 available = Min(ringsize - used, nbytes - sent);
831 /* Bail out if the queue has been detached. */
834 *bytes_written = sent;
835 return SHM_MQ_DETACHED;
838 if (available == 0 && !mqh->mqh_counterparty_attached)
841 * The queue is full, so if the receiver isn't yet known to be
842 * attached, we must wait for that to happen.
846 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
848 *bytes_written = sent;
849 return SHM_MQ_DETACHED;
851 if (shm_mq_get_receiver(mq) == NULL)
853 *bytes_written = sent;
854 return SHM_MQ_WOULD_BLOCK;
857 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
860 mq->mq_detached = true;
861 *bytes_written = sent;
862 return SHM_MQ_DETACHED;
864 mqh->mqh_counterparty_attached = true;
867 * The receiver may have read some data after attaching, so we
868 * must not wait without rechecking the queue state.
871 else if (available == 0)
875 /* Let the receiver know that we need them to read some data. */
876 res = shm_mq_notify_receiver(mq);
877 if (res != SHM_MQ_SUCCESS)
879 *bytes_written = sent;
883 /* Skip manipulation of our latch if nowait = true. */
886 *bytes_written = sent;
887 return SHM_MQ_WOULD_BLOCK;
891 * Wait for our latch to be set. It might already be set for some
892 * unrelated reason, but that'll just result in one extra trip
893 * through the loop. It's worth it to avoid resetting the latch
894 * at top of loop, because setting an already-set latch is much
895 * cheaper than setting one that has been reset.
897 WaitLatch(MyLatch, WL_LATCH_SET, 0);
899 /* An interrupt may have occurred while we were waiting. */
900 CHECK_FOR_INTERRUPTS();
902 /* Reset the latch so we don't spin. */
907 Size offset = mq->mq_bytes_written % (uint64) ringsize;
908 Size sendnow = Min(available, ringsize - offset);
910 /* Write as much data as we can via a single memcpy(). */
911 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
912 (char *) data + sent, sendnow);
916 * Update count of bytes written, with alignment padding. Note
917 * that this will never actually insert any padding except at the
918 * end of a run of bytes, because the buffer size is a multiple of
919 * MAXIMUM_ALIGNOF, and each read is as well.
921 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
922 shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
925 * For efficiency, we don't set the reader's latch here. We'll do
926 * that only when the buffer fills up or after writing an entire
932 *bytes_written = sent;
933 return SHM_MQ_SUCCESS;
937 * Wait until at least *nbytesp bytes are available to be read from the
938 * shared message queue, or until the buffer wraps around. If the queue is
939 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
940 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
941 * to the location at which data bytes can be read, *nbytesp is set to the
942 * number of bytes which can be read at that address, and the return value
946 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
947 Size *nbytesp, void **datap)
949 Size ringsize = mq->mq_ring_size;
958 /* Get bytes written, so we can compute what's available to read. */
959 written = shm_mq_get_bytes_written(mq, &detached);
960 used = written - mq->mq_bytes_read;
961 Assert(used <= ringsize);
962 offset = mq->mq_bytes_read % (uint64) ringsize;
964 /* If we have enough data or buffer has wrapped, we're done. */
965 if (used >= bytes_needed || offset + used >= ringsize)
967 *nbytesp = Min(used, ringsize - offset);
968 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
969 return SHM_MQ_SUCCESS;
973 * Fall out before waiting if the queue has been detached.
975 * Note that we don't check for this until *after* considering whether
976 * the data already available is enough, since the receiver can finish
977 * receiving a message stored in the buffer even after the sender has
981 return SHM_MQ_DETACHED;
983 /* Skip manipulation of our latch if nowait = true. */
985 return SHM_MQ_WOULD_BLOCK;
988 * Wait for our latch to be set. It might already be set for some
989 * unrelated reason, but that'll just result in one extra trip through
990 * the loop. It's worth it to avoid resetting the latch at top of
991 * loop, because setting an already-set latch is much cheaper than
992 * setting one that has been reset.
994 WaitLatch(MyLatch, WL_LATCH_SET, 0);
996 /* An interrupt may have occurred while we were waiting. */
997 CHECK_FOR_INTERRUPTS();
999 /* Reset the latch so we don't spin. */
1000 ResetLatch(MyLatch);
1005 * Test whether a counterparty who may not even be alive yet is definitely gone.
1008 shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
1013 /* Acquire the lock just long enough to check the pointer. */
1014 SpinLockAcquire(&mq->mq_mutex);
1015 detached = mq->mq_detached;
1016 SpinLockRelease(&mq->mq_mutex);
1018 /* If the queue has been detached, counterparty is definitely gone. */
1022 /* If there's a handle, check worker status. */
1025 BgwHandleStatus status;
1027 /* Check for unexpected worker death. */
1028 status = GetBackgroundWorkerPid(handle, &pid);
1029 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1031 /* Mark it detached, just to make it official. */
1032 SpinLockAcquire(&mq->mq_mutex);
1033 mq->mq_detached = true;
1034 SpinLockRelease(&mq->mq_mutex);
1039 /* Counterparty is not definitively gone. */
1044 * This is used when a process is waiting for its counterpart to attach to the
1045 * queue. We exit when the other process attaches as expected, or, if
1046 * handle != NULL, when the referenced background process or the postmaster
1047 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1048 * potentially get stuck here forever waiting for a process that may never
1049 * start. We do check for interrupts, though.
1051 * ptr is a pointer to the memory address that we're expecting to become
1052 * non-NULL when our counterpart attaches to the queue.
1055 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
1056 BackgroundWorkerHandle *handle)
1058 bool result = false;
1062 BgwHandleStatus status;
1066 /* Acquire the lock just long enough to check the pointer. */
1067 SpinLockAcquire(&mq->mq_mutex);
1068 detached = mq->mq_detached;
1069 result = (*ptr != NULL);
1070 SpinLockRelease(&mq->mq_mutex);
1072 /* Fail if detached; else succeed if initialized. */
1083 /* Check for unexpected worker death. */
1084 status = GetBackgroundWorkerPid(handle, &pid);
1085 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1092 /* Wait to be signalled. */
1093 WaitLatch(MyLatch, WL_LATCH_SET, 0);
1095 /* An interrupt may have occurred while we were waiting. */
1096 CHECK_FOR_INTERRUPTS();
1098 /* Reset the latch so we don't spin. */
1099 ResetLatch(MyLatch);
1106 * Get the number of bytes read. The receiver need not use this to access
1107 * the count of bytes read, but the sender must.
1110 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1114 SpinLockAcquire(&mq->mq_mutex);
1115 v = mq->mq_bytes_read;
1116 *detached = mq->mq_detached;
1117 SpinLockRelease(&mq->mq_mutex);
1123 * Increment the number of bytes read.
1126 shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
1130 SpinLockAcquire(&mq->mq_mutex);
1131 mq->mq_bytes_read += n;
1132 sender = mq->mq_sender;
1133 SpinLockRelease(&mq->mq_mutex);
1135 /* We shouldn't have any bytes to read without a sender. */
1136 Assert(sender != NULL);
1137 SetLatch(&sender->procLatch);
1141 * Get the number of bytes written. The sender need not use this to access
1142 * the count of bytes written, but the receiver must.
1145 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1149 SpinLockAcquire(&mq->mq_mutex);
1150 v = mq->mq_bytes_written;
1151 *detached = mq->mq_detached;
1152 SpinLockRelease(&mq->mq_mutex);
1158 * Increment the number of bytes written.
1161 shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
1163 SpinLockAcquire(&mq->mq_mutex);
1164 mq->mq_bytes_written += n;
1165 SpinLockRelease(&mq->mq_mutex);
1169 * Set sender's latch, unless queue is detached.
1171 static shm_mq_result
1172 shm_mq_notify_receiver(volatile shm_mq *mq)
1177 SpinLockAcquire(&mq->mq_mutex);
1178 detached = mq->mq_detached;
1179 receiver = mq->mq_receiver;
1180 SpinLockRelease(&mq->mq_mutex);
1183 return SHM_MQ_DETACHED;
1185 SetLatch(&receiver->procLatch);
1186 return SHM_MQ_SUCCESS;
1189 /* Shim for on_dsm_callback. */
1191 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1193 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);