]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / commands / async.c
index 7d24261c644fc2cacdd7b108b4438fd54ab65a1f..8e53d6af7d79d57b183e12e66bf3086a32ef0537 100644 (file)
 /*-------------------------------------------------------------------------
  *
- * async.c--
- *    Asynchronous notification
- *
- * Copyright (c) 1994, Regents of the University of California
+ * async.c
+ *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.15 1997/08/12 20:15:08 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
  * 1. Multiple backends on same machine.  Multiple backends listening on
- *    one relation.
- *
- * 2. One of the backend does a 'notify <relname>'.  For all backends that
- *    are listening to this relation (all notifications take place at the
- *    end of commit),
- *    2.a  If the process is the same as the backend process that issued
- *         notification (we are notifying something that we are listening),
- *         signal the corresponding frontend over the comm channel using the
- *         out-of-band channel.
- *    2.b  For all other listening processes, we send kill(2) to wake up
- *         the listening backend.
- * 3. Upon receiving a kill(2) signal from another backend process notifying
- *    that one of the relation that we are listening is being notified,
- *    we can be in either of two following states:
- *    3.a  We are sleeping, wake up and signal our frontend.
- *    3.b  We are in middle of another transaction, wait until the end of
- *         of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and prcesses accordingly.
- *
- * -- jw, 12/28/93
- *
- */
-/*
- * The following is the old model which does not work.
- */
-/*
- * Model is:
- * 1. Multiple backends on same machine.
- *
- * 2. Query on one backend sends stuff over an asynchronous portal by 
- *    appending to a relation, and then doing an async. notification
- *    (which takes place after commit) to all listeners on this relation.
- *
- * 3. Async. notification results in all backends listening on relation 
- *    to be woken up, by a process signal kill(2), with name of relation
- *    passed in shared memory.
- *
- * 4. Each backend notifies its respective frontend over the comm
- *    channel using the out-of-band channel.
- *
- * 5. Each frontend receives this notification and processes accordingly.
- *
- * #4,#5 are changing soon with pending rewrite of portal/protocol.
- *
+ *       one relation.  (Note: "listening on a relation" is not really the
+ *       right way to think about it, since the notify names need not have
+ *       anything to do with the names of relations actually in the database.
+ *       But this terminology is all over the code and docs, and I don't feel
+ *       like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ *       ie, each relname/listenerPID pair.  The "notification" field of the
+ *       tuple is zero when no NOTIFY is pending for that listener, or the PID
+ *       of the originating backend when a cross-backend NOTIFY is pending.
+ *       (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ *       notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ *       relname to a list of outstanding NOTIFY requests.  Actual processing
+ *       happens if and only if we reach transaction commit.  At that time (in
+ *       routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ *       If the listenerPID in a matching tuple is ours, we just send a notify
+ *       message to our own front end.  If it is not ours, and "notification"
+ *       is not already nonzero, we set notification to our own PID and send a
+ *       SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ *       BTW: if the signal operation fails, we presume that the listener backend
+ *       crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ *       notify processing immediately if this backend is idle (ie, it is
+ *       waiting for a frontend command and is not within a transaction block).
+ *       Otherwise the handler may only set a flag, which will cause the
+ *       processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ *       matching our own listenerPID and having nonzero notification fields.
+ *       For each such tuple, we send a message to our frontend and clear the
+ *       notification field.  BTW: this routine has to start/commit its own
+ *       transaction, since by assumption it is only called from outside any
+ *       transaction.
+ *
+ * Although we grab ExclusiveLock on pg_listener for any operation,
+ * the lock is never held very long, so it shouldn't cause too much of
+ * a performance problem.  (Previously we used AccessExclusiveLock, but
+ * there's no real reason to forbid concurrent reads.)
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.)  The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies.  Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY.  Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
  */
+
+#include "postgres.h"
+
 #include <unistd.h>
 #include <signal.h>
-#include <string.h>
 #include <errno.h>
-#include <sys/types.h>    /* Needed by in.h on Ultrix */
 #include <netinet/in.h>
 
-#include <postgres.h>
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/pg_listener.h"
+#include "commands/async.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "tcop/tcopprot.h"
+#include "utils/fmgroids.h"
+#include "utils/ps_status.h"
+#include "utils/syscache.h"
+
+
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction. We do not actually perform a NOTIFY until
+ * and unless the transaction commits. pendingNotifies is NIL if no
+ * NOTIFYs have been done in the current transaction.
+ *
+ * The list is kept in CurTransactionContext.  In subtransactions, each
+ * subtransaction has its own list in its own CurTransactionContext, but
+ * successful subtransactions attach their lists to their parent's list.
+ * Failed subtransactions simply discard their lists.
+ */
+static List *pendingNotifies = NIL;
+
+static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
 
-#include <utils/syscache.h>
-#include <access/relscan.h>
-#include <access/xact.h>
-#include <lib/dllist.h>
-#include <tcop/dest.h>
-#include <catalog/pg_proc.h>
-#include <catalog/catname.h>
-#include <catalog/pg_listener.h>
-#include <access/heapam.h> 
-#include <storage/bufmgr.h>
-#include <nodes/memnodes.h>
-#include <utils/mcxt.h>
-#include <commands/async.h>
-#include <libpq/libpq.h>
+/*
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical!  If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
+ */
+static volatile int notifyInterruptEnabled = 0;
+static volatile int notifyInterruptOccurred = 0;
 
-#include <port-protos.h>  /* for strdup() */
+/* True if we've registered an on_shmem_exit cleanup */
+static bool unlistenExitRegistered = false;
 
-#include <storage/lmgr.h>
+bool           Trace_notify = false;
 
-static int notifyFrontEndPending = 0;
-static int notifyIssued = 0;
-static Dllist *pendingNotifies = NULL;
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(int code, Datum arg);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static bool AsyncExistsPendingNotify(const char *relname);
+static void ClearPendingNotifies(void);
 
 
-static int AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-     
 /*
  *--------------------------------------------------------------
- * Async_NotifyHandler --
+ * Async_Notify
+ *
+ *             This is executed by the SQL notify command.
  *
- *      This is the signal handler for SIGUSR2.  When the backend
- *      is signaled, the backend can be in two states.
- *      1. If the backend is in the middle of another transaction,
- *         we set the flag, notifyFrontEndPending, and wait until
- *         the end of the transaction to notify the front end.
- *      2. If the backend is not in the middle of another transaction,
- *         we notify the front end immediately.
+ *             Adds the relation to the list of pending notifies.
+ *             Actual notification happens during transaction commit.
+ *             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  *
- *      -- jw, 12/28/93
  * Results:
- *      none
+ *             XXX
  *
- * Side effects:
- *      none
+ *--------------------------------------------------------------
  */
 void
-Async_NotifyHandler(SIGNAL_ARGS)
+Async_Notify(char *relname)
 {
-    extern TransactionState CurrentTransactionState;
-    
-    if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Notify(%s)", relname);
 
-#ifdef ASYNC_DEBUG
-       elog(DEBUG, "Waking up sleeping backend process");
-#endif
-       Async_NotifyFrontEnd();
+       /* no point in making duplicate entries in the list ... */
+       if (!AsyncExistsPendingNotify(relname))
+       {
+               /*
+                * The name list needs to live until end of transaction, so store
+                * it in the transaction context.
+                */
+               MemoryContext oldcontext;
 
-    }else {
-#ifdef ASYNC_DEBUG
-       elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
-            CurrentTransactionState->state,
-            CurrentTransactionState->blockState);
-#endif
-       notifyFrontEndPending = 1;
-    }
+               oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+
+               pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
+
+               MemoryContextSwitchTo(oldcontext);
+       }
 }
 
 /*
  *--------------------------------------------------------------
- * Async_Notify --
- *
- *      Adds the relation to the list of pending notifies.
- *      All notification happens at end of commit.
- *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ * Async_Listen
  *
- *      All notification of backend processes happens here,
- *      then each backend notifies its corresponding front end at
- *      the end of commit.
+ *             This is executed by the SQL listen command.
  *
- *      This correspond to 'notify <relname>' command
- *      -- jw, 12/28/93
+ *             Register a backend (identified by its Unix PID) as listening
+ *             on the specified relation.
  *
  * Results:
- *      XXX
+ *             XXX
  *
  * Side effects:
- *      All tuples for relname in pg_listener are updated.
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
-Async_Notify(char *relname)
+Async_Listen(char *relname, int pid)
 {
-    
-    HeapTuple lTuple, rTuple;
-    Relation lRel;
-    HeapScanDesc sRel;
-    TupleDesc tdesc;
-    ScanKeyData key;
-    Buffer b;
-    Datum d, value[3];
-    bool isnull;
-    char repl[3], nulls[3];
-    
-    char *notifyName;
-    
-#ifdef ASYNC_DEBUG
-    elog(DEBUG,"Async_Notify: %s",relname);
+       Relation        lRel;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
+       Datum           values[Natts_pg_listener];
+       char            nulls[Natts_pg_listener];
+       int                     i;
+       bool            alreadyListener = false;
+
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
+
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+
+       /* Detect whether we are already listening on this relname */
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+               if (listener->listenerpid == pid &&
+                 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+               {
+                       alreadyListener = true;
+                       /* No need to scan the rest of the table */
+                       break;
+               }
+       }
+       heap_endscan(scan);
+
+       if (alreadyListener)
+       {
+               heap_close(lRel, ExclusiveLock);
+               return;
+       }
+
+       /*
+        * OK to insert a new tuple
+        */
+
+       for (i = 0; i < Natts_pg_listener; i++)
+       {
+               nulls[i] = ' ';
+               values[i] = PointerGetDatum(NULL);
+       }
+
+       i = 0;
+       values[i++] = (Datum) relname;
+       values[i++] = (Datum) pid;
+       values[i++] = (Datum) 0;        /* no notifies pending */
+
+       tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
+       simple_heap_insert(lRel, tuple);
+
+#ifdef NOT_USED                                        /* currently there are no indexes */
+       CatalogUpdateIndexes(lRel, tuple);
 #endif
-    
-    if (!pendingNotifies) 
-      pendingNotifies = DLNewList();
-
-    /* 
-     * Allocate memory from the global malloc pool because it needs to be
-     * referenced also when the transaction is finished.  DZ - 26-08-1996
-     */
-    notifyName = strdup(relname);
-    DLAddHead(pendingNotifies, DLNewElem(notifyName));
-    
-    ScanKeyEntryInitialize(&key, 0,
-                          Anum_pg_listener_relname,
-                          NameEqualRegProcedure,
-                          PointerGetDatum(notifyName));
-    
-    lRel = heap_openr(ListenerRelationName);
-    tdesc = RelationGetTupleDescriptor(lRel);
-    RelationSetLockForWrite(lRel);
-    sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
-    
-    nulls[0] = nulls[1] = nulls[2] = ' ';
-    repl[0] = repl[1] = repl[2] = ' ';
-    repl[Anum_pg_listener_notify - 1] = 'r';
-    value[0] = value[1] = value[2] = (Datum) 0;
-    value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
-    
-    while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) {
-       d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify,
-                                tdesc, &isnull);
-       if (!DatumGetInt32(d)) {
-           rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
-           (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
+
+       heap_freetuple(tuple);
+
+       heap_close(lRel, ExclusiveLock);
+
+       /*
+        * now that we are listening, make sure we will unlisten before dying.
+        */
+       if (!unlistenExitRegistered)
+       {
+               on_shmem_exit(Async_UnlistenOnExit, 0);
+               unlistenExitRegistered = true;
        }
-       ReleaseBuffer(b);
-    }
-    heap_endscan(sRel);
-    RelationUnsetLockForWrite(lRel);
-    heap_close(lRel);
-    notifyIssued = 1;
 }
 
 /*
  *--------------------------------------------------------------
- * Async_NotifyAtCommit --
+ * Async_Unlisten
  *
- *      Signal our corresponding frontend process on relations that
- *      were notified.  Signal all other backend process that
- *      are listening also.
+ *             This is executed by the SQL unlisten command.
  *
- *      -- jw, 12/28/93
+ *             Remove the backend from the list of listening backends
+ *             for the specified relation.
  *
  * Results:
- *      XXX
+ *             XXX
  *
  * Side effects:
- *      Tuples in pg_listener that has our listenerpid are updated so
- *      that the notification is 0.  We do not want to notify frontend
- *      more than once.
- *
- *      -- jw, 12/28/93
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
-Async_NotifyAtCommit()
+Async_Unlisten(char *relname, int pid)
 {
-    HeapTuple lTuple;
-    Relation lRel;
-    HeapScanDesc sRel;
-    TupleDesc tdesc;
-    ScanKeyData key;
-    Datum d;
-    int ourpid;
-    bool isnull;
-    Buffer b;
-    extern TransactionState CurrentTransactionState;
-    
-    if (!pendingNotifies)
-      pendingNotifies = DLNewList();
-    
-    if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
-       
-       if (notifyIssued) {             /* 'notify <relname>' issued by us */
-           notifyIssued = 0;
-           StartTransactionCommand();
-#ifdef ASYNC_DEBUG
-           elog(DEBUG, "Async_NotifyAtCommit.");
-#endif
-           ScanKeyEntryInitialize(&key, 0,
-                                  Anum_pg_listener_notify,
-                                  Integer32EqualRegProcedure,
-                                  Int32GetDatum(1));
-           lRel = heap_openr(ListenerRelationName);
-           RelationSetLockForWrite(lRel);
-           sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
-           tdesc = RelationGetTupleDescriptor(lRel);
-           ourpid = getpid();
-           
-           while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) {
-               d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
-                                        tdesc, &isnull);
-               
-               if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) {
-                   d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid,
-                                            tdesc, &isnull);
-                   
-                   if (ourpid == DatumGetInt32(d)) {
-#ifdef ASYNC_DEBUG
-                       elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
-#endif
-                       notifyFrontEndPending = 1;
-                   } else {
-#ifdef ASYNC_DEBUG
-                       elog(DEBUG, "Notifying others");
-#endif
-#ifdef HAVE_KILL
-                       if (kill(DatumGetInt32(d), SIGUSR2) < 0) {
-                           if (errno == ESRCH) {
-                               heap_delete(lRel, &lTuple->t_ctid);
-                           }
-                       }
-#endif 
-                   }
-               }
-               ReleaseBuffer(b);
-           }
-           heap_endscan(sRel);
-           RelationUnsetLockForWrite(lRel);
-           heap_close(lRel);
-
-           CommitTransactionCommand();
-           ClearPendingNotify();
+       Relation        lRel;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
+
+       /* Handle specially the `unlisten "*"' command */
+       if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
+       {
+               Async_UnlistenAll();
+               return;
        }
-       
-       if (notifyFrontEndPending) {    /* we need to notify the frontend of
-                                          all pending notifies. */
-           notifyFrontEndPending = 1;
-           Async_NotifyFrontEnd();
+
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
+
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+               if (listener->listenerpid == pid &&
+                 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+               {
+                       /* Found the matching tuple, delete it */
+                       simple_heap_delete(lRel, &tuple->t_self);
+
+                       /*
+                        * We assume there can be only one match, so no need to scan
+                        * the rest of the table
+                        */
+                       break;
+               }
        }
-    }
+       heap_endscan(scan);
+
+       heap_close(lRel, ExclusiveLock);
+
+       /*
+        * We do not complain about unlistening something not being listened;
+        * should we?
+        */
 }
 
 /*
  *--------------------------------------------------------------
- * Async_NotifyAtAbort --
+ * Async_UnlistenAll
  *
- *      Gets rid of pending notifies.  List elements are automatically
- *      freed through memory context.
- *      
+ *             Unlisten all relations for this backend.
+ *
+ *             This is invoked by UNLISTEN "*" command, and also at backend exit.
  *
  * Results:
- *      XXX
+ *             XXX
  *
  * Side effects:
- *      XXX
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
-void
-Async_NotifyAtAbort()
+static void
+Async_UnlistenAll(void)
 {
-    extern TransactionState CurrentTransactionState;
-    
-    if (notifyIssued) {
-       ClearPendingNotify();
-    }
-    notifyIssued = 0;
-    if (pendingNotifies)
-       DLFreeList(pendingNotifies);
-    pendingNotifies = DLNewList();
-
-    if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-       (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
-       if (notifyFrontEndPending) { /* don't forget to notify front end */
-           Async_NotifyFrontEnd();
-       }
-    }
+       Relation        lRel;
+       TupleDesc       tdesc;
+       HeapScanDesc scan;
+       HeapTuple       lTuple;
+       ScanKeyData key[1];
+
+       if (Trace_notify)
+               elog(DEBUG1, "Async_UnlistenAll");
+
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+       tdesc = RelationGetDescr(lRel);
+
+       /* Find and delete all entries with my listenerPID */
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
+       scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               simple_heap_delete(lRel, &lTuple->t_self);
+
+       heap_endscan(scan);
+       heap_close(lRel, ExclusiveLock);
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_UnlistenOnExit
+ *
+ *             Clean up the pg_listener table at backend exit.
+ *
+ *             This is executed if we have done any LISTENs in this backend.
+ *             It might not be necessary anymore, if the user UNLISTENed everything,
+ *             but we don't try to detect that case.
+ *
+ * Results:
+ *             XXX
+ *
+ * Side effects:
+ *             pg_listener is updated if necessary.
+ *
+ *--------------------------------------------------------------
+ */
+static void
+Async_UnlistenOnExit(int code, Datum arg)
+{
+       /*
+        * We need to start/commit a transaction for the unlisten, but if
+        * there is already an active transaction we had better abort that one
+        * first.  Otherwise we'd end up committing changes that probably
+        * ought to be discarded.
+        */
+       AbortOutOfAnyTransaction();
+       /* Now we can do the unlisten */
+       StartTransactionCommand();
+       Async_UnlistenAll();
+       CommitTransactionCommand();
 }
 
 /*
  *--------------------------------------------------------------
- * Async_Listen --
+ * AtCommit_Notify
  *
- *      Register a backend (identified by its Unix PID) as listening
- *      on the specified relation.  
+ *             This is called at transaction commit.
  *
- *      This corresponds to the 'listen <relation>' command in SQL
+ *             If there are outbound notify requests in the pendingNotifies list,
+ *             scan pg_listener for matching tuples, and either signal the other
+ *             backend or send a message to our own frontend.
  *
- *      One listener per relation, pg_listener relation is keyed
- *      on (relname,pid) to provide multiple listeners in future.
+ *             NOTE: we are still inside the current transaction, therefore can
+ *             piggyback on its committing of changes.
  *
  * Results:
- *      pg_listeners is updated.
+ *             XXX
  *
  * Side effects:
- *      XXX
+ *             Tuples in pg_listener that have matching relnames and other peoples'
+ *             listenerPIDs are updated with a nonzero notification field.
  *
  *--------------------------------------------------------------
  */
 void
-Async_Listen(char *relname, int pid)
+AtCommit_Notify(void)
 {
-    Datum values[Natts_pg_listener];
-    char nulls[Natts_pg_listener];
-    TupleDesc tdesc;
-    HeapScanDesc s;
-    HeapTuple htup,tup;
-    Relation lDesc;
-    Buffer b;
-    Datum d;
-    int i;
-    bool isnull;
-    int alreadyListener = 0;
-    int ourPid = getpid();
-    char *relnamei;
-    TupleDesc tupDesc;
-    
-#ifdef ASYNC_DEBUG
-    elog(DEBUG,"Async_Listen: %s",relname);
+       Relation        lRel;
+       TupleDesc       tdesc;
+       HeapScanDesc scan;
+       HeapTuple       lTuple,
+                               rTuple;
+       Datum           value[Natts_pg_listener];
+       char            repl[Natts_pg_listener],
+                               nulls[Natts_pg_listener];
+
+       if (pendingNotifies == NIL)
+               return;                                 /* no NOTIFY statements in this
+                                                                * transaction */
+
+       /*
+        * NOTIFY is disabled if not normal processing mode. This test used to
+        * be in xact.c, but it seems cleaner to do it here.
+        */
+       if (!IsNormalProcessingMode())
+       {
+               ClearPendingNotifies();
+               return;
+       }
+
+       if (Trace_notify)
+               elog(DEBUG1, "AtCommit_Notify");
+
+       /* preset data to update notify column to MyProcPid */
+       nulls[0] = nulls[1] = nulls[2] = ' ';
+       repl[0] = repl[1] = repl[2] = ' ';
+       repl[Anum_pg_listener_notify - 1] = 'r';
+       value[0] = value[1] = value[2] = (Datum) 0;
+       value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
+
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+       tdesc = RelationGetDescr(lRel);
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+               char       *relname = NameStr(listener->relname);
+               int32           listenerPID = listener->listenerpid;
+
+               if (!AsyncExistsPendingNotify(relname))
+                       continue;
+
+               if (listenerPID == MyProcPid)
+               {
+                       /*
+                        * Self-notify: no need to bother with table update. Indeed,
+                        * we *must not* clear the notification field in this path, or
+                        * we could lose an outside notify, which'd be bad for
+                        * applications that ignore self-notify messages.
+                        */
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "AtCommit_Notify: notifying self");
+
+                       NotifyMyFrontEnd(relname, listenerPID);
+               }
+               else
+               {
+                       if (Trace_notify)
+                               elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
+                                        listenerPID);
+
+                       /*
+                        * If someone has already notified this listener, we don't
+                        * bother modifying the table, but we do still send a SIGUSR2
+                        * signal, just in case that backend missed the earlier signal
+                        * for some reason.  It's OK to send the signal first, because
+                        * the other guy can't read pg_listener until we unlock it.
+                        */
+                       if (kill(listenerPID, SIGUSR2) < 0)
+                       {
+                               /*
+                                * Get rid of pg_listener entry if it refers to a PID that
+                                * no longer exists.  Presumably, that backend crashed
+                                * without deleting its pg_listener entries. This code
+                                * used to only delete the entry if errno==ESRCH, but as
+                                * far as I can see we should just do it for any failure
+                                * (certainly at least for EPERM too...)
+                                */
+                               simple_heap_delete(lRel, &lTuple->t_self);
+                       }
+                       else if (listener->notification == 0)
+                       {
+                               ItemPointerData ctid;
+                               int                     result;
+
+                               rTuple = heap_modifytuple(lTuple, lRel,
+                                                                                 value, nulls, repl);
+                               /*
+                                * We cannot use simple_heap_update here because the tuple
+                                * could have been modified by an uncommitted transaction;
+                                * specifically, since UNLISTEN releases exclusive lock on
+                                * the table before commit, the other guy could already have
+                                * tried to unlisten.  There are no other cases where we
+                                * should be able to see an uncommitted update or delete.
+                                * Therefore, our response to a HeapTupleBeingUpdated result
+                                * is just to ignore it.  We do *not* wait for the other
+                                * guy to commit --- that would risk deadlock, and we don't
+                                * want to block while holding the table lock anyway for
+                                * performance reasons.  We also ignore HeapTupleUpdated,
+                                * which could occur if the other guy commits between our
+                                * heap_getnext and heap_update calls.
+                                */
+                               result = heap_update(lRel, &lTuple->t_self, rTuple,
+                                                                        &ctid,
+                                                                        GetCurrentCommandId(), SnapshotAny,
+                                                                        false /* no wait for commit */);
+                               switch (result)
+                               {
+                                       case HeapTupleSelfUpdated:
+                                               /* Tuple was already updated in current command? */
+                                               elog(ERROR, "tuple already updated by self");
+                                               break;
+
+                                       case HeapTupleMayBeUpdated:
+                                               /* done successfully */
+
+#ifdef NOT_USED                                        /* currently there are no indexes */
+                                               CatalogUpdateIndexes(lRel, rTuple);
 #endif
-    for (i = 0 ; i < Natts_pg_listener; i++) {
-        nulls[i] = ' ';
-        values[i] = PointerGetDatum(NULL);
-    }
-    
-    i = 0;
-    values[i++] = (Datum) relname;
-    values[i++] = (Datum) pid;
-    values[i++] = (Datum) 0;   /* no notifies pending */
-    
-    lDesc = heap_openr(ListenerRelationName);
-    RelationSetLockForWrite(lDesc);
-    
-    /* is someone already listening.  One listener per relation */
-    tdesc = RelationGetTupleDescriptor(lDesc);
-    s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
-    while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
-       d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
-                                &isnull);
-       relnamei = DatumGetPointer(d);
-       if (!strncmp(relnamei,relname, NAMEDATALEN)) {
-           d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
-           pid = DatumGetInt32(d);
-           if (pid == ourPid) {
-               alreadyListener = 1;
-           }
+                                               break;
+
+                                       case HeapTupleBeingUpdated:
+                                               /* ignore uncommitted tuples */
+                                               break;
+
+                                       case HeapTupleUpdated:
+                                               /* ignore just-committed tuples */
+                                               break;
+
+                                       default:
+                                               elog(ERROR, "unrecognized heap_update status: %u",
+                                                        result);
+                                               break;
+                               }
+                       }
+               }
        }
-       ReleaseBuffer(b);
-    }
-    heap_endscan(s);
-    
-    if (alreadyListener) {
-       elog(NOTICE, "Async_Listen: We are already listening on %s",
-            relname);
-       return;
-    }
-    
-    tupDesc = lDesc->rd_att;
-    tup = heap_formtuple(tupDesc,
-                        values,
-                        nulls);
-    heap_insert(lDesc, tup);
-    
-    pfree(tup);
-    /*    if (alreadyListener) {
-         elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
-         }*/
-
-    RelationUnsetLockForWrite(lDesc);
-    heap_close(lDesc);
-    
-    /*
-     * now that we are listening, we should make a note to ourselves 
-     * to unlisten prior to dying.
-     */
-    relnamei = malloc(NAMEDATALEN); /* persists to process exit */
-    strNcpy(relnamei, relname, NAMEDATALEN-1);
-    on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
+
+       heap_endscan(scan);
+
+       /*
+        * We do NOT release the lock on pg_listener here; we need to hold it
+        * until end of transaction (which is about to happen, anyway) to
+        * ensure that notified backends see our tuple updates when they look.
+        * Else they might disregard the signal, which would make the
+        * application programmer very unhappy.
+        */
+       heap_close(lRel, NoLock);
+
+       ClearPendingNotifies();
+
+       if (Trace_notify)
+               elog(DEBUG1, "AtCommit_Notify: done");
 }
 
 /*
  *--------------------------------------------------------------
- * Async_Unlisten --
+ * AtAbort_Notify
  *
- *      Remove the backend from the list of listening backends
- *      for the specified relation.
- *    
- *      This would correspond to the 'unlisten <relation>' 
- *      command, but there isn't one yet.
+ *             This is called at transaction abort.
  *
- * Results:
- *      pg_listeners is updated.
+ *             Gets rid of pending outbound notifies that we would have executed
+ *             if the transaction got committed.
  *
- * Side effects:
- *      XXX
+ * Results:
+ *             XXX
  *
  *--------------------------------------------------------------
  */
 void
-Async_Unlisten(char *relname, int pid)
+AtAbort_Notify(void)
 {
-    Relation lDesc;
-    HeapTuple lTuple;
-    
-    lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
-                                Int32GetDatum(pid),
-                                0,0);
-    lDesc = heap_openr(ListenerRelationName);
-    RelationSetLockForWrite(lDesc);
-
-    if (lTuple != NULL) {
-       heap_delete(lDesc,&lTuple->t_ctid);
-    }
-
-    RelationUnsetLockForWrite(lDesc);
-    heap_close(lDesc);
+       ClearPendingNotifies();
 }
 
+/*
+ * AtSubStart_Notify() --- Take care of subtransaction start.
+ *
+ * Push empty state for the new subtransaction.
+ */
 void
-Async_UnlistenOnExit(int code, /* from exitpg */
-                    char *relname)
+AtSubStart_Notify(void)
 {
-    Async_Unlisten((char *) relname, getpid());
+       MemoryContext   old_cxt;
+
+       /* Keep the list-of-lists in TopTransactionContext for simplicity */
+       old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+       upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
+
+       pendingNotifies = NIL;
+
+       MemoryContextSwitchTo(old_cxt);
 }
 
 /*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd --
+ * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ *
+ * Reassign all items in the pending notifies list to the parent transaction.
+ */
+void
+AtSubCommit_Notify(void)
+{
+       List    *parentPendingNotifies;
+
+       parentPendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+
+       /*
+        * We could try to eliminate duplicates here, but it seems not worthwhile.
+        */
+       pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+}
+
+/*
+ * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ */
+void
+AtSubAbort_Notify(void)
+{
+       /*
+        * All we have to do is pop the stack --- the notifies made in this
+        * subxact are no longer interesting, and the space will be freed when
+        * CurTransactionContext is recycled.
+        */
+       pendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+}
+
+/*
+ *--------------------------------------------------------------
+ * NotifyInterruptHandler
  *
- *      Perform an asynchronous notification to front end over
- *      portal comm channel.  The name of the relation which contains the
- *      data is sent to the front end.
+ *             This is the signal handler for SIGUSR2.
  *
- *      We remove the notification flag from the pg_listener tuple
- *      associated with our process.
+ *             If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ *             ProcessIncomingNotify directly.  Otherwise, just set a flag
+ *             to do it later.
  *
  * Results:
- *      XXX
+ *             none
  *
  * Side effects:
+ *             per above
+ *--------------------------------------------------------------
+ */
+void
+NotifyInterruptHandler(SIGNAL_ARGS)
+{
+       int                     save_errno = errno;
+
+       /*
+        * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
+        * here. Some helpful soul had this routine sprinkled with TPRINTFs,
+        * which would likely lead to corruption of stdio buffers if they were
+        * ever turned on.
+        */
+
+       /* Don't joggle the elbow of proc_exit */
+       if (proc_exit_inprogress)
+               return;
+
+       if (notifyInterruptEnabled)
+       {
+               bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
+
+               /*
+                * We may be called while ImmediateInterruptOK is true; turn it
+                * off while messing with the NOTIFY state.  (We would have to
+                * save and restore it anyway, because PGSemaphore operations
+                * inside ProcessIncomingNotify() might reset it.)
+                */
+               ImmediateInterruptOK = false;
+
+               /*
+                * I'm not sure whether some flavors of Unix might allow another
+                * SIGUSR2 occurrence to recursively interrupt this routine. To
+                * cope with the possibility, we do the same sort of dance that
+                * EnableNotifyInterrupt must do --- see that routine for
+                * comments.
+                */
+               notifyInterruptEnabled = 0;             /* disable any recursive signal */
+               notifyInterruptOccurred = 1;    /* do at least one iteration */
+               for (;;)
+               {
+                       notifyInterruptEnabled = 1;
+                       if (!notifyInterruptOccurred)
+                               break;
+                       notifyInterruptEnabled = 0;
+                       if (notifyInterruptOccurred)
+                       {
+                               /* Here, it is finally safe to do stuff. */
+                               if (Trace_notify)
+                                       elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
+
+                               ProcessIncomingNotify();
+
+                               if (Trace_notify)
+                                       elog(DEBUG1, "NotifyInterruptHandler: done");
+                       }
+               }
+
+               /*
+                * Restore ImmediateInterruptOK, and check for interrupts if
+                * needed.
+                */
+               ImmediateInterruptOK = save_ImmediateInterruptOK;
+               if (save_ImmediateInterruptOK)
+                       CHECK_FOR_INTERRUPTS();
+       }
+       else
+       {
+               /*
+                * In this path it is NOT SAFE to do much of anything, except
+                * this:
+                */
+               notifyInterruptOccurred = 1;
+       }
+
+       errno = save_errno;
+}
+
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt
  *
- *      We make use of the out-of-band channel to transmit the
- *      notification to the front end.  The actual data transfer takes
- *      place at the front end's request.
+ *             This is called by the PostgresMain main loop just before waiting
+ *             for a frontend command.  If we are truly idle (ie, *not* inside
+ *             a transaction block), then process any pending inbound notifies,
+ *             and enable the signal handler to process future notifies directly.
  *
+ *             NOTE: the signal handler starts out disabled, and stays so until
+ *             PostgresMain calls this the first time.
  * --------------------------------------------------------------
  */
-GlobalMemory notifyContext = NULL;
-
 void
-Async_NotifyFrontEnd()
+EnableNotifyInterrupt(void)
 {
-    extern CommandDest whereToSendOutput;
-    HeapTuple lTuple, rTuple;
-    Relation lRel;
-    HeapScanDesc sRel;
-    TupleDesc tdesc;
-    ScanKeyData key[2];
-    Datum d, value[3];
-    char repl[3], nulls[3];
-    Buffer b;
-    int ourpid;
-    bool isnull;
-    
-    notifyFrontEndPending = 0;
-    
-#ifdef ASYNC_DEBUG
-    elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
-#endif
-    
-    StartTransactionCommand();
-    ourpid = getpid();
-    ScanKeyEntryInitialize(&key[0], 0,
-                          Anum_pg_listener_notify,
-                          Integer32EqualRegProcedure,
-                          Int32GetDatum(1));
-    ScanKeyEntryInitialize(&key[1], 0,
-                          Anum_pg_listener_pid,
-                          Integer32EqualRegProcedure,
-                          Int32GetDatum(ourpid));
-    lRel = heap_openr(ListenerRelationName);
-    RelationSetLockForWrite(lRel);
-    tdesc = RelationGetTupleDescriptor(lRel);
-    sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key);
-    
-    nulls[0] = nulls[1] = nulls[2] = ' ';
-    repl[0] = repl[1] = repl[2] = ' ';
-    repl[Anum_pg_listener_notify - 1] = 'r';
-    value[0] = value[1] = value[2] = (Datum) 0;
-    value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
-    
-    while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) {
-       d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
-                                tdesc, &isnull);
-       rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
-       (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
-       
-       /* notifying the front end */
-       
-       if (whereToSendOutput == Remote) {
-           pq_putnchar("A", 1);
-           pq_putint(ourpid, 4);
-           pq_putstr(DatumGetName(d)->data);
-           pq_flush();
-       } else {
-           elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
+       if (IsTransactionOrTransactionBlock())
+               return;                                 /* not really idle */
+
+       /*
+        * This code is tricky because we are communicating with a signal
+        * handler that could interrupt us at any point.  If we just checked
+        * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+        * could fail to respond promptly to a signal that happens in between
+        * those two steps.  (A very small time window, perhaps, but Murphy's
+        * Law says you can hit it...)  Instead, we first set the enable flag,
+        * then test the occurred flag.  If we see an unserviced interrupt has
+        * occurred, we re-clear the enable flag before going off to do the
+        * service work.  (That prevents re-entrant invocation of
+        * ProcessIncomingNotify() if another interrupt occurs.) If an
+        * interrupt comes in between the setting and clearing of
+        * notifyInterruptEnabled, then it will have done the service work and
+        * left notifyInterruptOccurred zero, so we have to check again after
+        * clearing enable.  The whole thing has to be in a loop in case
+        * another interrupt occurs while we're servicing the first. Once we
+        * get out of the loop, enable is set and we know there is no
+        * unserviced interrupt.
+        *
+        * NB: an overenthusiastic optimizing compiler could easily break this
+        * code.  Hopefully, they all understand what "volatile" means these
+        * days.
+        */
+       for (;;)
+       {
+               notifyInterruptEnabled = 1;
+               if (!notifyInterruptOccurred)
+                       break;
+               notifyInterruptEnabled = 0;
+               if (notifyInterruptOccurred)
+               {
+                       if (Trace_notify)
+                               elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
+
+                       ProcessIncomingNotify();
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "EnableNotifyInterrupt: done");
+               }
        }
-       ReleaseBuffer(b);
-    }
-    CommitTransactionCommand();
 }
 
-static int
-AsyncExistsPendingNotify(char *relname)
+/*
+ * --------------------------------------------------------------
+ * DisableNotifyInterrupt
+ *
+ *             This is called by the PostgresMain main loop just after receiving
+ *             a frontend command.  Signal handler execution of inbound notifies
+ *             is disabled until the next EnableNotifyInterrupt call.
+ *
+ *             The SIGUSR1 signal handler also needs to call this, so as to
+ *             prevent conflicts if one signal interrupts the other.  So we
+ *             must return the previous state of the flag.
+ * --------------------------------------------------------------
+ */
+bool
+DisableNotifyInterrupt(void)
 {
-    Dlelem* p;
-    for (p = DLGetHead(pendingNotifies); 
-        p != NULL;
-        p = DLGetSucc(p)) {
-      /* Use NAMEDATALEN for relname comparison.    DZ - 26-08-1996 */
-      if (!strncmp(DLE_VAL(p), relname, NAMEDATALEN))
-       return 1;
-    }
-
-    return 0;
+       bool    result = (notifyInterruptEnabled != 0);
+
+       notifyInterruptEnabled = 0;
+
+       return result;
 }
 
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify
+ *
+ *             Deal with arriving NOTIFYs from other backends.
+ *             This is called either directly from the SIGUSR2 signal handler,
+ *             or the next time control reaches the outer idle loop.
+ *             Scan pg_listener for arriving notifies, report them to my front end,
+ *             and clear the notification field in pg_listener until next time.
+ *
+ *             NOTE: since we are outside any transaction, we must create our own.
+ * --------------------------------------------------------------
+ */
 static void
-ClearPendingNotify()
+ProcessIncomingNotify(void)
 {
-    Dlelem* p;
-    while ( (p = DLRemHead(pendingNotifies)) != NULL)
-       free(DLE_VAL(p));
+       Relation        lRel;
+       TupleDesc       tdesc;
+       ScanKeyData key[1];
+       HeapScanDesc scan;
+       HeapTuple       lTuple,
+                               rTuple;
+       Datum           value[Natts_pg_listener];
+       char            repl[Natts_pg_listener],
+                               nulls[Natts_pg_listener];
+       bool            catchup_enabled;
+
+       /* Must prevent SIGUSR1 interrupt while I am running */
+       catchup_enabled = DisableCatchupInterrupt();
+
+       if (Trace_notify)
+               elog(DEBUG1, "ProcessIncomingNotify");
+
+       set_ps_display("notify interrupt");
+
+       notifyInterruptOccurred = 0;
+
+       StartTransactionCommand();
+
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+       tdesc = RelationGetDescr(lRel);
+
+       /* Scan only entries with my listenerPID */
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
+       scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+
+       /* Prepare data for rewriting 0 into notification field */
+       nulls[0] = nulls[1] = nulls[2] = ' ';
+       repl[0] = repl[1] = repl[2] = ' ';
+       repl[Anum_pg_listener_notify - 1] = 'r';
+       value[0] = value[1] = value[2] = (Datum) 0;
+       value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+               char       *relname = NameStr(listener->relname);
+               int32           sourcePID = listener->notification;
+
+               if (sourcePID != 0)
+               {
+                       /* Notify the frontend */
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
+                                        relname, (int) sourcePID);
+
+                       NotifyMyFrontEnd(relname, sourcePID);
+                       /*
+                        * Rewrite the tuple with 0 in notification column.
+                        *
+                        * simple_heap_update is safe here because no one else would
+                        * have tried to UNLISTEN us, so there can be no uncommitted
+                        * changes.
+                        */
+                       rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+                       simple_heap_update(lRel, &lTuple->t_self, rTuple);
+
+#ifdef NOT_USED                                        /* currently there are no indexes */
+                       CatalogUpdateIndexes(lRel, rTuple);
+#endif
+               }
+       }
+       heap_endscan(scan);
+
+       /*
+        * We do NOT release the lock on pg_listener here; we need to hold it
+        * until end of transaction (which is about to happen, anyway) to
+        * ensure that other backends see our tuple updates when they look.
+        * Otherwise, a transaction started after this one might mistakenly
+        * think it doesn't need to send this backend a new NOTIFY.
+        */
+       heap_close(lRel, NoLock);
+
+       CommitTransactionCommand();
+
+       /*
+        * Must flush the notify messages to ensure frontend gets them
+        * promptly.
+        */
+       pq_flush();
+
+       set_ps_display("idle");
+
+       if (Trace_notify)
+               elog(DEBUG1, "ProcessIncomingNotify: done");
+
+       if (catchup_enabled)
+               EnableCatchupInterrupt();
+}
+
+/*
+ * Send NOTIFY message to my front end.
+ */
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+       if (whereToSendOutput == Remote)
+       {
+               StringInfoData buf;
+
+               pq_beginmessage(&buf, 'A');
+               pq_sendint(&buf, listenerPID, sizeof(int32));
+               pq_sendstring(&buf, relname);
+               if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
+               {
+                       /* XXX Add parameter string here later */
+                       pq_sendstring(&buf, "");
+               }
+               pq_endmessage(&buf);
+
+               /*
+                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
+                * happen at the end of the transaction, and for incoming notifies
+                * ProcessIncomingNotify will do it after finding all the
+                * notifies.
+                */
+       }
+       else
+               elog(INFO, "NOTIFY for %s", relname);
+}
+
+/* Does pendingNotifies include the given relname? */
+static bool
+AsyncExistsPendingNotify(const char *relname)
+{
+       ListCell   *p;
+
+       foreach(p, pendingNotifies)
+       {
+               /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
+               if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
+                       return true;
+       }
+
+       return false;
 }
 
+/* Clear the pendingNotifies list. */
+static void
+ClearPendingNotifies(void)
+{
+       /*
+        * We used to have to explicitly deallocate the list members and
+        * nodes, because they were malloc'd.  Now, since we know they are
+        * palloc'd in CurTransactionContext, we need not do that --- they'll
+        * go away automatically at transaction exit.  We need only reset the
+        * list head pointer.
+        */
+       pendingNotifies = NIL;
+}