+/*
+ * Read all pending notifications from the queue, and deliver appropriate
+ * ones to my frontend. Stop when we reach queue head or an uncommitted
+ * notification.
+ */
+static void
+asyncQueueReadAllNotifications(void)
+{
+ QueuePosition pos;
+ QueuePosition oldpos;
+ QueuePosition head;
+ bool advanceTail;
+ /* page_buffer must be adequately aligned, so use a union */
+ union {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } page_buffer;
+
+ /* Fetch current state */
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ /* Assert checks that we have a valid state entry */
+ Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
+ pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ head = QUEUE_HEAD;
+ LWLockRelease(AsyncQueueLock);
+
+ if (QUEUE_POS_EQUAL(pos, head))
+ {
+ /* Nothing to do, we have read all notifications already. */
+ return;
+ }
+
+ /*----------
+ * Note that we deliver everything that we see in the queue and that
+ * matches our _current_ listening state.
+ * Especially we do not take into account different commit times.
+ * Consider the following example:
+ *
+ * Backend 1: Backend 2:
+ *
+ * transaction starts
+ * NOTIFY foo;
+ * commit starts
+ * transaction starts
+ * LISTEN foo;
+ * commit starts
+ * commit to clog
+ * commit to clog
+ *
+ * It could happen that backend 2 sees the notification from backend 1 in
+ * the queue. Even though the notifying transaction committed before
+ * the listening transaction, we still deliver the notification.
+ *
+ * The idea is that an additional notification does not do any harm, we
+ * just need to make sure that we do not miss a notification.
+ *
+ * It is possible that we fail while trying to send a message to our
+ * frontend (for example, because of encoding conversion failure).
+ * If that happens it is critical that we not try to send the same
+ * message over and over again. Therefore, we place a PG_TRY block
+ * here that will forcibly advance our backend position before we lose
+ * control to an error. (We could alternatively retake AsyncQueueLock
+ * and move the position before handling each individual message, but
+ * that seems like too much lock traffic.)
+ *----------
+ */
+ PG_TRY();
+ {
+ bool reachedStop;
+
+ do
+ {
+ int curpage = QUEUE_POS_PAGE(pos);
+ int curoffset = QUEUE_POS_OFFSET(pos);
+ int slotno;
+ int copysize;
+
+ /*
+ * We copy the data from SLRU into a local buffer, so as to avoid
+ * holding the AsyncCtlLock while we are examining the entries and
+ * possibly transmitting them to our frontend. Copy only the part
+ * of the page we will actually inspect.
+ */
+ slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
+ InvalidTransactionId);
+ if (curpage == QUEUE_POS_PAGE(head))
+ {
+ /* we only want to read as far as head */
+ copysize = QUEUE_POS_OFFSET(head) - curoffset;
+ if (copysize < 0)
+ copysize = 0; /* just for safety */
+ }
+ else
+ {
+ /* fetch all the rest of the page */
+ copysize = QUEUE_PAGESIZE - curoffset;
+ }
+ memcpy(page_buffer.buf + curoffset,
+ AsyncCtl->shared->page_buffer[slotno] + curoffset,
+ copysize);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(AsyncCtlLock);
+
+ /*
+ * Process messages up to the stop position, end of page, or an
+ * uncommitted message.
+ *
+ * Our stop position is what we found to be the head's position
+ * when we entered this function. It might have changed
+ * already. But if it has, we will receive (or have already
+ * received and queued) another signal and come here again.
+ *
+ * We are not holding AsyncQueueLock here! The queue can only
+ * extend beyond the head pointer (see above) and we leave our
+ * backend's pointer where it is so nobody will truncate or
+ * rewrite pages under us. Especially we don't want to hold a lock
+ * while sending the notifications to the frontend.
+ */
+ reachedStop = asyncQueueProcessPageEntries(&pos, head,
+ page_buffer.buf);
+ } while (!reachedStop);
+ }
+ PG_CATCH();
+ {
+ /* Update shared state */
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ QUEUE_BACKEND_POS(MyBackendId) = pos;
+ advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+ LWLockRelease(AsyncQueueLock);
+
+ /* If we were the laziest backend, try to advance the tail pointer */
+ if (advanceTail)
+ asyncQueueAdvanceTail();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ /* Update shared state */
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ QUEUE_BACKEND_POS(MyBackendId) = pos;
+ advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+ LWLockRelease(AsyncQueueLock);
+
+ /* If we were the laziest backend, try to advance the tail pointer */
+ if (advanceTail)
+ asyncQueueAdvanceTail();
+}
+
+/*
+ * Fetch notifications from the shared queue, beginning at position current,
+ * and deliver relevant ones to my frontend.
+ *
+ * The current page must have been fetched into page_buffer from shared
+ * memory. (We could access the page right in shared memory, but that
+ * would imply holding the AsyncCtlLock throughout this routine.)
+ *
+ * We stop if we reach the "stop" position, or reach a notification from an
+ * uncommitted transaction, or reach the end of the page.
+ *
+ * The function returns true once we have reached the stop position or an
+ * uncommitted notification, and false if we have finished with the page.
+ * In other words: once it returns true there is no need to look further.
+ */
+static bool
+asyncQueueProcessPageEntries(QueuePosition *current,
+ QueuePosition stop,
+ char *page_buffer)
+{
+ bool reachedStop = false;
+ bool reachedEndOfPage;
+ AsyncQueueEntry *qe;
+
+ do
+ {
+ if (QUEUE_POS_EQUAL(*current, stop))
+ break;
+
+ qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current));
+
+ /*
+ * Advance *current over this message, possibly to the next page.
+ * As noted in the comments for asyncQueueReadAllNotifications, we
+ * must do this before possibly failing while processing the message.
+ */
+ reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+
+ /* Ignore messages destined for other databases */
+ if (qe->dboid == MyDatabaseId)
+ {
+ if (TransactionIdDidCommit(qe->xid))
+ {
+ /* qe->data is the null-terminated channel name */
+ char *channel = qe->data;
+
+ if (IsListeningOn(channel))
+ {
+ /* payload follows channel name */
+ char *payload = qe->data + strlen(channel) + 1;
+
+ NotifyMyFrontEnd(channel, payload, qe->srcPid);
+ }
+ }
+ else if (TransactionIdDidAbort(qe->xid))
+ {
+ /*
+ * If the source transaction aborted, we just ignore its
+ * notifications.
+ */
+ }
+ else
+ {
+ /*
+ * The transaction has neither committed nor aborted so far,
+ * so we can't process its message yet. Break out of the loop.
+ */
+ reachedStop = true;
+ break;
+ }
+ }
+
+ /* Loop back if we're not at end of page */
+ } while (!reachedEndOfPage);
+
+ if (QUEUE_POS_EQUAL(*current, stop))
+ reachedStop = true;
+
+ return reachedStop;
+}
+
+/*
+ * Advance the shared queue tail variable to the minimum of all the
+ * per-backend tail pointers. Truncate pg_notify space if possible.
+ */
+static void
+asyncQueueAdvanceTail(void)
+{
+ QueuePosition min;
+ int i;
+ int oldtailpage;
+ int newtailpage;
+ int boundary;
+
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ min = QUEUE_HEAD;
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ if (QUEUE_BACKEND_PID(i) != InvalidPid)
+ min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ }
+ oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
+ QUEUE_TAIL = min;
+ LWLockRelease(AsyncQueueLock);
+
+ /*
+ * We can truncate something if the global tail advanced across an SLRU
+ * segment boundary.
+ *
+ * XXX it might be better to truncate only once every several segments,
+ * to reduce the number of directory scans.
+ */
+ newtailpage = QUEUE_POS_PAGE(min);
+ boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
+ if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
+ {
+ /*
+ * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ * release the lock again.
+ */
+ SimpleLruTruncate(AsyncCtl, newtailpage);
+ }
+}
+