* async.c
* Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
- * can call inbound-notify processing immediately if this backend is idle
- * (ie, it is waiting for a frontend command and is not within a transaction
- * block). Otherwise the handler may only set a flag, which will cause the
- * processing to occur just before we next go idle.
+ * sets the process's latch, which triggers the event to be processed
+ * immediately if this backend is idle (i.e., it is waiting for a frontend
+ * command and is not within a transaction block. C.f.
+ * ProcessClientReadInterrupt()). Otherwise the handler may only set a
+ * flag, which will cause the processing to occur just before we next go
+ * idle.
*
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#include "utils/timestamp.h"
/*
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
- * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
+ * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
/* choose logically smaller QueuePosition */
#define QUEUE_POS_MIN(x,y) \
- (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
-#define InvalidPid (-1)
-
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
*
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
*
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the tail
+ QueuePosition tail; /* the global tail is equivalent to the pos
* of the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
- QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
- /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
+ /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
- * was a wraparound condition. With the default BLCKSZ this means there
+ * was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
typedef struct
{
ListenActionKind action;
- char channel[1]; /* actually, as long as needed */
+ char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/*
- * State for inbound notifications consists of two flags: one saying whether
- * the signal handler is currently allowed to call ProcessIncomingNotify
- * directly, and one saying whether the signal has occurred but the handler
- * was not allowed to call ProcessIncomingNotify at the time.
- *
- * NB: the "volatile" on these declarations is critical! If your compiler
- * does not grok "volatile", you'd be best advised to compile this file
- * with all optimization turned off.
+ * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * called from inside a signal handler. That just sets the
+ * notifyInterruptPending flag and sets the process
+ * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * actually deal with the interrupt.
*/
-static volatile sig_atomic_t notifyInterruptEnabled = 0;
-static volatile sig_atomic_t notifyInterruptOccurred = 0;
+volatile sig_atomic_t notifyInterruptPending = false;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
+/* True if we're currently registered as a listener in asyncQueueControl */
+static bool amRegisteredListener = false;
+
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
-/* has this backend executed its first LISTEN in the current transaction? */
-static bool backendHasExecutedInitialListen = false;
-
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
-static bool asyncQueuePagePrecedesPhysically(int p, int q);
-static bool asyncQueuePagePrecedesLogically(int p, int q);
+static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
-static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
+static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
-static bool asyncQueueProcessPageEntries(QueuePosition *current,
+static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer);
static void asyncQueueAdvanceTail(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
-
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
- *
- * asyncQueuePagePrecedesPhysically just checks numerically without any magic
- * if one page precedes another one. This is wrong for normal operation but
- * is helpful when clearing pg_notify/ during startup.
- *
- * asyncQueuePagePrecedesLogically compares using wraparound logic, as is
- * required by slru.c.
*/
static bool
-asyncQueuePagePrecedesPhysically(int p, int q)
-{
- return p < q;
-}
-
-static bool
-asyncQueuePagePrecedesLogically(int p, int q)
+asyncQueuePagePrecedes(int p, int q)
{
int diff;
/*
- * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
+ * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
* in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Size size;
/* This had better match AsyncShmemInit */
- size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
- size = add_size(size, sizeof(AsyncQueueControl));
+ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = add_size(size, offsetof(AsyncQueueControl, backend));
size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
/*
* Create or attach to the AsyncQueueControl structure.
*
- * The used entries in the backend[] array run from 1 to MaxBackends.
- * sizeof(AsyncQueueControl) already includes space for the unused zero'th
- * entry, but we need to add on space for the used entries.
+ * The used entries in the backend[] array run from 1 to MaxBackends; the
+ * zero'th entry is unused but must be allocated.
*/
- size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
- size = add_size(size, sizeof(AsyncQueueControl));
+ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = add_size(size, offsetof(AsyncQueueControl, backend));
asyncQueueControl = (AsyncQueueControl *)
ShmemInitStruct("Async Queue Control", size, &found);
/*
* Set up SLRU management of the pg_notify data.
*/
- AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+ AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
AsyncCtlLock, "pg_notify");
/* Override default assumption that writes should be fsync'd */
{
/*
* During start or reboot, clean out the pg_notify directory.
- *
- * Since we want to remove every file, we temporarily use
- * asyncQueuePagePrecedesPhysically() and pass INT_MAX as the
- * comparison value; every file in the directory should therefore
- * appear to be less than that.
*/
- AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
- (void) SlruScanDirectory(AsyncCtl, INT_MAX, true);
- AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+ (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
/* Now initialize page zero to empty */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
- actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
+ actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
+ strlen(channel) + 1);
actrec->action = action;
strcpy(actrec->channel, channel);
Async_UnlistenOnExit(int code, Datum arg)
{
Exec_UnlistenAllCommit();
+ asyncQueueUnregister();
}
/*
if (pendingActions || pendingNotifies)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN or NOTIFY")));
+ errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
}
/*
if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify");
- Assert(backendHasExecutedInitialListen == false);
-
/* Preflight for any pending listen/unlisten actions */
foreach(p, pendingActions)
{
while (nextNotify != NULL)
{
/*
- * Add the pending notifications to the queue. We acquire and
+ * Add the pending notifications to the queue. We acquire and
* release AsyncQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
}
}
- /*
- * If we did an initial LISTEN, listenChannels now has the entry, so we no
- * longer need or want the flag to be set.
- */
- backendHasExecutedInitialListen = false;
+ /* If no longer listening to anything, get out of listener array */
+ if (amRegisteredListener && listenChannels == NIL)
+ asyncQueueUnregister();
/* And clean up */
ClearPendingActionsAndNotifies();
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
*/
- if (listenChannels != NIL || backendHasExecutedInitialListen)
+ if (amRegisteredListener)
return;
if (Trace_notify)
elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
- /*
- * We need this variable to detect an aborted initial LISTEN. In that case
- * we would set up our pointer but not listen on any channel. This flag
- * gets cleared in AtCommit_Notify or AtAbort_Notify().
- */
- backendHasExecutedInitialListen = true;
-
/*
* Before registering, make sure we will unlisten before dying. (Note:
* this action does not get undone if we abort later.)
*/
if (!unlistenExitRegistered)
{
- on_shmem_exit(Async_UnlistenOnExit, 0);
+ before_shmem_exit(Async_UnlistenOnExit, 0);
unlistenExitRegistered = true;
}
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock);
+ /* Now we are listed in the global array, so remember we're listening */
+ amRegisteredListener = true;
+
/*
* Try to move our pointer forward as far as possible. This will skip over
* already-committed notifications. Still, we could get notifications that
* We do not complain about unlistening something not being listened;
* should we?
*/
-
- /* If no longer listening to anything, get out of listener array */
- if (listenChannels == NIL)
- asyncQueueUnregister();
}
/*
list_free_deep(listenChannels);
listenChannels = NIL;
-
- asyncQueueUnregister();
}
/*
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
* Note that we send signals and process the queue even if the transaction
- * eventually aborted. This is because we need to clean out whatever got
+ * eventually aborted. This is because we need to clean out whatever got
* added to the queue.
*
* NOTE: we are outside of any transaction here.
void
ProcessCompletedNotifies(void)
{
+ MemoryContext caller_context;
bool signalled;
/* Nothing to do if we didn't send any notifications */
*/
backendHasSentNotifications = false;
+ /*
+ * We must preserve the caller's memory context (probably MessageContext)
+ * across the transaction we do here.
+ */
+ caller_context = CurrentMemoryContext;
+
if (Trace_notify)
elog(DEBUG1, "ProcessCompletedNotifies");
CommitTransactionCommand();
+ MemoryContextSwitchTo(caller_context);
+
/* We don't need pq_flush() here since postgres.c will do one shortly */
}
/*
* Remove our entry from the listeners array when we are no longer listening
- * on any channel. NB: must not fail if we're already not listening.
+ * on any channel. NB: must not fail if we're already not listening.
*/
static void
asyncQueueUnregister(void)
Assert(listenChannels == NIL); /* else caller error */
+ if (!amRegisteredListener) /* nothing to do */
+ return;
+
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* check if entry is valid and oldest ... */
advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
LWLockRelease(AsyncQueueLock);
+ /* mark ourselves as no longer listed in the global array */
+ amRegisteredListener = false;
+
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
/*
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
- * pointer would wrap around compared to the tail. We cannot create such
+ * pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (compare the truncation logic in
* asyncQueueAdvanceTail).
nexthead = 0; /* wrap around */
boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
- return asyncQueuePagePrecedesLogically(nexthead, boundary);
+ return asyncQueuePagePrecedes(nexthead, boundary);
}
/*
* Advance the QueuePosition to the next entry, assuming that the current
- * entry is of length entryLength. If we jump to a new page the function
+ * entry is of length entryLength. If we jump to a new page the function
* returns true, else false.
*/
static bool
-asyncQueueAdvance(QueuePosition *position, int entryLength)
+asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
{
int pageno = QUEUE_POS_PAGE(*position);
int offset = QUEUE_POS_OFFSET(*position);
* the last byte which simplifies reading the page later.
*
* We are passed the list cell containing the next notification to write
- * and return the first still-unwritten cell back. Eventually we will return
+ * and return the first still-unwritten cell back. Eventually we will return
* NULL indicating all is done.
*
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
asyncQueueAddEntries(ListCell *nextNotify)
{
AsyncQueueEntry qe;
+ QueuePosition queue_head;
int pageno;
int offset;
int slotno;
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ /*
+ * We work with a local copy of QUEUE_HEAD, which we write back to shared
+ * memory upon exiting. The reason for this is that if we have to advance
+ * to a new page, SimpleLruZeroPage might fail (out of disk space, for
+ * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
+ * subsequent insertions would try to put entries into a page that slru.c
+ * thinks doesn't exist yet.) So, use a local position variable. Note
+ * that if we do fail, any already-inserted queue entries are forgotten;
+ * this is okay, since they'd be useless anyway after our transaction
+ * rolls back.
+ */
+ queue_head = QUEUE_HEAD;
+
/* Fetch the current page */
- pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
+ pageno = QUEUE_POS_PAGE(queue_head);
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
/* Note we mark the page dirty before writing in it */
AsyncCtl->shared->page_dirty[slotno] = true;
/* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe);
- offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
+ offset = QUEUE_POS_OFFSET(queue_head);
/* Check whether the entry really fits on the current page */
if (offset + qe.length <= QUEUE_PAGESIZE)
&qe,
qe.length);
- /* Advance QUEUE_HEAD appropriately, and note if page is full */
- if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
+ /* Advance queue_head appropriately, and detect if page is full */
+ if (asyncQueueAdvance(&(queue_head), qe.length))
{
/*
* Page is full, so we're done here, but first fill the next page
* with zeroes. The reason to do this is to ensure that slru.c's
* idea of the head page is always the same as ours, which avoids
- * boundary problems in SimpleLruTruncate. The test in
+ * boundary problems in SimpleLruTruncate. The test in
* asyncQueueIsFull() ensured that there is room to create this
* page without overrunning the queue.
*/
- slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
/* And exit the loop */
break;
}
}
+ /* Success, so update the global QUEUE_HEAD */
+ QUEUE_HEAD = queue_head;
+
LWLockRelease(AsyncCtlLock);
return nextNotify;
}
/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ * SQL function to return the fraction of the notification queue currently
+ * occupied.
+ */
+Datum
+pg_notification_queue_usage(PG_FUNCTION_ARGS)
+{
+ double usage;
+
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ usage = asyncQueueUsage();
+ LWLockRelease(AsyncQueueLock);
+
+ PG_RETURN_FLOAT8(usage);
+}
+
+/*
+ * Return the fraction of the queue that is currently occupied.
*
- * Caller must hold exclusive AsyncQueueLock.
+ * The caller must hold AysncQueueLock in (at least) shared mode.
*/
-static void
-asyncQueueFillWarning(void)
+static double
+asyncQueueUsage(void)
{
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied;
- double fillDegree;
- TimestampTz t;
occupied = headPage - tailPage;
if (occupied == 0)
- return; /* fast exit for common case */
+ return (double) 0; /* fast exit for common case */
if (occupied < 0)
{
occupied += QUEUE_MAX_PAGE + 1;
}
- fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+ return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+}
+
+/*
+ * Check whether the queue is at least half full, and emit a warning if so.
+ *
+ * This is unlikely given the size of the queue, but possible.
+ * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ *
+ * Caller must hold exclusive AsyncQueueLock.
+ */
+static void
+asyncQueueFillWarning(void)
+{
+ double fillDegree;
+ TimestampTz t;
+ fillDegree = asyncQueueUsage();
if (fillDegree < 0.5)
return;
AtAbort_Notify(void)
{
/*
- * If we LISTEN but then roll back the transaction we have set our pointer
- * but have not made any entry in listenChannels. In that case, remove our
- * pointer again.
+ * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+ * we have registered as a listener but have not made any entry in
+ * listenChannels. In that case, deregister again.
*/
- if (backendHasExecutedInitialListen)
- {
- /*
- * Checking listenChannels should be redundant but it can't hurt doing
- * it for safety reasons.
- */
- if (listenChannels == NIL)
- asyncQueueUnregister();
-
- backendHasExecutedInitialListen = false;
- }
+ if (amRegisteredListener && listenChannels == NIL)
+ asyncQueueUnregister();
/* And clean up */
ClearPendingActionsAndNotifies();
/*
* HandleNotifyInterrupt
*
- * This is called when PROCSIG_NOTIFY_INTERRUPT is received.
- *
- * If we are idle (notifyInterruptEnabled is set), we can safely invoke
- * ProcessIncomingNotify directly. Otherwise, just set a flag
- * to do it later.
+ * Signal handler portion of interrupt handling. Let the backend know
+ * that there's a pending notify interrupt. If we're currently reading
+ * from the client, this will interrupt the read and
+ * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
*/
void
HandleNotifyInterrupt(void)
{
/*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what
- * you do here. Some helpful soul had this routine sprinkled with
- * TPRINTFs, which would likely lead to corruption of stdio buffers if
- * they were ever turned on.
+ * you do here.
*/
- /* Don't joggle the elbow of proc_exit */
- if (proc_exit_inprogress)
- return;
-
- if (notifyInterruptEnabled)
- {
- bool save_ImmediateInterruptOK = ImmediateInterruptOK;
+ /* signal that work needs to be done */
+ notifyInterruptPending = true;
- /*
- * We may be called while ImmediateInterruptOK is true; turn it off
- * while messing with the NOTIFY state. (We would have to save and
- * restore it anyway, because PGSemaphore operations inside
- * ProcessIncomingNotify() might reset it.)
- */
- ImmediateInterruptOK = false;
-
- /*
- * I'm not sure whether some flavors of Unix might allow another
- * SIGUSR1 occurrence to recursively interrupt this routine. To cope
- * with the possibility, we do the same sort of dance that
- * EnableNotifyInterrupt must do --- see that routine for comments.
- */
- notifyInterruptEnabled = 0; /* disable any recursive signal */
- notifyInterruptOccurred = 1; /* do at least one iteration */
- for (;;)
- {
- notifyInterruptEnabled = 1;
- if (!notifyInterruptOccurred)
- break;
- notifyInterruptEnabled = 0;
- if (notifyInterruptOccurred)
- {
- /* Here, it is finally safe to do stuff. */
- if (Trace_notify)
- elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");
-
- ProcessIncomingNotify();
-
- if (Trace_notify)
- elog(DEBUG1, "HandleNotifyInterrupt: done");
- }
- }
-
- /*
- * Restore ImmediateInterruptOK, and check for interrupts if needed.
- */
- ImmediateInterruptOK = save_ImmediateInterruptOK;
- if (save_ImmediateInterruptOK)
- CHECK_FOR_INTERRUPTS();
- }
- else
- {
- /*
- * In this path it is NOT SAFE to do much of anything, except this:
- */
- notifyInterruptOccurred = 1;
- }
+ /* make sure the event is processed in due course */
+ SetLatch(MyLatch);
}
/*
- * EnableNotifyInterrupt
+ * ProcessNotifyInterrupt
*
- * This is called by the PostgresMain main loop just before waiting
- * for a frontend command. If we are truly idle (ie, *not* inside
- * a transaction block), then process any pending inbound notifies,
- * and enable the signal handler to process future notifies directly.
- *
- * NOTE: the signal handler starts out disabled, and stays so until
- * PostgresMain calls this the first time.
+ * This is called just after waiting for a frontend command. If a
+ * interrupt arrives (via HandleNotifyInterrupt()) while reading, the
+ * read will be interrupted via the process's latch, and this routine
+ * will get called. If we are truly idle (ie, *not* inside a transaction
+ * block), process the incoming notifies.
*/
void
-EnableNotifyInterrupt(void)
+ProcessNotifyInterrupt(void)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
- /*
- * This code is tricky because we are communicating with a signal handler
- * that could interrupt us at any point. If we just checked
- * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
- * fail to respond promptly to a signal that happens in between those two
- * steps. (A very small time window, perhaps, but Murphy's Law says you
- * can hit it...) Instead, we first set the enable flag, then test the
- * occurred flag. If we see an unserviced interrupt has occurred, we
- * re-clear the enable flag before going off to do the service work. (That
- * prevents re-entrant invocation of ProcessIncomingNotify() if another
- * interrupt occurs.) If an interrupt comes in between the setting and
- * clearing of notifyInterruptEnabled, then it will have done the service
- * work and left notifyInterruptOccurred zero, so we have to check again
- * after clearing enable. The whole thing has to be in a loop in case
- * another interrupt occurs while we're servicing the first. Once we get
- * out of the loop, enable is set and we know there is no unserviced
- * interrupt.
- *
- * NB: an overenthusiastic optimizing compiler could easily break this
- * code. Hopefully, they all understand what "volatile" means these days.
- */
- for (;;)
- {
- notifyInterruptEnabled = 1;
- if (!notifyInterruptOccurred)
- break;
- notifyInterruptEnabled = 0;
- if (notifyInterruptOccurred)
- {
- if (Trace_notify)
- elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
-
- ProcessIncomingNotify();
-
- if (Trace_notify)
- elog(DEBUG1, "EnableNotifyInterrupt: done");
- }
- }
+ while (notifyInterruptPending)
+ ProcessIncomingNotify();
}
-/*
- * DisableNotifyInterrupt
- *
- * This is called by the PostgresMain main loop just after receiving
- * a frontend command. Signal handler execution of inbound notifies
- * is disabled until the next EnableNotifyInterrupt call.
- *
- * The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
- * so as to prevent conflicts if one signal interrupts the other. So we
- * must return the previous state of the flag.
- */
-bool
-DisableNotifyInterrupt(void)
-{
- bool result = (notifyInterruptEnabled != 0);
-
- notifyInterruptEnabled = 0;
-
- return result;
-}
/*
* Read all pending notifications from the queue, and deliver appropriate
static void
asyncQueueReadAllNotifications(void)
{
- QueuePosition pos;
+ volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
bool advanceTail;
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the AsyncCtlLock while we are examining the entries and
- * possibly transmitting them to our frontend. Copy only the part
+ * possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
* and deliver relevant ones to my frontend.
*
* The current page must have been fetched into page_buffer from shared
- * memory. (We could access the page right in shared memory, but that
+ * memory. (We could access the page right in shared memory, but that
* would imply holding the AsyncCtlLock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* The QueuePosition *current is advanced past all processed messages.
*/
static bool
-asyncQueueProcessPageEntries(QueuePosition *current,
+asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer)
{
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
- if (TransactionIdDidCommit(qe->xid))
+ if (TransactionIdIsInProgress(qe->xid))
+ {
+ /*
+ * The source transaction is still in progress, so we can't
+ * process this message yet. Break out of the loop, but first
+ * back up *current so we will reprocess the message next
+ * time. (Note: it is unlikely but not impossible for
+ * TransactionIdDidCommit to fail, so we can't really avoid
+ * this advance-then-back-up behavior when dealing with an
+ * uncommitted message.)
+ *
+ * Note that we must test TransactionIdIsInProgress before we
+ * test TransactionIdDidCommit, else we might return a message
+ * from a transaction that is not yet visible to snapshots;
+ * compare the comments at the head of tqual.c.
+ */
+ *current = thisentry;
+ reachedStop = true;
+ break;
+ }
+ else if (TransactionIdDidCommit(qe->xid))
{
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
}
}
- else if (TransactionIdDidAbort(qe->xid))
- {
- /*
- * If the source transaction aborted, we just ignore its
- * notifications.
- */
- }
else
{
/*
- * The transaction has neither committed nor aborted so far,
- * so we can't process its message yet. Break out of the
- * loop, but first back up *current so we will reprocess the
- * message next time. (Note: it is unlikely but not
- * impossible for TransactionIdDidCommit to fail, so we can't
- * really avoid this advance-then-back-up behavior when
- * dealing with an uncommitted message.)
+ * The source transaction aborted or crashed, so we just
+ * ignore its notifications.
*/
- *current = thisentry;
- reachedStop = true;
- break;
}
}
*/
newtailpage = QUEUE_POS_PAGE(min);
boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
- if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
+ if (asyncQueuePagePrecedes(oldtailpage, boundary))
{
/*
* SimpleLruTruncate() will ask for AsyncCtlLock but will also release
/*
* ProcessIncomingNotify
*
- * Deal with arriving NOTIFYs from other backends.
- * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
- * signal handler, or the next time control reaches the outer idle loop.
+ * Deal with arriving NOTIFYs from other backends as soon as it's safe to
+ * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
+ * signal handler, but isn't anymore.
+ *
* Scan the queue for arriving notifications and report them to my front
* end.
*
static void
ProcessIncomingNotify(void)
{
- bool catchup_enabled;
-
/* We *must* reset the flag */
- notifyInterruptOccurred = 0;
+ notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
if (listenChannels == NIL)
return;
- /* Must prevent catchup interrupt while I am running */
- catchup_enabled = DisableCatchupInterrupt();
-
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify");
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: done");
-
- if (catchup_enabled)
- EnableCatchupInterrupt();
}
/*
pq_endmessage(&buf);
/*
- * NOTE: we do not do pq_flush() here. For a self-notify, it will
+ * NOTE: we do not do pq_flush() here. For a self-notify, it will
* happen at the end of the transaction, and for incoming notifies
* ProcessIncomingNotify will do it after finding all the notifies.
*/