]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Improve LISTEN startup time when there are many unread notifications.
[postgresql] / src / backend / commands / async.c
index 91baede4e363f148e00bae53f9d6bdd4a1d5f4a3..914388ba715198213ec2f190c2cb50c2495475bd 100644 (file)
@@ -202,12 +202,19 @@ typedef struct QueuePosition
         (x).page != (y).page ? (y) : \
         (x).offset < (y).offset ? (x) : (y))
 
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+       (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+        (x).page != (y).page ? (x) : \
+        (x).offset > (y).offset ? (x) : (y))
+
 /*
  * Struct describing a listening backend's status
  */
 typedef struct QueueBackendStatus
 {
        int32           pid;                    /* either a PID or InvalidPid */
+       Oid                     dboid;                  /* backend's database OID, or InvalidOid */
        QueuePosition pos;                      /* backend has read queue up to here */
 } QueueBackendStatus;
 
@@ -235,8 +242,8 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
        QueuePosition head;                     /* head points to the next free location */
-       QueuePosition tail;                     /* the global tail is equivalent to the pos
-                                                                * of the "slowest" backend */
+       QueuePosition tail;                     /* the global tail is equivalent to the pos of
+                                                                * the "slowest" backend */
        TimestampTz lastQueueFillWarn;          /* time of last queue-full msg */
        QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
        /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -247,6 +254,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_HEAD                                     (asyncQueueControl->head)
 #define QUEUE_TAIL                                     (asyncQueueControl->tail)
 #define QUEUE_BACKEND_PID(i)           (asyncQueueControl->backend[i].pid)
+#define QUEUE_BACKEND_DBOID(i)         (asyncQueueControl->backend[i].dboid)
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
 
 /*
@@ -461,6 +469,7 @@ AsyncShmemInit(void)
                for (i = 0; i <= MaxBackends; i++)
                {
                        QUEUE_BACKEND_PID(i) = InvalidPid;
+                       QUEUE_BACKEND_DBOID(i) = InvalidOid;
                        SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
                }
        }
@@ -907,6 +916,10 @@ AtCommit_Notify(void)
 static void
 Exec_ListenPreCommit(void)
 {
+       QueuePosition head;
+       QueuePosition max;
+       int                     i;
+
        /*
         * Nothing to do if we are already listening to something, nor if we
         * already ran this routine in this transaction.
@@ -934,10 +947,34 @@ Exec_ListenPreCommit(void)
         * over already-committed notifications.  This ensures we cannot miss any
         * not-yet-committed notifications.  We might get a few more but that
         * doesn't hurt.
+        *
+        * In some scenarios there might be a lot of committed notifications that
+        * have not yet been pruned away (because some backend is being lazy about
+        * reading them).  To reduce our startup time, we can look at other
+        * backends and adopt the maximum "pos" pointer of any backend that's in
+        * our database; any notifications it's already advanced over are surely
+        * committed and need not be re-examined by us.  (We must consider only
+        * backends connected to our DB, because others will not have bothered to
+        * check committed-ness of notifications in our DB.)  But we only bother
+        * with that if there's more than a page worth of notifications
+        * outstanding, otherwise scanning all the other backends isn't worth it.
+        *
+        * We need exclusive lock here so we can look at other backends' entries.
         */
-       LWLockAcquire(AsyncQueueLock, LW_SHARED);
-       QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+       LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+       head = QUEUE_HEAD;
+       max = QUEUE_TAIL;
+       if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
+       {
+               for (i = 1; i <= MaxBackends; i++)
+               {
+                       if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+                               max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+               }
+       }
+       QUEUE_BACKEND_POS(MyBackendId) = max;
        QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
+       QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
        LWLockRelease(AsyncQueueLock);
 
        /* Now we are listed in the global array, so remember we're listening */
@@ -953,7 +990,8 @@ Exec_ListenPreCommit(void)
         *
         * This will also advance the global tail pointer if possible.
         */
-       asyncQueueReadAllNotifications();
+       if (!QUEUE_POS_EQUAL(max, head))
+               asyncQueueReadAllNotifications();
 }
 
 /*
@@ -1156,6 +1194,7 @@ asyncQueueUnregister(void)
                QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
        /* ... then mark it invalid */
        QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+       QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
        LWLockRelease(AsyncQueueLock);
 
        /* mark ourselves as no longer listed in the global array */