]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/async.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / commands / async.c
index 7ed3663a48f394a91fef77b3ed1b99a2b592964c..8e53d6af7d79d57b183e12e66bf3086a32ef0537 100644 (file)
@@ -3,11 +3,11 @@
  * async.c
  *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.86 2002/05/21 22:05:54 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  *       transaction, since by assumption it is only called from outside any
  *       transaction.
  *
- * Although we grab AccessExclusiveLock on pg_listener for any operation,
+ * Although we grab ExclusiveLock on pg_listener for any operation,
  * the lock is never held very long, so it shouldn't cause too much of
- * a performance problem.
+ * a performance problem.  (Previously we used AccessExclusiveLock, but
+ * there's no real reason to forbid concurrent reads.)
  *
  * An application that listens on the same relname it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
@@ -75,7 +76,6 @@
 #include <unistd.h>
 #include <signal.h>
 #include <errno.h>
-#include <sys/types.h>
 #include <netinet/in.h>
 
 #include "access/heapam.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
+#include "storage/sinval.h"
 #include "tcop/tcopprot.h"
 #include "utils/fmgroids.h"
 #include "utils/ps_status.h"
 #include "utils/syscache.h"
 
 
-/* stuff that we really ought not be touching directly :-( */
-extern TransactionState CurrentTransactionState;
-
-
 /*
  * State for outbound notifies consists of a list of all relnames NOTIFYed
  * in the current transaction. We do not actually perform a NOTIFY until
  * and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.  The List nodes and
- * referenced strings are all palloc'd in TopTransactionContext.
+ * NOTIFYs have been done in the current transaction.
+ *
+ * The list is kept in CurTransactionContext.  In subtransactions, each
+ * subtransaction has its own list in its own CurTransactionContext, but
+ * successful subtransactions attach their lists to their parent's list.
+ * Failed subtransactions simply discard their lists.
  */
 static List *pendingNotifies = NIL;
 
+static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+
 /*
  * State for inbound notifies consists of two flags: one saying whether
  * the signal handler is currently allowed to call ProcessIncomingNotify
@@ -125,7 +128,7 @@ bool                Trace_notify = false;
 
 
 static void Async_UnlistenAll(void);
-static void Async_UnlistenOnExit(void);
+static void Async_UnlistenOnExit(int code, Datum arg);
 static void ProcessIncomingNotify(void);
 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
 static bool AsyncExistsPendingNotify(const char *relname);
@@ -151,18 +154,18 @@ void
 Async_Notify(char *relname)
 {
        if (Trace_notify)
-               elog(LOG, "Async_Notify: %s", relname);
+               elog(DEBUG1, "Async_Notify(%s)", relname);
 
        /* no point in making duplicate entries in the list ... */
        if (!AsyncExistsPendingNotify(relname))
        {
                /*
                 * The name list needs to live until end of transaction, so store
-                * it in the top transaction context.
+                * it in the transaction context.
                 */
                MemoryContext oldcontext;
 
-               oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+               oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
                pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
 
@@ -199,12 +202,12 @@ Async_Listen(char *relname, int pid)
        bool            alreadyListener = false;
 
        if (Trace_notify)
-               elog(LOG, "Async_Listen: %s", relname);
+               elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
 
-       lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
 
        /* Detect whether we are already listening on this relname */
-       scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
                Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
@@ -221,8 +224,7 @@ Async_Listen(char *relname, int pid)
 
        if (alreadyListener)
        {
-               heap_close(lRel, AccessExclusiveLock);
-               elog(WARNING, "Async_Listen: We are already listening on %s", relname);
+               heap_close(lRel, ExclusiveLock);
                return;
        }
 
@@ -245,19 +247,12 @@ Async_Listen(char *relname, int pid)
        simple_heap_insert(lRel, tuple);
 
 #ifdef NOT_USED                                        /* currently there are no indexes */
-       if (RelationGetForm(lRel)->relhasindex)
-       {
-               Relation        idescs[Num_pg_listener_indices];
-
-               CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
-               CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
-               CatalogCloseIndices(Num_pg_listener_indices, idescs);
-       }
+       CatalogUpdateIndexes(lRel, tuple);
 #endif
 
        heap_freetuple(tuple);
 
-       heap_close(lRel, AccessExclusiveLock);
+       heap_close(lRel, ExclusiveLock);
 
        /*
         * now that we are listening, make sure we will unlisten before dying.
@@ -301,11 +296,11 @@ Async_Unlisten(char *relname, int pid)
        }
 
        if (Trace_notify)
-               elog(LOG, "Async_Unlisten %s", relname);
+               elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
 
-       lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
 
-       scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
                Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
@@ -325,7 +320,7 @@ Async_Unlisten(char *relname, int pid)
        }
        heap_endscan(scan);
 
-       heap_close(lRel, AccessExclusiveLock);
+       heap_close(lRel, ExclusiveLock);
 
        /*
         * We do not complain about unlistening something not being listened;
@@ -359,23 +354,23 @@ Async_UnlistenAll(void)
        ScanKeyData key[1];
 
        if (Trace_notify)
-               elog(LOG, "Async_UnlistenAll");
+               elog(DEBUG1, "Async_UnlistenAll");
 
-       lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
        tdesc = RelationGetDescr(lRel);
 
        /* Find and delete all entries with my listenerPID */
-       ScanKeyEntryInitialize(&key[0], 0,
-                                                  Anum_pg_listener_pid,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(MyProcPid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
        scan = heap_beginscan(lRel, SnapshotNow, 1, key);
 
        while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
                simple_heap_delete(lRel, &lTuple->t_self);
 
        heap_endscan(scan);
-       heap_close(lRel, AccessExclusiveLock);
+       heap_close(lRel, ExclusiveLock);
 }
 
 /*
@@ -397,7 +392,7 @@ Async_UnlistenAll(void)
  *--------------------------------------------------------------
  */
 static void
-Async_UnlistenOnExit(void)
+Async_UnlistenOnExit(int code, Datum arg)
 {
        /*
         * We need to start/commit a transaction for the unlisten, but if
@@ -461,7 +456,7 @@ AtCommit_Notify(void)
        }
 
        if (Trace_notify)
-               elog(LOG, "AtCommit_Notify");
+               elog(DEBUG1, "AtCommit_Notify");
 
        /* preset data to update notify column to MyProcPid */
        nulls[0] = nulls[1] = nulls[2] = ' ';
@@ -470,9 +465,9 @@ AtCommit_Notify(void)
        value[0] = value[1] = value[2] = (Datum) 0;
        value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
 
-       lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
        tdesc = RelationGetDescr(lRel);
-       scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
+       scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
 
        while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
@@ -493,14 +488,14 @@ AtCommit_Notify(void)
                         */
 
                        if (Trace_notify)
-                               elog(LOG, "AtCommit_Notify: notifying self");
+                               elog(DEBUG1, "AtCommit_Notify: notifying self");
 
                        NotifyMyFrontEnd(relname, listenerPID);
                }
                else
                {
                        if (Trace_notify)
-                               elog(LOG, "AtCommit_Notify: notifying pid %d",
+                               elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
                                         listenerPID);
 
                        /*
@@ -524,20 +519,58 @@ AtCommit_Notify(void)
                        }
                        else if (listener->notification == 0)
                        {
+                               ItemPointerData ctid;
+                               int                     result;
+
                                rTuple = heap_modifytuple(lTuple, lRel,
                                                                                  value, nulls, repl);
-                               simple_heap_update(lRel, &lTuple->t_self, rTuple);
+                               /*
+                                * We cannot use simple_heap_update here because the tuple
+                                * could have been modified by an uncommitted transaction;
+                                * specifically, since UNLISTEN releases exclusive lock on
+                                * the table before commit, the other guy could already have
+                                * tried to unlisten.  There are no other cases where we
+                                * should be able to see an uncommitted update or delete.
+                                * Therefore, our response to a HeapTupleBeingUpdated result
+                                * is just to ignore it.  We do *not* wait for the other
+                                * guy to commit --- that would risk deadlock, and we don't
+                                * want to block while holding the table lock anyway for
+                                * performance reasons.  We also ignore HeapTupleUpdated,
+                                * which could occur if the other guy commits between our
+                                * heap_getnext and heap_update calls.
+                                */
+                               result = heap_update(lRel, &lTuple->t_self, rTuple,
+                                                                        &ctid,
+                                                                        GetCurrentCommandId(), SnapshotAny,
+                                                                        false /* no wait for commit */);
+                               switch (result)
+                               {
+                                       case HeapTupleSelfUpdated:
+                                               /* Tuple was already updated in current command? */
+                                               elog(ERROR, "tuple already updated by self");
+                                               break;
+
+                                       case HeapTupleMayBeUpdated:
+                                               /* done successfully */
 
 #ifdef NOT_USED                                        /* currently there are no indexes */
-                               if (RelationGetForm(lRel)->relhasindex)
-                               {
-                                       Relation        idescs[Num_pg_listener_indices];
+                                               CatalogUpdateIndexes(lRel, rTuple);
+#endif
+                                               break;
+
+                                       case HeapTupleBeingUpdated:
+                                               /* ignore uncommitted tuples */
+                                               break;
 
-                                       CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
-                                       CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
-                                       CatalogCloseIndices(Num_pg_listener_indices, idescs);
+                                       case HeapTupleUpdated:
+                                               /* ignore just-committed tuples */
+                                               break;
+
+                                       default:
+                                               elog(ERROR, "unrecognized heap_update status: %u",
+                                                        result);
+                                               break;
                                }
-#endif
                        }
                }
        }
@@ -556,7 +589,7 @@ AtCommit_Notify(void)
        ClearPendingNotifies();
 
        if (Trace_notify)
-               elog(LOG, "AtCommit_Notify: done");
+               elog(DEBUG1, "AtCommit_Notify: done");
 }
 
 /*
@@ -579,9 +612,63 @@ AtAbort_Notify(void)
        ClearPendingNotifies();
 }
 
+/*
+ * AtSubStart_Notify() --- Take care of subtransaction start.
+ *
+ * Push empty state for the new subtransaction.
+ */
+void
+AtSubStart_Notify(void)
+{
+       MemoryContext   old_cxt;
+
+       /* Keep the list-of-lists in TopTransactionContext for simplicity */
+       old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+       upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
+
+       pendingNotifies = NIL;
+
+       MemoryContextSwitchTo(old_cxt);
+}
+
+/*
+ * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ *
+ * Reassign all items in the pending notifies list to the parent transaction.
+ */
+void
+AtSubCommit_Notify(void)
+{
+       List    *parentPendingNotifies;
+
+       parentPendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+
+       /*
+        * We could try to eliminate duplicates here, but it seems not worthwhile.
+        */
+       pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+}
+
+/*
+ * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ */
+void
+AtSubAbort_Notify(void)
+{
+       /*
+        * All we have to do is pop the stack --- the notifies made in this
+        * subxact are no longer interesting, and the space will be freed when
+        * CurTransactionContext is recycled.
+        */
+       pendingNotifies = (List *) linitial(upperPendingNotifies);
+       upperPendingNotifies = list_delete_first(upperPendingNotifies);
+}
+
 /*
  *--------------------------------------------------------------
- * Async_NotifyHandler
+ * NotifyInterruptHandler
  *
  *             This is the signal handler for SIGUSR2.
  *
@@ -597,7 +684,7 @@ AtAbort_Notify(void)
  *--------------------------------------------------------------
  */
 void
-Async_NotifyHandler(SIGNAL_ARGS)
+NotifyInterruptHandler(SIGNAL_ARGS)
 {
        int                     save_errno = errno;
 
@@ -608,8 +695,22 @@ Async_NotifyHandler(SIGNAL_ARGS)
         * ever turned on.
         */
 
+       /* Don't joggle the elbow of proc_exit */
+       if (proc_exit_inprogress)
+               return;
+
        if (notifyInterruptEnabled)
        {
+               bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
+
+               /*
+                * We may be called while ImmediateInterruptOK is true; turn it
+                * off while messing with the NOTIFY state.  (We would have to
+                * save and restore it anyway, because PGSemaphore operations
+                * inside ProcessIncomingNotify() might reset it.)
+                */
+               ImmediateInterruptOK = false;
+
                /*
                 * I'm not sure whether some flavors of Unix might allow another
                 * SIGUSR2 occurrence to recursively interrupt this routine. To
@@ -629,14 +730,22 @@ Async_NotifyHandler(SIGNAL_ARGS)
                        {
                                /* Here, it is finally safe to do stuff. */
                                if (Trace_notify)
-                                       elog(LOG, "Async_NotifyHandler: perform async notify");
+                                       elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
 
                                ProcessIncomingNotify();
 
                                if (Trace_notify)
-                                       elog(LOG, "Async_NotifyHandler: done");
+                                       elog(DEBUG1, "NotifyInterruptHandler: done");
                        }
                }
+
+               /*
+                * Restore ImmediateInterruptOK, and check for interrupts if
+                * needed.
+                */
+               ImmediateInterruptOK = save_ImmediateInterruptOK;
+               if (save_ImmediateInterruptOK)
+                       CHECK_FOR_INTERRUPTS();
        }
        else
        {
@@ -666,7 +775,7 @@ Async_NotifyHandler(SIGNAL_ARGS)
 void
 EnableNotifyInterrupt(void)
 {
-       if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+       if (IsTransactionOrTransactionBlock())
                return;                                 /* not really idle */
 
        /*
@@ -701,12 +810,12 @@ EnableNotifyInterrupt(void)
                if (notifyInterruptOccurred)
                {
                        if (Trace_notify)
-                               elog(LOG, "EnableNotifyInterrupt: perform async notify");
+                               elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
 
                        ProcessIncomingNotify();
 
                        if (Trace_notify)
-                               elog(LOG, "EnableNotifyInterrupt: done");
+                               elog(DEBUG1, "EnableNotifyInterrupt: done");
                }
        }
 }
@@ -718,12 +827,20 @@ EnableNotifyInterrupt(void)
  *             This is called by the PostgresMain main loop just after receiving
  *             a frontend command.  Signal handler execution of inbound notifies
  *             is disabled until the next EnableNotifyInterrupt call.
+ *
+ *             The SIGUSR1 signal handler also needs to call this, so as to
+ *             prevent conflicts if one signal interrupts the other.  So we
+ *             must return the previous state of the flag.
  * --------------------------------------------------------------
  */
-void
+bool
 DisableNotifyInterrupt(void)
 {
+       bool    result = (notifyInterruptEnabled != 0);
+
        notifyInterruptEnabled = 0;
+
+       return result;
 }
 
 /*
@@ -737,10 +854,6 @@ DisableNotifyInterrupt(void)
  *             and clear the notification field in pg_listener until next time.
  *
  *             NOTE: since we are outside any transaction, we must create our own.
- *
- * Results:
- *             XXX
- *
  * --------------------------------------------------------------
  */
 static void
@@ -755,24 +868,28 @@ ProcessIncomingNotify(void)
        Datum           value[Natts_pg_listener];
        char            repl[Natts_pg_listener],
                                nulls[Natts_pg_listener];
+       bool            catchup_enabled;
+
+       /* Must prevent SIGUSR1 interrupt while I am running */
+       catchup_enabled = DisableCatchupInterrupt();
 
        if (Trace_notify)
-               elog(LOG, "ProcessIncomingNotify");
+               elog(DEBUG1, "ProcessIncomingNotify");
 
-       set_ps_display("async_notify");
+       set_ps_display("notify interrupt");
 
        notifyInterruptOccurred = 0;
 
        StartTransactionCommand();
 
-       lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+       lRel = heap_openr(ListenerRelationName, ExclusiveLock);
        tdesc = RelationGetDescr(lRel);
 
        /* Scan only entries with my listenerPID */
-       ScanKeyEntryInitialize(&key[0], 0,
-                                                  Anum_pg_listener_pid,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(MyProcPid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_listener_pid,
+                               BTEqualStrategyNumber, F_INT4EQ,
+                               Int32GetDatum(MyProcPid));
        scan = heap_beginscan(lRel, SnapshotNow, 1, key);
 
        /* Prepare data for rewriting 0 into notification field */
@@ -793,23 +910,22 @@ ProcessIncomingNotify(void)
                        /* Notify the frontend */
 
                        if (Trace_notify)
-                               elog(LOG, "ProcessIncomingNotify: received %s from %d",
+                               elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
                                         relname, (int) sourcePID);
 
                        NotifyMyFrontEnd(relname, sourcePID);
-                       /* Rewrite the tuple with 0 in notification column */
+                       /*
+                        * Rewrite the tuple with 0 in notification column.
+                        *
+                        * simple_heap_update is safe here because no one else would
+                        * have tried to UNLISTEN us, so there can be no uncommitted
+                        * changes.
+                        */
                        rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
                        simple_heap_update(lRel, &lTuple->t_self, rTuple);
 
 #ifdef NOT_USED                                        /* currently there are no indexes */
-                       if (RelationGetForm(lRel)->relhasindex)
-                       {
-                               Relation        idescs[Num_pg_listener_indices];
-
-                               CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
-                               CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
-                               CatalogCloseIndices(Num_pg_listener_indices, idescs);
-                       }
+                       CatalogUpdateIndexes(lRel, rTuple);
 #endif
                }
        }
@@ -835,7 +951,10 @@ ProcessIncomingNotify(void)
        set_ps_display("idle");
 
        if (Trace_notify)
-               elog(LOG, "ProcessIncomingNotify: done");
+               elog(DEBUG1, "ProcessIncomingNotify: done");
+
+       if (catchup_enabled)
+               EnableCatchupInterrupt();
 }
 
 /*
@@ -848,10 +967,14 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID)
        {
                StringInfoData buf;
 
-               pq_beginmessage(&buf);
-               pq_sendbyte(&buf, 'A');
+               pq_beginmessage(&buf, 'A');
                pq_sendint(&buf, listenerPID, sizeof(int32));
                pq_sendstring(&buf, relname);
+               if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
+               {
+                       /* XXX Add parameter string here later */
+                       pq_sendstring(&buf, "");
+               }
                pq_endmessage(&buf);
 
                /*
@@ -869,7 +992,7 @@ NotifyMyFrontEnd(char *relname, int32 listenerPID)
 static bool
 AsyncExistsPendingNotify(const char *relname)
 {
-       List       *p;
+       ListCell   *p;
 
        foreach(p, pendingNotifies)
        {
@@ -888,7 +1011,7 @@ ClearPendingNotifies(void)
        /*
         * We used to have to explicitly deallocate the list members and
         * nodes, because they were malloc'd.  Now, since we know they are
-        * palloc'd in TopTransactionContext, we need not do that --- they'll
+        * palloc'd in CurTransactionContext, we need not do that --- they'll
         * go away automatically at transaction exit.  We need only reset the
         * list head pointer.
         */