* async.c
* Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
- * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
- * can call inbound-notify processing immediately if this backend is idle
- * (ie, it is waiting for a frontend command and is not within a transaction
- * block). Otherwise the handler may only set a flag, which will cause the
- * processing to occur just before we next go idle.
+ * sets the process's latch, which triggers the event to be processed
+ * immediately if this backend is idle (i.e., it is waiting for a frontend
+ * command and is not within a transaction block. C.f.
+ * ProcessClientReadInterrupt()). Otherwise the handler may only set a
+ * flag, which will cause the processing to occur just before we next go
+ * idle.
*
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
*
* This struct declaration has the maximal length, but in a real queue entry
* the data area is only big enough for the actual channel and payload strings
- * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
+ * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible
* entry size, if both channel and payload strings are empty (but note it
* doesn't include alignment padding).
*
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
-#define InvalidPid (-1)
-
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
*
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
* of other backends and also change the head and tail pointers.
*
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
*
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the tail
+ QueuePosition tail; /* the global tail is equivalent to the pos
* of the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
- QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
- /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
+ /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
*
* The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
* pages, because more than that would confuse slru.c into thinking there
- * was a wraparound condition. With the default BLCKSZ this means there
+ * was a wraparound condition. With the default BLCKSZ this means there
* can be up to 8GB of queued-and-not-read data.
*
* Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
typedef struct
{
ListenActionKind action;
- char channel[1]; /* actually, as long as needed */
+ char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/*
- * State for inbound notifications consists of two flags: one saying whether
- * the signal handler is currently allowed to call ProcessIncomingNotify
- * directly, and one saying whether the signal has occurred but the handler
- * was not allowed to call ProcessIncomingNotify at the time.
- *
- * NB: the "volatile" on these declarations is critical! If your compiler
- * does not grok "volatile", you'd be best advised to compile this file
- * with all optimization turned off.
+ * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * called from inside a signal handler. That just sets the
+ * notifyInterruptPending flag and sets the process
+ * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * actually deal with the interrupt.
*/
-static volatile sig_atomic_t notifyInterruptEnabled = 0;
-static volatile sig_atomic_t notifyInterruptOccurred = 0;
+volatile sig_atomic_t notifyInterruptPending = false;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
-static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
+static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
-static bool asyncQueueProcessPageEntries(QueuePosition *current,
+static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer);
static void asyncQueueAdvanceTail(void);
int diff;
/*
- * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
+ * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
* in the range 0..QUEUE_MAX_PAGE.
*/
Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
Size size;
/* This had better match AsyncShmemInit */
- size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
- size = add_size(size, sizeof(AsyncQueueControl));
+ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = add_size(size, offsetof(AsyncQueueControl, backend));
size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));
/*
* Create or attach to the AsyncQueueControl structure.
*
- * The used entries in the backend[] array run from 1 to MaxBackends.
- * sizeof(AsyncQueueControl) already includes space for the unused zero'th
- * entry, but we need to add on space for the used entries.
+ * The used entries in the backend[] array run from 1 to MaxBackends; the
+ * zero'th entry is unused but must be allocated.
*/
- size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
- size = add_size(size, sizeof(AsyncQueueControl));
+ size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
+ size = add_size(size, offsetof(AsyncQueueControl, backend));
asyncQueueControl = (AsyncQueueControl *)
ShmemInitStruct("Async Queue Control", size, &found);
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
- actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
+ actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
+ strlen(channel) + 1);
actrec->action = action;
strcpy(actrec->channel, channel);
while (nextNotify != NULL)
{
/*
- * Add the pending notifications to the queue. We acquire and
+ * Add the pending notifications to the queue. We acquire and
* release AsyncQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
*/
if (!unlistenExitRegistered)
{
- on_shmem_exit(Async_UnlistenOnExit, 0);
+ before_shmem_exit(Async_UnlistenOnExit, 0);
unlistenExitRegistered = true;
}
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
* Note that we send signals and process the queue even if the transaction
- * eventually aborted. This is because we need to clean out whatever got
+ * eventually aborted. This is because we need to clean out whatever got
* added to the queue.
*
* NOTE: we are outside of any transaction here.
/*
* Remove our entry from the listeners array when we are no longer listening
- * on any channel. NB: must not fail if we're already not listening.
+ * on any channel. NB: must not fail if we're already not listening.
*/
static void
asyncQueueUnregister(void)
Assert(listenChannels == NIL); /* else caller error */
- if (!amRegisteredListener) /* nothing to do */
+ if (!amRegisteredListener) /* nothing to do */
return;
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/*
* The queue is full if creating a new head page would create a page that
* logically precedes the current global tail pointer, ie, the head
- * pointer would wrap around compared to the tail. We cannot create such
+ * pointer would wrap around compared to the tail. We cannot create such
* a head page for fear of confusing slru.c. For safety we round the tail
* pointer back to a segment boundary (compare the truncation logic in
* asyncQueueAdvanceTail).
/*
* Advance the QueuePosition to the next entry, assuming that the current
- * entry is of length entryLength. If we jump to a new page the function
+ * entry is of length entryLength. If we jump to a new page the function
* returns true, else false.
*/
static bool
-asyncQueueAdvance(QueuePosition *position, int entryLength)
+asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
{
int pageno = QUEUE_POS_PAGE(*position);
int offset = QUEUE_POS_OFFSET(*position);
* the last byte which simplifies reading the page later.
*
* We are passed the list cell containing the next notification to write
- * and return the first still-unwritten cell back. Eventually we will return
+ * and return the first still-unwritten cell back. Eventually we will return
* NULL indicating all is done.
*
* We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
* Page is full, so we're done here, but first fill the next page
* with zeroes. The reason to do this is to ensure that slru.c's
* idea of the head page is always the same as ours, which avoids
- * boundary problems in SimpleLruTruncate. The test in
+ * boundary problems in SimpleLruTruncate. The test in
* asyncQueueIsFull() ensured that there is room to create this
* page without overrunning the queue.
*/
}
/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ * SQL function to return the fraction of the notification queue currently
+ * occupied.
+ */
+Datum
+pg_notification_queue_usage(PG_FUNCTION_ARGS)
+{
+ double usage;
+
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ usage = asyncQueueUsage();
+ LWLockRelease(AsyncQueueLock);
+
+ PG_RETURN_FLOAT8(usage);
+}
+
+/*
+ * Return the fraction of the queue that is currently occupied.
*
- * Caller must hold exclusive AsyncQueueLock.
+ * The caller must hold AysncQueueLock in (at least) shared mode.
*/
-static void
-asyncQueueFillWarning(void)
+static double
+asyncQueueUsage(void)
{
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
int occupied;
- double fillDegree;
- TimestampTz t;
occupied = headPage - tailPage;
if (occupied == 0)
- return; /* fast exit for common case */
+ return (double) 0; /* fast exit for common case */
if (occupied < 0)
{
occupied += QUEUE_MAX_PAGE + 1;
}
- fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+ return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
+}
+/*
+ * Check whether the queue is at least half full, and emit a warning if so.
+ *
+ * This is unlikely given the size of the queue, but possible.
+ * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
+ *
+ * Caller must hold exclusive AsyncQueueLock.
+ */
+static void
+asyncQueueFillWarning(void)
+{
+ double fillDegree;
+ TimestampTz t;
+
+ fillDegree = asyncQueueUsage();
if (fillDegree < 0.5)
return;
/*
* HandleNotifyInterrupt
*
- * This is called when PROCSIG_NOTIFY_INTERRUPT is received.
- *
- * If we are idle (notifyInterruptEnabled is set), we can safely invoke
- * ProcessIncomingNotify directly. Otherwise, just set a flag
- * to do it later.
+ * Signal handler portion of interrupt handling. Let the backend know
+ * that there's a pending notify interrupt. If we're currently reading
+ * from the client, this will interrupt the read and
+ * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
*/
void
HandleNotifyInterrupt(void)
{
/*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what
- * you do here. Some helpful soul had this routine sprinkled with
- * TPRINTFs, which would likely lead to corruption of stdio buffers if
- * they were ever turned on.
+ * you do here.
*/
- /* Don't joggle the elbow of proc_exit */
- if (proc_exit_inprogress)
- return;
-
- if (notifyInterruptEnabled)
- {
- bool save_ImmediateInterruptOK = ImmediateInterruptOK;
-
- /*
- * We may be called while ImmediateInterruptOK is true; turn it off
- * while messing with the NOTIFY state. (We would have to save and
- * restore it anyway, because PGSemaphore operations inside
- * ProcessIncomingNotify() might reset it.)
- */
- ImmediateInterruptOK = false;
-
- /*
- * I'm not sure whether some flavors of Unix might allow another
- * SIGUSR1 occurrence to recursively interrupt this routine. To cope
- * with the possibility, we do the same sort of dance that
- * EnableNotifyInterrupt must do --- see that routine for comments.
- */
- notifyInterruptEnabled = 0; /* disable any recursive signal */
- notifyInterruptOccurred = 1; /* do at least one iteration */
- for (;;)
- {
- notifyInterruptEnabled = 1;
- if (!notifyInterruptOccurred)
- break;
- notifyInterruptEnabled = 0;
- if (notifyInterruptOccurred)
- {
- /* Here, it is finally safe to do stuff. */
- if (Trace_notify)
- elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");
-
- ProcessIncomingNotify();
+ /* signal that work needs to be done */
+ notifyInterruptPending = true;
- if (Trace_notify)
- elog(DEBUG1, "HandleNotifyInterrupt: done");
- }
- }
-
- /*
- * Restore ImmediateInterruptOK, and check for interrupts if needed.
- */
- ImmediateInterruptOK = save_ImmediateInterruptOK;
- if (save_ImmediateInterruptOK)
- CHECK_FOR_INTERRUPTS();
- }
- else
- {
- /*
- * In this path it is NOT SAFE to do much of anything, except this:
- */
- notifyInterruptOccurred = 1;
- }
+ /* make sure the event is processed in due course */
+ SetLatch(MyLatch);
}
/*
- * EnableNotifyInterrupt
- *
- * This is called by the PostgresMain main loop just before waiting
- * for a frontend command. If we are truly idle (ie, *not* inside
- * a transaction block), then process any pending inbound notifies,
- * and enable the signal handler to process future notifies directly.
+ * ProcessNotifyInterrupt
*
- * NOTE: the signal handler starts out disabled, and stays so until
- * PostgresMain calls this the first time.
+ * This is called just after waiting for a frontend command. If a
+ * interrupt arrives (via HandleNotifyInterrupt()) while reading, the
+ * read will be interrupted via the process's latch, and this routine
+ * will get called. If we are truly idle (ie, *not* inside a transaction
+ * block), process the incoming notifies.
*/
void
-EnableNotifyInterrupt(void)
+ProcessNotifyInterrupt(void)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
- /*
- * This code is tricky because we are communicating with a signal handler
- * that could interrupt us at any point. If we just checked
- * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
- * fail to respond promptly to a signal that happens in between those two
- * steps. (A very small time window, perhaps, but Murphy's Law says you
- * can hit it...) Instead, we first set the enable flag, then test the
- * occurred flag. If we see an unserviced interrupt has occurred, we
- * re-clear the enable flag before going off to do the service work. (That
- * prevents re-entrant invocation of ProcessIncomingNotify() if another
- * interrupt occurs.) If an interrupt comes in between the setting and
- * clearing of notifyInterruptEnabled, then it will have done the service
- * work and left notifyInterruptOccurred zero, so we have to check again
- * after clearing enable. The whole thing has to be in a loop in case
- * another interrupt occurs while we're servicing the first. Once we get
- * out of the loop, enable is set and we know there is no unserviced
- * interrupt.
- *
- * NB: an overenthusiastic optimizing compiler could easily break this
- * code. Hopefully, they all understand what "volatile" means these days.
- */
- for (;;)
- {
- notifyInterruptEnabled = 1;
- if (!notifyInterruptOccurred)
- break;
- notifyInterruptEnabled = 0;
- if (notifyInterruptOccurred)
- {
- if (Trace_notify)
- elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
-
- ProcessIncomingNotify();
-
- if (Trace_notify)
- elog(DEBUG1, "EnableNotifyInterrupt: done");
- }
- }
+ while (notifyInterruptPending)
+ ProcessIncomingNotify();
}
-/*
- * DisableNotifyInterrupt
- *
- * This is called by the PostgresMain main loop just after receiving
- * a frontend command. Signal handler execution of inbound notifies
- * is disabled until the next EnableNotifyInterrupt call.
- *
- * The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
- * so as to prevent conflicts if one signal interrupts the other. So we
- * must return the previous state of the flag.
- */
-bool
-DisableNotifyInterrupt(void)
-{
- bool result = (notifyInterruptEnabled != 0);
-
- notifyInterruptEnabled = 0;
-
- return result;
-}
/*
* Read all pending notifications from the queue, and deliver appropriate
static void
asyncQueueReadAllNotifications(void)
{
- QueuePosition pos;
+ volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
bool advanceTail;
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the AsyncCtlLock while we are examining the entries and
- * possibly transmitting them to our frontend. Copy only the part
+ * possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage,
* and deliver relevant ones to my frontend.
*
* The current page must have been fetched into page_buffer from shared
- * memory. (We could access the page right in shared memory, but that
+ * memory. (We could access the page right in shared memory, but that
* would imply holding the AsyncCtlLock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* The QueuePosition *current is advanced past all processed messages.
*/
static bool
-asyncQueueProcessPageEntries(QueuePosition *current,
+asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer)
{
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
- if (TransactionIdDidCommit(qe->xid))
+ if (TransactionIdIsInProgress(qe->xid))
+ {
+ /*
+ * The source transaction is still in progress, so we can't
+ * process this message yet. Break out of the loop, but first
+ * back up *current so we will reprocess the message next
+ * time. (Note: it is unlikely but not impossible for
+ * TransactionIdDidCommit to fail, so we can't really avoid
+ * this advance-then-back-up behavior when dealing with an
+ * uncommitted message.)
+ *
+ * Note that we must test TransactionIdIsInProgress before we
+ * test TransactionIdDidCommit, else we might return a message
+ * from a transaction that is not yet visible to snapshots;
+ * compare the comments at the head of tqual.c.
+ */
+ *current = thisentry;
+ reachedStop = true;
+ break;
+ }
+ else if (TransactionIdDidCommit(qe->xid))
{
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
}
}
- else if (TransactionIdDidAbort(qe->xid))
- {
- /*
- * If the source transaction aborted, we just ignore its
- * notifications.
- */
- }
else
{
/*
- * The transaction has neither committed nor aborted so far,
- * so we can't process its message yet. Break out of the
- * loop, but first back up *current so we will reprocess the
- * message next time. (Note: it is unlikely but not
- * impossible for TransactionIdDidCommit to fail, so we can't
- * really avoid this advance-then-back-up behavior when
- * dealing with an uncommitted message.)
+ * The source transaction aborted or crashed, so we just
+ * ignore its notifications.
*/
- *current = thisentry;
- reachedStop = true;
- break;
}
}
/*
* ProcessIncomingNotify
*
- * Deal with arriving NOTIFYs from other backends.
- * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
- * signal handler, or the next time control reaches the outer idle loop.
+ * Deal with arriving NOTIFYs from other backends as soon as it's safe to
+ * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
+ * signal handler, but isn't anymore.
+ *
* Scan the queue for arriving notifications and report them to my front
* end.
*
static void
ProcessIncomingNotify(void)
{
- bool catchup_enabled;
-
/* We *must* reset the flag */
- notifyInterruptOccurred = 0;
+ notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
if (listenChannels == NIL)
return;
- /* Must prevent catchup interrupt while I am running */
- catchup_enabled = DisableCatchupInterrupt();
-
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify");
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: done");
-
- if (catchup_enabled)
- EnableCatchupInterrupt();
}
/*
pq_endmessage(&buf);
/*
- * NOTE: we do not do pq_flush() here. For a self-notify, it will
+ * NOTE: we do not do pq_flush() here. For a self-notify, it will
* happen at the end of the transaction, and for incoming notifies
* ProcessIncomingNotify will do it after finding all the notifies.
*/