]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / commands / async.c
index 3d5cf92f7d1299d61e78e698eeba8dd48e7cd732..8e53d6af7d79d57b183e12e66bf3086a32ef0537 100644 (file)
@@ -3,10 +3,11 @@
  * async.c
  *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
- * Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.45 1999/04/25 03:19:08 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  *       relname to a list of outstanding NOTIFY requests.  Actual processing
  *       happens if and only if we reach transaction commit.  At that time (in
  *       routine AtCommit_Notify) we scan pg_listener for matching relnames.
- *    If the listenerPID in a matching tuple is ours, we just send a notify
+ *       If the listenerPID in a matching tuple is ours, we just send a notify
  *       message to our own front end.  If it is not ours, and "notification"
  *       is not already nonzero, we set notification to our own PID and send a
  *       SIGUSR2 signal to the receiving process (indicated by listenerPID).
  *       BTW: if the signal operation fails, we presume that the listener backend
- *    crashed without removing this tuple, and remove the tuple for it.
+ *       crashed without removing this tuple, and remove the tuple for it.
  *
  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
  *       notify processing immediately if this backend is idle (ie, it is
  *       waiting for a frontend command and is not within a transaction block).
- *    Otherwise the handler may only set a flag, which will cause the
+ *       Otherwise the handler may only set a flag, which will cause the
  *       processing to occur just before we next go idle.
  *
  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
  *       transaction, since by assumption it is only called from outside any
  *       transaction.
  *
- * Note that the system's use of pg_listener is confined to very short
- * intervals at the end of a transaction that contains NOTIFY statements,
- * or during the transaction caused by an inbound SIGUSR2.  So the fact that
- * pg_listener is a global resource shouldn't cause too much performance
- * problem.  But application authors ought to be discouraged from doing
- * LISTEN or UNLISTEN near the start of a long transaction --- that would
- * result in holding the pg_listener write lock for a long time, possibly
- * blocking unrelated activity.  It could even lead to deadlock against another
- * transaction that touches the same user tables and then tries to NOTIFY.
- * Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
+ * Although we grab ExclusiveLock on pg_listener for any operation,
+ * the lock is never held very long, so it shouldn't cause too much of
+ * a performance problem.  (Previously we used AccessExclusiveLock, but
+ * there's no real reason to forbid concurrent reads.)
  *
  * An application that listens on the same relname it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
  *-------------------------------------------------------------------------
  */
 
+#include "postgres.h"
+
 #include <unistd.h>
 #include <signal.h>
-#include <string.h>
 #include <errno.h>
-#include <sys/types.h>                 /* Needed by in.h on Ultrix */
 #include <netinet/in.h>
 
-#include "postgres.h"
-
-#include "commands/async.h"
 #include "access/heapam.h"
-#include "access/relscan.h"
-#include "access/xact.h"
 #include "catalog/catname.h"
 #include "catalog/pg_listener.h"
-#include "fmgr.h"
-#include "lib/dllist.h"
+#include "commands/async.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
-#include "storage/bufmgr.h"
-#include "storage/lmgr.h"
-#include "tcop/dest.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "tcop/tcopprot.h"
+#include "utils/fmgroids.h"
+#include "utils/ps_status.h"
 #include "utils/syscache.h"
-#include <utils/trace.h>
-#include <utils/ps_status.h>
 
-/* stuff that we really ought not be touching directly :-( */
-extern TransactionState CurrentTransactionState;
-extern CommandDest whereToSendOutput;
 
 /*
  * State for outbound notifies consists of a list of all relnames NOTIFYed
- * in the current transaction.  We do not actually perform a NOTIFY until
- * and unless the transaction commits.  pendingNotifies is NULL if no
+ * in the current transaction. We do not actually perform a NOTIFY until
+ * and unless the transaction commits. pendingNotifies is NIL if no
  * NOTIFYs have been done in the current transaction.
+ *
+ * The list is kept in CurTransactionContext.  In subtransactions, each
+ * subtransaction has its own list in its own CurTransactionContext, but
+ * successful subtransactions attach their lists to their parent's list.
+ * Failed subtransactions simply discard their lists.
  */
-static Dllist *pendingNotifies = NULL;
+static List *pendingNotifies = NIL;
+
+static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
 
 /*
  * State for inbound notifies consists of two flags: one saying whether
@@ -125,24 +118,26 @@ static Dllist *pendingNotifies = NULL;
  * does not grok "volatile", you'd be best advised to compile this file
  * with all optimization turned off.
  */
-static volatile int    notifyInterruptEnabled = 0;
-static volatile int    notifyInterruptOccurred = 0;
+static volatile int notifyInterruptEnabled = 0;
+static volatile int notifyInterruptOccurred = 0;
 
-/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
-static int     unlistenExitRegistered = 0;
+/* True if we've registered an on_shmem_exit cleanup */
+static bool unlistenExitRegistered = false;
+
+bool           Trace_notify = false;
 
 
 static void Async_UnlistenAll(void);
-static void Async_UnlistenOnExit(void);
+static void Async_UnlistenOnExit(int code, Datum arg);
 static void ProcessIncomingNotify(void);
 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
-static int     AsyncExistsPendingNotify(char *relname);
+static bool AsyncExistsPendingNotify(const char *relname);
 static void ClearPendingNotifies(void);
 
 
 /*
  *--------------------------------------------------------------
- * Async_Notify 
+ * Async_Notify
  *
  *             This is executed by the SQL notify command.
  *
@@ -158,34 +153,29 @@ static void ClearPendingNotifies(void);
 void
 Async_Notify(char *relname)
 {
-       char       *notifyName;
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Notify(%s)", relname);
+
+       /* no point in making duplicate entries in the list ... */
+       if (!AsyncExistsPendingNotify(relname))
+       {
+               /*
+                * The name list needs to live until end of transaction, so store
+                * it in the transaction context.
+                */
+               MemoryContext oldcontext;
 
-       TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
+               oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
-       /*
-        * We allocate list memory from the global malloc pool to ensure that
-        * it will live until we want to use it.  This is probably not necessary
-        * any longer, since we will use it before the end of the transaction.
-        * DLList only knows how to use malloc() anyway, but we could probably
-        * palloc() the strings...
-        */
-       if (!pendingNotifies)
-               pendingNotifies = DLNewList();
-       notifyName = strdup(relname);
-       DLAddHead(pendingNotifies, DLNewElem(notifyName));
-       /*
-        * NOTE: we could check to see if pendingNotifies already has an entry
-        * for relname, and thus avoid making duplicate entries.  However, most
-        * apps probably don't notify the same name multiple times per transaction,
-        * so we'd likely just be wasting cycles to make such a check.
-        * AsyncExistsPendingNotify() doesn't really care whether the list
-        * contains duplicates...
-        */
+               pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
+
+               MemoryContextSwitchTo(oldcontext);
+       }
 }
 
 /*
  *--------------------------------------------------------------
- * Async_Listen 
+ * Async_Listen
  *
  *             This is executed by the SQL listen command.
  *
@@ -204,47 +194,37 @@ void
 Async_Listen(char *relname, int pid)
 {
        Relation        lRel;
-       TupleDesc       tdesc;
        HeapScanDesc scan;
-       HeapTuple       tuple,
-                               newtup;
+       HeapTuple       tuple;
        Datum           values[Natts_pg_listener];
        char            nulls[Natts_pg_listener];
-       Datum           d;
        int                     i;
-       bool            isnull;
-       int                     alreadyListener = 0;
-       TupleDesc       tupDesc;
+       bool            alreadyListener = false;
 
-       TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
 
-       lRel = heap_openr(ListenerRelationName);
-       LockRelation(lRel, AccessExclusiveLock);
-       tdesc = RelationGetDescr(lRel);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
 
        /* Detect whether we are already listening on this relname */
-       scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
-       while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
-               if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+               if (listener->listenerpid == pid &&
+                 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
                {
-                       d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
-                       if (DatumGetInt32(d) == pid)
-                       {
-                               alreadyListener = 1;
-                               /* No need to scan the rest of the table */
-                               break;
-                       }
+                       alreadyListener = true;
+                       /* No need to scan the rest of the table */
+                       break;
                }
        }
        heap_endscan(scan);
 
        if (alreadyListener)
        {
-               elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
-               UnlockRelation(lRel, AccessExclusiveLock);
-               heap_close(lRel);
+               heap_close(lRel, ExclusiveLock);
                return;
        }
 
@@ -263,28 +243,30 @@ Async_Listen(char *relname, int pid)
        values[i++] = (Datum) pid;
        values[i++] = (Datum) 0;        /* no notifies pending */
 
-       tupDesc = lRel->rd_att;
-       newtup = heap_formtuple(tupDesc, values, nulls);
-       heap_insert(lRel, newtup);
-       pfree(newtup);
+       tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
+       simple_heap_insert(lRel, tuple);
 
-       UnlockRelation(lRel, AccessExclusiveLock);
-       heap_close(lRel);
+#ifdef NOT_USED                                        /* currently there are no indexes */
+       CatalogUpdateIndexes(lRel, tuple);
+#endif
+
+       heap_freetuple(tuple);
+
+       heap_close(lRel, ExclusiveLock);
 
        /*
         * now that we are listening, make sure we will unlisten before dying.
         */
-       if (! unlistenExitRegistered)
+       if (!unlistenExitRegistered)
        {
-               if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
-                       elog(NOTICE, "Async_Listen: out of shmem_exit slots");
-               unlistenExitRegistered = 1;
+               on_shmem_exit(Async_UnlistenOnExit, 0);
+               unlistenExitRegistered = true;
        }
 }
 
 /*
  *--------------------------------------------------------------
- * Async_Unlisten 
+ * Async_Unlisten
  *
  *             This is executed by the SQL unlisten command.
  *
@@ -303,7 +285,8 @@ void
 Async_Unlisten(char *relname, int pid)
 {
        Relation        lRel;
-       HeapTuple       lTuple;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
 
        /* Handle specially the `unlisten "*"' command */
        if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
@@ -312,28 +295,42 @@ Async_Unlisten(char *relname, int pid)
                return;
        }
 
-       TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
+       if (Trace_notify)
+               elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
 
-       /* Note we assume there can be only one matching tuple. */
-       lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
-                                                                Int32GetDatum(pid),
-                                                                0, 0);
-       if (lTuple != NULL)
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               lRel = heap_openr(ListenerRelationName);
-               LockRelation(lRel, AccessExclusiveLock);
-               heap_delete(lRel, &lTuple->t_self, NULL);
-               UnlockRelation(lRel, AccessExclusiveLock);
-               heap_close(lRel);
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+               if (listener->listenerpid == pid &&
+                 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+               {
+                       /* Found the matching tuple, delete it */
+                       simple_heap_delete(lRel, &tuple->t_self);
+
+                       /*
+                        * We assume there can be only one match, so no need to scan
+                        * the rest of the table
+                        */
+                       break;
+               }
        }
-       /* We do not complain about unlistening something not being listened;
+       heap_endscan(scan);
+
+       heap_close(lRel, ExclusiveLock);
+
+       /*
+        * We do not complain about unlistening something not being listened;
         * should we?
         */
 }
 
 /*
  *--------------------------------------------------------------
- * Async_UnlistenAll 
+ * Async_UnlistenAll
  *
  *             Unlisten all relations for this backend.
  *
@@ -348,38 +345,37 @@ Async_Unlisten(char *relname, int pid)
  *--------------------------------------------------------------
  */
 static void
-Async_UnlistenAll()
+Async_UnlistenAll(void)
 {
        Relation        lRel;
        TupleDesc       tdesc;
-       HeapScanDesc sRel;
+       HeapScanDesc scan;
        HeapTuple       lTuple;
        ScanKeyData key[1];
 
-       TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+       if (Trace_notify)
+               elog(DEBUG1, "Async_UnlistenAll");
 
-       lRel = heap_openr(ListenerRelationName);
-       LockRelation(lRel, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
        tdesc = RelationGetDescr(lRel);
 
        /* Find and delete all entries with my listenerPID */
-       ScanKeyEntryInitialize(&key[0], 0,
-                                                  Anum_pg_listener_pid,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(MyProcPid));
-       sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
-
-       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-               heap_delete(lRel, &lTuple->t_self, NULL);
-
-       heap_endscan(sRel);
-       UnlockRelation(lRel, AccessExclusiveLock);
-       heap_close(lRel);
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
+       scan = heap_beginscan(lRel, SnapshotNow, 1, key);
+
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               simple_heap_delete(lRel, &lTuple->t_self);
+
+       heap_endscan(scan);
+       heap_close(lRel, ExclusiveLock);
 }
 
 /*
  *--------------------------------------------------------------
- * Async_UnlistenOnExit 
+ * Async_UnlistenOnExit
  *
  *             Clean up the pg_listener table at backend exit.
  *
@@ -396,13 +392,13 @@ Async_UnlistenAll()
  *--------------------------------------------------------------
  */
 static void
-Async_UnlistenOnExit()
+Async_UnlistenOnExit(int code, Datum arg)
 {
        /*
-        * We need to start/commit a transaction for the unlisten,
-        * but if there is already an active transaction we had better
-        * abort that one first.  Otherwise we'd end up committing changes
-        * that probably ought to be discarded.
+        * We need to start/commit a transaction for the unlisten, but if
+        * there is already an active transaction we had better abort that one
+        * first.  Otherwise we'd end up committing changes that probably
+        * ought to be discarded.
         */
        AbortOutOfAnyTransaction();
        /* Now we can do the unlisten */
@@ -413,7 +409,7 @@ Async_UnlistenOnExit()
 
 /*
  *--------------------------------------------------------------
- * AtCommit_Notify 
+ * AtCommit_Notify
  *
  *             This is called at transaction commit.
  *
@@ -434,39 +430,33 @@ Async_UnlistenOnExit()
  *--------------------------------------------------------------
  */
 void
-AtCommit_Notify()
+AtCommit_Notify(void)
 {
        Relation        lRel;
        TupleDesc       tdesc;
-       HeapScanDesc sRel;
+       HeapScanDesc scan;
        HeapTuple       lTuple,
                                rTuple;
-       Datum           d,
-                               value[Natts_pg_listener];
+       Datum           value[Natts_pg_listener];
        char            repl[Natts_pg_listener],
                                nulls[Natts_pg_listener];
-       bool            isnull;
-       char       *relname;
-       int32           listenerPID;
 
-       if (!pendingNotifies)
-               return;                                 /* no NOTIFY statements in this transaction */
+       if (pendingNotifies == NIL)
+               return;                                 /* no NOTIFY statements in this
+                                                                * transaction */
 
-       /* NOTIFY is disabled if not normal processing mode.
-        * This test used to be in xact.c, but it seems cleaner to do it here.
+       /*
+        * NOTIFY is disabled if not normal processing mode. This test used to
+        * be in xact.c, but it seems cleaner to do it here.
         */
-       if (! IsNormalProcessingMode())
+       if (!IsNormalProcessingMode())
        {
                ClearPendingNotifies();
                return;
        }
 
-       TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
-
-       lRel = heap_openr(ListenerRelationName);
-       LockRelation(lRel, AccessExclusiveLock);
-       tdesc = RelationGetDescr(lRel);
-       sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+       if (Trace_notify)
+               elog(DEBUG1, "AtCommit_Notify");
 
        /* preset data to update notify column to MyProcPid */
        nulls[0] = nulls[1] = nulls[2] = ' ';
@@ -475,81 +465,136 @@ AtCommit_Notify()
        value[0] = value[1] = value[2] = (Datum) 0;
        value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
 
-       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
+       tdesc = RelationGetDescr(lRel);
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
-               relname = (char *) DatumGetPointer(d);
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+               char       *relname = NameStr(listener->relname);
+               int32           listenerPID = listener->listenerpid;
+
+               if (!AsyncExistsPendingNotify(relname))
+                       continue;
 
-               if (AsyncExistsPendingNotify(relname))
+               if (listenerPID == MyProcPid)
                {
-                       d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
-                       listenerPID = DatumGetInt32(d);
+                       /*
+                        * Self-notify: no need to bother with table update. Indeed,
+                        * we *must not* clear the notification field in this path, or
+                        * we could lose an outside notify, which'd be bad for
+                        * applications that ignore self-notify messages.
+                        */
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "AtCommit_Notify: notifying self");
 
-                       if (listenerPID == MyProcPid)
+                       NotifyMyFrontEnd(relname, listenerPID);
+               }
+               else
+               {
+                       if (Trace_notify)
+                               elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
+                                        listenerPID);
+
+                       /*
+                        * If someone has already notified this listener, we don't
+                        * bother modifying the table, but we do still send a SIGUSR2
+                        * signal, just in case that backend missed the earlier signal
+                        * for some reason.  It's OK to send the signal first, because
+                        * the other guy can't read pg_listener until we unlock it.
+                        */
+                       if (kill(listenerPID, SIGUSR2) < 0)
                        {
-                               /* Self-notify: no need to bother with table update.
-                                * Indeed, we *must not* clear the notification field in
-                                * this path, or we could lose an outside notify, which'd be
-                                * bad for applications that ignore self-notify messages.
+                               /*
+                                * Get rid of pg_listener entry if it refers to a PID that
+                                * no longer exists.  Presumably, that backend crashed
+                                * without deleting its pg_listener entries. This code
+                                * used to only delete the entry if errno==ESRCH, but as
+                                * far as I can see we should just do it for any failure
+                                * (certainly at least for EPERM too...)
                                 */
-                               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
-                               NotifyMyFrontEnd(relname, listenerPID);
+                               simple_heap_delete(lRel, &lTuple->t_self);
                        }
-                       else
+                       else if (listener->notification == 0)
                        {
-                               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
-                                               listenerPID);
+                               ItemPointerData ctid;
+                               int                     result;
+
+                               rTuple = heap_modifytuple(lTuple, lRel,
+                                                                                 value, nulls, repl);
                                /*
-                                * If someone has already notified this listener,
-                                * we don't bother modifying the table, but we do still send
-                                * a SIGUSR2 signal, just in case that backend missed the
-                                * earlier signal for some reason.  It's OK to send the signal
-                                * first, because the other guy can't read pg_listener until
-                                * we unlock it.
+                                * We cannot use simple_heap_update here because the tuple
+                                * could have been modified by an uncommitted transaction;
+                                * specifically, since UNLISTEN releases exclusive lock on
+                                * the table before commit, the other guy could already have
+                                * tried to unlisten.  There are no other cases where we
+                                * should be able to see an uncommitted update or delete.
+                                * Therefore, our response to a HeapTupleBeingUpdated result
+                                * is just to ignore it.  We do *not* wait for the other
+                                * guy to commit --- that would risk deadlock, and we don't
+                                * want to block while holding the table lock anyway for
+                                * performance reasons.  We also ignore HeapTupleUpdated,
+                                * which could occur if the other guy commits between our
+                                * heap_getnext and heap_update calls.
                                 */
-#ifdef HAVE_KILL
-                               if (kill(listenerPID, SIGUSR2) < 0)
+                               result = heap_update(lRel, &lTuple->t_self, rTuple,
+                                                                        &ctid,
+                                                                        GetCurrentCommandId(), SnapshotAny,
+                                                                        false /* no wait for commit */);
+                               switch (result)
                                {
-                                       /* Get rid of pg_listener entry if it refers to a PID
-                                        * that no longer exists.  Presumably, that backend
-                                        * crashed without deleting its pg_listener entries.
-                                        * This code used to only delete the entry if errno==ESRCH,
-                                        * but as far as I can see we should just do it for any
-                                        * failure (certainly at least for EPERM too...)
-                                        */
-                                       heap_delete(lRel, &lTuple->t_self, NULL);
-                               }
-                               else
+                                       case HeapTupleSelfUpdated:
+                                               /* Tuple was already updated in current command? */
+                                               elog(ERROR, "tuple already updated by self");
+                                               break;
+
+                                       case HeapTupleMayBeUpdated:
+                                               /* done successfully */
+
+#ifdef NOT_USED                                        /* currently there are no indexes */
+                                               CatalogUpdateIndexes(lRel, rTuple);
 #endif
-                               {
-                                       d = heap_getattr(lTuple, Anum_pg_listener_notify,
-                                                                        tdesc, &isnull);
-                                       if (DatumGetInt32(d) == 0)
-                                       {
-                                               rTuple = heap_modifytuple(lTuple, lRel,
-                                                                                                 value, nulls, repl);
-                                               heap_replace(lRel, &lTuple->t_self, rTuple, NULL);
-                                       }
+                                               break;
+
+                                       case HeapTupleBeingUpdated:
+                                               /* ignore uncommitted tuples */
+                                               break;
+
+                                       case HeapTupleUpdated:
+                                               /* ignore just-committed tuples */
+                                               break;
+
+                                       default:
+                                               elog(ERROR, "unrecognized heap_update status: %u",
+                                                        result);
+                                               break;
                                }
                        }
                }
        }
 
-       heap_endscan(sRel);
+       heap_endscan(scan);
+
        /*
-        * We do not do RelationUnsetLockForWrite(lRel) here, because the
-        * transaction is about to be committed anyway.
+        * We do NOT release the lock on pg_listener here; we need to hold it
+        * until end of transaction (which is about to happen, anyway) to
+        * ensure that notified backends see our tuple updates when they look.
+        * Else they might disregard the signal, which would make the
+        * application programmer very unhappy.
         */
-       heap_close(lRel);
+       heap_close(lRel, NoLock);
 
        ClearPendingNotifies();
 
-       TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+       if (Trace_notify)
+               elog(DEBUG1, "AtCommit_Notify: done");
 }
 
 /*
  *--------------------------------------------------------------
- * AtAbort_Notify 
+ * AtAbort_Notify
  *
  *             This is called at transaction abort.
  *
@@ -562,14 +607,68 @@ AtCommit_Notify()
  *--------------------------------------------------------------
  */
 void
-AtAbort_Notify()
+AtAbort_Notify(void)
 {
        ClearPendingNotifies();
 }
 
+/*
+ * AtSubStart_Notify() --- Take care of subtransaction start.
+ *
+ * Push empty state for the new subtransaction.
+ */
+void
+AtSubStart_Notify(void)
+{
+       MemoryContext   old_cxt;
+
+       /* Keep the list-of-lists in TopTransactionContext for simplicity */
+       old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+       upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
+
+       pendingNotifies = NIL;
+
+       MemoryContextSwitchTo(old_cxt);
+}
+
+/*
+ * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ *
+ * Reassign all items in the pending notifies list to the parent transaction.
+ */
+void
+AtSubCommit_Notify(void)
+{
+       List    *parentPendingNotifies;
+
+       parentPendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+
+       /*
+        * We could try to eliminate duplicates here, but it seems not worthwhile.
+        */
+       pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+}
+
+/*
+ * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ */
+void
+AtSubAbort_Notify(void)
+{
+       /*
+        * All we have to do is pop the stack --- the notifies made in this
+        * subxact are no longer interesting, and the space will be freed when
+        * CurTransactionContext is recycled.
+        */
+       pendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+}
+
 /*
  *--------------------------------------------------------------
- * Async_NotifyHandler 
+ * NotifyInterruptHandler
  *
  *             This is the signal handler for SIGUSR2.
  *
@@ -584,51 +683,85 @@ AtAbort_Notify()
  *             per above
  *--------------------------------------------------------------
  */
-
 void
-Async_NotifyHandler(SIGNAL_ARGS)
+NotifyInterruptHandler(SIGNAL_ARGS)
 {
+       int                     save_errno = errno;
+
        /*
-        * Note: this is a SIGNAL HANDLER.  You must be very wary what you do here.
-        * Some helpful soul had this routine sprinkled with TPRINTFs, which would
-        * likely lead to corruption of stdio buffers if they were ever turned on.
+        * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
+        * here. Some helpful soul had this routine sprinkled with TPRINTFs,
+        * which would likely lead to corruption of stdio buffers if they were
+        * ever turned on.
         */
 
+       /* Don't joggle the elbow of proc_exit */
+       if (proc_exit_inprogress)
+               return;
+
        if (notifyInterruptEnabled)
        {
-               /* I'm not sure whether some flavors of Unix might allow another
-                * SIGUSR2 occurrence to recursively interrupt this routine.
-                * To cope with the possibility, we do the same sort of dance that
-                * EnableNotifyInterrupt must do --- see that routine for comments.
+               bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
+
+               /*
+                * We may be called while ImmediateInterruptOK is true; turn it
+                * off while messing with the NOTIFY state.  (We would have to
+                * save and restore it anyway, because PGSemaphore operations
+                * inside ProcessIncomingNotify() might reset it.)
+                */
+               ImmediateInterruptOK = false;
+
+               /*
+                * I'm not sure whether some flavors of Unix might allow another
+                * SIGUSR2 occurrence to recursively interrupt this routine. To
+                * cope with the possibility, we do the same sort of dance that
+                * EnableNotifyInterrupt must do --- see that routine for
+                * comments.
                 */
                notifyInterruptEnabled = 0;             /* disable any recursive signal */
                notifyInterruptOccurred = 1;    /* do at least one iteration */
                for (;;)
                {
                        notifyInterruptEnabled = 1;
-                       if (! notifyInterruptOccurred)
+                       if (!notifyInterruptOccurred)
                                break;
                        notifyInterruptEnabled = 0;
                        if (notifyInterruptOccurred)
                        {
                                /* Here, it is finally safe to do stuff. */
-                               TPRINTF(TRACE_NOTIFY,
-                                               "Async_NotifyHandler: perform async notify");
+                               if (Trace_notify)
+                                       elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
+
                                ProcessIncomingNotify();
-                               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
+
+                               if (Trace_notify)
+                                       elog(DEBUG1, "NotifyInterruptHandler: done");
                        }
                }
+
+               /*
+                * Restore ImmediateInterruptOK, and check for interrupts if
+                * needed.
+                */
+               ImmediateInterruptOK = save_ImmediateInterruptOK;
+               if (save_ImmediateInterruptOK)
+                       CHECK_FOR_INTERRUPTS();
        }
        else
        {
-               /* In this path it is NOT SAFE to do much of anything, except this: */
+               /*
+                * In this path it is NOT SAFE to do much of anything, except
+                * this:
+                */
                notifyInterruptOccurred = 1;
        }
+
+       errno = save_errno;
 }
 
 /*
  * --------------------------------------------------------------
- * EnableNotifyInterrupt 
+ * EnableNotifyInterrupt
  *
  *             This is called by the PostgresMain main loop just before waiting
  *             for a frontend command.  If we are truly idle (ie, *not* inside
@@ -639,11 +772,10 @@ Async_NotifyHandler(SIGNAL_ARGS)
  *             PostgresMain calls this the first time.
  * --------------------------------------------------------------
  */
-
 void
 EnableNotifyInterrupt(void)
 {
-       if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+       if (IsTransactionOrTransactionBlock())
                return;                                 /* not really idle */
 
        /*
@@ -652,57 +784,68 @@ EnableNotifyInterrupt(void)
         * notifyInterruptOccurred and then set notifyInterruptEnabled, we
         * could fail to respond promptly to a signal that happens in between
         * those two steps.  (A very small time window, perhaps, but Murphy's
-        * Law says you can hit it...)  Instead, we first set the enable flag,
-        * then test the occurred flag.  If we see an unserviced interrupt
-        * has occurred, we re-clear the enable flag before going off to do
-        * the service work.  (That prevents re-entrant invocation of
-        * ProcessIncomingNotify() if another interrupt occurs.)
-        * If an interrupt comes in between the setting and clearing of
-        * notifyInterruptEnabled, then it will have done the service
-        * work and left notifyInterruptOccurred zero, so we have to check
-        * again after clearing enable.  The whole thing has to be in a loop
-        * in case another interrupt occurs while we're servicing the first.
-        * Once we get out of the loop, enable is set and we know there is no
+        * Law says you can hit it...)  Instead, we first set the enable flag,
+        * then test the occurred flag.  If we see an unserviced interrupt has
+        * occurred, we re-clear the enable flag before going off to do the
+        * service work.  (That prevents re-entrant invocation of
+        * ProcessIncomingNotify() if another interrupt occurs.) If an
+        * interrupt comes in between the setting and clearing of
+        * notifyInterruptEnabled, then it will have done the service work and
+        * left notifyInterruptOccurred zero, so we have to check again after
+        * clearing enable.  The whole thing has to be in a loop in case
+        * another interrupt occurs while we're servicing the first. Once we
+        * get out of the loop, enable is set and we know there is no
         * unserviced interrupt.
         *
         * NB: an overenthusiastic optimizing compiler could easily break this
-        * code.  Hopefully, they all understand what "volatile" means these days.
+        * code.  Hopefully, they all understand what "volatile" means these
+        * days.
         */
        for (;;)
        {
                notifyInterruptEnabled = 1;
-               if (! notifyInterruptOccurred)
+               if (!notifyInterruptOccurred)
                        break;
                notifyInterruptEnabled = 0;
                if (notifyInterruptOccurred)
                {
-                       TPRINTF(TRACE_NOTIFY,
-                                       "EnableNotifyInterrupt: perform async notify");
+                       if (Trace_notify)
+                               elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
+
                        ProcessIncomingNotify();
-                       TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "EnableNotifyInterrupt: done");
                }
        }
 }
 
 /*
  * --------------------------------------------------------------
- * DisableNotifyInterrupt 
+ * DisableNotifyInterrupt
  *
  *             This is called by the PostgresMain main loop just after receiving
  *             a frontend command.  Signal handler execution of inbound notifies
  *             is disabled until the next EnableNotifyInterrupt call.
+ *
+ *             The SIGUSR1 signal handler also needs to call this, so as to
+ *             prevent conflicts if one signal interrupts the other.  So we
+ *             must return the previous state of the flag.
  * --------------------------------------------------------------
  */
-
-void
+bool
 DisableNotifyInterrupt(void)
 {
+       bool    result = (notifyInterruptEnabled != 0);
+
        notifyInterruptEnabled = 0;
+
+       return result;
 }
 
 /*
  * --------------------------------------------------------------
- * ProcessIncomingNotify 
+ * ProcessIncomingNotify
  *
  *             Deal with arriving NOTIFYs from other backends.
  *             This is called either directly from the SIGUSR2 signal handler,
@@ -711,10 +854,6 @@ DisableNotifyInterrupt(void)
  *             and clear the notification field in pg_listener until next time.
  *
  *             NOTE: since we are outside any transaction, we must create our own.
- *
- * Results:
- *             XXX
- *
  * --------------------------------------------------------------
  */
 static void
@@ -723,34 +862,35 @@ ProcessIncomingNotify(void)
        Relation        lRel;
        TupleDesc       tdesc;
        ScanKeyData key[1];
-       HeapScanDesc sRel;
+       HeapScanDesc scan;
        HeapTuple       lTuple,
                                rTuple;
-       Datum           d,
-                               value[Natts_pg_listener];
+       Datum           value[Natts_pg_listener];
        char            repl[Natts_pg_listener],
                                nulls[Natts_pg_listener];
-       bool            isnull;
-       char       *relname;
-       int32           sourcePID;
+       bool            catchup_enabled;
+
+       /* Must prevent SIGUSR1 interrupt while I am running */
+       catchup_enabled = DisableCatchupInterrupt();
 
-       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
-       PS_SET_STATUS("async_notify");
+       if (Trace_notify)
+               elog(DEBUG1, "ProcessIncomingNotify");
+
+       set_ps_display("notify interrupt");
 
        notifyInterruptOccurred = 0;
 
        StartTransactionCommand();
 
-       lRel = heap_openr(ListenerRelationName);
-       LockRelation(lRel, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
        tdesc = RelationGetDescr(lRel);
 
        /* Scan only entries with my listenerPID */
-       ScanKeyEntryInitialize(&key[0], 0,
-                                                  Anum_pg_listener_pid,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(MyProcPid));
-       sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
+       scan = heap_beginscan(lRel, SnapshotNow, 1, key);
 
        /* Prepare data for rewriting 0 into notification field */
        nulls[0] = nulls[1] = nulls[2] = ' ';
@@ -759,105 +899,121 @@ ProcessIncomingNotify(void)
        value[0] = value[1] = value[2] = (Datum) 0;
        value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
 
-       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+       while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
-               sourcePID = DatumGetInt32(d);
+               Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+               char       *relname = NameStr(listener->relname);
+               int32           sourcePID = listener->notification;
+
                if (sourcePID != 0)
                {
-                       d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
-                       relname = (char *) DatumGetPointer(d);
                        /* Notify the frontend */
-                       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
-                                       relname, (int) sourcePID);
+
+                       if (Trace_notify)
+                               elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
+                                        relname, (int) sourcePID);
+
                        NotifyMyFrontEnd(relname, sourcePID);
-                       /* Rewrite the tuple with 0 in notification column */
+                       /*
+                        * Rewrite the tuple with 0 in notification column.
+                        *
+                        * simple_heap_update is safe here because no one else would
+                        * have tried to UNLISTEN us, so there can be no uncommitted
+                        * changes.
+                        */
                        rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-                       heap_replace(lRel, &lTuple->t_self, rTuple, NULL);
+                       simple_heap_update(lRel, &lTuple->t_self, rTuple);
+
+#ifdef NOT_USED                                        /* currently there are no indexes */
+                       CatalogUpdateIndexes(lRel, rTuple);
+#endif
                }
        }
-       heap_endscan(sRel);
+       heap_endscan(scan);
+
        /*
-        * We do not do RelationUnsetLockForWrite(lRel) here, because the
-        * transaction is about to be committed anyway.
+        * We do NOT release the lock on pg_listener here; we need to hold it
+        * until end of transaction (which is about to happen, anyway) to
+        * ensure that other backends see our tuple updates when they look.
+        * Otherwise, a transaction started after this one might mistakenly
+        * think it doesn't need to send this backend a new NOTIFY.
         */
-       heap_close(lRel);
+       heap_close(lRel, NoLock);
 
        CommitTransactionCommand();
 
-       /* Must flush the notify messages to ensure frontend gets them promptly. */
+       /*
+        * Must flush the notify messages to ensure frontend gets them
+        * promptly.
+        */
        pq_flush();
 
-       PS_SET_STATUS("idle");
-       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
-}
+       set_ps_display("idle");
+
+       if (Trace_notify)
+               elog(DEBUG1, "ProcessIncomingNotify: done");
 
-/* Send NOTIFY message to my front end. */
+       if (catchup_enabled)
+               EnableCatchupInterrupt();
+}
 
+/*
+ * Send NOTIFY message to my front end.
+ */
 static void
 NotifyMyFrontEnd(char *relname, int32 listenerPID)
 {
        if (whereToSendOutput == Remote)
        {
                StringInfoData buf;
-               pq_beginmessage(&buf);
-               pq_sendbyte(&buf, 'A');
+
+               pq_beginmessage(&buf, 'A');
                pq_sendint(&buf, listenerPID, sizeof(int32));
-               pq_sendstring(&buf, relname, strlen(relname));
+               pq_sendstring(&buf, relname);
+               if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
+               {
+                       /* XXX Add parameter string here later */
+                       pq_sendstring(&buf, "");
+               }
                pq_endmessage(&buf);
-               /* NOTE: we do not do pq_flush() here.  For a self-notify, it will
+
+               /*
+                * NOTE: we do not do pq_flush() here.  For a self-notify, it will
                 * happen at the end of the transaction, and for incoming notifies
-                * ProcessIncomingNotify will do it after finding all the notifies.
+                * ProcessIncomingNotify will do it after finding all the
+                * notifies.
                 */
        }
        else
-       {
-               elog(NOTICE, "NOTIFY for %s", relname);
-       }
+               elog(INFO, "NOTIFY for %s", relname);
 }
 
-/* Does pendingNotifies include the given relname?
- *
- * NB: not called unless pendingNotifies != NULL.
- */
-
-static int
-AsyncExistsPendingNotify(char *relname)
+/* Does pendingNotifies include the given relname? */
+static bool
+AsyncExistsPendingNotify(const char *relname)
 {
-       Dlelem     *p;
+       ListCell   *p;
 
-       for (p = DLGetHead(pendingNotifies);
-                p != NULL;
-                p = DLGetSucc(p))
+       foreach(p, pendingNotifies)
        {
                /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
-               if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
-                       return 1;
+               if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
+                       return true;
        }
 
-       return 0;
+       return false;
 }
 
 /* Clear the pendingNotifies list. */
-
 static void
-ClearPendingNotifies()
+ClearPendingNotifies(void)
 {
-       Dlelem     *p;
-
-       if (pendingNotifies)
-       {
-               /* Since the referenced strings are malloc'd, we have to scan the
-                * list and delete them individually.  If we used palloc for the
-                * strings then we could just do DLFreeList to get rid of both
-                * the list nodes and the list base...
-                */
-               while ((p = DLRemHead(pendingNotifies)) != NULL)
-               {
-                       free(DLE_VAL(p));
-                       DLFreeElem(p);
-               }
-               DLFreeList(pendingNotifies);
-               pendingNotifies = NULL;
-       }
+       /*
+        * We used to have to explicitly deallocate the list members and
+        * nodes, because they were malloc'd.  Now, since we know they are
+        * palloc'd in CurTransactionContext, we need not do that --- they'll
+        * go away automatically at transaction exit.  We need only reset the
+        * list head pointer.
+        */
+       pendingNotifies = NIL;
 }