(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+ (x).page != (y).page ? (x) : \
+ (x).offset > (y).offset ? (x) : (y))
+
/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
+ Oid dboid; /* backend's database OID, or InvalidOid */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos
- * of the "slowest" backend */
+ QueuePosition tail; /* the global tail is equivalent to the pos of
+ * the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
+#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
/*
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
+ QUEUE_BACKEND_DBOID(i) = InvalidOid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
static void
Exec_ListenPreCommit(void)
{
+ QueuePosition head;
+ QueuePosition max;
+ int i;
+
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
* over already-committed notifications. This ensures we cannot miss any
* not-yet-committed notifications. We might get a few more but that
* doesn't hurt.
+ *
+ * In some scenarios there might be a lot of committed notifications that
+ * have not yet been pruned away (because some backend is being lazy about
+ * reading them). To reduce our startup time, we can look at other
+ * backends and adopt the maximum "pos" pointer of any backend that's in
+ * our database; any notifications it's already advanced over are surely
+ * committed and need not be re-examined by us. (We must consider only
+ * backends connected to our DB, because others will not have bothered to
+ * check committed-ness of notifications in our DB.) But we only bother
+ * with that if there's more than a page worth of notifications
+ * outstanding, otherwise scanning all the other backends isn't worth it.
+ *
+ * We need exclusive lock here so we can look at other backends' entries.
*/
- LWLockAcquire(AsyncQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ head = QUEUE_HEAD;
+ max = QUEUE_TAIL;
+ if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
+ {
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ }
+ }
+ QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
*
* This will also advance the global tail pointer if possible.
*/
- asyncQueueReadAllNotifications();
+ if (!QUEUE_POS_EQUAL(max, head))
+ asyncQueueReadAllNotifications();
}
/*
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */