]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Small improvements in comments in async.c.
[postgresql] / src / backend / commands / async.c
index f12b3a9f1382cbc68b1b92256bebf3df511154ab..91baede4e363f148e00bae53f9d6bdd4a1d5f4a3 100644 (file)
@@ -3,7 +3,7 @@
  * async.c
  *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
  *       either, but just process the queue directly.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
- *       can call inbound-notify processing immediately if this backend is idle
- *       (ie, it is waiting for a frontend command and is not within a transaction
- *       block).  Otherwise the handler may only set a flag, which will cause the
- *       processing to occur just before we next go idle.
+ *       sets the process's latch, which triggers the event to be processed
+ *       immediately if this backend is idle (i.e., it is waiting for a frontend
+ *       command and is not within a transaction block. C.f.
+ *       ProcessClientReadInterrupt()).  Otherwise the handler may only set a
+ *       flag, which will cause the processing to occur just before we next go
+ *       idle.
  *
  *       Inbound-notify processing consists of reading all of the notifications
  *       that have arrived since scanning last time. We read every notification
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
 #include "storage/procsignal.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
+#include "utils/timestamp.h"
 
 
 /*
  *
  * This struct declaration has the maximal length, but in a real queue entry
  * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).     AsyncQueueEntryEmptySize is the minimum possible
+ * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
  * entry size, if both channel and payload strings are empty (but note it
  * doesn't include alignment padding).
  *
@@ -193,7 +198,7 @@ typedef struct QueuePosition
 
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
-       (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \
+       (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
         (x).page != (y).page ? (y) : \
         (x).offset < (y).offset ? (x) : (y))
 
@@ -206,8 +211,6 @@ typedef struct QueueBackendStatus
        QueuePosition pos;                      /* backend has read queue up to here */
 } QueueBackendStatus;
 
-#define InvalidPid                             (-1)
-
 /*
  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
  *
@@ -221,6 +224,7 @@ typedef struct QueueBackendStatus
  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
  * of other backends and also change the head and tail pointers.
  *
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need both locks, we always first
  * get AsyncQueueLock and then AsyncCtlLock.
  *
@@ -231,11 +235,11 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
        QueuePosition head;                     /* head points to the next free location */
-       QueuePosition tail;                     /* the global tail is equivalent to the tail
+       QueuePosition tail;                     /* the global tail is equivalent to the pos
                                                                 * of the "slowest" backend */
        TimestampTz lastQueueFillWarn;          /* time of last queue-full msg */
-       QueueBackendStatus backend[1];          /* actually of length MaxBackends+1 */
-       /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+       QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
+       /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
@@ -265,7 +269,7 @@ static SlruCtlData AsyncCtlData;
  *
  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
  * pages, because more than that would confuse slru.c into thinking there
- * was a wraparound condition. With the default BLCKSZ this means there
+ * was a wraparound condition.  With the default BLCKSZ this means there
  * can be up to 8GB of queued-and-not-read data.
  *
  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
@@ -300,7 +304,7 @@ typedef enum
 typedef struct
 {
        ListenActionKind action;
-       char            channel[1];             /* actually, as long as needed */
+       char            channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
 } ListenAction;
 
 static List *pendingActions = NIL;             /* list of ListenAction */
@@ -334,33 +338,28 @@ static List *pendingNotifies = NIL;               /* list of Notifications */
 static List *upperPendingNotifies = NIL;               /* list of upper-xact lists */
 
 /*
- * State for inbound notifications consists of two flags: one saying whether
- * the signal handler is currently allowed to call ProcessIncomingNotify
- * directly, and one saying whether the signal has occurred but the handler
- * was not allowed to call ProcessIncomingNotify at the time.
- *
- * NB: the "volatile" on these declarations is critical!  If your compiler
- * does not grok "volatile", you'd be best advised to compile this file
- * with all optimization turned off.
+ * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * called from inside a signal handler. That just sets the
+ * notifyInterruptPending flag and sets the process
+ * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * actually deal with the interrupt.
  */
-static volatile sig_atomic_t notifyInterruptEnabled = 0;
-static volatile sig_atomic_t notifyInterruptOccurred = 0;
+volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
 
+/* True if we're currently registered as a listener in asyncQueueControl */
+static bool amRegisteredListener = false;
+
 /* has this backend sent notifications in the current transaction? */
 static bool backendHasSentNotifications = false;
 
-/* has this backend executed its first LISTEN in the current transaction? */
-static bool backendHasExecutedInitialListen = false;
-
 /* GUC parameter */
 bool           Trace_notify = false;
 
 /* local function prototypes */
-static bool asyncQueuePagePrecedesPhysically(int p, int q);
-static bool asyncQueuePagePrecedesLogically(int p, int q);
+static bool asyncQueuePagePrecedes(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
@@ -370,13 +369,14 @@ static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
-static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
+static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
 static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
 static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
-static bool asyncQueueProcessPageEntries(QueuePosition *current,
+static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         QueuePosition stop,
                                                         char *page_buffer);
 static void asyncQueueAdvanceTail(void);
@@ -387,30 +387,16 @@ static void NotifyMyFrontEnd(const char *channel,
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
-
 /*
  * We will work on the page range of 0..QUEUE_MAX_PAGE.
- *
- * asyncQueuePagePrecedesPhysically just checks numerically without any magic
- * if one page precedes another one.  This is wrong for normal operation but
- * is helpful when clearing pg_notify/ during startup.
- *
- * asyncQueuePagePrecedesLogically compares using wraparound logic, as is
- * required by slru.c.
  */
 static bool
-asyncQueuePagePrecedesPhysically(int p, int q)
-{
-       return p < q;
-}
-
-static bool
-asyncQueuePagePrecedesLogically(int p, int q)
+asyncQueuePagePrecedes(int p, int q)
 {
        int                     diff;
 
        /*
-        * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.      Both inputs should be
+        * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should be
         * in the range 0..QUEUE_MAX_PAGE.
         */
        Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
@@ -433,8 +419,8 @@ AsyncShmemSize(void)
        Size            size;
 
        /* This had better match AsyncShmemInit */
-       size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
-       size = add_size(size, sizeof(AsyncQueueControl));
+       size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+       size = add_size(size, offsetof(AsyncQueueControl, backend));
 
        size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
 
@@ -454,12 +440,11 @@ AsyncShmemInit(void)
        /*
         * Create or attach to the AsyncQueueControl structure.
         *
-        * The used entries in the backend[] array run from 1 to MaxBackends.
-        * sizeof(AsyncQueueControl) already includes space for the unused zero'th
-        * entry, but we need to add on space for the used entries.
+        * The used entries in the backend[] array run from 1 to MaxBackends; the
+        * zero'th entry is unused but must be allocated.
         */
-       size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
-       size = add_size(size, sizeof(AsyncQueueControl));
+       size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+       size = add_size(size, offsetof(AsyncQueueControl, backend));
 
        asyncQueueControl = (AsyncQueueControl *)
                ShmemInitStruct("Async Queue Control", size, &found);
@@ -483,7 +468,7 @@ AsyncShmemInit(void)
        /*
         * Set up SLRU management of the pg_notify data.
         */
-       AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+       AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
        SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
                                  AsyncCtlLock, "pg_notify");
        /* Override default assumption that writes should be fsync'd */
@@ -493,15 +478,8 @@ AsyncShmemInit(void)
        {
                /*
                 * During start or reboot, clean out the pg_notify directory.
-                *
-                * Since we want to remove every file, we temporarily use
-                * asyncQueuePagePrecedesPhysically() and pass INT_MAX as the
-                * comparison value; every file in the directory should therefore
-                * appear to be less than that.
                 */
-               AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
-               (void) SlruScanDirectory(AsyncCtl, INT_MAX, true);
-               AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+               (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
 
                /* Now initialize page zero to empty */
                LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
@@ -628,7 +606,8 @@ queue_listen(ListenActionKind action, const char *channel)
        oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
        /* space for terminating null is included in sizeof(ListenAction) */
-       actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
+       actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
+                                                                        strlen(channel) + 1);
        actrec->action = action;
        strcpy(actrec->channel, channel);
 
@@ -745,6 +724,7 @@ static void
 Async_UnlistenOnExit(int code, Datum arg)
 {
        Exec_UnlistenAllCommit();
+       asyncQueueUnregister();
 }
 
 /*
@@ -760,7 +740,7 @@ AtPrepare_Notify(void)
        if (pendingActions || pendingNotifies)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN or NOTIFY")));
+                                errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
 }
 
 /*
@@ -789,8 +769,6 @@ PreCommit_Notify(void)
        if (Trace_notify)
                elog(DEBUG1, "PreCommit_Notify");
 
-       Assert(backendHasExecutedInitialListen == false);
-
        /* Preflight for any pending listen/unlisten actions */
        foreach(p, pendingActions)
        {
@@ -849,7 +827,7 @@ PreCommit_Notify(void)
                while (nextNotify != NULL)
                {
                        /*
-                        * Add the pending notifications to the queue.  We acquire and
+                        * Add the pending notifications to the queue.  We acquire and
                         * release AsyncQueueLock once per page, which might be overkill
                         * but it does allow readers to get in while we're doing this.
                         *
@@ -913,11 +891,9 @@ AtCommit_Notify(void)
                }
        }
 
-       /*
-        * If we did an initial LISTEN, listenChannels now has the entry, so we no
-        * longer need or want the flag to be set.
-        */
-       backendHasExecutedInitialListen = false;
+       /* If no longer listening to anything, get out of listener array */
+       if (amRegisteredListener && listenChannels == NIL)
+               asyncQueueUnregister();
 
        /* And clean up */
        ClearPendingActionsAndNotifies();
@@ -935,26 +911,19 @@ Exec_ListenPreCommit(void)
         * Nothing to do if we are already listening to something, nor if we
         * already ran this routine in this transaction.
         */
-       if (listenChannels != NIL || backendHasExecutedInitialListen)
+       if (amRegisteredListener)
                return;
 
        if (Trace_notify)
                elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
 
-       /*
-        * We need this variable to detect an aborted initial LISTEN. In that case
-        * we would set up our pointer but not listen on any channel. This flag
-        * gets cleared in AtCommit_Notify or AtAbort_Notify().
-        */
-       backendHasExecutedInitialListen = true;
-
        /*
         * Before registering, make sure we will unlisten before dying. (Note:
         * this action does not get undone if we abort later.)
         */
        if (!unlistenExitRegistered)
        {
-               on_shmem_exit(Async_UnlistenOnExit, 0);
+               before_shmem_exit(Async_UnlistenOnExit, 0);
                unlistenExitRegistered = true;
        }
 
@@ -971,6 +940,9 @@ Exec_ListenPreCommit(void)
        QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
        LWLockRelease(AsyncQueueLock);
 
+       /* Now we are listed in the global array, so remember we're listening */
+       amRegisteredListener = true;
+
        /*
         * Try to move our pointer forward as far as possible. This will skip over
         * already-committed notifications. Still, we could get notifications that
@@ -1043,10 +1015,6 @@ Exec_UnlistenCommit(const char *channel)
         * We do not complain about unlistening something not being listened;
         * should we?
         */
-
-       /* If no longer listening to anything, get out of listener array */
-       if (listenChannels == NIL)
-               asyncQueueUnregister();
 }
 
 /*
@@ -1062,8 +1030,6 @@ Exec_UnlistenAllCommit(void)
 
        list_free_deep(listenChannels);
        listenChannels = NIL;
-
-       asyncQueueUnregister();
 }
 
 /*
@@ -1077,12 +1043,12 @@ Exec_UnlistenAllCommit(void)
  * The reason that this is not done in AtCommit_Notify is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
  * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
  * Note that we send signals and process the queue even if the transaction
- * eventually aborted. This is because we need to clean out whatever got
+ * eventually aborted.  This is because we need to clean out whatever got
  * added to the queue.
  *
  * NOTE: we are outside of any transaction here.
@@ -1090,6 +1056,7 @@ Exec_UnlistenAllCommit(void)
 void
 ProcessCompletedNotifies(void)
 {
+       MemoryContext caller_context;
        bool            signalled;
 
        /* Nothing to do if we didn't send any notifications */
@@ -1103,6 +1070,12 @@ ProcessCompletedNotifies(void)
         */
        backendHasSentNotifications = false;
 
+       /*
+        * We must preserve the caller's memory context (probably MessageContext)
+        * across the transaction we do here.
+        */
+       caller_context = CurrentMemoryContext;
+
        if (Trace_notify)
                elog(DEBUG1, "ProcessCompletedNotifies");
 
@@ -1135,6 +1108,8 @@ ProcessCompletedNotifies(void)
 
        CommitTransactionCommand();
 
+       MemoryContextSwitchTo(caller_context);
+
        /* We don't need pq_flush() here since postgres.c will do one shortly */
 }
 
@@ -1163,7 +1138,7 @@ IsListeningOn(const char *channel)
 
 /*
  * Remove our entry from the listeners array when we are no longer listening
- * on any channel.     NB: must not fail if we're already not listening.
+ * on any channel.  NB: must not fail if we're already not listening.
  */
 static void
 asyncQueueUnregister(void)
@@ -1172,6 +1147,9 @@ asyncQueueUnregister(void)
 
        Assert(listenChannels == NIL);          /* else caller error */
 
+       if (!amRegisteredListener)      /* nothing to do */
+               return;
+
        LWLockAcquire(AsyncQueueLock, LW_SHARED);
        /* check if entry is valid and oldest ... */
        advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
@@ -1180,6 +1158,9 @@ asyncQueueUnregister(void)
        QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
        LWLockRelease(AsyncQueueLock);
 
+       /* mark ourselves as no longer listed in the global array */
+       amRegisteredListener = false;
+
        /* If we were the laziest backend, try to advance the tail pointer */
        if (advanceTail)
                asyncQueueAdvanceTail();
@@ -1199,7 +1180,7 @@ asyncQueueIsFull(void)
        /*
         * The queue is full if creating a new head page would create a page that
         * logically precedes the current global tail pointer, ie, the head
-        * pointer would wrap around compared to the tail.      We cannot create such
+        * pointer would wrap around compared to the tail.  We cannot create such
         * a head page for fear of confusing slru.c.  For safety we round the tail
         * pointer back to a segment boundary (compare the truncation logic in
         * asyncQueueAdvanceTail).
@@ -1213,16 +1194,16 @@ asyncQueueIsFull(void)
                nexthead = 0;                   /* wrap around */
        boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
        boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
-       return asyncQueuePagePrecedesLogically(nexthead, boundary);
+       return asyncQueuePagePrecedes(nexthead, boundary);
 }
 
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
- * entry is of length entryLength.     If we jump to a new page the function
+ * entry is of length entryLength.  If we jump to a new page the function
  * returns true, else false.
  */
 static bool
-asyncQueueAdvance(QueuePosition *position, int entryLength)
+asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 {
        int                     pageno = QUEUE_POS_PAGE(*position);
        int                     offset = QUEUE_POS_OFFSET(*position);
@@ -1287,7 +1268,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  * the last byte which simplifies reading the page later.
  *
  * We are passed the list cell containing the next notification to write
- * and return the first still-unwritten cell back.     Eventually we will return
+ * and return the first still-unwritten cell back.  Eventually we will return
  * NULL indicating all is done.
  *
  * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
@@ -1297,6 +1278,7 @@ static ListCell *
 asyncQueueAddEntries(ListCell *nextNotify)
 {
        AsyncQueueEntry qe;
+       QueuePosition queue_head;
        int                     pageno;
        int                     offset;
        int                     slotno;
@@ -1304,8 +1286,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
        /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
        LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
 
+       /*
+        * We work with a local copy of QUEUE_HEAD, which we write back to shared
+        * memory upon exiting.  The reason for this is that if we have to advance
+        * to a new page, SimpleLruZeroPage might fail (out of disk space, for
+        * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
+        * subsequent insertions would try to put entries into a page that slru.c
+        * thinks doesn't exist yet.)  So, use a local position variable.  Note
+        * that if we do fail, any already-inserted queue entries are forgotten;
+        * this is okay, since they'd be useless anyway after our transaction
+        * rolls back.
+        */
+       queue_head = QUEUE_HEAD;
+
        /* Fetch the current page */
-       pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
+       pageno = QUEUE_POS_PAGE(queue_head);
        slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
        /* Note we mark the page dirty before writing in it */
        AsyncCtl->shared->page_dirty[slotno] = true;
@@ -1317,7 +1312,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
                /* Construct a valid queue entry in local variable qe */
                asyncQueueNotificationToEntry(n, &qe);
 
-               offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
+               offset = QUEUE_POS_OFFSET(queue_head);
 
                /* Check whether the entry really fits on the current page */
                if (offset + qe.length <= QUEUE_PAGESIZE)
@@ -1343,49 +1338,63 @@ asyncQueueAddEntries(ListCell *nextNotify)
                           &qe,
                           qe.length);
 
-               /* Advance QUEUE_HEAD appropriately, and note if page is full */
-               if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
+               /* Advance queue_head appropriately, and detect if page is full */
+               if (asyncQueueAdvance(&(queue_head), qe.length))
                {
                        /*
                         * Page is full, so we're done here, but first fill the next page
                         * with zeroes.  The reason to do this is to ensure that slru.c's
                         * idea of the head page is always the same as ours, which avoids
-                        * boundary problems in SimpleLruTruncate.      The test in
+                        * boundary problems in SimpleLruTruncate.  The test in
                         * asyncQueueIsFull() ensured that there is room to create this
                         * page without overrunning the queue.
                         */
-                       slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+                       slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
                        /* And exit the loop */
                        break;
                }
        }
 
+       /* Success, so update the global QUEUE_HEAD */
+       QUEUE_HEAD = queue_head;
+
        LWLockRelease(AsyncCtlLock);
 
        return nextNotify;
 }
 
 /*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ * SQL function to return the fraction of the notification queue currently
+ * occupied.
+ */
+Datum
+pg_notification_queue_usage(PG_FUNCTION_ARGS)
+{
+       double          usage;
+
+       LWLockAcquire(AsyncQueueLock, LW_SHARED);
+       usage = asyncQueueUsage();
+       LWLockRelease(AsyncQueueLock);
+
+       PG_RETURN_FLOAT8(usage);
+}
+
+/*
+ * Return the fraction of the queue that is currently occupied.
  *
- * Caller must hold exclusive AsyncQueueLock.
+ * The caller must hold AysncQueueLock in (at least) shared mode.
  */
-static void
-asyncQueueFillWarning(void)
+static double
+asyncQueueUsage(void)
 {
        int                     headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
        int                     tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
        int                     occupied;
-       double          fillDegree;
-       TimestampTz t;
 
        occupied = headPage - tailPage;
 
        if (occupied == 0)
-               return;                                 /* fast exit for common case */
+               return (double) 0;              /* fast exit for common case */
 
        if (occupied < 0)
        {
@@ -1393,8 +1402,24 @@ asyncQueueFillWarning(void)
                occupied += QUEUE_MAX_PAGE + 1;
        }
 
-       fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+       return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+}
+
+/*
+ * Check whether the queue is at least half full, and emit a warning if so.
+ *
+ * This is unlikely given the size of the queue, but possible.
+ * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ *
+ * Caller must hold exclusive AsyncQueueLock.
+ */
+static void
+asyncQueueFillWarning(void)
+{
+       double          fillDegree;
+       TimestampTz t;
 
+       fillDegree = asyncQueueUsage();
        if (fillDegree < 0.5)
                return;
 
@@ -1519,21 +1544,12 @@ void
 AtAbort_Notify(void)
 {
        /*
-        * If we LISTEN but then roll back the transaction we have set our pointer
-        * but have not made any entry in listenChannels. In that case, remove our
-        * pointer again.
+        * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+        * we have registered as a listener but have not made any entry in
+        * listenChannels.  In that case, deregister again.
         */
-       if (backendHasExecutedInitialListen)
-       {
-               /*
-                * Checking listenChannels should be redundant but it can't hurt doing
-                * it for safety reasons.
-                */
-               if (listenChannels == NIL)
-                       asyncQueueUnregister();
-
-               backendHasExecutedInitialListen = false;
-       }
+       if (amRegisteredListener && listenChannels == NIL)
+               asyncQueueUnregister();
 
        /* And clean up */
        ClearPendingActionsAndNotifies();
@@ -1637,158 +1653,45 @@ AtSubAbort_Notify(void)
 /*
  * HandleNotifyInterrupt
  *
- *             This is called when PROCSIG_NOTIFY_INTERRUPT is received.
- *
- *             If we are idle (notifyInterruptEnabled is set), we can safely invoke
- *             ProcessIncomingNotify directly.  Otherwise, just set a flag
- *             to do it later.
+ *             Signal handler portion of interrupt handling. Let the backend know
+ *             that there's a pending notify interrupt. If we're currently reading
+ *             from the client, this will interrupt the read and
+ *             ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
  */
 void
 HandleNotifyInterrupt(void)
 {
        /*
         * Note: this is called by a SIGNAL HANDLER. You must be very wary what
-        * you do here. Some helpful soul had this routine sprinkled with
-        * TPRINTFs, which would likely lead to corruption of stdio buffers if
-        * they were ever turned on.
+        * you do here.
         */
 
-       /* Don't joggle the elbow of proc_exit */
-       if (proc_exit_inprogress)
-               return;
-
-       if (notifyInterruptEnabled)
-       {
-               bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
+       /* signal that work needs to be done */
+       notifyInterruptPending = true;
 
-               /*
-                * We may be called while ImmediateInterruptOK is true; turn it off
-                * while messing with the NOTIFY state.  (We would have to save and
-                * restore it anyway, because PGSemaphore operations inside
-                * ProcessIncomingNotify() might reset it.)
-                */
-               ImmediateInterruptOK = false;
-
-               /*
-                * I'm not sure whether some flavors of Unix might allow another
-                * SIGUSR1 occurrence to recursively interrupt this routine. To cope
-                * with the possibility, we do the same sort of dance that
-                * EnableNotifyInterrupt must do --- see that routine for comments.
-                */
-               notifyInterruptEnabled = 0;             /* disable any recursive signal */
-               notifyInterruptOccurred = 1;    /* do at least one iteration */
-               for (;;)
-               {
-                       notifyInterruptEnabled = 1;
-                       if (!notifyInterruptOccurred)
-                               break;
-                       notifyInterruptEnabled = 0;
-                       if (notifyInterruptOccurred)
-                       {
-                               /* Here, it is finally safe to do stuff. */
-                               if (Trace_notify)
-                                       elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");
-
-                               ProcessIncomingNotify();
-
-                               if (Trace_notify)
-                                       elog(DEBUG1, "HandleNotifyInterrupt: done");
-                       }
-               }
-
-               /*
-                * Restore ImmediateInterruptOK, and check for interrupts if needed.
-                */
-               ImmediateInterruptOK = save_ImmediateInterruptOK;
-               if (save_ImmediateInterruptOK)
-                       CHECK_FOR_INTERRUPTS();
-       }
-       else
-       {
-               /*
-                * In this path it is NOT SAFE to do much of anything, except this:
-                */
-               notifyInterruptOccurred = 1;
-       }
+       /* make sure the event is processed in due course */
+       SetLatch(MyLatch);
 }
 
 /*
- * EnableNotifyInterrupt
+ * ProcessNotifyInterrupt
  *
- *             This is called by the PostgresMain main loop just before waiting
- *             for a frontend command.  If we are truly idle (ie, *not* inside
- *             a transaction block), then process any pending inbound notifies,
- *             and enable the signal handler to process future notifies directly.
- *
- *             NOTE: the signal handler starts out disabled, and stays so until
- *             PostgresMain calls this the first time.
+ *             This is called just after waiting for a frontend command.  If a
+ *             interrupt arrives (via HandleNotifyInterrupt()) while reading, the
+ *             read will be interrupted via the process's latch, and this routine
+ *             will get called.  If we are truly idle (ie, *not* inside a transaction
+ *             block), process the incoming notifies.
  */
 void
-EnableNotifyInterrupt(void)
+ProcessNotifyInterrupt(void)
 {
        if (IsTransactionOrTransactionBlock())
                return;                                 /* not really idle */
 
-       /*
-        * This code is tricky because we are communicating with a signal handler
-        * that could interrupt us at any point.  If we just checked
-        * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
-        * fail to respond promptly to a signal that happens in between those two
-        * steps.  (A very small time window, perhaps, but Murphy's Law says you
-        * can hit it...)  Instead, we first set the enable flag, then test the
-        * occurred flag.  If we see an unserviced interrupt has occurred, we
-        * re-clear the enable flag before going off to do the service work. (That
-        * prevents re-entrant invocation of ProcessIncomingNotify() if another
-        * interrupt occurs.) If an interrupt comes in between the setting and
-        * clearing of notifyInterruptEnabled, then it will have done the service
-        * work and left notifyInterruptOccurred zero, so we have to check again
-        * after clearing enable.  The whole thing has to be in a loop in case
-        * another interrupt occurs while we're servicing the first. Once we get
-        * out of the loop, enable is set and we know there is no unserviced
-        * interrupt.
-        *
-        * NB: an overenthusiastic optimizing compiler could easily break this
-        * code. Hopefully, they all understand what "volatile" means these days.
-        */
-       for (;;)
-       {
-               notifyInterruptEnabled = 1;
-               if (!notifyInterruptOccurred)
-                       break;
-               notifyInterruptEnabled = 0;
-               if (notifyInterruptOccurred)
-               {
-                       if (Trace_notify)
-                               elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
-
-                       ProcessIncomingNotify();
-
-                       if (Trace_notify)
-                               elog(DEBUG1, "EnableNotifyInterrupt: done");
-               }
-       }
+       while (notifyInterruptPending)
+               ProcessIncomingNotify();
 }
 
-/*
- * DisableNotifyInterrupt
- *
- *             This is called by the PostgresMain main loop just after receiving
- *             a frontend command.  Signal handler execution of inbound notifies
- *             is disabled until the next EnableNotifyInterrupt call.
- *
- *             The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
- *             so as to prevent conflicts if one signal interrupts the other.  So we
- *             must return the previous state of the flag.
- */
-bool
-DisableNotifyInterrupt(void)
-{
-       bool            result = (notifyInterruptEnabled != 0);
-
-       notifyInterruptEnabled = 0;
-
-       return result;
-}
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
@@ -1798,7 +1701,7 @@ DisableNotifyInterrupt(void)
 static void
 asyncQueueReadAllNotifications(void)
 {
-       QueuePosition pos;
+       volatile QueuePosition pos;
        QueuePosition oldpos;
        QueuePosition head;
        bool            advanceTail;
@@ -1872,7 +1775,7 @@ asyncQueueReadAllNotifications(void)
                        /*
                         * We copy the data from SLRU into a local buffer, so as to avoid
                         * holding the AsyncCtlLock while we are examining the entries and
-                        * possibly transmitting them to our frontend.  Copy only the part
+                        * possibly transmitting them to our frontend.  Copy only the part
                         * of the page we will actually inspect.
                         */
                        slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
@@ -1946,7 +1849,7 @@ asyncQueueReadAllNotifications(void)
  * and deliver relevant ones to my frontend.
  *
  * The current page must have been fetched into page_buffer from shared
- * memory.     (We could access the page right in shared memory, but that
+ * memory.  (We could access the page right in shared memory, but that
  * would imply holding the AsyncCtlLock throughout this routine.)
  *
  * We stop if we reach the "stop" position, or reach a notification from an
@@ -1958,7 +1861,7 @@ asyncQueueReadAllNotifications(void)
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
-asyncQueueProcessPageEntries(QueuePosition *current,
+asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         QueuePosition stop,
                                                         char *page_buffer)
 {
@@ -1985,7 +1888,27 @@ asyncQueueProcessPageEntries(QueuePosition *current,
                /* Ignore messages destined for other databases */
                if (qe->dboid == MyDatabaseId)
                {
-                       if (TransactionIdDidCommit(qe->xid))
+                       if (TransactionIdIsInProgress(qe->xid))
+                       {
+                               /*
+                                * The source transaction is still in progress, so we can't
+                                * process this message yet.  Break out of the loop, but first
+                                * back up *current so we will reprocess the message next
+                                * time.  (Note: it is unlikely but not impossible for
+                                * TransactionIdDidCommit to fail, so we can't really avoid
+                                * this advance-then-back-up behavior when dealing with an
+                                * uncommitted message.)
+                                *
+                                * Note that we must test TransactionIdIsInProgress before we
+                                * test TransactionIdDidCommit, else we might return a message
+                                * from a transaction that is not yet visible to snapshots;
+                                * compare the comments at the head of tqual.c.
+                                */
+                               *current = thisentry;
+                               reachedStop = true;
+                               break;
+                       }
+                       else if (TransactionIdDidCommit(qe->xid))
                        {
                                /* qe->data is the null-terminated channel name */
                                char       *channel = qe->data;
@@ -1998,27 +1921,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
                                        NotifyMyFrontEnd(channel, payload, qe->srcPid);
                                }
                        }
-                       else if (TransactionIdDidAbort(qe->xid))
-                       {
-                               /*
-                                * If the source transaction aborted, we just ignore its
-                                * notifications.
-                                */
-                       }
                        else
                        {
                                /*
-                                * The transaction has neither committed nor aborted so far,
-                                * so we can't process its message yet.  Break out of the
-                                * loop, but first back up *current so we will reprocess the
-                                * message next time.  (Note: it is unlikely but not
-                                * impossible for TransactionIdDidCommit to fail, so we can't
-                                * really avoid this advance-then-back-up behavior when
-                                * dealing with an uncommitted message.)
+                                * The source transaction aborted or crashed, so we just
+                                * ignore its notifications.
                                 */
-                               *current = thisentry;
-                               reachedStop = true;
-                               break;
                        }
                }
 
@@ -2064,7 +1972,7 @@ asyncQueueAdvanceTail(void)
         */
        newtailpage = QUEUE_POS_PAGE(min);
        boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
-       if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
+       if (asyncQueuePagePrecedes(oldtailpage, boundary))
        {
                /*
                 * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
@@ -2077,9 +1985,10 @@ asyncQueueAdvanceTail(void)
 /*
  * ProcessIncomingNotify
  *
- *             Deal with arriving NOTIFYs from other backends.
- *             This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
- *             signal handler, or the next time control reaches the outer idle loop.
+ *             Deal with arriving NOTIFYs from other backends as soon as it's safe to
+ *             do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
+ *             signal handler, but isn't anymore.
+ *
  *             Scan the queue for arriving notifications and report them to my front
  *             end.
  *
@@ -2088,18 +1997,13 @@ asyncQueueAdvanceTail(void)
 static void
 ProcessIncomingNotify(void)
 {
-       bool            catchup_enabled;
-
        /* We *must* reset the flag */
-       notifyInterruptOccurred = 0;
+       notifyInterruptPending = false;
 
        /* Do nothing else if we aren't actively listening */
        if (listenChannels == NIL)
                return;
 
-       /* Must prevent catchup interrupt while I am running */
-       catchup_enabled = DisableCatchupInterrupt();
-
        if (Trace_notify)
                elog(DEBUG1, "ProcessIncomingNotify");
 
@@ -2124,9 +2028,6 @@ ProcessIncomingNotify(void)
 
        if (Trace_notify)
                elog(DEBUG1, "ProcessIncomingNotify: done");
-
-       if (catchup_enabled)
-               EnableCatchupInterrupt();
 }
 
 /*
@@ -2147,7 +2048,7 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
                pq_endmessage(&buf);
 
                /*
-                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
+                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
                 * happen at the end of the transaction, and for incoming notifies
                 * ProcessIncomingNotify will do it after finding all the notifies.
                 */