* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.102.2.2 2005/08/25 22:07:15 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.102.2.3 2008/03/12 20:12:48 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* transaction, since by assumption it is only called from outside any
* transaction.
*
+ * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
+ * of pending actions. If we reach transaction commit, the changes are
+ * applied to pg_listener just before executing any pending NOTIFYs. This
+ * method is necessary because to avoid race conditions, we must hold lock
+ * on pg_listener from when we insert a new listener tuple until we commit.
+ * To do that and not create undue hazard of deadlock, we don't want to
+ * touch pg_listener until we are otherwise done with the transaction;
+ * in particular it'd be uncool to still be taking user-commanded locks
+ * while holding the pg_listener lock.
+ *
* Although we grab ExclusiveLock on pg_listener for any operation,
* the lock is never held very long, so it shouldn't cause too much of
* a performance problem. (Previously we used AccessExclusiveLock, but
#include <unistd.h>
#include <signal.h>
#include <errno.h>
-#include <netinet/in.h>
#include "access/heapam.h"
#include "catalog/catname.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/memutils.h"
#include "utils/ps_status.h"
-#include "utils/syscache.h"
+/*
+ * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
+ * all actions requested in the current transaction. As explained above,
+ * we don't actually modify pg_listener until we reach transaction commit.
+ * The List nodes and structs are all palloc'd in TopTransactionContext.
+ */
+typedef enum
+{
+ LISTEN_LISTEN,
+ LISTEN_UNLISTEN,
+ LISTEN_UNLISTEN_ALL
+} ListenActionKind;
+
+typedef struct
+{
+ ListenActionKind action;
+ char condname[1]; /* actually, as long as needed */
+} ListenAction;
+
+static List *pendingActions = NIL; /* list of ListenAction */
+
/*
* State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
* and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction. The List nodes and
* referenced strings are all palloc'd in TopTransactionContext.
+ *
+ * Note: the action and notify lists do not interact within a transaction.
+ * In particular, if a transaction does NOTIFY and then LISTEN on the same
+ * condition name, it will get a self-notify at commit. This is a bit odd
+ * but is consistent with our historical behavior.
*/
-static List *pendingNotifies = NIL;
+static List *pendingNotifies = NIL; /* list of C strings */
/*
* State for inbound notifies consists of two flags: one saying whether
* does not grok "volatile", you'd be best advised to compile this file
* with all optimization turned off.
*/
-static volatile int notifyInterruptEnabled = 0;
-static volatile int notifyInterruptOccurred = 0;
+static volatile sig_atomic_t notifyInterruptEnabled = 0;
+static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
bool Trace_notify = false;
+static void queue_listen(ListenActionKind action, const char *condname);
static void Async_UnlistenAll(void);
static void Async_UnlistenOnExit(void);
+static void Exec_Listen(Relation lRel, const char *relname);
+static void Exec_Unlisten(Relation lRel, const char *relname);
+static void Exec_UnlistenAll(Relation lRel);
+static void Send_Notify(Relation lRel);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
static bool AsyncExistsPendingNotify(const char *relname);
-static void ClearPendingNotifies(void);
+static void ClearPendingActionsAndNotifies(void);
/*
- *--------------------------------------------------------------
* Async_Notify
*
* This is executed by the SQL notify command.
* Adds the relation to the list of pending notifies.
* Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- *
- * Results:
- * XXX
- *
- *--------------------------------------------------------------
*/
void
Async_Notify(char *relname)
if (!AsyncExistsPendingNotify(relname))
{
/*
- * The name list needs to live until end of transaction, so store
- * it in the top transaction context.
+ * The name list needs to live until end of transaction, so store it
+ * in the top transaction context.
*/
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+ /*
+ * Ordering of the list isn't important. We choose to put new
+ * entries on the front, as this might make duplicate-elimination
+ * a tad faster when the same condition is signaled many times in
+ * a row.
+ */
pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
MemoryContextSwitchTo(oldcontext);
}
/*
- *--------------------------------------------------------------
+ * queue_listen
+ * Common code for listen, unlisten, unlisten all commands.
+ *
+ * Adds the request to the list of pending actions.
+ * Actual update of pg_listener happens during transaction commit.
+ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ */
+static void
+queue_listen(ListenActionKind action, const char *condname)
+{
+ MemoryContext oldcontext;
+ ListenAction *actrec;
+
+ /*
+ * Unlike Async_Notify, we don't try to collapse out duplicates.
+ * It would be too complicated to ensure we get the right interactions
+ * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
+ * there would be any performance benefit anyway in sane applications.
+ */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* space for terminating null is included in sizeof(ListenAction) */
+ actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
+ actrec->action = action;
+ strcpy(actrec->condname, condname);
+
+ pendingActions = lappend(pendingActions, actrec);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
* Async_Listen
*
* This is executed by the SQL listen command.
+ */
+void
+Async_Listen(char *relname, int pid)
+{
+ Assert(pid == MyProcPid);
+ if (Trace_notify)
+ elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
+
+ queue_listen(LISTEN_LISTEN, relname);
+}
+
+/*
+ * Async_Unlisten
*
- * Register a backend (identified by its Unix PID) as listening
- * on the specified relation.
+ * This is executed by the SQL unlisten command.
+ */
+void
+Async_Unlisten(char *relname, int pid)
+{
+ Assert(pid == MyProcPid);
+ /* Handle specially the `unlisten "*"' command */
+ if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
+ {
+ Async_UnlistenAll();
+ }
+ else
+ {
+ if (Trace_notify)
+ elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
+
+ queue_listen(LISTEN_UNLISTEN, relname);
+ }
+}
+
+/*
+ * Async_UnlistenAll
*
- * Results:
- * XXX
+ * This is invoked by UNLISTEN "*" command, and also at backend exit.
+ */
+static void
+Async_UnlistenAll(void)
+{
+ if (Trace_notify)
+ elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+
+ queue_listen(LISTEN_UNLISTEN_ALL, "");
+}
+
+/*
+ * Async_UnlistenOnExit
*
- * Side effects:
- * pg_listener is updated.
+ * Clean up the pg_listener table at backend exit.
*
- *--------------------------------------------------------------
+ * This is executed if we have done any LISTENs in this backend.
+ * It might not be necessary anymore, if the user UNLISTENed everything,
+ * but we don't try to detect that case.
+ */
+static void
+Async_UnlistenOnExit(void)
+{
+ /*
+ * We need to start/commit a transaction for the unlisten, but if there is
+ * already an active transaction we had better abort that one first.
+ * Otherwise we'd end up committing changes that probably ought to be
+ * discarded.
+ */
+ AbortOutOfAnyTransaction();
+ /* Now we can do the unlisten */
+ StartTransactionCommand();
+ Async_UnlistenAll();
+ CommitTransactionCommand();
+}
+
+/*
+ * AtCommit_Notify
+ *
+ * This is called at transaction commit.
+ *
+ * If there are pending LISTEN/UNLISTEN actions, insert or delete
+ * tuples in pg_listener accordingly.
+ *
+ * If there are outbound notify requests in the pendingNotifies list,
+ * scan pg_listener for matching tuples, and either signal the other
+ * backend or send a message to our own frontend.
+ *
+ * NOTE: we are still inside the current transaction, therefore can
+ * piggyback on its committing of changes.
*/
void
-Async_Listen(char *relname, int pid)
+AtCommit_Notify(void)
{
Relation lRel;
+ List *p;
+
+ if (pendingActions == NIL && pendingNotifies == NIL)
+ return; /* no relevant statements in this xact */
+
+ /*
+ * NOTIFY is disabled if not normal processing mode. This test used to be
+ * in xact.c, but it seems cleaner to do it here.
+ */
+ if (!IsNormalProcessingMode())
+ {
+ ClearPendingActionsAndNotifies();
+ return;
+ }
+
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify");
+
+ /* Acquire ExclusiveLock on pg_listener */
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+
+ /* Perform any pending listen/unlisten actions */
+ foreach(p, pendingActions)
+ {
+ ListenAction *actrec = (ListenAction *) lfirst(p);
+
+ switch (actrec->action)
+ {
+ case LISTEN_LISTEN:
+ Exec_Listen(lRel, actrec->condname);
+ break;
+ case LISTEN_UNLISTEN:
+ Exec_Unlisten(lRel, actrec->condname);
+ break;
+ case LISTEN_UNLISTEN_ALL:
+ Exec_UnlistenAll(lRel);
+ break;
+ }
+
+ /* We must CCI after each action in case of conflicting actions */
+ CommandCounterIncrement();
+ }
+
+ /* Perform any pending notifies */
+ if (pendingNotifies)
+ Send_Notify(lRel);
+
+ /*
+ * We do NOT release the lock on pg_listener here; we need to hold it
+ * until end of transaction (which is about to happen, anyway) to ensure
+ * that notified backends see our tuple updates when they look. Else they
+ * might disregard the signal, which would make the application programmer
+ * very unhappy. Also, this prevents race conditions when we have just
+ * inserted a listening tuple.
+ */
+ heap_close(lRel, NoLock);
+
+ ClearPendingActionsAndNotifies();
+
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify: done");
+}
+
+/*
+ * Exec_Listen --- subroutine for AtCommit_Notify
+ *
+ * Register the current backend as listening on the specified relation.
+ */
+static void
+Exec_Listen(Relation lRel, const char *relname)
+{
HeapScanDesc scan;
HeapTuple tuple;
Datum values[Natts_pg_listener];
char nulls[Natts_pg_listener];
- int i;
+ NameData condname;
bool alreadyListener = false;
if (Trace_notify)
- elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
-
- lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+ elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
/* Detect whether we are already listening on this relname */
- scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
- if (listener->listenerpid == pid &&
- strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ if (listener->listenerpid == MyProcPid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
alreadyListener = true;
/* No need to scan the rest of the table */
heap_endscan(scan);
if (alreadyListener)
- {
- heap_close(lRel, ExclusiveLock);
return;
- }
/*
* OK to insert a new tuple
*/
+ memset(nulls, ' ', sizeof(nulls));
- for (i = 0; i < Natts_pg_listener; i++)
- {
- nulls[i] = ' ';
- values[i] = PointerGetDatum(NULL);
- }
-
- i = 0;
- values[i++] = (Datum) relname;
- values[i++] = (Datum) pid;
- values[i++] = (Datum) 0; /* no notifies pending */
+ namestrcpy(&condname, relname);
+ values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
+ values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid);
+ values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */
tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
+
simple_heap_insert(lRel, tuple);
#ifdef NOT_USED /* currently there are no indexes */
heap_freetuple(tuple);
- heap_close(lRel, ExclusiveLock);
-
/*
* now that we are listening, make sure we will unlisten before dying.
*/
}
/*
- *--------------------------------------------------------------
- * Async_Unlisten
- *
- * This is executed by the SQL unlisten command.
+ * Exec_Unlisten --- subroutine for AtCommit_Notify
*
- * Remove the backend from the list of listening backends
+ * Remove the current backend from the list of listening backends
* for the specified relation.
- *
- * Results:
- * XXX
- *
- * Side effects:
- * pg_listener is updated.
- *
- *--------------------------------------------------------------
*/
-void
-Async_Unlisten(char *relname, int pid)
+static void
+Exec_Unlisten(Relation lRel, const char *relname)
{
- Relation lRel;
HeapScanDesc scan;
HeapTuple tuple;
- /* Handle specially the `unlisten "*"' command */
- if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
- {
- Async_UnlistenAll();
- return;
- }
-
if (Trace_notify)
- elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
-
- lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+ elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
- scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
- if (listener->listenerpid == pid &&
- strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ if (listener->listenerpid == MyProcPid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
/* Found the matching tuple, delete it */
simple_heap_delete(lRel, &tuple->t_self);
/*
- * We assume there can be only one match, so no need to scan
- * the rest of the table
+ * We assume there can be only one match, so no need to scan the
+ * rest of the table
*/
break;
}
}
heap_endscan(scan);
- heap_close(lRel, ExclusiveLock);
-
/*
* We do not complain about unlistening something not being listened;
* should we?
}
/*
- *--------------------------------------------------------------
- * Async_UnlistenAll
- *
- * Unlisten all relations for this backend.
- *
- * This is invoked by UNLISTEN "*" command, and also at backend exit.
- *
- * Results:
- * XXX
+ * Exec_UnlistenAll --- subroutine for AtCommit_Notify
*
- * Side effects:
- * pg_listener is updated.
- *
- *--------------------------------------------------------------
+ * Update pg_listener to unlisten all relations for this backend.
*/
static void
-Async_UnlistenAll(void)
+Exec_UnlistenAll(Relation lRel)
{
- Relation lRel;
- TupleDesc tdesc;
HeapScanDesc scan;
HeapTuple lTuple;
ScanKeyData key[1];
if (Trace_notify)
- elog(DEBUG1, "Async_UnlistenAll");
-
- lRel = heap_openr(ListenerRelationName, ExclusiveLock);
- tdesc = RelationGetDescr(lRel);
+ elog(DEBUG1, "Exec_UnlistenAll");
/* Find and delete all entries with my listenerPID */
ScanKeyEntryInitialize(&key[0], 0,
simple_heap_delete(lRel, &lTuple->t_self);
heap_endscan(scan);
- heap_close(lRel, ExclusiveLock);
}
/*
- *--------------------------------------------------------------
- * Async_UnlistenOnExit
+ * Send_Notify --- subroutine for AtCommit_Notify
*
- * Clean up the pg_listener table at backend exit.
- *
- * This is executed if we have done any LISTENs in this backend.
- * It might not be necessary anymore, if the user UNLISTENed everything,
- * but we don't try to detect that case.
- *
- * Results:
- * XXX
- *
- * Side effects:
- * pg_listener is updated if necessary.
- *
- *--------------------------------------------------------------
+ * Scan pg_listener for tuples matching our pending notifies, and
+ * either signal the other backend or send a message to our own frontend.
*/
static void
-Async_UnlistenOnExit(void)
+Send_Notify(Relation lRel)
{
- /*
- * We need to start/commit a transaction for the unlisten, but if
- * there is already an active transaction we had better abort that one
- * first. Otherwise we'd end up committing changes that probably
- * ought to be discarded.
- */
- AbortOutOfAnyTransaction();
- /* Now we can do the unlisten */
- StartTransactionCommand();
- Async_UnlistenAll();
- CommitTransactionCommand();
-}
-
-/*
- *--------------------------------------------------------------
- * AtCommit_Notify
- *
- * This is called at transaction commit.
- *
- * If there are outbound notify requests in the pendingNotifies list,
- * scan pg_listener for matching tuples, and either signal the other
- * backend or send a message to our own frontend.
- *
- * NOTE: we are still inside the current transaction, therefore can
- * piggyback on its committing of changes.
- *
- * Results:
- * XXX
- *
- * Side effects:
- * Tuples in pg_listener that have matching relnames and other peoples'
- * listenerPIDs are updated with a nonzero notification field.
- *
- *--------------------------------------------------------------
- */
-void
-AtCommit_Notify(void)
-{
- Relation lRel;
- TupleDesc tdesc;
HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
- if (pendingNotifies == NIL)
- return; /* no NOTIFY statements in this
- * transaction */
-
- /*
- * NOTIFY is disabled if not normal processing mode. This test used to
- * be in xact.c, but it seems cleaner to do it here.
- */
- if (!IsNormalProcessingMode())
- {
- ClearPendingNotifies();
- return;
- }
-
- if (Trace_notify)
- elog(DEBUG1, "AtCommit_Notify");
-
/* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' ';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
- lRel = heap_openr(ListenerRelationName, ExclusiveLock);
- tdesc = RelationGetDescr(lRel);
- scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
if (listenerPID == MyProcPid)
{
/*
- * Self-notify: no need to bother with table update. Indeed,
- * we *must not* clear the notification field in this path, or
- * we could lose an outside notify, which'd be bad for
- * applications that ignore self-notify messages.
+ * Self-notify: no need to bother with table update. Indeed, we
+ * *must not* clear the notification field in this path, or we
+ * could lose an outside notify, which'd be bad for applications
+ * that ignore self-notify messages.
*/
-
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: notifying self");
listenerPID);
/*
- * If someone has already notified this listener, we don't
- * bother modifying the table, but we do still send a SIGUSR2
- * signal, just in case that backend missed the earlier signal
- * for some reason. It's OK to send the signal first, because
- * the other guy can't read pg_listener until we unlock it.
+ * If someone has already notified this listener, we don't bother
+ * modifying the table, but we do still send a SIGUSR2 signal,
+ * just in case that backend missed the earlier signal for some
+ * reason. It's OK to send the signal first, because the other
+ * guy can't read pg_listener until we unlock it.
*/
if (kill(listenerPID, SIGUSR2) < 0)
{
/*
- * Get rid of pg_listener entry if it refers to a PID that
- * no longer exists. Presumably, that backend crashed
- * without deleting its pg_listener entries. This code
- * used to only delete the entry if errno==ESRCH, but as
- * far as I can see we should just do it for any failure
- * (certainly at least for EPERM too...)
+ * Get rid of pg_listener entry if it refers to a PID that no
+ * longer exists. Presumably, that backend crashed without
+ * deleting its pg_listener entries. This code used to only
+ * delete the entry if errno==ESRCH, but as far as I can see
+ * we should just do it for any failure (certainly at least
+ * for EPERM too...)
*/
simple_heap_delete(lRel, &lTuple->t_self);
}
else if (listener->notification == 0)
{
- int result;
- ItemPointerData update_ctid;
- TransactionId update_xmax;
-
- rTuple = heap_modifytuple(lTuple, lRel,
- value, nulls, repl);
- /*
- * We cannot use simple_heap_update here because the tuple
- * could have been modified by an uncommitted transaction;
- * specifically, since UNLISTEN releases exclusive lock on
- * the table before commit, the other guy could already have
- * tried to unlisten. There are no other cases where we
- * should be able to see an uncommitted update or delete.
- * Therefore, our response to a HeapTupleBeingUpdated result
- * is just to ignore it. We do *not* wait for the other
- * guy to commit --- that would risk deadlock, and we don't
- * want to block while holding the table lock anyway for
- * performance reasons. We also ignore HeapTupleUpdated,
- * which could occur if the other guy commits between our
- * heap_getnext and heap_update calls.
- */
- result = heap_update(lRel, &lTuple->t_self, rTuple,
- &update_ctid, &update_xmax,
- GetCurrentCommandId(), SnapshotAny,
- false /* no wait for commit */);
- switch (result)
- {
- case HeapTupleSelfUpdated:
- /* Tuple was already updated in current command? */
- elog(ERROR, "tuple already updated by self");
- break;
-
- case HeapTupleMayBeUpdated:
- /* done successfully */
+ /* Rewrite the tuple with my PID in notification column */
+ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+ simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
- CatalogUpdateIndexes(lRel, rTuple);
+ CatalogUpdateIndexes(lRel, rTuple);
#endif
- break;
-
- case HeapTupleBeingUpdated:
- /* ignore uncommitted tuples */
- break;
-
- case HeapTupleUpdated:
- /* ignore just-committed tuples */
- break;
-
- default:
- elog(ERROR, "unrecognized heap_update status: %u",
- result);
- break;
- }
}
}
}
heap_endscan(scan);
-
- /*
- * We do NOT release the lock on pg_listener here; we need to hold it
- * until end of transaction (which is about to happen, anyway) to
- * ensure that notified backends see our tuple updates when they look.
- * Else they might disregard the signal, which would make the
- * application programmer very unhappy.
- */
- heap_close(lRel, NoLock);
-
- ClearPendingNotifies();
-
- if (Trace_notify)
- elog(DEBUG1, "AtCommit_Notify: done");
}
/*
- *--------------------------------------------------------------
* AtAbort_Notify
*
* This is called at transaction abort.
*
- * Gets rid of pending outbound notifies that we would have executed
- * if the transaction got committed.
- *
- * Results:
- * XXX
- *
- *--------------------------------------------------------------
+ * Gets rid of pending actions and outbound notifies that we would have
+ * executed if the transaction got committed.
*/
void
AtAbort_Notify(void)
{
- ClearPendingNotifies();
+ ClearPendingActionsAndNotifies();
}
/*
- *--------------------------------------------------------------
* Async_NotifyHandler
*
* This is the signal handler for SIGUSR2.
* If we are idle (notifyInterruptEnabled is set), we can safely invoke
* ProcessIncomingNotify directly. Otherwise, just set a flag
* to do it later.
- *
- * Results:
- * none
- *
- * Side effects:
- * per above
- *--------------------------------------------------------------
*/
void
Async_NotifyHandler(SIGNAL_ARGS)
/*
* Note: this is a SIGNAL HANDLER. You must be very wary what you do
- * here. Some helpful soul had this routine sprinkled with TPRINTFs,
- * which would likely lead to corruption of stdio buffers if they were
- * ever turned on.
+ * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
+ * would likely lead to corruption of stdio buffers if they were ever
+ * turned on.
*/
/* Don't joggle the elbow of proc_exit */
bool save_ImmediateInterruptOK = ImmediateInterruptOK;
/*
- * We may be called while ImmediateInterruptOK is true; turn it
- * off while messing with the NOTIFY state. (We would have to
- * save and restore it anyway, because PGSemaphore operations
- * inside ProcessIncomingNotify() might reset it.)
+ * We may be called while ImmediateInterruptOK is true; turn it off
+ * while messing with the NOTIFY state. (We would have to save and
+ * restore it anyway, because PGSemaphore operations inside
+ * ProcessIncomingNotify() might reset it.)
*/
ImmediateInterruptOK = false;
/*
* I'm not sure whether some flavors of Unix might allow another
- * SIGUSR2 occurrence to recursively interrupt this routine. To
- * cope with the possibility, we do the same sort of dance that
- * EnableNotifyInterrupt must do --- see that routine for
- * comments.
+ * SIGUSR2 occurrence to recursively interrupt this routine. To cope
+ * with the possibility, we do the same sort of dance that
+ * EnableNotifyInterrupt must do --- see that routine for comments.
*/
notifyInterruptEnabled = 0; /* disable any recursive signal */
notifyInterruptOccurred = 1; /* do at least one iteration */
}
/*
- * Restore ImmediateInterruptOK, and check for interrupts if
- * needed.
+ * Restore ImmediateInterruptOK, and check for interrupts if needed.
*/
ImmediateInterruptOK = save_ImmediateInterruptOK;
if (save_ImmediateInterruptOK)
else
{
/*
- * In this path it is NOT SAFE to do much of anything, except
- * this:
+ * In this path it is NOT SAFE to do much of anything, except this:
*/
notifyInterruptOccurred = 1;
}
}
/*
- * --------------------------------------------------------------
* EnableNotifyInterrupt
*
* This is called by the PostgresMain main loop just before waiting
*
* NOTE: the signal handler starts out disabled, and stays so until
* PostgresMain calls this the first time.
- * --------------------------------------------------------------
*/
void
EnableNotifyInterrupt(void)
return; /* not really idle */
/*
- * This code is tricky because we are communicating with a signal
- * handler that could interrupt us at any point. If we just checked
- * notifyInterruptOccurred and then set notifyInterruptEnabled, we
- * could fail to respond promptly to a signal that happens in between
- * those two steps. (A very small time window, perhaps, but Murphy's
- * Law says you can hit it...) Instead, we first set the enable flag,
- * then test the occurred flag. If we see an unserviced interrupt has
- * occurred, we re-clear the enable flag before going off to do the
- * service work. (That prevents re-entrant invocation of
- * ProcessIncomingNotify() if another interrupt occurs.) If an
- * interrupt comes in between the setting and clearing of
- * notifyInterruptEnabled, then it will have done the service work and
- * left notifyInterruptOccurred zero, so we have to check again after
- * clearing enable. The whole thing has to be in a loop in case
- * another interrupt occurs while we're servicing the first. Once we
- * get out of the loop, enable is set and we know there is no
- * unserviced interrupt.
+ * This code is tricky because we are communicating with a signal handler
+ * that could interrupt us at any point. If we just checked
+ * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
+ * fail to respond promptly to a signal that happens in between those two
+ * steps. (A very small time window, perhaps, but Murphy's Law says you
+ * can hit it...) Instead, we first set the enable flag, then test the
+ * occurred flag. If we see an unserviced interrupt has occurred, we
+ * re-clear the enable flag before going off to do the service work. (That
+ * prevents re-entrant invocation of ProcessIncomingNotify() if another
+ * interrupt occurs.) If an interrupt comes in between the setting and
+ * clearing of notifyInterruptEnabled, then it will have done the service
+ * work and left notifyInterruptOccurred zero, so we have to check again
+ * after clearing enable. The whole thing has to be in a loop in case
+ * another interrupt occurs while we're servicing the first. Once we get
+ * out of the loop, enable is set and we know there is no unserviced
+ * interrupt.
*
* NB: an overenthusiastic optimizing compiler could easily break this
- * code. Hopefully, they all understand what "volatile" means these
- * days.
+ * code. Hopefully, they all understand what "volatile" means these days.
*/
for (;;)
{
}
/*
- * --------------------------------------------------------------
* DisableNotifyInterrupt
*
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of inbound notifies
* is disabled until the next EnableNotifyInterrupt call.
- * --------------------------------------------------------------
*/
void
DisableNotifyInterrupt(void)
}
/*
- * --------------------------------------------------------------
* ProcessIncomingNotify
*
* Deal with arriving NOTIFYs from other backends.
* and clear the notification field in pg_listener until next time.
*
* NOTE: since we are outside any transaction, we must create our own.
- *
- * Results:
- * XXX
- *
- * --------------------------------------------------------------
*/
static void
ProcessIncomingNotify(void)
{
Relation lRel;
- TupleDesc tdesc;
ScanKeyData key[1];
HeapScanDesc scan;
HeapTuple lTuple,
StartTransactionCommand();
lRel = heap_openr(ListenerRelationName, ExclusiveLock);
- tdesc = RelationGetDescr(lRel);
/* Scan only entries with my listenerPID */
ScanKeyEntryInitialize(&key[0], 0,
relname, (int) sourcePID);
NotifyMyFrontEnd(relname, sourcePID);
+
/*
* Rewrite the tuple with 0 in notification column.
- *
- * simple_heap_update is safe here because no one else would
- * have tried to UNLISTEN us, so there can be no uncommitted
- * changes.
*/
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
- * until end of transaction (which is about to happen, anyway) to
- * ensure that other backends see our tuple updates when they look.
- * Otherwise, a transaction started after this one might mistakenly
- * think it doesn't need to send this backend a new NOTIFY.
+ * until end of transaction (which is about to happen, anyway) to ensure
+ * that other backends see our tuple updates when they look. Otherwise, a
+ * transaction started after this one might mistakenly think it doesn't
+ * need to send this backend a new NOTIFY.
*/
heap_close(lRel, NoLock);
CommitTransactionCommand();
/*
- * Must flush the notify messages to ensure frontend gets them
- * promptly.
+ * Must flush the notify messages to ensure frontend gets them promptly.
*/
pq_flush();
/*
* NOTE: we do not do pq_flush() here. For a self-notify, it will
* happen at the end of the transaction, and for incoming notifies
- * ProcessIncomingNotify will do it after finding all the
- * notifies.
+ * ProcessIncomingNotify will do it after finding all the notifies.
*/
}
else
foreach(p, pendingNotifies)
{
- /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
- if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
+ const char *prelname = (const char *) lfirst(p);
+
+ if (strcmp(prelname, relname) == 0)
return true;
}
return false;
}
-/* Clear the pendingNotifies list. */
+/* Clear the pendingActions and pendingNotifies lists. */
static void
-ClearPendingNotifies(void)
+ClearPendingActionsAndNotifies(void)
{
/*
- * We used to have to explicitly deallocate the list members and
- * nodes, because they were malloc'd. Now, since we know they are
- * palloc'd in TopTransactionContext, we need not do that --- they'll
- * go away automatically at transaction exit. We need only reset the
- * list head pointer.
+ * We used to have to explicitly deallocate the list members and nodes,
+ * because they were malloc'd. Now, since we know they are palloc'd in
+ * TopTransactionContext, we need not do that --- they'll go away
+ * automatically at transaction exit. We need only reset the list head
+ * pointers.
*/
+ pendingActions = NIL;
pendingNotifies = NIL;
}