]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Small improvements in comments in async.c.
[postgresql] / src / backend / commands / async.c
index 9845cf9a4d70d03f5ff7580a141dcea9808d8718..91baede4e363f148e00bae53f9d6bdd4a1d5f4a3 100644 (file)
@@ -3,7 +3,7 @@
  * async.c
  *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
- * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
  *       either, but just process the queue directly.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
- *       can call inbound-notify processing immediately if this backend is idle
- *       (ie, it is waiting for a frontend command and is not within a transaction
- *       block).  Otherwise the handler may only set a flag, which will cause the
- *       processing to occur just before we next go idle.
+ *       sets the process's latch, which triggers the event to be processed
+ *       immediately if this backend is idle (i.e., it is waiting for a frontend
+ *       command and is not within a transaction block. C.f.
+ *       ProcessClientReadInterrupt()).  Otherwise the handler may only set a
+ *       flag, which will cause the processing to occur just before we next go
+ *       idle.
  *
  *       Inbound-notify processing consists of reading all of the notifications
  *       that have arrived since scanning last time. We read every notification
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
 #include "storage/procsignal.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
  *
  * This struct declaration has the maximal length, but in a real queue entry
  * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).     AsyncQueueEntryEmptySize is the minimum possible
+ * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
  * entry size, if both channel and payload strings are empty (but note it
  * doesn't include alignment padding).
  *
@@ -207,8 +211,6 @@ typedef struct QueueBackendStatus
        QueuePosition pos;                      /* backend has read queue up to here */
 } QueueBackendStatus;
 
-#define InvalidPid                             (-1)
-
 /*
  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
  *
@@ -222,6 +224,7 @@ typedef struct QueueBackendStatus
  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
  * of other backends and also change the head and tail pointers.
  *
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need both locks, we always first
  * get AsyncQueueLock and then AsyncCtlLock.
  *
@@ -232,11 +235,11 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
        QueuePosition head;                     /* head points to the next free location */
-       QueuePosition tail;                     /* the global tail is equivalent to the tail
+       QueuePosition tail;                     /* the global tail is equivalent to the pos
                                                                 * of the "slowest" backend */
        TimestampTz lastQueueFillWarn;          /* time of last queue-full msg */
-       QueueBackendStatus backend[1];          /* actually of length MaxBackends+1 */
-       /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+       QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
+       /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
@@ -266,7 +269,7 @@ static SlruCtlData AsyncCtlData;
  *
  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
  * pages, because more than that would confuse slru.c into thinking there
- * was a wraparound condition. With the default BLCKSZ this means there
+ * was a wraparound condition.  With the default BLCKSZ this means there
  * can be up to 8GB of queued-and-not-read data.
  *
  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
@@ -301,7 +304,7 @@ typedef enum
 typedef struct
 {
        ListenActionKind action;
-       char            channel[1];             /* actually, as long as needed */
+       char            channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
 } ListenAction;
 
 static List *pendingActions = NIL;             /* list of ListenAction */
@@ -335,17 +338,13 @@ static List *pendingNotifies = NIL;               /* list of Notifications */
 static List *upperPendingNotifies = NIL;               /* list of upper-xact lists */
 
 /*
- * State for inbound notifications consists of two flags: one saying whether
- * the signal handler is currently allowed to call ProcessIncomingNotify
- * directly, and one saying whether the signal has occurred but the handler
- * was not allowed to call ProcessIncomingNotify at the time.
- *
- * NB: the "volatile" on these declarations is critical!  If your compiler
- * does not grok "volatile", you'd be best advised to compile this file
- * with all optimization turned off.
+ * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * called from inside a signal handler. That just sets the
+ * notifyInterruptPending flag and sets the process
+ * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * actually deal with the interrupt.
  */
-static volatile sig_atomic_t notifyInterruptEnabled = 0;
-static volatile sig_atomic_t notifyInterruptOccurred = 0;
+volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
@@ -370,13 +369,14 @@ static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
-static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
+static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
 static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
-static bool asyncQueueProcessPageEntries(QueuePosition *current,
+static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         QueuePosition stop,
                                                         char *page_buffer);
 static void asyncQueueAdvanceTail(void);
@@ -396,7 +396,7 @@ asyncQueuePagePrecedes(int p, int q)
        int                     diff;
 
        /*
-        * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.      Both inputs should be
+        * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should be
         * in the range 0..QUEUE_MAX_PAGE.
         */
        Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
@@ -419,8 +419,8 @@ AsyncShmemSize(void)
        Size            size;
 
        /* This had better match AsyncShmemInit */
-       size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
-       size = add_size(size, sizeof(AsyncQueueControl));
+       size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+       size = add_size(size, offsetof(AsyncQueueControl, backend));
 
        size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
 
@@ -440,12 +440,11 @@ AsyncShmemInit(void)
        /*
         * Create or attach to the AsyncQueueControl structure.
         *
-        * The used entries in the backend[] array run from 1 to MaxBackends.
-        * sizeof(AsyncQueueControl) already includes space for the unused zero'th
-        * entry, but we need to add on space for the used entries.
+        * The used entries in the backend[] array run from 1 to MaxBackends; the
+        * zero'th entry is unused but must be allocated.
         */
-       size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
-       size = add_size(size, sizeof(AsyncQueueControl));
+       size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+       size = add_size(size, offsetof(AsyncQueueControl, backend));
 
        asyncQueueControl = (AsyncQueueControl *)
                ShmemInitStruct("Async Queue Control", size, &found);
@@ -607,7 +606,8 @@ queue_listen(ListenActionKind action, const char *channel)
        oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
        /* space for terminating null is included in sizeof(ListenAction) */
-       actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
+       actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
+                                                                        strlen(channel) + 1);
        actrec->action = action;
        strcpy(actrec->channel, channel);
 
@@ -827,7 +827,7 @@ PreCommit_Notify(void)
                while (nextNotify != NULL)
                {
                        /*
-                        * Add the pending notifications to the queue.  We acquire and
+                        * Add the pending notifications to the queue.  We acquire and
                         * release AsyncQueueLock once per page, which might be overkill
                         * but it does allow readers to get in while we're doing this.
                         *
@@ -923,7 +923,7 @@ Exec_ListenPreCommit(void)
         */
        if (!unlistenExitRegistered)
        {
-               on_shmem_exit(Async_UnlistenOnExit, 0);
+               before_shmem_exit(Async_UnlistenOnExit, 0);
                unlistenExitRegistered = true;
        }
 
@@ -1043,12 +1043,12 @@ Exec_UnlistenAllCommit(void)
  * The reason that this is not done in AtCommit_Notify is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
  * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
  * Note that we send signals and process the queue even if the transaction
- * eventually aborted. This is because we need to clean out whatever got
+ * eventually aborted.  This is because we need to clean out whatever got
  * added to the queue.
  *
  * NOTE: we are outside of any transaction here.
@@ -1138,7 +1138,7 @@ IsListeningOn(const char *channel)
 
 /*
  * Remove our entry from the listeners array when we are no longer listening
- * on any channel.     NB: must not fail if we're already not listening.
+ * on any channel.  NB: must not fail if we're already not listening.
  */
 static void
 asyncQueueUnregister(void)
@@ -1147,7 +1147,7 @@ asyncQueueUnregister(void)
 
        Assert(listenChannels == NIL);          /* else caller error */
 
-       if (!amRegisteredListener)                      /* nothing to do */
+       if (!amRegisteredListener)      /* nothing to do */
                return;
 
        LWLockAcquire(AsyncQueueLock, LW_SHARED);
@@ -1180,7 +1180,7 @@ asyncQueueIsFull(void)
        /*
         * The queue is full if creating a new head page would create a page that
         * logically precedes the current global tail pointer, ie, the head
-        * pointer would wrap around compared to the tail.      We cannot create such
+        * pointer would wrap around compared to the tail.  We cannot create such
         * a head page for fear of confusing slru.c.  For safety we round the tail
         * pointer back to a segment boundary (compare the truncation logic in
         * asyncQueueAdvanceTail).
@@ -1199,11 +1199,11 @@ asyncQueueIsFull(void)
 
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
- * entry is of length entryLength.     If we jump to a new page the function
+ * entry is of length entryLength.  If we jump to a new page the function
  * returns true, else false.
  */
 static bool
-asyncQueueAdvance(QueuePosition *position, int entryLength)
+asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 {
        int                     pageno = QUEUE_POS_PAGE(*position);
        int                     offset = QUEUE_POS_OFFSET(*position);
@@ -1268,7 +1268,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  * the last byte which simplifies reading the page later.
  *
  * We are passed the list cell containing the next notification to write
- * and return the first still-unwritten cell back.     Eventually we will return
+ * and return the first still-unwritten cell back.  Eventually we will return
  * NULL indicating all is done.
  *
  * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
@@ -1345,7 +1345,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
                         * Page is full, so we're done here, but first fill the next page
                         * with zeroes.  The reason to do this is to ensure that slru.c's
                         * idea of the head page is always the same as ours, which avoids
-                        * boundary problems in SimpleLruTruncate.      The test in
+                        * boundary problems in SimpleLruTruncate.  The test in
                         * asyncQueueIsFull() ensured that there is room to create this
                         * page without overrunning the queue.
                         */
@@ -1364,26 +1364,37 @@ asyncQueueAddEntries(ListCell *nextNotify)
 }
 
 /*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ * SQL function to return the fraction of the notification queue currently
+ * occupied.
+ */
+Datum
+pg_notification_queue_usage(PG_FUNCTION_ARGS)
+{
+       double          usage;
+
+       LWLockAcquire(AsyncQueueLock, LW_SHARED);
+       usage = asyncQueueUsage();
+       LWLockRelease(AsyncQueueLock);
+
+       PG_RETURN_FLOAT8(usage);
+}
+
+/*
+ * Return the fraction of the queue that is currently occupied.
  *
- * Caller must hold exclusive AsyncQueueLock.
+ * The caller must hold AysncQueueLock in (at least) shared mode.
  */
-static void
-asyncQueueFillWarning(void)
+static double
+asyncQueueUsage(void)
 {
        int                     headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
        int                     tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
        int                     occupied;
-       double          fillDegree;
-       TimestampTz t;
 
        occupied = headPage - tailPage;
 
        if (occupied == 0)
-               return;                                 /* fast exit for common case */
+               return (double) 0;              /* fast exit for common case */
 
        if (occupied < 0)
        {
@@ -1391,8 +1402,24 @@ asyncQueueFillWarning(void)
                occupied += QUEUE_MAX_PAGE + 1;
        }
 
-       fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+       return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+}
 
+/*
+ * Check whether the queue is at least half full, and emit a warning if so.
+ *
+ * This is unlikely given the size of the queue, but possible.
+ * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ *
+ * Caller must hold exclusive AsyncQueueLock.
+ */
+static void
+asyncQueueFillWarning(void)
+{
+       double          fillDegree;
+       TimestampTz t;
+
+       fillDegree = asyncQueueUsage();
        if (fillDegree < 0.5)
                return;
 
@@ -1626,158 +1653,45 @@ AtSubAbort_Notify(void)
 /*
  * HandleNotifyInterrupt
  *
- *             This is called when PROCSIG_NOTIFY_INTERRUPT is received.
- *
- *             If we are idle (notifyInterruptEnabled is set), we can safely invoke
- *             ProcessIncomingNotify directly.  Otherwise, just set a flag
- *             to do it later.
+ *             Signal handler portion of interrupt handling. Let the backend know
+ *             that there's a pending notify interrupt. If we're currently reading
+ *             from the client, this will interrupt the read and
+ *             ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
  */
 void
 HandleNotifyInterrupt(void)
 {
        /*
         * Note: this is called by a SIGNAL HANDLER. You must be very wary what
-        * you do here. Some helpful soul had this routine sprinkled with
-        * TPRINTFs, which would likely lead to corruption of stdio buffers if
-        * they were ever turned on.
+        * you do here.
         */
 
-       /* Don't joggle the elbow of proc_exit */
-       if (proc_exit_inprogress)
-               return;
-
-       if (notifyInterruptEnabled)
-       {
-               bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
-
-               /*
-                * We may be called while ImmediateInterruptOK is true; turn it off
-                * while messing with the NOTIFY state.  (We would have to save and
-                * restore it anyway, because PGSemaphore operations inside
-                * ProcessIncomingNotify() might reset it.)
-                */
-               ImmediateInterruptOK = false;
-
-               /*
-                * I'm not sure whether some flavors of Unix might allow another
-                * SIGUSR1 occurrence to recursively interrupt this routine. To cope
-                * with the possibility, we do the same sort of dance that
-                * EnableNotifyInterrupt must do --- see that routine for comments.
-                */
-               notifyInterruptEnabled = 0;             /* disable any recursive signal */
-               notifyInterruptOccurred = 1;    /* do at least one iteration */
-               for (;;)
-               {
-                       notifyInterruptEnabled = 1;
-                       if (!notifyInterruptOccurred)
-                               break;
-                       notifyInterruptEnabled = 0;
-                       if (notifyInterruptOccurred)
-                       {
-                               /* Here, it is finally safe to do stuff. */
-                               if (Trace_notify)
-                                       elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");
-
-                               ProcessIncomingNotify();
+       /* signal that work needs to be done */
+       notifyInterruptPending = true;
 
-                               if (Trace_notify)
-                                       elog(DEBUG1, "HandleNotifyInterrupt: done");
-                       }
-               }
-
-               /*
-                * Restore ImmediateInterruptOK, and check for interrupts if needed.
-                */
-               ImmediateInterruptOK = save_ImmediateInterruptOK;
-               if (save_ImmediateInterruptOK)
-                       CHECK_FOR_INTERRUPTS();
-       }
-       else
-       {
-               /*
-                * In this path it is NOT SAFE to do much of anything, except this:
-                */
-               notifyInterruptOccurred = 1;
-       }
+       /* make sure the event is processed in due course */
+       SetLatch(MyLatch);
 }
 
 /*
- * EnableNotifyInterrupt
- *
- *             This is called by the PostgresMain main loop just before waiting
- *             for a frontend command.  If we are truly idle (ie, *not* inside
- *             a transaction block), then process any pending inbound notifies,
- *             and enable the signal handler to process future notifies directly.
+ * ProcessNotifyInterrupt
  *
- *             NOTE: the signal handler starts out disabled, and stays so until
- *             PostgresMain calls this the first time.
+ *             This is called just after waiting for a frontend command.  If a
+ *             interrupt arrives (via HandleNotifyInterrupt()) while reading, the
+ *             read will be interrupted via the process's latch, and this routine
+ *             will get called.  If we are truly idle (ie, *not* inside a transaction
+ *             block), process the incoming notifies.
  */
 void
-EnableNotifyInterrupt(void)
+ProcessNotifyInterrupt(void)
 {
        if (IsTransactionOrTransactionBlock())
                return;                                 /* not really idle */
 
-       /*
-        * This code is tricky because we are communicating with a signal handler
-        * that could interrupt us at any point.  If we just checked
-        * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
-        * fail to respond promptly to a signal that happens in between those two
-        * steps.  (A very small time window, perhaps, but Murphy's Law says you
-        * can hit it...)  Instead, we first set the enable flag, then test the
-        * occurred flag.  If we see an unserviced interrupt has occurred, we
-        * re-clear the enable flag before going off to do the service work. (That
-        * prevents re-entrant invocation of ProcessIncomingNotify() if another
-        * interrupt occurs.) If an interrupt comes in between the setting and
-        * clearing of notifyInterruptEnabled, then it will have done the service
-        * work and left notifyInterruptOccurred zero, so we have to check again
-        * after clearing enable.  The whole thing has to be in a loop in case
-        * another interrupt occurs while we're servicing the first. Once we get
-        * out of the loop, enable is set and we know there is no unserviced
-        * interrupt.
-        *
-        * NB: an overenthusiastic optimizing compiler could easily break this
-        * code. Hopefully, they all understand what "volatile" means these days.
-        */
-       for (;;)
-       {
-               notifyInterruptEnabled = 1;
-               if (!notifyInterruptOccurred)
-                       break;
-               notifyInterruptEnabled = 0;
-               if (notifyInterruptOccurred)
-               {
-                       if (Trace_notify)
-                               elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
-
-                       ProcessIncomingNotify();
-
-                       if (Trace_notify)
-                               elog(DEBUG1, "EnableNotifyInterrupt: done");
-               }
-       }
+       while (notifyInterruptPending)
+               ProcessIncomingNotify();
 }
 
-/*
- * DisableNotifyInterrupt
- *
- *             This is called by the PostgresMain main loop just after receiving
- *             a frontend command.  Signal handler execution of inbound notifies
- *             is disabled until the next EnableNotifyInterrupt call.
- *
- *             The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
- *             so as to prevent conflicts if one signal interrupts the other.  So we
- *             must return the previous state of the flag.
- */
-bool
-DisableNotifyInterrupt(void)
-{
-       bool            result = (notifyInterruptEnabled != 0);
-
-       notifyInterruptEnabled = 0;
-
-       return result;
-}
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
@@ -1787,7 +1701,7 @@ DisableNotifyInterrupt(void)
 static void
 asyncQueueReadAllNotifications(void)
 {
-       QueuePosition pos;
+       volatile QueuePosition pos;
        QueuePosition oldpos;
        QueuePosition head;
        bool            advanceTail;
@@ -1861,7 +1775,7 @@ asyncQueueReadAllNotifications(void)
                        /*
                         * We copy the data from SLRU into a local buffer, so as to avoid
                         * holding the AsyncCtlLock while we are examining the entries and
-                        * possibly transmitting them to our frontend.  Copy only the part
+                        * possibly transmitting them to our frontend.  Copy only the part
                         * of the page we will actually inspect.
                         */
                        slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
@@ -1935,7 +1849,7 @@ asyncQueueReadAllNotifications(void)
  * and deliver relevant ones to my frontend.
  *
  * The current page must have been fetched into page_buffer from shared
- * memory.     (We could access the page right in shared memory, but that
+ * memory.  (We could access the page right in shared memory, but that
  * would imply holding the AsyncCtlLock throughout this routine.)
  *
  * We stop if we reach the "stop" position, or reach a notification from an
@@ -1947,7 +1861,7 @@ asyncQueueReadAllNotifications(void)
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
-asyncQueueProcessPageEntries(QueuePosition *current,
+asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         QueuePosition stop,
                                                         char *page_buffer)
 {
@@ -1974,7 +1888,27 @@ asyncQueueProcessPageEntries(QueuePosition *current,
                /* Ignore messages destined for other databases */
                if (qe->dboid == MyDatabaseId)
                {
-                       if (TransactionIdDidCommit(qe->xid))
+                       if (TransactionIdIsInProgress(qe->xid))
+                       {
+                               /*
+                                * The source transaction is still in progress, so we can't
+                                * process this message yet.  Break out of the loop, but first
+                                * back up *current so we will reprocess the message next
+                                * time.  (Note: it is unlikely but not impossible for
+                                * TransactionIdDidCommit to fail, so we can't really avoid
+                                * this advance-then-back-up behavior when dealing with an
+                                * uncommitted message.)
+                                *
+                                * Note that we must test TransactionIdIsInProgress before we
+                                * test TransactionIdDidCommit, else we might return a message
+                                * from a transaction that is not yet visible to snapshots;
+                                * compare the comments at the head of tqual.c.
+                                */
+                               *current = thisentry;
+                               reachedStop = true;
+                               break;
+                       }
+                       else if (TransactionIdDidCommit(qe->xid))
                        {
                                /* qe->data is the null-terminated channel name */
                                char       *channel = qe->data;
@@ -1987,27 +1921,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
                                        NotifyMyFrontEnd(channel, payload, qe->srcPid);
                                }
                        }
-                       else if (TransactionIdDidAbort(qe->xid))
-                       {
-                               /*
-                                * If the source transaction aborted, we just ignore its
-                                * notifications.
-                                */
-                       }
                        else
                        {
                                /*
-                                * The transaction has neither committed nor aborted so far,
-                                * so we can't process its message yet.  Break out of the
-                                * loop, but first back up *current so we will reprocess the
-                                * message next time.  (Note: it is unlikely but not
-                                * impossible for TransactionIdDidCommit to fail, so we can't
-                                * really avoid this advance-then-back-up behavior when
-                                * dealing with an uncommitted message.)
+                                * The source transaction aborted or crashed, so we just
+                                * ignore its notifications.
                                 */
-                               *current = thisentry;
-                               reachedStop = true;
-                               break;
                        }
                }
 
@@ -2066,9 +1985,10 @@ asyncQueueAdvanceTail(void)
 /*
  * ProcessIncomingNotify
  *
- *             Deal with arriving NOTIFYs from other backends.
- *             This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
- *             signal handler, or the next time control reaches the outer idle loop.
+ *             Deal with arriving NOTIFYs from other backends as soon as it's safe to
+ *             do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
+ *             signal handler, but isn't anymore.
+ *
  *             Scan the queue for arriving notifications and report them to my front
  *             end.
  *
@@ -2077,18 +1997,13 @@ asyncQueueAdvanceTail(void)
 static void
 ProcessIncomingNotify(void)
 {
-       bool            catchup_enabled;
-
        /* We *must* reset the flag */
-       notifyInterruptOccurred = 0;
+       notifyInterruptPending = false;
 
        /* Do nothing else if we aren't actively listening */
        if (listenChannels == NIL)
                return;
 
-       /* Must prevent catchup interrupt while I am running */
-       catchup_enabled = DisableCatchupInterrupt();
-
        if (Trace_notify)
                elog(DEBUG1, "ProcessIncomingNotify");
 
@@ -2113,9 +2028,6 @@ ProcessIncomingNotify(void)
 
        if (Trace_notify)
                elog(DEBUG1, "ProcessIncomingNotify: done");
-
-       if (catchup_enabled)
-               EnableCatchupInterrupt();
 }
 
 /*
@@ -2136,7 +2048,7 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
                pq_endmessage(&buf);
 
                /*
-                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
+                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
                 * happen at the end of the transaction, and for incoming notifies
                 * ProcessIncomingNotify will do it after finding all the notifies.
                 */