* async.c
* Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.80 2001/09/08 01:10:20 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* transaction, since by assumption it is only called from outside any
* transaction.
*
- * Although we grab AccessExclusiveLock on pg_listener for any operation,
+ * Although we grab ExclusiveLock on pg_listener for any operation,
* the lock is never held very long, so it shouldn't cause too much of
- * a performance problem.
+ * a performance problem. (Previously we used AccessExclusiveLock, but
+ * there's no real reason to forbid concurrent reads.)
*
* An application that listens on the same relname it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
#include <unistd.h>
#include <signal.h>
#include <errno.h>
-#include <sys/types.h>
#include <netinet/in.h>
#include "access/heapam.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/fmgroids.h"
#include "utils/ps_status.h"
#include "utils/syscache.h"
-/* stuff that we really ought not be touching directly :-( */
-extern TransactionState CurrentTransactionState;
-
-
/*
* State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
* and unless the transaction commits. pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction. The List nodes and
- * referenced strings are all palloc'd in TopTransactionContext.
+ * NOTIFYs have been done in the current transaction.
+ *
+ * The list is kept in CurTransactionContext. In subtransactions, each
+ * subtransaction has its own list in its own CurTransactionContext, but
+ * successful subtransactions attach their lists to their parent's list.
+ * Failed subtransactions simply discard their lists.
*/
static List *pendingNotifies = NIL;
+static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+
/*
* State for inbound notifies consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
-bool Trace_notify = false;
+bool Trace_notify = false;
static void Async_UnlistenAll(void);
-static void Async_UnlistenOnExit(void);
+static void Async_UnlistenOnExit(int code, Datum arg);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
static bool AsyncExistsPendingNotify(const char *relname);
Async_Notify(char *relname)
{
if (Trace_notify)
- elog(DEBUG, "Async_Notify: %s", relname);
+ elog(DEBUG1, "Async_Notify(%s)", relname);
/* no point in making duplicate entries in the list ... */
- if (! AsyncExistsPendingNotify(relname))
+ if (!AsyncExistsPendingNotify(relname))
{
/*
- * The name list needs to live until end of transaction, so
- * store it in the top transaction context.
+ * The name list needs to live until end of transaction, so store
+ * it in the transaction context.
*/
- MemoryContext oldcontext;
+ MemoryContext oldcontext;
- oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+ oldcontext = MemoryContextSwitchTo(CurTransactionContext);
pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
bool alreadyListener = false;
if (Trace_notify)
- elog(DEBUG, "Async_Listen: %s", relname);
+ elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
/* Detect whether we are already listening on this relname */
- scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
- while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
if (listener->listenerpid == pid &&
- strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
alreadyListener = true;
/* No need to scan the rest of the table */
if (alreadyListener)
{
- heap_close(lRel, AccessExclusiveLock);
- elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+ heap_close(lRel, ExclusiveLock);
return;
}
values[i++] = (Datum) 0; /* no notifies pending */
tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
- heap_insert(lRel, tuple);
+ simple_heap_insert(lRel, tuple);
#ifdef NOT_USED /* currently there are no indexes */
- if (RelationGetForm(lRel)->relhasindex)
- {
- Relation idescs[Num_pg_listener_indices];
-
- CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
- CatalogCloseIndices(Num_pg_listener_indices, idescs);
- }
+ CatalogUpdateIndexes(lRel, tuple);
#endif
heap_freetuple(tuple);
- heap_close(lRel, AccessExclusiveLock);
+ heap_close(lRel, ExclusiveLock);
/*
* now that we are listening, make sure we will unlisten before dying.
}
if (Trace_notify)
- elog(DEBUG, "Async_Unlisten %s", relname);
+ elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
- scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
- while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
if (listener->listenerpid == pid &&
- strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
/* Found the matching tuple, delete it */
simple_heap_delete(lRel, &tuple->t_self);
+
/*
- * We assume there can be only one match, so no need
- * to scan the rest of the table
+ * We assume there can be only one match, so no need to scan
+ * the rest of the table
*/
break;
}
}
heap_endscan(scan);
- heap_close(lRel, AccessExclusiveLock);
+ heap_close(lRel, ExclusiveLock);
/*
* We do not complain about unlistening something not being listened;
ScanKeyData key[1];
if (Trace_notify)
- elog(DEBUG, "Async_UnlistenAll");
+ elog(DEBUG1, "Async_UnlistenAll");
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
/* Find and delete all entries with my listenerPID */
- ScanKeyEntryInitialize(&key[0], 0,
- Anum_pg_listener_pid,
- F_INT4EQ,
- Int32GetDatum(MyProcPid));
- scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ ScanKeyInit(&key[0],
+ Anum_pg_listener_pid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ scan = heap_beginscan(lRel, SnapshotNow, 1, key);
- while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
simple_heap_delete(lRel, &lTuple->t_self);
heap_endscan(scan);
- heap_close(lRel, AccessExclusiveLock);
+ heap_close(lRel, ExclusiveLock);
}
/*
*--------------------------------------------------------------
*/
static void
-Async_UnlistenOnExit(void)
+Async_UnlistenOnExit(int code, Datum arg)
{
/*
* We need to start/commit a transaction for the unlisten, but if
}
if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify");
+ elog(DEBUG1, "AtCommit_Notify");
/* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' ';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
- scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
- while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
int32 listenerPID = listener->listenerpid;
- if (! AsyncExistsPendingNotify(relname))
+ if (!AsyncExistsPendingNotify(relname))
continue;
if (listenerPID == MyProcPid)
{
/*
- * Self-notify: no need to bother with table update.
- * Indeed, we *must not* clear the notification field in
- * this path, or we could lose an outside notify, which'd
- * be bad for applications that ignore self-notify messages.
+ * Self-notify: no need to bother with table update. Indeed,
+ * we *must not* clear the notification field in this path, or
+ * we could lose an outside notify, which'd be bad for
+ * applications that ignore self-notify messages.
*/
if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify: notifying self");
+ elog(DEBUG1, "AtCommit_Notify: notifying self");
NotifyMyFrontEnd(relname, listenerPID);
}
else
{
if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify: notifying pid %d",
+ elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
listenerPID);
/*
* If someone has already notified this listener, we don't
- * bother modifying the table, but we do still send a
- * SIGUSR2 signal, just in case that backend missed the
- * earlier signal for some reason. It's OK to send the
- * signal first, because the other guy can't read
- * pg_listener until we unlock it.
+ * bother modifying the table, but we do still send a SIGUSR2
+ * signal, just in case that backend missed the earlier signal
+ * for some reason. It's OK to send the signal first, because
+ * the other guy can't read pg_listener until we unlock it.
*/
if (kill(listenerPID, SIGUSR2) < 0)
{
/*
- * Get rid of pg_listener entry if it refers to a PID
- * that no longer exists. Presumably, that backend
- * crashed without deleting its pg_listener entries.
- * This code used to only delete the entry if
- * errno==ESRCH, but as far as I can see we should
- * just do it for any failure (certainly at least for
- * EPERM too...)
+ * Get rid of pg_listener entry if it refers to a PID that
+ * no longer exists. Presumably, that backend crashed
+ * without deleting its pg_listener entries. This code
+ * used to only delete the entry if errno==ESRCH, but as
+ * far as I can see we should just do it for any failure
+ * (certainly at least for EPERM too...)
*/
simple_heap_delete(lRel, &lTuple->t_self);
}
else if (listener->notification == 0)
{
+ ItemPointerData ctid;
+ int result;
+
rTuple = heap_modifytuple(lTuple, lRel,
value, nulls, repl);
- simple_heap_update(lRel, &lTuple->t_self, rTuple);
+ /*
+ * We cannot use simple_heap_update here because the tuple
+ * could have been modified by an uncommitted transaction;
+ * specifically, since UNLISTEN releases exclusive lock on
+ * the table before commit, the other guy could already have
+ * tried to unlisten. There are no other cases where we
+ * should be able to see an uncommitted update or delete.
+ * Therefore, our response to a HeapTupleBeingUpdated result
+ * is just to ignore it. We do *not* wait for the other
+ * guy to commit --- that would risk deadlock, and we don't
+ * want to block while holding the table lock anyway for
+ * performance reasons. We also ignore HeapTupleUpdated,
+ * which could occur if the other guy commits between our
+ * heap_getnext and heap_update calls.
+ */
+ result = heap_update(lRel, &lTuple->t_self, rTuple,
+ &ctid,
+ GetCurrentCommandId(), SnapshotAny,
+ false /* no wait for commit */);
+ switch (result)
+ {
+ case HeapTupleSelfUpdated:
+ /* Tuple was already updated in current command? */
+ elog(ERROR, "tuple already updated by self");
+ break;
+
+ case HeapTupleMayBeUpdated:
+ /* done successfully */
#ifdef NOT_USED /* currently there are no indexes */
- if (RelationGetForm(lRel)->relhasindex)
- {
- Relation idescs[Num_pg_listener_indices];
+ CatalogUpdateIndexes(lRel, rTuple);
+#endif
+ break;
- CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
- CatalogCloseIndices(Num_pg_listener_indices, idescs);
+ case HeapTupleBeingUpdated:
+ /* ignore uncommitted tuples */
+ break;
+
+ case HeapTupleUpdated:
+ /* ignore just-committed tuples */
+ break;
+
+ default:
+ elog(ERROR, "unrecognized heap_update status: %u",
+ result);
+ break;
}
-#endif
}
}
}
ClearPendingNotifies();
if (Trace_notify)
- elog(DEBUG, "AtCommit_Notify: done");
+ elog(DEBUG1, "AtCommit_Notify: done");
}
/*
ClearPendingNotifies();
}
+/*
+ * AtSubStart_Notify() --- Take care of subtransaction start.
+ *
+ * Push empty state for the new subtransaction.
+ */
+void
+AtSubStart_Notify(void)
+{
+ MemoryContext old_cxt;
+
+ /* Keep the list-of-lists in TopTransactionContext for simplicity */
+ old_cxt = MemoryContextSwitchTo(TopTransactionContext);
+
+ upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
+
+ pendingNotifies = NIL;
+
+ MemoryContextSwitchTo(old_cxt);
+}
+
+/*
+ * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ *
+ * Reassign all items in the pending notifies list to the parent transaction.
+ */
+void
+AtSubCommit_Notify(void)
+{
+ List *parentPendingNotifies;
+
+ parentPendingNotifies = (List *) linitial(upperPendingNotifies);
+ upperPendingNotifies = list_delete_first(upperPendingNotifies);
+
+ /*
+ * We could try to eliminate duplicates here, but it seems not worthwhile.
+ */
+ pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+}
+
+/*
+ * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ */
+void
+AtSubAbort_Notify(void)
+{
+ /*
+ * All we have to do is pop the stack --- the notifies made in this
+ * subxact are no longer interesting, and the space will be freed when
+ * CurTransactionContext is recycled.
+ */
+ pendingNotifies = (List *) linitial(upperPendingNotifies);
+ upperPendingNotifies = list_delete_first(upperPendingNotifies);
+}
+
/*
*--------------------------------------------------------------
- * Async_NotifyHandler
+ * NotifyInterruptHandler
*
* This is the signal handler for SIGUSR2.
*
*--------------------------------------------------------------
*/
void
-Async_NotifyHandler(SIGNAL_ARGS)
+NotifyInterruptHandler(SIGNAL_ARGS)
{
int save_errno = errno;
* ever turned on.
*/
+ /* Don't joggle the elbow of proc_exit */
+ if (proc_exit_inprogress)
+ return;
+
if (notifyInterruptEnabled)
{
+ bool save_ImmediateInterruptOK = ImmediateInterruptOK;
+
+ /*
+ * We may be called while ImmediateInterruptOK is true; turn it
+ * off while messing with the NOTIFY state. (We would have to
+ * save and restore it anyway, because PGSemaphore operations
+ * inside ProcessIncomingNotify() might reset it.)
+ */
+ ImmediateInterruptOK = false;
/*
* I'm not sure whether some flavors of Unix might allow another
{
/* Here, it is finally safe to do stuff. */
if (Trace_notify)
- elog(DEBUG, "Async_NotifyHandler: perform async notify");
+ elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
ProcessIncomingNotify();
if (Trace_notify)
- elog(DEBUG, "Async_NotifyHandler: done");
+ elog(DEBUG1, "NotifyInterruptHandler: done");
}
}
+
+ /*
+ * Restore ImmediateInterruptOK, and check for interrupts if
+ * needed.
+ */
+ ImmediateInterruptOK = save_ImmediateInterruptOK;
+ if (save_ImmediateInterruptOK)
+ CHECK_FOR_INTERRUPTS();
}
else
{
-
/*
* In this path it is NOT SAFE to do much of anything, except
* this:
void
EnableNotifyInterrupt(void)
{
- if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+ if (IsTransactionOrTransactionBlock())
return; /* not really idle */
/*
if (notifyInterruptOccurred)
{
if (Trace_notify)
- elog(DEBUG, "EnableNotifyInterrupt: perform async notify");
+ elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
ProcessIncomingNotify();
if (Trace_notify)
- elog(DEBUG, "EnableNotifyInterrupt: done");
+ elog(DEBUG1, "EnableNotifyInterrupt: done");
}
}
}
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of inbound notifies
* is disabled until the next EnableNotifyInterrupt call.
+ *
+ * The SIGUSR1 signal handler also needs to call this, so as to
+ * prevent conflicts if one signal interrupts the other. So we
+ * must return the previous state of the flag.
* --------------------------------------------------------------
*/
-void
+bool
DisableNotifyInterrupt(void)
{
+ bool result = (notifyInterruptEnabled != 0);
+
notifyInterruptEnabled = 0;
+
+ return result;
}
/*
* and clear the notification field in pg_listener until next time.
*
* NOTE: since we are outside any transaction, we must create our own.
- *
- * Results:
- * XXX
- *
* --------------------------------------------------------------
*/
static void
Datum value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
+ bool catchup_enabled;
+
+ /* Must prevent SIGUSR1 interrupt while I am running */
+ catchup_enabled = DisableCatchupInterrupt();
if (Trace_notify)
- elog(DEBUG, "ProcessIncomingNotify");
+ elog(DEBUG1, "ProcessIncomingNotify");
- set_ps_display("async_notify");
+ set_ps_display("notify interrupt");
notifyInterruptOccurred = 0;
StartTransactionCommand();
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ lRel = heap_openr(ListenerRelationName, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
/* Scan only entries with my listenerPID */
- ScanKeyEntryInitialize(&key[0], 0,
- Anum_pg_listener_pid,
- F_INT4EQ,
- Int32GetDatum(MyProcPid));
- scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ ScanKeyInit(&key[0],
+ Anum_pg_listener_pid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ scan = heap_beginscan(lRel, SnapshotNow, 1, key);
/* Prepare data for rewriting 0 into notification field */
nulls[0] = nulls[1] = nulls[2] = ' ';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
- while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
+ while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
/* Notify the frontend */
if (Trace_notify)
- elog(DEBUG, "ProcessIncomingNotify: received %s from %d",
+ elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
relname, (int) sourcePID);
NotifyMyFrontEnd(relname, sourcePID);
- /* Rewrite the tuple with 0 in notification column */
+ /*
+ * Rewrite the tuple with 0 in notification column.
+ *
+ * simple_heap_update is safe here because no one else would
+ * have tried to UNLISTEN us, so there can be no uncommitted
+ * changes.
+ */
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
- if (RelationGetForm(lRel)->relhasindex)
- {
- Relation idescs[Num_pg_listener_indices];
-
- CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
- CatalogCloseIndices(Num_pg_listener_indices, idescs);
- }
+ CatalogUpdateIndexes(lRel, rTuple);
#endif
}
}
set_ps_display("idle");
if (Trace_notify)
- elog(DEBUG, "ProcessIncomingNotify: done");
+ elog(DEBUG1, "ProcessIncomingNotify: done");
+
+ if (catchup_enabled)
+ EnableCatchupInterrupt();
}
/*
{
StringInfoData buf;
- pq_beginmessage(&buf);
- pq_sendbyte(&buf, 'A');
+ pq_beginmessage(&buf, 'A');
pq_sendint(&buf, listenerPID, sizeof(int32));
pq_sendstring(&buf, relname);
+ if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
+ {
+ /* XXX Add parameter string here later */
+ pq_sendstring(&buf, "");
+ }
pq_endmessage(&buf);
/*
*/
}
else
- elog(NOTICE, "NOTIFY for %s", relname);
+ elog(INFO, "NOTIFY for %s", relname);
}
/* Does pendingNotifies include the given relname? */
static bool
AsyncExistsPendingNotify(const char *relname)
{
- List *p;
+ ListCell *p;
foreach(p, pendingNotifies)
{
ClearPendingNotifies(void)
{
/*
- * We used to have to explicitly deallocate the list members and nodes,
- * because they were malloc'd. Now, since we know they are palloc'd
- * in TopTransactionContext, we need not do that --- they'll go away
- * automatically at transaction exit. We need only reset the list head
- * pointer.
+ * We used to have to explicitly deallocate the list members and
+ * nodes, because they were malloc'd. Now, since we know they are
+ * palloc'd in CurTransactionContext, we need not do that --- they'll
+ * go away automatically at transaction exit. We need only reset the
+ * list head pointer.
*/
pendingNotifies = NIL;
}