* async.c
* Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
- * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.61 2000/05/28 17:55:54 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.84 2002/05/05 00:03:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <sys/types.h>
#include <netinet/in.h>
-#include "postgres.h"
-
#include "access/heapam.h"
#include "catalog/catname.h"
-#include "catalog/indexing.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
-#include "lib/dllist.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "tcop/tcopprot.h"
#include "utils/fmgroids.h"
#include "utils/ps_status.h"
#include "utils/syscache.h"
-#include "utils/trace.h"
+
/* stuff that we really ought not be touching directly :-( */
extern TransactionState CurrentTransactionState;
-extern CommandDest whereToSendOutput;
+
/*
* State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
- * and unless the transaction commits. pendingNotifies is NULL if no
- * NOTIFYs have been done in the current transaction.
+ * and unless the transaction commits. pendingNotifies is NIL if no
+ * NOTIFYs have been done in the current transaction. The List nodes and
+ * referenced strings are all palloc'd in TopTransactionContext.
*/
-static Dllist *pendingNotifies = NULL;
+static List *pendingNotifies = NIL;
/*
* State for inbound notifies consists of two flags: one saying whether
static volatile int notifyInterruptEnabled = 0;
static volatile int notifyInterruptOccurred = 0;
-/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
-static int unlistenExitRegistered = 0;
+/* True if we've registered an on_shmem_exit cleanup */
+static bool unlistenExitRegistered = false;
+
+bool Trace_notify = false;
static void Async_UnlistenAll(void);
static void Async_UnlistenOnExit(void);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
-static int AsyncExistsPendingNotify(char *relname);
+static bool AsyncExistsPendingNotify(const char *relname);
static void ClearPendingNotifies(void);
void
Async_Notify(char *relname)
{
- char *notifyName;
-
- TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
+ if (Trace_notify)
+ elog(LOG, "Async_Notify: %s", relname);
- if (!pendingNotifies)
- pendingNotifies = DLNewList();
/* no point in making duplicate entries in the list ... */
if (!AsyncExistsPendingNotify(relname))
{
-
/*
- * We allocate list memory from the global malloc pool to ensure
- * that it will live until we want to use it. This is probably
- * not necessary any longer, since we will use it before the end
- * of the transaction. DLList only knows how to use malloc()
- * anyway, but we could probably palloc() the strings...
+ * The name list needs to live until end of transaction, so store
+ * it in the top transaction context.
*/
- notifyName = strdup(relname);
- DLAddHead(pendingNotifies, DLNewElem(notifyName));
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
+
+ MemoryContextSwitchTo(oldcontext);
}
}
Async_Listen(char *relname, int pid)
{
Relation lRel;
- TupleDesc tdesc;
HeapScanDesc scan;
- HeapTuple tuple,
- newtup;
+ HeapTuple tuple;
Datum values[Natts_pg_listener];
char nulls[Natts_pg_listener];
- Datum d;
int i;
- bool isnull;
- int alreadyListener = 0;
- TupleDesc tupDesc;
+ bool alreadyListener = false;
- TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
+ if (Trace_notify)
+ elog(LOG, "Async_Listen: %s", relname);
lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
- tdesc = RelationGetDescr(lRel);
/* Detect whether we are already listening on this relname */
scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
- if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
- d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
- if (DatumGetInt32(d) == pid)
- {
- alreadyListener = 1;
- /* No need to scan the rest of the table */
- break;
- }
+ alreadyListener = true;
+ /* No need to scan the rest of the table */
+ break;
}
}
heap_endscan(scan);
if (alreadyListener)
{
heap_close(lRel, AccessExclusiveLock);
- elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+ elog(WARNING, "Async_Listen: We are already listening on %s", relname);
return;
}
values[i++] = (Datum) pid;
values[i++] = (Datum) 0; /* no notifies pending */
- tupDesc = lRel->rd_att;
- newtup = heap_formtuple(tupDesc, values, nulls);
- heap_insert(lRel, newtup);
+ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
+ heap_insert(lRel, tuple);
+
+#ifdef NOT_USED /* currently there are no indexes */
if (RelationGetForm(lRel)->relhasindex)
{
Relation idescs[Num_pg_listener_indices];
CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, newtup);
+ CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
- heap_freetuple(newtup);
+ heap_freetuple(tuple);
heap_close(lRel, AccessExclusiveLock);
*/
if (!unlistenExitRegistered)
{
- if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
- elog(NOTICE, "Async_Listen: out of shmem_exit slots");
- unlistenExitRegistered = 1;
+ on_shmem_exit(Async_UnlistenOnExit, 0);
+ unlistenExitRegistered = true;
}
}
Async_Unlisten(char *relname, int pid)
{
Relation lRel;
- HeapTuple lTuple;
+ HeapScanDesc scan;
+ HeapTuple tuple;
/* Handle specially the `unlisten "*"' command */
if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
return;
}
- TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
+ if (Trace_notify)
+ elog(LOG, "Async_Unlisten %s", relname);
lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
- /* Note we assume there can be only one matching tuple. */
- lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
- Int32GetDatum(pid),
- 0, 0);
- if (lTuple != NULL)
- heap_delete(lRel, &lTuple->t_self, NULL);
+
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
+ {
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
+
+ if (listener->listenerpid == pid &&
+ strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
+ {
+ /* Found the matching tuple, delete it */
+ simple_heap_delete(lRel, &tuple->t_self);
+
+ /*
+ * We assume there can be only one match, so no need to scan
+ * the rest of the table
+ */
+ break;
+ }
+ }
+ heap_endscan(scan);
+
heap_close(lRel, AccessExclusiveLock);
/*
*--------------------------------------------------------------
*/
static void
-Async_UnlistenAll()
+Async_UnlistenAll(void)
{
Relation lRel;
TupleDesc tdesc;
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple;
ScanKeyData key[1];
- TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+ if (Trace_notify)
+ elog(LOG, "Async_UnlistenAll");
lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
tdesc = RelationGetDescr(lRel);
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
- heap_delete(lRel, &lTuple->t_self, NULL);
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
+ simple_heap_delete(lRel, &lTuple->t_self);
- heap_endscan(sRel);
+ heap_endscan(scan);
heap_close(lRel, AccessExclusiveLock);
}
*--------------------------------------------------------------
*/
static void
-Async_UnlistenOnExit()
+Async_UnlistenOnExit(void)
{
-
/*
* We need to start/commit a transaction for the unlisten, but if
* there is already an active transaction we had better abort that one
*--------------------------------------------------------------
*/
void
-AtCommit_Notify()
+AtCommit_Notify(void)
{
Relation lRel;
TupleDesc tdesc;
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
- Datum d,
- value[Natts_pg_listener];
+ Datum value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
- bool isnull;
- char *relname;
- int32 listenerPID;
- if (!pendingNotifies)
+ if (pendingNotifies == NIL)
return; /* no NOTIFY statements in this
* transaction */
return;
}
- TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
-
- lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
- tdesc = RelationGetDescr(lRel);
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+ if (Trace_notify)
+ elog(LOG, "AtCommit_Notify");
/* preset data to update notify column to MyProcPid */
nulls[0] = nulls[1] = nulls[2] = ' ';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
+ tdesc = RelationGetDescr(lRel);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
- relname = (char *) DatumGetPointer(d);
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 listenerPID = listener->listenerpid;
- if (AsyncExistsPendingNotify(relname))
+ if (!AsyncExistsPendingNotify(relname))
+ continue;
+
+ if (listenerPID == MyProcPid)
{
- d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
- listenerPID = DatumGetInt32(d);
+ /*
+ * Self-notify: no need to bother with table update. Indeed,
+ * we *must not* clear the notification field in this path, or
+ * we could lose an outside notify, which'd be bad for
+ * applications that ignore self-notify messages.
+ */
- if (listenerPID == MyProcPid)
- {
+ if (Trace_notify)
+ elog(LOG, "AtCommit_Notify: notifying self");
+ NotifyMyFrontEnd(relname, listenerPID);
+ }
+ else
+ {
+ if (Trace_notify)
+ elog(LOG, "AtCommit_Notify: notifying pid %d",
+ listenerPID);
+
+ /*
+ * If someone has already notified this listener, we don't
+ * bother modifying the table, but we do still send a SIGUSR2
+ * signal, just in case that backend missed the earlier signal
+ * for some reason. It's OK to send the signal first, because
+ * the other guy can't read pg_listener until we unlock it.
+ */
+ if (kill(listenerPID, SIGUSR2) < 0)
+ {
/*
- * Self-notify: no need to bother with table update.
- * Indeed, we *must not* clear the notification field in
- * this path, or we could lose an outside notify, which'd
- * be bad for applications that ignore self-notify
- * messages.
+ * Get rid of pg_listener entry if it refers to a PID that
+ * no longer exists. Presumably, that backend crashed
+ * without deleting its pg_listener entries. This code
+ * used to only delete the entry if errno==ESRCH, but as
+ * far as I can see we should just do it for any failure
+ * (certainly at least for EPERM too...)
*/
- TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
- NotifyMyFrontEnd(relname, listenerPID);
+ simple_heap_delete(lRel, &lTuple->t_self);
}
- else
+ else if (listener->notification == 0)
{
- TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
- listenerPID);
+ rTuple = heap_modifytuple(lTuple, lRel,
+ value, nulls, repl);
+ simple_heap_update(lRel, &lTuple->t_self, rTuple);
- /*
- * If someone has already notified this listener, we don't
- * bother modifying the table, but we do still send a
- * SIGUSR2 signal, just in case that backend missed the
- * earlier signal for some reason. It's OK to send the
- * signal first, because the other guy can't read
- * pg_listener until we unlock it.
- */
- if (kill(listenerPID, SIGUSR2) < 0)
+#ifdef NOT_USED /* currently there are no indexes */
+ if (RelationGetForm(lRel)->relhasindex)
{
+ Relation idescs[Num_pg_listener_indices];
- /*
- * Get rid of pg_listener entry if it refers to a PID
- * that no longer exists. Presumably, that backend
- * crashed without deleting its pg_listener entries.
- * This code used to only delete the entry if
- * errno==ESRCH, but as far as I can see we should
- * just do it for any failure (certainly at least for
- * EPERM too...)
- */
- heap_delete(lRel, &lTuple->t_self, NULL);
- }
- else
- {
- d = heap_getattr(lTuple, Anum_pg_listener_notify,
- tdesc, &isnull);
- if (DatumGetInt32(d) == 0)
- {
- rTuple = heap_modifytuple(lTuple, lRel,
- value, nulls, repl);
- heap_update(lRel, &lTuple->t_self, rTuple, NULL);
- if (RelationGetForm(lRel)->relhasindex)
- {
- Relation idescs[Num_pg_listener_indices];
-
- CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
- CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
- CatalogCloseIndices(Num_pg_listener_indices, idescs);
- }
- }
+ CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
+ CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
+ CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
}
}
}
- heap_endscan(sRel);
+ heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
ClearPendingNotifies();
- TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+ if (Trace_notify)
+ elog(LOG, "AtCommit_Notify: done");
}
/*
*--------------------------------------------------------------
*/
void
-AtAbort_Notify()
+AtAbort_Notify(void)
{
ClearPendingNotifies();
}
* per above
*--------------------------------------------------------------
*/
-
void
Async_NotifyHandler(SIGNAL_ARGS)
{
+ int save_errno = errno;
/*
* Note: this is a SIGNAL HANDLER. You must be very wary what you do
if (notifyInterruptEnabled)
{
-
/*
* I'm not sure whether some flavors of Unix might allow another
* SIGUSR2 occurrence to recursively interrupt this routine. To
if (notifyInterruptOccurred)
{
/* Here, it is finally safe to do stuff. */
- TPRINTF(TRACE_NOTIFY,
- "Async_NotifyHandler: perform async notify");
+ if (Trace_notify)
+ elog(LOG, "Async_NotifyHandler: perform async notify");
+
ProcessIncomingNotify();
- TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
+
+ if (Trace_notify)
+ elog(LOG, "Async_NotifyHandler: done");
}
}
}
else
{
-
/*
* In this path it is NOT SAFE to do much of anything, except
* this:
*/
notifyInterruptOccurred = 1;
}
+
+ errno = save_errno;
}
/*
* PostgresMain calls this the first time.
* --------------------------------------------------------------
*/
-
void
EnableNotifyInterrupt(void)
{
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
- TPRINTF(TRACE_NOTIFY,
- "EnableNotifyInterrupt: perform async notify");
+ if (Trace_notify)
+ elog(LOG, "EnableNotifyInterrupt: perform async notify");
+
ProcessIncomingNotify();
- TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+
+ if (Trace_notify)
+ elog(LOG, "EnableNotifyInterrupt: done");
}
}
}
* is disabled until the next EnableNotifyInterrupt call.
* --------------------------------------------------------------
*/
-
void
DisableNotifyInterrupt(void)
{
Relation lRel;
TupleDesc tdesc;
ScanKeyData key[1];
- HeapScanDesc sRel;
+ HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
- Datum d,
- value[Natts_pg_listener];
+ Datum value[Natts_pg_listener];
char repl[Natts_pg_listener],
nulls[Natts_pg_listener];
- bool isnull;
- char *relname;
- int32 sourcePID;
- TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
- PS_SET_STATUS("async_notify");
+ if (Trace_notify)
+ elog(LOG, "ProcessIncomingNotify");
+
+ set_ps_display("async_notify");
notifyInterruptOccurred = 0;
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
- sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+ scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
/* Prepare data for rewriting 0 into notification field */
nulls[0] = nulls[1] = nulls[2] = ' ';
value[0] = value[1] = value[2] = (Datum) 0;
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
- while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
- sourcePID = DatumGetInt32(d);
+ Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
+ char *relname = NameStr(listener->relname);
+ int32 sourcePID = listener->notification;
+
if (sourcePID != 0)
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
- relname = (char *) DatumGetPointer(d);
/* Notify the frontend */
- TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
- relname, (int) sourcePID);
+
+ if (Trace_notify)
+ elog(LOG, "ProcessIncomingNotify: received %s from %d",
+ relname, (int) sourcePID);
+
NotifyMyFrontEnd(relname, sourcePID);
/* Rewrite the tuple with 0 in notification column */
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
- heap_update(lRel, &lTuple->t_self, rTuple, NULL);
+ simple_heap_update(lRel, &lTuple->t_self, rTuple);
+
+#ifdef NOT_USED /* currently there are no indexes */
if (RelationGetForm(lRel)->relhasindex)
{
Relation idescs[Num_pg_listener_indices];
CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
CatalogCloseIndices(Num_pg_listener_indices, idescs);
}
+#endif
}
}
- heap_endscan(sRel);
+ heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
*/
pq_flush();
- PS_SET_STATUS("idle");
- TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
-}
+ set_ps_display("idle");
-/* Send NOTIFY message to my front end. */
+ if (Trace_notify)
+ elog(LOG, "ProcessIncomingNotify: done");
+}
+/*
+ * Send NOTIFY message to my front end.
+ */
static void
NotifyMyFrontEnd(char *relname, int32 listenerPID)
{
*/
}
else
- elog(NOTICE, "NOTIFY for %s", relname);
+ elog(INFO, "NOTIFY for %s", relname);
}
-/* Does pendingNotifies include the given relname?
- *
- * NB: not called unless pendingNotifies != NULL.
- */
-
-static int
-AsyncExistsPendingNotify(char *relname)
+/* Does pendingNotifies include the given relname? */
+static bool
+AsyncExistsPendingNotify(const char *relname)
{
- Dlelem *p;
+ List *p;
- for (p = DLGetHead(pendingNotifies);
- p != NULL;
- p = DLGetSucc(p))
+ foreach(p, pendingNotifies)
{
/* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
- if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
- return 1;
+ if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
+ return true;
}
- return 0;
+ return false;
}
/* Clear the pendingNotifies list. */
-
static void
-ClearPendingNotifies()
+ClearPendingNotifies(void)
{
- Dlelem *p;
-
- if (pendingNotifies)
- {
-
- /*
- * Since the referenced strings are malloc'd, we have to scan the
- * list and delete them individually. If we used palloc for the
- * strings then we could just do DLFreeList to get rid of both the
- * list nodes and the list base...
- */
- while ((p = DLRemHead(pendingNotifies)) != NULL)
- {
- free(DLE_VAL(p));
- DLFreeElem(p);
- }
- DLFreeList(pendingNotifies);
- pendingNotifies = NULL;
- }
+ /*
+ * We used to have to explicitly deallocate the list members and
+ * nodes, because they were malloc'd. Now, since we know they are
+ * palloc'd in TopTransactionContext, we need not do that --- they'll
+ * go away automatically at transaction exit. We need only reset the
+ * list head pointer.
+ */
+ pendingNotifies = NIL;
}