/*-------------------------------------------------------------------------
*
- * async.c--
- * Asynchronous notification
- *
- * Copyright (c) 1994, Regents of the University of California
+ * async.c
+ * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.34 1998/06/27 04:53:29 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
* 1. Multiple backends on same machine. Multiple backends listening on
- * one relation.
- *
- * 2. One of the backend does a 'notify <relname>'. For all backends that
- * are listening to this relation (all notifications take place at the
- * end of commit),
- * 2.a If the process is the same as the backend process that issued
- * notification (we are notifying something that we are listening),
- * signal the corresponding frontend over the comm channel.
- * 2.b For all other listening processes, we send kill(2) to wake up
- * the listening backend.
- * 3. Upon receiving a kill(2) signal from another backend process notifying
- * that one of the relation that we are listening is being notified,
- * we can be in either of two following states:
- * 3.a We are sleeping, wake up and signal our frontend.
- * 3.b We are in middle of another transaction, wait until the end of
- * of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and processes accordingly.
- *
- * -- jw, 12/28/93
- *
- */
-/*
- * The following is the old model which does not work.
- */
-/*
- * Model is:
- * 1. Multiple backends on same machine.
- *
- * 2. Query on one backend sends stuff over an asynchronous portal by
- * appending to a relation, and then doing an async. notification
- * (which takes place after commit) to all listeners on this relation.
- *
- * 3. Async. notification results in all backends listening on relation
- * to be woken up, by a process signal kill(2), with name of relation
- * passed in shared memory.
- *
- * 4. Each backend notifies its respective frontend over the comm
- * channel using the out-of-band channel.
- *
- * 5. Each frontend receives this notification and processes accordingly.
- *
- * #4,#5 are changing soon with pending rewrite of portal/protocol.
- *
+ * one relation. (Note: "listening on a relation" is not really the
+ * right way to think about it, since the notify names need not have
+ * anything to do with the names of relations actually in the database.
+ * But this terminology is all over the code and docs, and I don't feel
+ * like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ * ie, each relname/listenerPID pair. The "notification" field of the
+ * tuple is zero when no NOTIFY is pending for that listener, or the PID
+ * of the originating backend when a cross-backend NOTIFY is pending.
+ * (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ * notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ * relname to a list of outstanding NOTIFY requests. Actual processing
+ * happens if and only if we reach transaction commit. At that time (in
+ * routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ * If the listenerPID in a matching tuple is ours, we just send a notify
+ * message to our own front end. If it is not ours, and "notification"
+ * is not already nonzero, we set notification to our own PID and send a
+ * SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ * BTW: if the signal operation fails, we presume that the listener backend
+ * crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ * notify processing immediately if this backend is idle (ie, it is
+ * waiting for a frontend command and is not within a transaction block).
+ * Otherwise the handler may only set a flag, which will cause the
+ * processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ * matching our own listenerPID and having nonzero notification fields.
+ * For each such tuple, we send a message to our frontend and clear the
+ * notification field. BTW: this routine has to start/commit its own
+ * transaction, since by assumption it is only called from outside any
+ * transaction.
+ *
+ * Although we grab ExclusiveLock on pg_listener for any operation,
+ * the lock is never held very long, so it shouldn't cause too much of
+ * a performance problem. (Previously we used AccessExclusiveLock, but
+ * there's no real reason to forbid concurrent reads.)
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.) The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies. Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY. Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
*/
+
+#include "postgres.h"
+
#include <unistd.h>
#include <signal.h>
-#include <string.h>
#include <errno.h>
-#include <sys/types.h> /* Needed by in.h on Ultrix */
#include <netinet/in.h>
-#include "postgres.h"
-
#include "access/heapam.h"
-#include "access/relscan.h"
-#include "access/xact.h"
#include "catalog/catname.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
-#include "fmgr.h"
-#include "lib/dllist.h"
#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
#include "miscadmin.h"
-#include "nodes/memnodes.h"
-#include "storage/bufmgr.h"
-#include "storage/lmgr.h"
-#include "tcop/dest.h"
-#include "utils/mcxt.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "tcop/tcopprot.h"
+#include "utils/fmgroids.h"
+#include "utils/ps_status.h"
#include "utils/syscache.h"
-static int notifyFrontEndPending = 0;
-static int notifyIssued = 0;
-static Dllist *pendingNotifies = NULL;
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction. We do not actually perform a NOTIFY until
+ * and unless the transaction commits. pendingNotifies is NIL if no
+ * NOTIFYs have been done in the current transaction.
+ *
+ * The list is kept in CurTransactionContext. In subtransactions, each
+ * subtransaction has its own list in its own CurTransactionContext, but
+ * successful subtransactions attach their lists to their parent's list.
+ * Failed subtransactions simply discard their lists.
+ */
+static List *pendingNotifies = NIL;
+
+static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+
+/*
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical! If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
+ */
+static volatile int notifyInterruptEnabled = 0;
+static volatile int notifyInterruptOccurred = 0;
+
+/* True if we've registered an on_shmem_exit cleanup */
+static bool unlistenExitRegistered = false;
+
+bool Trace_notify = false;
+
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(int code, Datum arg);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static bool AsyncExistsPendingNotify(const char *relname);
+static void ClearPendingNotifies(void);
-static int AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-static void Async_NotifyFrontEnd(void);
-void Async_Unlisten(char *relname, int pid);
-static void Async_UnlistenOnExit(int code, char *relname);
/*
*--------------------------------------------------------------
- * Async_NotifyHandler --
+ * Async_Notify
*
- * This is the signal handler for SIGUSR2. When the backend
- * is signaled, the backend can be in two states.
- * 1. If the backend is in the middle of another transaction,
- * we set the flag, notifyFrontEndPending, and wait until
- * the end of the transaction to notify the front end.
- * 2. If the backend is not in the middle of another transaction,
- * we notify the front end immediately.
+ * This is executed by the SQL notify command.
+ *
+ * Adds the relation to the list of pending notifies.
+ * Actual notification happens during transaction commit.
+ * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
- * -- jw, 12/28/93
* Results:
- * none
+ * XXX
*
- * Side effects:
- * none
+ *--------------------------------------------------------------
*/
void
-Async_NotifyHandler(SIGNAL_ARGS)
+Async_Notify(char *relname)
{
- extern TransactionState CurrentTransactionState;
+ if (Trace_notify)
+ elog(DEBUG1, "Async_Notify(%s)", relname);
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
+ /* no point in making duplicate entries in the list ... */
+ if (!AsyncExistsPendingNotify(relname))
{
+ /*
+ * The name list needs to live until end of transaction, so store
+ * it in the transaction context.
+ */
+ MemoryContext oldcontext;
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Waking up sleeping backend process");
-#endif
- Async_NotifyFrontEnd();
+ oldcontext = MemoryContextSwitchTo(CurTransactionContext);
- }
- else
- {
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
- CurrentTransactionState->state,
- CurrentTransactionState->blockState);
-#endif
- notifyFrontEndPending = 1;
+ pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
+
+ MemoryContextSwitchTo(oldcontext);
}
}
/*
*--------------------------------------------------------------
- * Async_Notify --
- *
- * Adds the relation to the list of pending notifies.
- * All notification happens at end of commit.
- * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ * Async_Listen
*
- * All notification of backend processes happens here,
- * then each backend notifies its corresponding front end at
- * the end of commit.
+ * This is executed by the SQL listen command.
*
- * This correspond to 'notify <relname>' command
- * -- jw, 12/28/93
+ * Register a backend (identified by its Unix PID) as listening
+ * on the specified relation.
*
* Results:
* XXX
*
* Side effects:
- * All tuples for relname in pg_listener are updated.
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
void
-Async_Notify(char *relname)
+Async_Listen(char *relname, int pid)
{
-
- HeapTuple lTuple,
- rTuple;
Relation lRel;
- HeapScanDesc sRel;
- TupleDesc tdesc;
- ScanKeyData key;
- Buffer b;
- Datum d,
- value[3];
- bool isnull;
- char repl[3],
- nulls[3];
-
- char *notifyName;
-
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Notify: %s", relname);
-#endif
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ Datum values[Natts_pg_listener];
+ char nulls[Natts_pg_listener];
+ int i;
+ bool alreadyListener = false;
- if (!pendingNotifies)
- pendingNotifies = DLNewList();
+ if (Trace_notify)
+ elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
+
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+
+ /* Detect whether we are already listening on this relname */
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ {
+ alreadyListener = true;
+ /* No need to scan the rest of the table */
+ break;
+ }
+ }
+ heap_endscan(scan);
+
+ if (alreadyListener)
+ {
+ heap_close(lRel, ExclusiveLock);
+ return;
+ }
/*
- * Allocate memory from the global malloc pool because it needs to be
- * referenced also when the transaction is finished. DZ - 26-08-1996
+ * OK to insert a new tuple
*/
- notifyName = strdup(relname);
- DLAddHead(pendingNotifies, DLNewElem(notifyName));
- ScanKeyEntryInitialize(&key, 0,
- Anum_pg_listener_relname,
- F_NAMEEQ,
- PointerGetDatum(notifyName));
+ for (i = 0; i < Natts_pg_listener; i++)
+ {
+ nulls[i] = ' ';
+ values[i] = PointerGetDatum(NULL);
+ }
- lRel = heap_openr(ListenerRelationName);
- tdesc = RelationGetTupleDescriptor(lRel);
- RelationSetLockForWrite(lRel);
- sRel = heap_beginscan(lRel, 0, false, 1, &key);
+ i = 0;
+ values[i++] = (Datum) relname;
+ values[i++] = (Datum) pid;
+ values[i++] = (Datum) 0; /* no notifies pending */
- nulls[0] = nulls[1] = nulls[2] = ' ';
- repl[0] = repl[1] = repl[2] = ' ';
- repl[Anum_pg_listener_notify - 1] = 'r';
- value[0] = value[1] = value[2] = (Datum) 0;
- value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
+ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
+ simple_heap_insert(lRel, tuple);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
+#ifdef NOT_USED /* currently there are no indexes */
+ CatalogUpdateIndexes(lRel, tuple);
+#endif
+
+ heap_freetuple(tuple);
+
+ heap_close(lRel, ExclusiveLock);
+
+ /*
+ * now that we are listening, make sure we will unlisten before dying.
+ */
+ if (!unlistenExitRegistered)
{
- d = heap_getattr(lTuple, Anum_pg_listener_notify,
- tdesc, &isnull);
- if (!DatumGetInt32(d))
- {
- rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
- heap_replace(lRel, &lTuple->t_ctid, rTuple);
- }
- ReleaseBuffer(b);
+ on_shmem_exit(Async_UnlistenOnExit, 0);
+ unlistenExitRegistered = true;
}
- heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
- heap_close(lRel);
- notifyIssued = 1;
}
/*
*--------------------------------------------------------------
- * Async_NotifyAtCommit --
+ * Async_Unlisten
*
- * Signal our corresponding frontend process on relations that
- * were notified. Signal all other backend process that
- * are listening also.
+ * This is executed by the SQL unlisten command.
*
- * -- jw, 12/28/93
+ * Remove the backend from the list of listening backends
+ * for the specified relation.
*
* Results:
* XXX
*
* Side effects:
- * Tuples in pg_listener that has our listenerpid are updated so
- * that the notification is 0. We do not want to notify frontend
- * more than once.
- *
- * -- jw, 12/28/93
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
void
-Async_NotifyAtCommit()
+Async_Unlisten(char *relname, int pid)
{
- HeapTuple lTuple;
Relation lRel;
- HeapScanDesc sRel;
- TupleDesc tdesc;
- ScanKeyData key;
- Datum d;
- bool isnull;
- Buffer b;
- extern TransactionState CurrentTransactionState;
+ HeapScanDesc scan;
+ HeapTuple tuple;
- if (!pendingNotifies)
- pendingNotifies = DLNewList();
-
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
+ /* Handle specially the `unlisten "*"' command */
+ if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
{
+ Async_UnlistenAll();
+ return;
+ }
- if (notifyIssued)
- { /* 'notify <relname>' issued by us */
- notifyIssued = 0;
- StartTransactionCommand();
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyAtCommit.");
-#endif
- ScanKeyEntryInitialize(&key, 0,
- Anum_pg_listener_notify,
- F_INT4EQ,
- Int32GetDatum(1));
- lRel = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lRel);
- sRel = heap_beginscan(lRel, 0, false, 1, &key);
- tdesc = RelationGetTupleDescriptor(lRel);
-
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
- {
- d = heap_getattr(lTuple, Anum_pg_listener_relname,
- tdesc, &isnull);
-
- if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
- {
- d = heap_getattr(lTuple, Anum_pg_listener_pid,
- tdesc, &isnull);
+ if (Trace_notify)
+ elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
- if (MyProcPid == DatumGetInt32(d))
- {
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
-#endif
- notifyFrontEndPending = 1;
- }
- else
- {
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying others");
-#endif
-#ifdef HAVE_KILL
- if (kill(DatumGetInt32(d), SIGUSR2) < 0)
- {
- if (errno == ESRCH)
- heap_delete(lRel, &lTuple->t_ctid);
- }
-#endif
- }
- }
- ReleaseBuffer(b);
- }
- heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
- heap_close(lRel);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
- CommitTransactionCommand();
- ClearPendingNotify();
- }
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
- if (notifyFrontEndPending)
- { /* we need to notify the frontend of all
- * pending notifies. */
- notifyFrontEndPending = 1;
- Async_NotifyFrontEnd();
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ {
+ /* Found the matching tuple, delete it */
+ simple_heap_delete(lRel, &tuple->t_self);
+
+ /*
+ * We assume there can be only one match, so no need to scan
+ * the rest of the table
+ */
+ break;
}
}
+ heap_endscan(scan);
+
+ heap_close(lRel, ExclusiveLock);
+
+ /*
+ * We do not complain about unlistening something not being listened;
+ * should we?
+ */
}
/*
*--------------------------------------------------------------
- * Async_NotifyAtAbort --
+ * Async_UnlistenAll
*
- * Gets rid of pending notifies. List elements are automatically
- * freed through memory context.
+ * Unlisten all relations for this backend.
*
+ * This is invoked by UNLISTEN "*" command, and also at backend exit.
*
* Results:
* XXX
*
* Side effects:
- * XXX
+ * pg_listener is updated.
*
*--------------------------------------------------------------
*/
-void
-Async_NotifyAtAbort()
+static void
+Async_UnlistenAll(void)
{
- extern TransactionState CurrentTransactionState;
+ Relation lRel;
+ TupleDesc tdesc;
+ HeapScanDesc scan;
+ HeapTuple lTuple;
+ ScanKeyData key[1];
- if (notifyIssued)
- ClearPendingNotify();
- notifyIssued = 0;
- if (pendingNotifies)
- DLFreeList(pendingNotifies);
- pendingNotifies = DLNewList();
+ if (Trace_notify)
+ elog(DEBUG1, "Async_UnlistenAll");
- if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
- (CurrentTransactionState->blockState == TRANS_DEFAULT))
- {
- if (notifyFrontEndPending)
- { /* don't forget to notify front end */
- Async_NotifyFrontEnd();
- }
- }
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+ tdesc = RelationGetDescr(lRel);
+
+ /* Find and delete all entries with my listenerPID */
+ ScanKeyInit(&key[0],
+ Anum_pg_listener_pid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ simple_heap_delete(lRel, &lTuple->t_self);
+
+ heap_endscan(scan);
+ heap_close(lRel, ExclusiveLock);
}
/*
*--------------------------------------------------------------
- * Async_Listen --
- *
- * Register a backend (identified by its Unix PID) as listening
- * on the specified relation.
+ * Async_UnlistenOnExit
*
- * This corresponds to the 'listen <relation>' command in SQL
+ * Clean up the pg_listener table at backend exit.
*
- * One listener per relation, pg_listener relation is keyed
- * on (relname,pid) to provide multiple listeners in future.
+ * This is executed if we have done any LISTENs in this backend.
+ * It might not be necessary anymore, if the user UNLISTENed everything,
+ * but we don't try to detect that case.
*
* Results:
- * pg_listeners is updated.
+ * XXX
*
* Side effects:
+ * pg_listener is updated if necessary.
+ *
+ *--------------------------------------------------------------
+ */
+static void
+Async_UnlistenOnExit(int code, Datum arg)
+{
+ /*
+ * We need to start/commit a transaction for the unlisten, but if
+ * there is already an active transaction we had better abort that one
+ * first. Otherwise we'd end up committing changes that probably
+ * ought to be discarded.
+ */
+ AbortOutOfAnyTransaction();
+ /* Now we can do the unlisten */
+ StartTransactionCommand();
+ Async_UnlistenAll();
+ CommitTransactionCommand();
+}
+
+/*
+ *--------------------------------------------------------------
+ * AtCommit_Notify
+ *
+ * This is called at transaction commit.
+ *
+ * If there are outbound notify requests in the pendingNotifies list,
+ * scan pg_listener for matching tuples, and either signal the other
+ * backend or send a message to our own frontend.
+ *
+ * NOTE: we are still inside the current transaction, therefore can
+ * piggyback on its committing of changes.
+ *
+ * Results:
* XXX
*
+ * Side effects:
+ * Tuples in pg_listener that have matching relnames and other peoples'
+ * listenerPIDs are updated with a nonzero notification field.
+ *
*--------------------------------------------------------------
*/
void
-Async_Listen(char *relname, int pid)
+AtCommit_Notify(void)
{
- Datum values[Natts_pg_listener];
- char nulls[Natts_pg_listener];
+ Relation lRel;
TupleDesc tdesc;
- HeapScanDesc s;
- HeapTuple htup,
- tup;
- Relation lDesc;
- Buffer b;
- Datum d;
- int i;
- bool isnull;
- int alreadyListener = 0;
- char *relnamei;
- TupleDesc tupDesc;
+ HeapScanDesc scan;
+ HeapTuple lTuple,
+ rTuple;
+ Datum value[Natts_pg_listener];
+ char repl[Natts_pg_listener],
+ nulls[Natts_pg_listener];
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Listen: %s", relname);
-#endif
- for (i = 0; i < Natts_pg_listener; i++)
+ if (pendingNotifies == NIL)
+ return; /* no NOTIFY statements in this
+ * transaction */
+
+ /*
+ * NOTIFY is disabled if not normal processing mode. This test used to
+ * be in xact.c, but it seems cleaner to do it here.
+ */
+ if (!IsNormalProcessingMode())
{
- nulls[i] = ' ';
- values[i] = PointerGetDatum(NULL);
+ ClearPendingNotifies();
+ return;
}
- i = 0;
- values[i++] = (Datum) relname;
- values[i++] = (Datum) pid;
- values[i++] = (Datum) 0; /* no notifies pending */
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify");
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
+ /* preset data to update notify column to MyProcPid */
+ nulls[0] = nulls[1] = nulls[2] = ' ';
+ repl[0] = repl[1] = repl[2] = ' ';
+ repl[Anum_pg_listener_notify - 1] = 'r';
+ value[0] = value[1] = value[2] = (Datum) 0;
+ value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
- /* is someone already listening. One listener per relation */
- tdesc = RelationGetTupleDescriptor(lDesc);
- s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
- while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+ tdesc = RelationGetDescr(lRel);
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
- d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
- &isnull);
- relnamei = DatumGetPointer(d);
- if (!strncmp(relnamei, relname, NAMEDATALEN))
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 listenerPID = listener->listenerpid;
+
+ if (!AsyncExistsPendingNotify(relname))
+ continue;
+
+ if (listenerPID == MyProcPid)
{
- d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
- pid = DatumGetInt32(d);
- if (pid == MyProcPid)
- alreadyListener = 1;
+ /*
+ * Self-notify: no need to bother with table update. Indeed,
+ * we *must not* clear the notification field in this path, or
+ * we could lose an outside notify, which'd be bad for
+ * applications that ignore self-notify messages.
+ */
+
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify: notifying self");
+
+ NotifyMyFrontEnd(relname, listenerPID);
}
- ReleaseBuffer(b);
- }
- heap_endscan(s);
+ else
+ {
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
+ listenerPID);
+
+ /*
+ * If someone has already notified this listener, we don't
+ * bother modifying the table, but we do still send a SIGUSR2
+ * signal, just in case that backend missed the earlier signal
+ * for some reason. It's OK to send the signal first, because
+ * the other guy can't read pg_listener until we unlock it.
+ */
+ if (kill(listenerPID, SIGUSR2) < 0)
+ {
+ /*
+ * Get rid of pg_listener entry if it refers to a PID that
+ * no longer exists. Presumably, that backend crashed
+ * without deleting its pg_listener entries. This code
+ * used to only delete the entry if errno==ESRCH, but as
+ * far as I can see we should just do it for any failure
+ * (certainly at least for EPERM too...)
+ */
+ simple_heap_delete(lRel, &lTuple->t_self);
+ }
+ else if (listener->notification == 0)
+ {
+ ItemPointerData ctid;
+ int result;
+
+ rTuple = heap_modifytuple(lTuple, lRel,
+ value, nulls, repl);
+ /*
+ * We cannot use simple_heap_update here because the tuple
+ * could have been modified by an uncommitted transaction;
+ * specifically, since UNLISTEN releases exclusive lock on
+ * the table before commit, the other guy could already have
+ * tried to unlisten. There are no other cases where we
+ * should be able to see an uncommitted update or delete.
+ * Therefore, our response to a HeapTupleBeingUpdated result
+ * is just to ignore it. We do *not* wait for the other
+ * guy to commit --- that would risk deadlock, and we don't
+ * want to block while holding the table lock anyway for
+ * performance reasons. We also ignore HeapTupleUpdated,
+ * which could occur if the other guy commits between our
+ * heap_getnext and heap_update calls.
+ */
+ result = heap_update(lRel, &lTuple->t_self, rTuple,
+ &ctid,
+ GetCurrentCommandId(), SnapshotAny,
+ false /* no wait for commit */);
+ switch (result)
+ {
+ case HeapTupleSelfUpdated:
+ /* Tuple was already updated in current command? */
+ elog(ERROR, "tuple already updated by self");
+ break;
- if (alreadyListener)
- {
- elog(NOTICE, "Async_Listen: We are already listening on %s",
- relname);
- return;
- }
+ case HeapTupleMayBeUpdated:
+ /* done successfully */
+
+#ifdef NOT_USED /* currently there are no indexes */
+ CatalogUpdateIndexes(lRel, rTuple);
+#endif
+ break;
- tupDesc = lDesc->rd_att;
- tup = heap_formtuple(tupDesc,
- values,
- nulls);
- heap_insert(lDesc, tup);
+ case HeapTupleBeingUpdated:
+ /* ignore uncommitted tuples */
+ break;
- pfree(tup);
+ case HeapTupleUpdated:
+ /* ignore just-committed tuples */
+ break;
+
+ default:
+ elog(ERROR, "unrecognized heap_update status: %u",
+ result);
+ break;
+ }
+ }
+ }
+ }
+
+ heap_endscan(scan);
/*
- * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
- * listener on %s (possibly dead)",relname); }
+ * We do NOT release the lock on pg_listener here; we need to hold it
+ * until end of transaction (which is about to happen, anyway) to
+ * ensure that notified backends see our tuple updates when they look.
+ * Else they might disregard the signal, which would make the
+ * application programmer very unhappy.
*/
+ heap_close(lRel, NoLock);
+
+ ClearPendingNotifies();
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ if (Trace_notify)
+ elog(DEBUG1, "AtCommit_Notify: done");
+}
+/*
+ *--------------------------------------------------------------
+ * AtAbort_Notify
+ *
+ * This is called at transaction abort.
+ *
+ * Gets rid of pending outbound notifies that we would have executed
+ * if the transaction got committed.
+ *
+ * Results:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+AtAbort_Notify(void)
+{
+ ClearPendingNotifies();
+}
+
+/*
+ * AtSubStart_Notify() --- Take care of subtransaction start.
+ *
+ * Push empty state for the new subtransaction.
+ */
+void
+AtSubStart_Notify(void)
+{
+ MemoryContext old_cxt;
+
+ /* Keep the list-of-lists in TopTransactionContext for simplicity */
+ old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+ upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
+
+ pendingNotifies = NIL;
+
+ MemoryContextSwitchTo(old_cxt);
+}
+
+/*
+ * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ *
+ * Reassign all items in the pending notifies list to the parent transaction.
+ */
+void
+AtSubCommit_Notify(void)
+{
+ List *parentPendingNotifies;
+
+ parentPendingNotifies = (List *) linitial(upperPendingNotifies);
+ upperPendingNotifies = list_delete_first(upperPendingNotifies);
+
+ /*
+ * We could try to eliminate duplicates here, but it seems not worthwhile.
+ */
+ pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+}
+
+/*
+ * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ */
+void
+AtSubAbort_Notify(void)
+{
/*
- * now that we are listening, we should make a note to ourselves to
- * unlisten prior to dying.
+ * All we have to do is pop the stack --- the notifies made in this
+ * subxact are no longer interesting, and the space will be freed when
+ * CurTransactionContext is recycled.
*/
- relnamei = malloc(NAMEDATALEN); /* persists to process exit */
- StrNCpy(relnamei, relname, NAMEDATALEN);
- on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
+ pendingNotifies = (List *) linitial(upperPendingNotifies);
+ upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
/*
*--------------------------------------------------------------
- * Async_Unlisten --
+ * NotifyInterruptHandler
*
- * Remove the backend from the list of listening backends
- * for the specified relation.
+ * This is the signal handler for SIGUSR2.
*
- * This would correspond to the 'unlisten <relation>'
- * command, but there isn't one yet.
+ * If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ * ProcessIncomingNotify directly. Otherwise, just set a flag
+ * to do it later.
*
* Results:
- * pg_listeners is updated.
+ * none
*
* Side effects:
- * XXX
- *
+ * per above
*--------------------------------------------------------------
*/
void
-Async_Unlisten(char *relname, int pid)
+NotifyInterruptHandler(SIGNAL_ARGS)
{
- Relation lDesc;
- HeapTuple lTuple;
+ int save_errno = errno;
+
+ /*
+ * Note: this is a SIGNAL HANDLER. You must be very wary what you do
+ * here. Some helpful soul had this routine sprinkled with TPRINTFs,
+ * which would likely lead to corruption of stdio buffers if they were
+ * ever turned on.
+ */
- lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
- Int32GetDatum(pid),
- 0, 0);
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
+ /* Don't joggle the elbow of proc_exit */
+ if (proc_exit_inprogress)
+ return;
- if (lTuple != NULL)
- heap_delete(lDesc, &lTuple->t_ctid);
+ if (notifyInterruptEnabled)
+ {
+ bool save_ImmediateInterruptOK = ImmediateInterruptOK;
+
+ /*
+ * We may be called while ImmediateInterruptOK is true; turn it
+ * off while messing with the NOTIFY state. (We would have to
+ * save and restore it anyway, because PGSemaphore operations
+ * inside ProcessIncomingNotify() might reset it.)
+ */
+ ImmediateInterruptOK = false;
+
+ /*
+ * I'm not sure whether some flavors of Unix might allow another
+ * SIGUSR2 occurrence to recursively interrupt this routine. To
+ * cope with the possibility, we do the same sort of dance that
+ * EnableNotifyInterrupt must do --- see that routine for
+ * comments.
+ */
+ notifyInterruptEnabled = 0; /* disable any recursive signal */
+ notifyInterruptOccurred = 1; /* do at least one iteration */
+ for (;;)
+ {
+ notifyInterruptEnabled = 1;
+ if (!notifyInterruptOccurred)
+ break;
+ notifyInterruptEnabled = 0;
+ if (notifyInterruptOccurred)
+ {
+ /* Here, it is finally safe to do stuff. */
+ if (Trace_notify)
+ elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ ProcessIncomingNotify();
+
+ if (Trace_notify)
+ elog(DEBUG1, "NotifyInterruptHandler: done");
+ }
+ }
+
+ /*
+ * Restore ImmediateInterruptOK, and check for interrupts if
+ * needed.
+ */
+ ImmediateInterruptOK = save_ImmediateInterruptOK;
+ if (save_ImmediateInterruptOK)
+ CHECK_FOR_INTERRUPTS();
+ }
+ else
+ {
+ /*
+ * In this path it is NOT SAFE to do much of anything, except
+ * this:
+ */
+ notifyInterruptOccurred = 1;
+ }
+
+ errno = save_errno;
}
-static void
-Async_UnlistenOnExit(int code, /* from exitpg */
- char *relname)
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt
+ *
+ * This is called by the PostgresMain main loop just before waiting
+ * for a frontend command. If we are truly idle (ie, *not* inside
+ * a transaction block), then process any pending inbound notifies,
+ * and enable the signal handler to process future notifies directly.
+ *
+ * NOTE: the signal handler starts out disabled, and stays so until
+ * PostgresMain calls this the first time.
+ * --------------------------------------------------------------
+ */
+void
+EnableNotifyInterrupt(void)
{
- Async_Unlisten((char *) relname, MyProcPid);
+ if (IsTransactionOrTransactionBlock())
+ return; /* not really idle */
+
+ /*
+ * This code is tricky because we are communicating with a signal
+ * handler that could interrupt us at any point. If we just checked
+ * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+ * could fail to respond promptly to a signal that happens in between
+ * those two steps. (A very small time window, perhaps, but Murphy's
+ * Law says you can hit it...) Instead, we first set the enable flag,
+ * then test the occurred flag. If we see an unserviced interrupt has
+ * occurred, we re-clear the enable flag before going off to do the
+ * service work. (That prevents re-entrant invocation of
+ * ProcessIncomingNotify() if another interrupt occurs.) If an
+ * interrupt comes in between the setting and clearing of
+ * notifyInterruptEnabled, then it will have done the service work and
+ * left notifyInterruptOccurred zero, so we have to check again after
+ * clearing enable. The whole thing has to be in a loop in case
+ * another interrupt occurs while we're servicing the first. Once we
+ * get out of the loop, enable is set and we know there is no
+ * unserviced interrupt.
+ *
+ * NB: an overenthusiastic optimizing compiler could easily break this
+ * code. Hopefully, they all understand what "volatile" means these
+ * days.
+ */
+ for (;;)
+ {
+ notifyInterruptEnabled = 1;
+ if (!notifyInterruptOccurred)
+ break;
+ notifyInterruptEnabled = 0;
+ if (notifyInterruptOccurred)
+ {
+ if (Trace_notify)
+ elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
+
+ ProcessIncomingNotify();
+
+ if (Trace_notify)
+ elog(DEBUG1, "EnableNotifyInterrupt: done");
+ }
+ }
}
/*
* --------------------------------------------------------------
- * Async_NotifyFrontEnd --
+ * DisableNotifyInterrupt
*
- * Perform an asynchronous notification to front end over
- * portal comm channel. The name of the relation which contains the
- * data is sent to the front end.
+ * This is called by the PostgresMain main loop just after receiving
+ * a frontend command. Signal handler execution of inbound notifies
+ * is disabled until the next EnableNotifyInterrupt call.
*
- * We remove the notification flag from the pg_listener tuple
- * associated with our process.
+ * The SIGUSR1 signal handler also needs to call this, so as to
+ * prevent conflicts if one signal interrupts the other. So we
+ * must return the previous state of the flag.
+ * --------------------------------------------------------------
+ */
+bool
+DisableNotifyInterrupt(void)
+{
+ bool result = (notifyInterruptEnabled != 0);
+
+ notifyInterruptEnabled = 0;
+
+ return result;
+}
+
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify
*
- * Results:
- * XXX
+ * Deal with arriving NOTIFYs from other backends.
+ * This is called either directly from the SIGUSR2 signal handler,
+ * or the next time control reaches the outer idle loop.
+ * Scan pg_listener for arriving notifies, report them to my front end,
+ * and clear the notification field in pg_listener until next time.
*
+ * NOTE: since we are outside any transaction, we must create our own.
* --------------------------------------------------------------
*/
-GlobalMemory notifyContext = NULL;
-
static void
-Async_NotifyFrontEnd()
+ProcessIncomingNotify(void)
{
- extern CommandDest whereToSendOutput;
- HeapTuple lTuple,
- rTuple;
Relation lRel;
- HeapScanDesc sRel;
TupleDesc tdesc;
- ScanKeyData key[2];
- Datum d,
- value[3];
- char repl[3],
- nulls[3];
- Buffer b;
- bool isnull;
-
- notifyFrontEndPending = 0;
-
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
-#endif
+ ScanKeyData key[1];
+ HeapScanDesc scan;
+ HeapTuple lTuple,
+ rTuple;
+ Datum value[Natts_pg_listener];
+ char repl[Natts_pg_listener],
+ nulls[Natts_pg_listener];
+ bool catchup_enabled;
+
+ /* Must prevent SIGUSR1 interrupt while I am running */
+ catchup_enabled = DisableCatchupInterrupt();
+
+ if (Trace_notify)
+ elog(DEBUG1, "ProcessIncomingNotify");
+
+ set_ps_display("notify interrupt");
+
+ notifyInterruptOccurred = 0;
StartTransactionCommand();
- ScanKeyEntryInitialize(&key[0], 0,
- Anum_pg_listener_notify,
- F_INT4EQ,
- Int32GetDatum(1));
- ScanKeyEntryInitialize(&key[1], 0,
- Anum_pg_listener_pid,
- F_INT4EQ,
- Int32GetDatum(MyProcPid));
- lRel = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lRel);
- tdesc = RelationGetTupleDescriptor(lRel);
- sRel = heap_beginscan(lRel, 0, false, 2, key);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+ tdesc = RelationGetDescr(lRel);
+
+ /* Scan only entries with my listenerPID */
+ ScanKeyInit(&key[0],
+ Anum_pg_listener_pid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+
+ /* Prepare data for rewriting 0 into notification field */
nulls[0] = nulls[1] = nulls[2] = ' ';
repl[0] = repl[1] = repl[2] = ' ';
repl[Anum_pg_listener_notify - 1] = 'r';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname,
- tdesc, &isnull);
- rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
- heap_replace(lRel, &lTuple->t_ctid, rTuple);
-
- /* notifying the front end */
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 sourcePID = listener->notification;
- if (whereToSendOutput == Remote)
+ if (sourcePID != 0)
{
- pq_putnchar("A", 1);
- pq_putint((int32) MyProcPid, sizeof(int32));
- pq_putstr(DatumGetName(d)->data);
- pq_flush();
+ /* Notify the frontend */
+
+ if (Trace_notify)
+ elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
+ relname, (int) sourcePID);
+
+ NotifyMyFrontEnd(relname, sourcePID);
+ /*
+ * Rewrite the tuple with 0 in notification column.
+ *
+ * simple_heap_update is safe here because no one else would
+ * have tried to UNLISTEN us, so there can be no uncommitted
+ * changes.
+ */
+ rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+ simple_heap_update(lRel, &lTuple->t_self, rTuple);
+
+#ifdef NOT_USED /* currently there are no indexes */
+ CatalogUpdateIndexes(lRel, rTuple);
+#endif
}
- else
- elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
- ReleaseBuffer(b);
}
+ heap_endscan(scan);
+
+ /*
+ * We do NOT release the lock on pg_listener here; we need to hold it
+ * until end of transaction (which is about to happen, anyway) to
+ * ensure that other backends see our tuple updates when they look.
+ * Otherwise, a transaction started after this one might mistakenly
+ * think it doesn't need to send this backend a new NOTIFY.
+ */
+ heap_close(lRel, NoLock);
+
CommitTransactionCommand();
+
+ /*
+ * Must flush the notify messages to ensure frontend gets them
+ * promptly.
+ */
+ pq_flush();
+
+ set_ps_display("idle");
+
+ if (Trace_notify)
+ elog(DEBUG1, "ProcessIncomingNotify: done");
+
+ if (catchup_enabled)
+ EnableCatchupInterrupt();
+}
+
+/*
+ * Send NOTIFY message to my front end.
+ */
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+ if (whereToSendOutput == Remote)
+ {
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'A');
+ pq_sendint(&buf, listenerPID, sizeof(int32));
+ pq_sendstring(&buf, relname);
+ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
+ {
+ /* XXX Add parameter string here later */
+ pq_sendstring(&buf, "");
+ }
+ pq_endmessage(&buf);
+
+ /*
+ * NOTE: we do not do pq_flush() here. For a self-notify, it will
+ * happen at the end of the transaction, and for incoming notifies
+ * ProcessIncomingNotify will do it after finding all the
+ * notifies.
+ */
+ }
+ else
+ elog(INFO, "NOTIFY for %s", relname);
}
-static int
-AsyncExistsPendingNotify(char *relname)
+/* Does pendingNotifies include the given relname? */
+static bool
+AsyncExistsPendingNotify(const char *relname)
{
- Dlelem *p;
+ ListCell *p;
- for (p = DLGetHead(pendingNotifies);
- p != NULL;
- p = DLGetSucc(p))
+ foreach(p, pendingNotifies)
{
/* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
- if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
- return 1;
+ if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
+ return true;
}
- return 0;
+ return false;
}
+/* Clear the pendingNotifies list. */
static void
-ClearPendingNotify()
+ClearPendingNotifies(void)
{
- Dlelem *p;
-
- while ((p = DLRemHead(pendingNotifies)) != NULL)
- free(DLE_VAL(p));
+ /*
+ * We used to have to explicitly deallocate the list members and
+ * nodes, because they were malloc'd. Now, since we know they are
+ * palloc'd in CurTransactionContext, we need not do that --- they'll
+ * go away automatically at transaction exit. We need only reset the
+ * list head pointer.
+ */
+ pendingNotifies = NIL;
}