]> granicus.if.org Git - postgresql/blob - src/backend/storage/ipc/shm_mq.c
5f6226c9bb960c6046d23bd603450ad2839bc77a
[postgresql] / src / backend / storage / ipc / shm_mq.c
1 /*-------------------------------------------------------------------------
2  *
3  * shm_mq.c
4  *        single-reader, single-writer shared memory message queue
5  *
6  * Both the sender and the receiver must have a PGPROC; their respective
7  * process latches are used for synchronization.  Only the sender may send,
8  * and only the receiver may receive.  This is intended to allow a user
9  * backend to communicate with worker backends that it has registered.
10  *
11  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * src/include/storage/shm_mq.h
15  *
16  *-------------------------------------------------------------------------
17  */
18
19 #include "postgres.h"
20
21 #include "miscadmin.h"
22 #include "postmaster/bgworker.h"
23 #include "storage/procsignal.h"
24 #include "storage/shm_mq.h"
25 #include "storage/spin.h"
26
27 /*
28  * This structure represents the actual queue, stored in shared memory.
29  *
30  * Some notes on synchronization:
31  *
32  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
33  * mq_sender and mq_bytes_written can only be changed by the sender.  However,
34  * because most of these fields are 8 bytes and we don't assume that 8 byte
35  * reads and writes are atomic, the spinlock must be taken whenever the field
36  * is updated, and whenever it is read by a process other than the one allowed
37  * to modify it. But the process that is allowed to modify it is also allowed
38  * to read it without the lock.  On architectures where 8-byte writes are
39  * atomic, we could replace these spinlocks with memory barriers, but
40  * testing found no performance benefit, so it seems best to keep things
41  * simple for now.
42  *
43  * mq_detached can be set by either the sender or the receiver, so the mutex
44  * must be held to read or write it.  Memory barriers could be used here as
45  * well, if needed.
46  *
47  * mq_ring_size and mq_ring_offset never change after initialization, and
48  * can therefore be read without the lock.
49  *
50  * Importantly, mq_ring can be safely read and written without a lock.  Were
51  * this not the case, we'd have to hold the spinlock for much longer
52  * intervals, and performance might suffer.  Fortunately, that's not
53  * necessary.  At any given time, the difference between mq_bytes_read and
54  * mq_bytes_written defines the number of bytes within mq_ring that contain
55  * unread data, and mq_bytes_read defines the position where those bytes
56  * begin.  The sender can increase the number of unread bytes at any time,
57  * but only the receiver can give license to overwrite those bytes, by
58  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
59  * the unread bytes it knows to be present without the lock.  Conversely,
60  * the sender can write to the unused portion of the ring buffer without
61  * the lock, because nobody else can be reading or writing those bytes.  The
62  * receiver could be making more bytes unused by incrementing mq_bytes_read,
63  * but that's OK.  Note that it would be unsafe for the receiver to read any
64  * data it's already marked as read, or to write any data; and it would be
65  * unsafe for the sender to reread any data after incrementing
66  * mq_bytes_written, but fortunately there's no need for any of that.
67  */
68 struct shm_mq
69 {
70         slock_t         mq_mutex;
71         PGPROC     *mq_receiver;
72         PGPROC     *mq_sender;
73         uint64          mq_bytes_read;
74         uint64          mq_bytes_written;
75         Size            mq_ring_size;
76         bool            mq_detached;
77         uint8           mq_ring_offset;
78         char            mq_ring[FLEXIBLE_ARRAY_MEMBER];
79 };
80
81 /*
82  * This structure is a backend-private handle for access to a queue.
83  *
84  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
85  * a pointer to the dynamic shared memory segment that contains it.
86  *
87  * If this queue is intended to connect the current process with a background
88  * worker that started it, the user can pass a pointer to the worker handle
89  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
90  * is to allow us to begin sending to or receiving from that queue before the
91  * process we'll be communicating with has even been started.  If it fails
92  * to start, the handle will allow us to notice that and fail cleanly, rather
93  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
94  * simple cases - e.g. where there are just 2 processes communicating; in
95  * more complex scenarios, every process may not have a BackgroundWorkerHandle
96  * available, or may need to watch for the failure of more than one other
97  * process at a time.
98  *
99  * When a message exists as a contiguous chunk of bytes in the queue - that is,
100  * it is smaller than the size of the ring buffer and does not wrap around
101  * the end - we return the message to the caller as a pointer into the buffer.
102  * For messages that are larger or happen to wrap, we reassemble the message
103  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
104  * the buffer, and mqh_buflen is the number of bytes allocated for it.
105  *
106  * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
107  * are used to track the state of non-blocking operations.  When the caller
108  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
109  * are expected to retry the call at a later time with the same argument;
110  * we need to retain enough state to pick up where we left off.
111  * mqh_length_word_complete tracks whether we are done sending or receiving
112  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
113  * the number of bytes read or written for either the length word or the
114  * message itself, and mqh_expected_bytes - which is used only for reads -
115  * tracks the expected total size of the payload.
116  *
117  * mqh_counterparty_attached tracks whether we know the counterparty to have
118  * attached to the queue at some previous point.  This lets us avoid some
119  * mutex acquisitions.
120  *
121  * mqh_context is the memory context in effect at the time we attached to
122  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
123  * we make sure any other allocations we do happen in this context as well,
124  * to avoid nasty surprises.
125  */
126 struct shm_mq_handle
127 {
128         shm_mq     *mqh_queue;
129         dsm_segment *mqh_segment;
130         BackgroundWorkerHandle *mqh_handle;
131         char       *mqh_buffer;
132         Size            mqh_buflen;
133         Size            mqh_consume_pending;
134         Size            mqh_partial_bytes;
135         Size            mqh_expected_bytes;
136         bool            mqh_length_word_complete;
137         bool            mqh_counterparty_attached;
138         MemoryContext mqh_context;
139 };
140
141 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
142                                   const void *data, bool nowait, Size *bytes_written);
143 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
144                                          bool nowait, Size *nbytesp, void **datap);
145 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
146                                                  BackgroundWorkerHandle *handle);
147 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
148                                          BackgroundWorkerHandle *handle);
149 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
150 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
151 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
152 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
153 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
154 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
155
156 /* Minimum queue size is enough for header and at least one chunk of data. */
157 const Size      shm_mq_minimum_size =
158 MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
159
160 #define MQH_INITIAL_BUFSIZE                             8192
161
162 /*
163  * Initialize a new shared message queue.
164  */
165 shm_mq *
166 shm_mq_create(void *address, Size size)
167 {
168         shm_mq     *mq = address;
169         Size            data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
170
171         /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
172         size = MAXALIGN_DOWN(size);
173
174         /* Queue size must be large enough to hold some data. */
175         Assert(size > data_offset);
176
177         /* Initialize queue header. */
178         SpinLockInit(&mq->mq_mutex);
179         mq->mq_receiver = NULL;
180         mq->mq_sender = NULL;
181         mq->mq_bytes_read = 0;
182         mq->mq_bytes_written = 0;
183         mq->mq_ring_size = size - data_offset;
184         mq->mq_detached = false;
185         mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
186
187         return mq;
188 }
189
190 /*
191  * Set the identity of the process that will receive from a shared message
192  * queue.
193  */
194 void
195 shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
196 {
197         volatile shm_mq *vmq = mq;
198         PGPROC     *sender;
199
200         SpinLockAcquire(&mq->mq_mutex);
201         Assert(vmq->mq_receiver == NULL);
202         vmq->mq_receiver = proc;
203         sender = vmq->mq_sender;
204         SpinLockRelease(&mq->mq_mutex);
205
206         if (sender != NULL)
207                 SetLatch(&sender->procLatch);
208 }
209
210 /*
211  * Set the identity of the process that will send to a shared message queue.
212  */
213 void
214 shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
215 {
216         volatile shm_mq *vmq = mq;
217         PGPROC     *receiver;
218
219         SpinLockAcquire(&mq->mq_mutex);
220         Assert(vmq->mq_sender == NULL);
221         vmq->mq_sender = proc;
222         receiver = vmq->mq_receiver;
223         SpinLockRelease(&mq->mq_mutex);
224
225         if (receiver != NULL)
226                 SetLatch(&receiver->procLatch);
227 }
228
229 /*
230  * Get the configured receiver.
231  */
232 PGPROC *
233 shm_mq_get_receiver(shm_mq *mq)
234 {
235         volatile shm_mq *vmq = mq;
236         PGPROC     *receiver;
237
238         SpinLockAcquire(&mq->mq_mutex);
239         receiver = vmq->mq_receiver;
240         SpinLockRelease(&mq->mq_mutex);
241
242         return receiver;
243 }
244
245 /*
246  * Get the configured sender.
247  */
248 PGPROC *
249 shm_mq_get_sender(shm_mq *mq)
250 {
251         volatile shm_mq *vmq = mq;
252         PGPROC     *sender;
253
254         SpinLockAcquire(&mq->mq_mutex);
255         sender = vmq->mq_sender;
256         SpinLockRelease(&mq->mq_mutex);
257
258         return sender;
259 }
260
261 /*
262  * Attach to a shared message queue so we can send or receive messages.
263  *
264  * The memory context in effect at the time this function is called should
265  * be one which will last for at least as long as the message queue itself.
266  * We'll allocate the handle in that context, and future allocations that
267  * are needed to buffer incoming data will happen in that context as well.
268  *
269  * If seg != NULL, the queue will be automatically detached when that dynamic
270  * shared memory segment is detached.
271  *
272  * If handle != NULL, the queue can be read or written even before the
273  * other process has attached.  We'll wait for it to do so if needed.  The
274  * handle must be for a background worker initialized with bgw_notify_pid
275  * equal to our PID.
276  *
277  * shm_mq_detach() should be called when done.  This will free the
278  * shm_mq_handle and mark the queue itself as detached, so that our
279  * counterpart won't get stuck waiting for us to fill or drain the queue
280  * after we've already lost interest.
281  */
282 shm_mq_handle *
283 shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
284 {
285         shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
286
287         Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
288         mqh->mqh_queue = mq;
289         mqh->mqh_segment = seg;
290         mqh->mqh_buffer = NULL;
291         mqh->mqh_handle = handle;
292         mqh->mqh_buflen = 0;
293         mqh->mqh_consume_pending = 0;
294         mqh->mqh_context = CurrentMemoryContext;
295         mqh->mqh_partial_bytes = 0;
296         mqh->mqh_length_word_complete = false;
297         mqh->mqh_counterparty_attached = false;
298
299         if (seg != NULL)
300                 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
301
302         return mqh;
303 }
304
305 /*
306  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
307  * been passed to shm_mq_attach.
308  */
309 void
310 shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
311 {
312         Assert(mqh->mqh_handle == NULL);
313         mqh->mqh_handle = handle;
314 }
315
316 /*
317  * Write a message into a shared message queue.
318  */
319 shm_mq_result
320 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
321 {
322         shm_mq_iovec iov;
323
324         iov.data = data;
325         iov.len = nbytes;
326
327         return shm_mq_sendv(mqh, &iov, 1, nowait);
328 }
329
330 /*
331  * Write a message into a shared message queue, gathered from multiple
332  * addresses.
333  *
334  * When nowait = false, we'll wait on our process latch when the ring buffer
335  * fills up, and then continue writing once the receiver has drained some data.
336  * The process latch is reset after each wait.
337  *
338  * When nowait = true, we do not manipulate the state of the process latch;
339  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
340  * this case, the caller should call this function again, with the same
341  * arguments, each time the process latch is set.  (Once begun, the sending
342  * of a message cannot be aborted except by detaching from the queue; changing
343  * the length or payload will corrupt the queue.)
344  */
345 shm_mq_result
346 shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
347 {
348         shm_mq_result res;
349         shm_mq     *mq = mqh->mqh_queue;
350         Size            nbytes = 0;
351         Size            bytes_written;
352         int                     i;
353         int                     which_iov = 0;
354         Size            offset;
355
356         Assert(mq->mq_sender == MyProc);
357
358         /* Compute total size of write. */
359         for (i = 0; i < iovcnt; ++i)
360                 nbytes += iov[i].len;
361
362         /* Try to write, or finish writing, the length word into the buffer. */
363         while (!mqh->mqh_length_word_complete)
364         {
365                 Assert(mqh->mqh_partial_bytes < sizeof(Size));
366                 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
367                                                                 ((char *) &nbytes) +mqh->mqh_partial_bytes,
368                                                                 nowait, &bytes_written);
369
370                 if (res == SHM_MQ_DETACHED)
371                 {
372                         /* Reset state in case caller tries to send another message. */
373                         mqh->mqh_partial_bytes = 0;
374                         mqh->mqh_length_word_complete = false;
375                         return res;
376                 }
377                 mqh->mqh_partial_bytes += bytes_written;
378
379                 if (mqh->mqh_partial_bytes >= sizeof(Size))
380                 {
381                         Assert(mqh->mqh_partial_bytes == sizeof(Size));
382
383                         mqh->mqh_partial_bytes = 0;
384                         mqh->mqh_length_word_complete = true;
385                 }
386
387                 if (res != SHM_MQ_SUCCESS)
388                         return res;
389
390                 /* Length word can't be split unless bigger than required alignment. */
391                 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
392         }
393
394         /* Write the actual data bytes into the buffer. */
395         Assert(mqh->mqh_partial_bytes <= nbytes);
396         offset = mqh->mqh_partial_bytes;
397         do
398         {
399                 Size            chunksize;
400
401                 /* Figure out which bytes need to be sent next. */
402                 if (offset >= iov[which_iov].len)
403                 {
404                         offset -= iov[which_iov].len;
405                         ++which_iov;
406                         if (which_iov >= iovcnt)
407                                 break;
408                         continue;
409                 }
410
411                 /*
412                  * We want to avoid copying the data if at all possible, but every
413                  * chunk of bytes we write into the queue has to be MAXALIGN'd, except
414                  * the last.  Thus, if a chunk other than the last one ends on a
415                  * non-MAXALIGN'd boundary, we have to combine the tail end of its
416                  * data with data from one or more following chunks until we either
417                  * reach the last chunk or accumulate a number of bytes which is
418                  * MAXALIGN'd.
419                  */
420                 if (which_iov + 1 < iovcnt &&
421                         offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
422                 {
423                         char            tmpbuf[MAXIMUM_ALIGNOF];
424                         int                     j = 0;
425
426                         for (;;)
427                         {
428                                 if (offset < iov[which_iov].len)
429                                 {
430                                         tmpbuf[j] = iov[which_iov].data[offset];
431                                         j++;
432                                         offset++;
433                                         if (j == MAXIMUM_ALIGNOF)
434                                                 break;
435                                 }
436                                 else
437                                 {
438                                         offset -= iov[which_iov].len;
439                                         which_iov++;
440                                         if (which_iov >= iovcnt)
441                                                 break;
442                                 }
443                         }
444
445                         res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
446
447                         if (res == SHM_MQ_DETACHED)
448                         {
449                                 /* Reset state in case caller tries to send another message. */
450                                 mqh->mqh_partial_bytes = 0;
451                                 mqh->mqh_length_word_complete = false;
452                                 return res;
453                         }
454
455                         mqh->mqh_partial_bytes += bytes_written;
456                         if (res != SHM_MQ_SUCCESS)
457                                 return res;
458                         continue;
459                 }
460
461                 /*
462                  * If this is the last chunk, we can write all the data, even if it
463                  * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
464                  * MAXALIGN_DOWN the write size.
465                  */
466                 chunksize = iov[which_iov].len - offset;
467                 if (which_iov + 1 < iovcnt)
468                         chunksize = MAXALIGN_DOWN(chunksize);
469                 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
470                                                                 nowait, &bytes_written);
471
472                 if (res == SHM_MQ_DETACHED)
473                 {
474                         /* Reset state in case caller tries to send another message. */
475                         mqh->mqh_length_word_complete = false;
476                         mqh->mqh_partial_bytes = 0;
477                         return res;
478                 }
479
480                 mqh->mqh_partial_bytes += bytes_written;
481                 offset += bytes_written;
482                 if (res != SHM_MQ_SUCCESS)
483                         return res;
484         } while (mqh->mqh_partial_bytes < nbytes);
485
486         /* Reset for next message. */
487         mqh->mqh_partial_bytes = 0;
488         mqh->mqh_length_word_complete = false;
489
490         /* Notify receiver of the newly-written data, and return. */
491         return shm_mq_notify_receiver(mq);
492 }
493
494 /*
495  * Receive a message from a shared message queue.
496  *
497  * We set *nbytes to the message length and *data to point to the message
498  * payload.  If the entire message exists in the queue as a single,
499  * contiguous chunk, *data will point directly into shared memory; otherwise,
500  * it will point to a temporary buffer.  This mostly avoids data copying in
501  * the hoped-for case where messages are short compared to the buffer size,
502  * while still allowing longer messages.  In either case, the return value
503  * remains valid until the next receive operation is perfomed on the queue.
504  *
505  * When nowait = false, we'll wait on our process latch when the ring buffer
506  * is empty and we have not yet received a full message.  The sender will
507  * set our process latch after more data has been written, and we'll resume
508  * processing.  Each call will therefore return a complete message
509  * (unless the sender detaches the queue).
510  *
511  * When nowait = true, we do not manipulate the state of the process latch;
512  * instead, whenever the buffer is empty and we need to read from it, we
513  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
514  * function again after the process latch has been set.
515  */
516 shm_mq_result
517 shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
518 {
519         shm_mq     *mq = mqh->mqh_queue;
520         shm_mq_result res;
521         Size            rb = 0;
522         Size            nbytes;
523         void       *rawdata;
524
525         Assert(mq->mq_receiver == MyProc);
526
527         /* We can't receive data until the sender has attached. */
528         if (!mqh->mqh_counterparty_attached)
529         {
530                 if (nowait)
531                 {
532                         int                     counterparty_gone;
533
534                         /*
535                          * We shouldn't return at this point at all unless the sender
536                          * hasn't attached yet.  However, the correct return value depends
537                          * on whether the sender is still attached.  If we first test
538                          * whether the sender has ever attached and then test whether the
539                          * sender has detached, there's a race condition: a sender that
540                          * attaches and detaches very quickly might fool us into thinking
541                          * the sender never attached at all.  So, test whether our
542                          * counterparty is definitively gone first, and only afterwards
543                          * check whether the sender ever attached in the first place.
544                          */
545                         counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
546                         if (shm_mq_get_sender(mq) == NULL)
547                         {
548                                 if (counterparty_gone)
549                                         return SHM_MQ_DETACHED;
550                                 else
551                                         return SHM_MQ_WOULD_BLOCK;
552                         }
553                 }
554                 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
555                                  && shm_mq_get_sender(mq) == NULL)
556                 {
557                         mq->mq_detached = true;
558                         return SHM_MQ_DETACHED;
559                 }
560                 mqh->mqh_counterparty_attached = true;
561         }
562
563         /* Consume any zero-copy data from previous receive operation. */
564         if (mqh->mqh_consume_pending > 0)
565         {
566                 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
567                 mqh->mqh_consume_pending = 0;
568         }
569
570         /* Try to read, or finish reading, the length word from the buffer. */
571         while (!mqh->mqh_length_word_complete)
572         {
573                 /* Try to receive the message length word. */
574                 Assert(mqh->mqh_partial_bytes < sizeof(Size));
575                 res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
576                                                                    nowait, &rb, &rawdata);
577                 if (res != SHM_MQ_SUCCESS)
578                         return res;
579
580                 /*
581                  * Hopefully, we'll receive the entire message length word at once.
582                  * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
583                  * multiple reads.
584                  */
585                 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
586                 {
587                         Size            needed;
588
589                         nbytes = *(Size *) rawdata;
590
591                         /* If we've already got the whole message, we're done. */
592                         needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
593                         if (rb >= needed)
594                         {
595                                 /*
596                                  * Technically, we could consume the message length
597                                  * information at this point, but the extra write to shared
598                                  * memory wouldn't be free and in most cases we would reap no
599                                  * benefit.
600                                  */
601                                 mqh->mqh_consume_pending = needed;
602                                 *nbytesp = nbytes;
603                                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
604                                 return SHM_MQ_SUCCESS;
605                         }
606
607                         /*
608                          * We don't have the whole message, but we at least have the whole
609                          * length word.
610                          */
611                         mqh->mqh_expected_bytes = nbytes;
612                         mqh->mqh_length_word_complete = true;
613                         shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
614                         rb -= MAXALIGN(sizeof(Size));
615                 }
616                 else
617                 {
618                         Size            lengthbytes;
619
620                         /* Can't be split unless bigger than required alignment. */
621                         Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
622
623                         /* Message word is split; need buffer to reassemble. */
624                         if (mqh->mqh_buffer == NULL)
625                         {
626                                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
627                                                                                                          MQH_INITIAL_BUFSIZE);
628                                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
629                         }
630                         Assert(mqh->mqh_buflen >= sizeof(Size));
631
632                         /* Copy and consume partial length word. */
633                         if (mqh->mqh_partial_bytes + rb > sizeof(Size))
634                                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
635                         else
636                                 lengthbytes = rb;
637                         memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
638                                    lengthbytes);
639                         mqh->mqh_partial_bytes += lengthbytes;
640                         shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
641                         rb -= lengthbytes;
642
643                         /* If we now have the whole word, we're ready to read payload. */
644                         if (mqh->mqh_partial_bytes >= sizeof(Size))
645                         {
646                                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
647                                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
648                                 mqh->mqh_length_word_complete = true;
649                                 mqh->mqh_partial_bytes = 0;
650                         }
651                 }
652         }
653         nbytes = mqh->mqh_expected_bytes;
654
655         if (mqh->mqh_partial_bytes == 0)
656         {
657                 /*
658                  * Try to obtain the whole message in a single chunk.  If this works,
659                  * we need not copy the data and can return a pointer directly into
660                  * shared memory.
661                  */
662                 res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
663                 if (res != SHM_MQ_SUCCESS)
664                         return res;
665                 if (rb >= nbytes)
666                 {
667                         mqh->mqh_length_word_complete = false;
668                         mqh->mqh_consume_pending = MAXALIGN(nbytes);
669                         *nbytesp = nbytes;
670                         *datap = rawdata;
671                         return SHM_MQ_SUCCESS;
672                 }
673
674                 /*
675                  * The message has wrapped the buffer.  We'll need to copy it in order
676                  * to return it to the client in one chunk.  First, make sure we have
677                  * a large enough buffer available.
678                  */
679                 if (mqh->mqh_buflen < nbytes)
680                 {
681                         Size            newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
682
683                         while (newbuflen < nbytes)
684                                 newbuflen *= 2;
685
686                         if (mqh->mqh_buffer != NULL)
687                         {
688                                 pfree(mqh->mqh_buffer);
689                                 mqh->mqh_buffer = NULL;
690                                 mqh->mqh_buflen = 0;
691                         }
692                         mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
693                         mqh->mqh_buflen = newbuflen;
694                 }
695         }
696
697         /* Loop until we've copied the entire message. */
698         for (;;)
699         {
700                 Size            still_needed;
701
702                 /* Copy as much as we can. */
703                 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
704                 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
705                 mqh->mqh_partial_bytes += rb;
706
707                 /*
708                  * Update count of bytes read, with alignment padding.  Note that this
709                  * will never actually insert any padding except at the end of a
710                  * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
711                  * and each read and write is as well.
712                  */
713                 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
714                 shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
715
716                 /* If we got all the data, exit the loop. */
717                 if (mqh->mqh_partial_bytes >= nbytes)
718                         break;
719
720                 /* Wait for some more data. */
721                 still_needed = nbytes - mqh->mqh_partial_bytes;
722                 res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
723                 if (res != SHM_MQ_SUCCESS)
724                         return res;
725                 if (rb > still_needed)
726                         rb = still_needed;
727         }
728
729         /* Return the complete message, and reset for next message. */
730         *nbytesp = nbytes;
731         *datap = mqh->mqh_buffer;
732         mqh->mqh_length_word_complete = false;
733         mqh->mqh_partial_bytes = 0;
734         return SHM_MQ_SUCCESS;
735 }
736
737 /*
738  * Wait for the other process that's supposed to use this queue to attach
739  * to it.
740  *
741  * The return value is SHM_MQ_DETACHED if the worker has already detached or
742  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
743  * Note that we will only be able to detect that the worker has died before
744  * attaching if a background worker handle was passed to shm_mq_attach().
745  */
746 shm_mq_result
747 shm_mq_wait_for_attach(shm_mq_handle *mqh)
748 {
749         shm_mq     *mq = mqh->mqh_queue;
750         PGPROC    **victim;
751
752         if (shm_mq_get_receiver(mq) == MyProc)
753                 victim = &mq->mq_sender;
754         else
755         {
756                 Assert(shm_mq_get_sender(mq) == MyProc);
757                 victim = &mq->mq_receiver;
758         }
759
760         if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
761                 return SHM_MQ_SUCCESS;
762         else
763                 return SHM_MQ_DETACHED;
764 }
765
766 /*
767  * Detach a shared message queue.
768  *
769  * The purpose of this function is to make sure that the process
770  * with which we're communicating doesn't block forever waiting for us to
771  * fill or drain the queue once we've lost interest.  Whem the sender
772  * detaches, the receiver can read any messages remaining in the queue;
773  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
774  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
775  */
776 void
777 shm_mq_detach(shm_mq *mq)
778 {
779         volatile shm_mq *vmq = mq;
780         PGPROC     *victim;
781
782         SpinLockAcquire(&mq->mq_mutex);
783         if (vmq->mq_sender == MyProc)
784                 victim = vmq->mq_receiver;
785         else
786         {
787                 Assert(vmq->mq_receiver == MyProc);
788                 victim = vmq->mq_sender;
789         }
790         vmq->mq_detached = true;
791         SpinLockRelease(&mq->mq_mutex);
792
793         if (victim != NULL)
794                 SetLatch(&victim->procLatch);
795 }
796
797 /*
798  * Get the shm_mq from handle.
799  */
800 shm_mq *
801 shm_mq_get_queue(shm_mq_handle *mqh)
802 {
803         return mqh->mqh_queue;
804 }
805
806 /*
807  * Write bytes into a shared message queue.
808  */
809 static shm_mq_result
810 shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
811                                   bool nowait, Size *bytes_written)
812 {
813         shm_mq     *mq = mqh->mqh_queue;
814         Size            sent = 0;
815         uint64          used;
816         Size            ringsize = mq->mq_ring_size;
817         Size            available;
818
819         while (sent < nbytes)
820         {
821                 bool            detached;
822                 uint64          rb;
823
824                 /* Compute number of ring buffer bytes used and available. */
825                 rb = shm_mq_get_bytes_read(mq, &detached);
826                 Assert(mq->mq_bytes_written >= rb);
827                 used = mq->mq_bytes_written - rb;
828                 Assert(used <= ringsize);
829                 available = Min(ringsize - used, nbytes - sent);
830
831                 /* Bail out if the queue has been detached. */
832                 if (detached)
833                 {
834                         *bytes_written = sent;
835                         return SHM_MQ_DETACHED;
836                 }
837
838                 if (available == 0 && !mqh->mqh_counterparty_attached)
839                 {
840                         /*
841                          * The queue is full, so if the receiver isn't yet known to be
842                          * attached, we must wait for that to happen.
843                          */
844                         if (nowait)
845                         {
846                                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
847                                 {
848                                         *bytes_written = sent;
849                                         return SHM_MQ_DETACHED;
850                                 }
851                                 if (shm_mq_get_receiver(mq) == NULL)
852                                 {
853                                         *bytes_written = sent;
854                                         return SHM_MQ_WOULD_BLOCK;
855                                 }
856                         }
857                         else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
858                                                                                    mqh->mqh_handle))
859                         {
860                                 mq->mq_detached = true;
861                                 *bytes_written = sent;
862                                 return SHM_MQ_DETACHED;
863                         }
864                         mqh->mqh_counterparty_attached = true;
865
866                         /*
867                          * The receiver may have read some data after attaching, so we
868                          * must not wait without rechecking the queue state.
869                          */
870                 }
871                 else if (available == 0)
872                 {
873                         shm_mq_result res;
874
875                         /* Let the receiver know that we need them to read some data. */
876                         res = shm_mq_notify_receiver(mq);
877                         if (res != SHM_MQ_SUCCESS)
878                         {
879                                 *bytes_written = sent;
880                                 return res;
881                         }
882
883                         /* Skip manipulation of our latch if nowait = true. */
884                         if (nowait)
885                         {
886                                 *bytes_written = sent;
887                                 return SHM_MQ_WOULD_BLOCK;
888                         }
889
890                         /*
891                          * Wait for our latch to be set.  It might already be set for some
892                          * unrelated reason, but that'll just result in one extra trip
893                          * through the loop.  It's worth it to avoid resetting the latch
894                          * at top of loop, because setting an already-set latch is much
895                          * cheaper than setting one that has been reset.
896                          */
897                         WaitLatch(MyLatch, WL_LATCH_SET, 0);
898
899                         /* An interrupt may have occurred while we were waiting. */
900                         CHECK_FOR_INTERRUPTS();
901
902                         /* Reset the latch so we don't spin. */
903                         ResetLatch(MyLatch);
904                 }
905                 else
906                 {
907                         Size            offset = mq->mq_bytes_written % (uint64) ringsize;
908                         Size            sendnow = Min(available, ringsize - offset);
909
910                         /* Write as much data as we can via a single memcpy(). */
911                         memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
912                                    (char *) data + sent, sendnow);
913                         sent += sendnow;
914
915                         /*
916                          * Update count of bytes written, with alignment padding.  Note
917                          * that this will never actually insert any padding except at the
918                          * end of a run of bytes, because the buffer size is a multiple of
919                          * MAXIMUM_ALIGNOF, and each read is as well.
920                          */
921                         Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
922                         shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
923
924                         /*
925                          * For efficiency, we don't set the reader's latch here.  We'll do
926                          * that only when the buffer fills up or after writing an entire
927                          * message.
928                          */
929                 }
930         }
931
932         *bytes_written = sent;
933         return SHM_MQ_SUCCESS;
934 }
935
936 /*
937  * Wait until at least *nbytesp bytes are available to be read from the
938  * shared message queue, or until the buffer wraps around.  If the queue is
939  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
940  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
941  * to the location at which data bytes can be read, *nbytesp is set to the
942  * number of bytes which can be read at that address, and the return value
943  * is SHM_MQ_SUCCESS.
944  */
945 static shm_mq_result
946 shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
947                                          Size *nbytesp, void **datap)
948 {
949         Size            ringsize = mq->mq_ring_size;
950         uint64          used;
951         uint64          written;
952
953         for (;;)
954         {
955                 Size            offset;
956                 bool            detached;
957
958                 /* Get bytes written, so we can compute what's available to read. */
959                 written = shm_mq_get_bytes_written(mq, &detached);
960                 used = written - mq->mq_bytes_read;
961                 Assert(used <= ringsize);
962                 offset = mq->mq_bytes_read % (uint64) ringsize;
963
964                 /* If we have enough data or buffer has wrapped, we're done. */
965                 if (used >= bytes_needed || offset + used >= ringsize)
966                 {
967                         *nbytesp = Min(used, ringsize - offset);
968                         *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
969                         return SHM_MQ_SUCCESS;
970                 }
971
972                 /*
973                  * Fall out before waiting if the queue has been detached.
974                  *
975                  * Note that we don't check for this until *after* considering whether
976                  * the data already available is enough, since the receiver can finish
977                  * receiving a message stored in the buffer even after the sender has
978                  * detached.
979                  */
980                 if (detached)
981                         return SHM_MQ_DETACHED;
982
983                 /* Skip manipulation of our latch if nowait = true. */
984                 if (nowait)
985                         return SHM_MQ_WOULD_BLOCK;
986
987                 /*
988                  * Wait for our latch to be set.  It might already be set for some
989                  * unrelated reason, but that'll just result in one extra trip through
990                  * the loop.  It's worth it to avoid resetting the latch at top of
991                  * loop, because setting an already-set latch is much cheaper than
992                  * setting one that has been reset.
993                  */
994                 WaitLatch(MyLatch, WL_LATCH_SET, 0);
995
996                 /* An interrupt may have occurred while we were waiting. */
997                 CHECK_FOR_INTERRUPTS();
998
999                 /* Reset the latch so we don't spin. */
1000                 ResetLatch(MyLatch);
1001         }
1002 }
1003
1004 /*
1005  * Test whether a counterparty who may not even be alive yet is definitely gone.
1006  */
1007 static bool
1008 shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
1009 {
1010         bool            detached;
1011         pid_t           pid;
1012
1013         /* Acquire the lock just long enough to check the pointer. */
1014         SpinLockAcquire(&mq->mq_mutex);
1015         detached = mq->mq_detached;
1016         SpinLockRelease(&mq->mq_mutex);
1017
1018         /* If the queue has been detached, counterparty is definitely gone. */
1019         if (detached)
1020                 return true;
1021
1022         /* If there's a handle, check worker status. */
1023         if (handle != NULL)
1024         {
1025                 BgwHandleStatus status;
1026
1027                 /* Check for unexpected worker death. */
1028                 status = GetBackgroundWorkerPid(handle, &pid);
1029                 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1030                 {
1031                         /* Mark it detached, just to make it official. */
1032                         SpinLockAcquire(&mq->mq_mutex);
1033                         mq->mq_detached = true;
1034                         SpinLockRelease(&mq->mq_mutex);
1035                         return true;
1036                 }
1037         }
1038
1039         /* Counterparty is not definitively gone. */
1040         return false;
1041 }
1042
1043 /*
1044  * This is used when a process is waiting for its counterpart to attach to the
1045  * queue.  We exit when the other process attaches as expected, or, if
1046  * handle != NULL, when the referenced background process or the postmaster
1047  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
1048  * potentially get stuck here forever waiting for a process that may never
1049  * start.  We do check for interrupts, though.
1050  *
1051  * ptr is a pointer to the memory address that we're expecting to become
1052  * non-NULL when our counterpart attaches to the queue.
1053  */
1054 static bool
1055 shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
1056                                          BackgroundWorkerHandle *handle)
1057 {
1058         bool            result = false;
1059
1060         for (;;)
1061         {
1062                 BgwHandleStatus status;
1063                 pid_t           pid;
1064                 bool            detached;
1065
1066                 /* Acquire the lock just long enough to check the pointer. */
1067                 SpinLockAcquire(&mq->mq_mutex);
1068                 detached = mq->mq_detached;
1069                 result = (*ptr != NULL);
1070                 SpinLockRelease(&mq->mq_mutex);
1071
1072                 /* Fail if detached; else succeed if initialized. */
1073                 if (detached)
1074                 {
1075                         result = false;
1076                         break;
1077                 }
1078                 if (result)
1079                         break;
1080
1081                 if (handle != NULL)
1082                 {
1083                         /* Check for unexpected worker death. */
1084                         status = GetBackgroundWorkerPid(handle, &pid);
1085                         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1086                         {
1087                                 result = false;
1088                                 break;
1089                         }
1090                 }
1091
1092                 /* Wait to be signalled. */
1093                 WaitLatch(MyLatch, WL_LATCH_SET, 0);
1094
1095                 /* An interrupt may have occurred while we were waiting. */
1096                 CHECK_FOR_INTERRUPTS();
1097
1098                 /* Reset the latch so we don't spin. */
1099                 ResetLatch(MyLatch);
1100         }
1101
1102         return result;
1103 }
1104
1105 /*
1106  * Get the number of bytes read.  The receiver need not use this to access
1107  * the count of bytes read, but the sender must.
1108  */
1109 static uint64
1110 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
1111 {
1112         uint64          v;
1113
1114         SpinLockAcquire(&mq->mq_mutex);
1115         v = mq->mq_bytes_read;
1116         *detached = mq->mq_detached;
1117         SpinLockRelease(&mq->mq_mutex);
1118
1119         return v;
1120 }
1121
1122 /*
1123  * Increment the number of bytes read.
1124  */
1125 static void
1126 shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
1127 {
1128         PGPROC     *sender;
1129
1130         SpinLockAcquire(&mq->mq_mutex);
1131         mq->mq_bytes_read += n;
1132         sender = mq->mq_sender;
1133         SpinLockRelease(&mq->mq_mutex);
1134
1135         /* We shouldn't have any bytes to read without a sender. */
1136         Assert(sender != NULL);
1137         SetLatch(&sender->procLatch);
1138 }
1139
1140 /*
1141  * Get the number of bytes written.  The sender need not use this to access
1142  * the count of bytes written, but the receiver must.
1143  */
1144 static uint64
1145 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
1146 {
1147         uint64          v;
1148
1149         SpinLockAcquire(&mq->mq_mutex);
1150         v = mq->mq_bytes_written;
1151         *detached = mq->mq_detached;
1152         SpinLockRelease(&mq->mq_mutex);
1153
1154         return v;
1155 }
1156
1157 /*
1158  * Increment the number of bytes written.
1159  */
1160 static void
1161 shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
1162 {
1163         SpinLockAcquire(&mq->mq_mutex);
1164         mq->mq_bytes_written += n;
1165         SpinLockRelease(&mq->mq_mutex);
1166 }
1167
1168 /*
1169  * Set sender's latch, unless queue is detached.
1170  */
1171 static shm_mq_result
1172 shm_mq_notify_receiver(volatile shm_mq *mq)
1173 {
1174         PGPROC     *receiver;
1175         bool            detached;
1176
1177         SpinLockAcquire(&mq->mq_mutex);
1178         detached = mq->mq_detached;
1179         receiver = mq->mq_receiver;
1180         SpinLockRelease(&mq->mq_mutex);
1181
1182         if (detached)
1183                 return SHM_MQ_DETACHED;
1184         if (receiver)
1185                 SetLatch(&receiver->procLatch);
1186         return SHM_MQ_SUCCESS;
1187 }
1188
1189 /* Shim for on_dsm_callback. */
1190 static void
1191 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1192 {
1193         shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
1194
1195         shm_mq_detach(mq);
1196 }