*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.37 1998/08/19 02:01:39 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.38 1998/08/30 21:04:43 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
* -- jw, 12/28/93
*
*/
-/*
- * The following is the old model which does not work.
- */
-/*
- * Model is:
- * 1. Multiple backends on same machine.
- *
- * 2. Query on one backend sends stuff over an asynchronous portal by
- * appending to a relation, and then doing an async. notification
- * (which takes place after commit) to all listeners on this relation.
- *
- * 3. Async. notification results in all backends listening on relation
- * to be woken up, by a process signal kill(SIGUSR2), with name of relation
- * passed in shared memory.
- *
- * 4. Each backend notifies its respective frontend over the comm
- * channel using the out-of-band channel.
- *
- * 5. Each frontend receives this notification and processes accordingly.
- *
- * #4,#5 are changing soon with pending rewrite of portal/protocol.
- *
- */
+
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h"
+#include <utils/trace.h>
+#include <utils/ps_status.h>
+
+#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
+#define NotifyHack pg_options[OPT_NOTIFYHACK]
+
+extern TransactionState CurrentTransactionState;
+extern CommandDest whereToSendOutput;
+
+GlobalMemory notifyContext = NULL;
static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
-
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
+static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
+static void Async_UnlistenAll(void);
/*
*--------------------------------------------------------------
void
Async_NotifyHandler(SIGNAL_ARGS)
{
- extern TransactionState CurrentTransactionState;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
-
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Waking up sleeping backend process");
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
+ "waking up sleeping backend process");
+ PS_SET_STATUS("async_notify");
Async_NotifyFrontEnd();
-
+ PS_SET_STATUS("idle");
}
else
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
- CurrentTransactionState->state,
- CurrentTransactionState->blockState);
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
+ "process in middle of transaction, state=%d, blockstate=%d",
+ CurrentTransactionState->state,
+ CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
}
/*
*--------------------------------------------------------------
* Async_Notify --
*
+ * This is executed by the SQL notify command.
+ *
* Adds the relation to the list of pending notifies.
* All notification happens at end of commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* then each backend notifies its corresponding front end at
* the end of commit.
*
- * This correspond to 'notify <relname>' command
* -- jw, 12/28/93
*
* Results:
char *notifyName;
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Notify: %s", relname);
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
if (!pendingNotifies)
pendingNotifies = DLNewList();
{
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
+ /* notify is really issued only if a tuple has been changed */
+ notifyIssued = 1;
}
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
+
+ /*
+ * Note: if the write lock is unset we can get multiple tuples
+ * with same oid if other backends notify the same relation.
+ * Use this option at your own risk.
+ */
+ if (NotifyUnlock) {
+ RelationUnsetLockForWrite(lRel);
+ }
+
heap_close(lRel);
- notifyIssued = 1;
+
+ TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}
/*
*--------------------------------------------------------------
* Async_NotifyAtCommit --
*
+ * This is called at transaction commit.
+ *
* Signal our corresponding frontend process on relations that
* were notified. Signal all other backend process that
* are listening also.
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
-
if (notifyIssued)
- { /* 'notify <relname>' issued by us */
+ {
+ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyAtCommit.");
-#endif
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
F_INT4EQ,
if (MyProcPid == DatumGetInt32(d))
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
-#endif
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit: notifying self");
}
else
{
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying others");
-#endif
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit: notifying pid %d",
+ DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
}
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
heap_close(lRel);
+ /*
+ * Notify the frontend inside the current transaction while
+ * we still have a valid write lock on pg_listeners. This
+ * avoid waiting until all other backends have finished
+ * with pg_listener.
+ */
+ if (notifyFrontEndPending) {
+ /* The aux version is called inside transaction */
+ Async_NotifyFrontEnd_Aux();
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
CommitTransactionCommand();
- ClearPendingNotify();
}
-
- if (notifyFrontEndPending)
- { /* we need to notify the frontend of all
- * pending notifies. */
- notifyFrontEndPending = 1;
- Async_NotifyFrontEnd();
+ else
+ {
+ /*
+ * No notifies issued by us. If notifyFrontEndPending has been set
+ * by Async_NotifyHandler notify the frontend of pending notifies
+ * from other backends.
+ */
+ if (notifyFrontEndPending) {
+ Async_NotifyFrontEnd();
+ }
}
+
+ ClearPendingNotify();
}
}
*--------------------------------------------------------------
* Async_NotifyAtAbort --
*
+ * This is called at transaction commit.
+ *
* Gets rid of pending notifies. List elements are automatically
* freed through memory context.
*
void
Async_NotifyAtAbort()
{
- extern TransactionState CurrentTransactionState;
-
- if (notifyIssued)
+ if (pendingNotifies) {
ClearPendingNotify();
- notifyIssued = 0;
- if (pendingNotifies)
DLFreeList(pendingNotifies);
+ }
pendingNotifies = DLNewList();
+ notifyIssued = 0;
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
+ /* don't forget to notify front end */
if (notifyFrontEndPending)
- { /* don't forget to notify front end */
+ {
Async_NotifyFrontEnd();
}
}
*--------------------------------------------------------------
* Async_Listen --
*
+ * This is executed by the SQL listen command.
+ *
* Register a backend (identified by its Unix PID) as listening
* on the specified relation.
*
- * This corresponds to the 'listen <relation>' command in SQL
- *
* One listener per relation, pg_listener relation is keyed
* on (relname,pid) to provide multiple listeners in future.
*
char *relnamei;
TupleDesc tupDesc;
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_Listen: %s", relname);
-#endif
+ if (whereToSendOutput != Remote) {
+ elog(NOTICE, "Async_Listen: "
+ "listen not available on interactive sessions");
+ return;
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
if (pid == MyProcPid)
alreadyListener = 1;
}
+ if (alreadyListener) {
+ /* No need to scan the rest of the table */
+ break;
+ }
}
heap_endscan(scan);
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
return;
}
tupDesc = lDesc->rd_att;
- newtup = heap_formtuple(tupDesc,
- values,
- nulls);
+ newtup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, newtup);
-
pfree(newtup);
/*
*--------------------------------------------------------------
* Async_Unlisten --
*
+ * This is executed by the SQL unlisten command.
+ *
* Remove the backend from the list of listening backends
* for the specified relation.
*
- * This would correspond to the 'unlisten <relation>'
- * command, but there isn't one yet.
- *
* Results:
* pg_listeners is updated.
*
Relation lDesc;
HeapTuple lTuple;
- lTuple = SearchSysCacheTuple(LISTENREL,
- PointerGetDatum(relname),
+ /* Handle specially the `unlisten "*"' command */
+ if ((!relname) || (*relname == '\0') || (strcmp(relname,"*")==0)) {
+ Async_UnlistenAll();
+ return;
+ }
+
+ TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
+
+ lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
-
if (lTuple != NULL)
+ {
+ lDesc = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lDesc);
heap_delete(lDesc, &lTuple->t_ctid);
-
- RelationUnsetLockForWrite(lDesc);
- heap_close(lDesc);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
+ }
}
+/*
+ *--------------------------------------------------------------
+ * Async_UnlistenAll --
+ *
+ * Unlisten all relations for this backend.
+ *
+ * Results:
+ * pg_listeners is updated.
+ *
+ * Side effects:
+ * XXX
+ *
+ *--------------------------------------------------------------
+ */
+static void
+Async_UnlistenAll()
+{
+ HeapTuple lTuple;
+ Relation lRel;
+ HeapScanDesc sRel;
+ TupleDesc tdesc;
+ ScanKeyData key[1];
+
+ TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+ ScanKeyEntryInitialize(&key[0], 0,
+ Anum_pg_listener_pid,
+ F_INT4EQ,
+ Int32GetDatum(MyProcPid));
+ lRel = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lRel);
+ tdesc = RelationGetTupleDescriptor(lRel);
+ sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
+
+ while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+ {
+ heap_delete(lRel, &lTuple->t_ctid);
+ }
+ heap_endscan(sRel);
+ RelationUnsetLockForWrite(lRel);
+ heap_close(lRel);
+ TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
+}
+
+/*
+ * --------------------------------------------------------------
+ * Async_UnlistenOnExit --
+ *
+ * This is called at backend exit for each registered listen.
+ *
+ * Results:
+ * XXX
+ *
+ * --------------------------------------------------------------
+ */
static void
Async_UnlistenOnExit(int code, /* from exitpg */
char *relname)
* --------------------------------------------------------------
* Async_NotifyFrontEnd --
*
+ * This is called outside transactions. The real work is done
+ * by Async_NotifyFrontEnd_Aux().
+ *
+ * --------------------------------------------------------------
+ */
+static void
+Async_NotifyFrontEnd()
+{
+ StartTransactionCommand();
+ Async_NotifyFrontEnd_Aux();
+ CommitTransactionCommand();
+}
+
+/*
+ * --------------------------------------------------------------
+ * Async_NotifyFrontEnd_Aux --
+ *
+ * This must be called inside a transaction block.
+ *
* Perform an asynchronous notification to front end over
* portal comm channel. The name of the relation which contains the
* data is sent to the front end.
*
* --------------------------------------------------------------
*/
-GlobalMemory notifyContext = NULL;
-
static void
-Async_NotifyFrontEnd()
+Async_NotifyFrontEnd_Aux()
{
- extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
Relation lRel;
nulls[3];
bool isnull;
- notifyFrontEndPending = 0;
+#define MAX_DONE 64
-#ifdef ASYNC_DEBUG
- elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
-#endif
+ char *done[MAX_DONE];
+ int ndone = 0;
+ int i;
+
+ notifyFrontEndPending = 0;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
StartTransactionCommand();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
- d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+ d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
+ &isnull);
+
+ /*
+ * This hack deletes duplicate tuples which can be left
+ * in the table if the NotifyUnlock option is set.
+ * I'm further investigating this. -- dz
+ */
+ if (NotifyHack) {
+ for (i=0; i<ndone; i++) {
+ if (strcmp(DatumGetName(d)->data, done[i]) == 0) {
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd: duplicate %s",
+ DatumGetName(d)->data);
+ heap_delete(lRel, &lTuple->t_ctid);
+ continue;
+ }
+ }
+ if (ndone < MAX_DONE) {
+ done[ndone++] = pstrdup(DatumGetName(d)->data);
+ }
+ }
+
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notifying the front end */
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
+ DatumGetName(d)->data);
if (whereToSendOutput == Remote)
{
pq_putstr(DatumGetName(d)->data);
pq_flush();
}
- else
- elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
}
heap_endscan(sRel);
+ RelationUnsetLockForWrite(lRel);
heap_close(lRel);
- CommitTransactionCommand();
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
}
static int