1 /*-------------------------------------------------------------------------
4 * Asynchronous notification
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.36 1998/07/27 19:37:50 vadim Exp $
12 *-------------------------------------------------------------------------
14 /* New Async Notification Model:
15 * 1. Multiple backends on same machine. Multiple backends listening on
18 * 2. One of the backend does a 'notify <relname>'. For all backends that
19 * are listening to this relation (all notifications take place at the
21 * 2.a If the process is the same as the backend process that issued
22 * notification (we are notifying something that we are listening),
23 * signal the corresponding frontend over the comm channel.
24 * 2.b For all other listening processes, we send kill(SIGUSR2) to wake up
25 * the listening backend.
26 * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
27 * notifying that one of the relation that we are listening is being
28 * notified, we can be in either of two following states:
29 * 3.a We are sleeping, wake up and signal our frontend.
30 * 3.b We are in middle of another transaction, wait until the end of
31 * of the current transaction and signal our frontend.
32 * 4. Each frontend receives this notification and processes accordingly.
38 * The following is the old model which does not work.
42 * 1. Multiple backends on same machine.
44 * 2. Query on one backend sends stuff over an asynchronous portal by
45 * appending to a relation, and then doing an async. notification
46 * (which takes place after commit) to all listeners on this relation.
48 * 3. Async. notification results in all backends listening on relation
49 * to be woken up, by a process signal kill(SIGUSR2), with name of relation
50 * passed in shared memory.
52 * 4. Each backend notifies its respective frontend over the comm
53 * channel using the out-of-band channel.
55 * 5. Each frontend receives this notification and processes accordingly.
57 * #4,#5 are changing soon with pending rewrite of portal/protocol.
64 #include <sys/types.h> /* Needed by in.h on Ultrix */
65 #include <netinet/in.h>
69 #include "access/heapam.h"
70 #include "access/relscan.h"
71 #include "access/xact.h"
72 #include "catalog/catname.h"
73 #include "catalog/pg_listener.h"
74 #include "commands/async.h"
76 #include "lib/dllist.h"
77 #include "libpq/libpq.h"
78 #include "miscadmin.h"
79 #include "nodes/memnodes.h"
80 #include "storage/bufmgr.h"
81 #include "storage/lmgr.h"
82 #include "tcop/dest.h"
83 #include "utils/mcxt.h"
84 #include "utils/syscache.h"
86 static int notifyFrontEndPending = 0;
87 static int notifyIssued = 0;
88 static Dllist *pendingNotifies = NULL;
91 static int AsyncExistsPendingNotify(char *);
92 static void ClearPendingNotify(void);
93 static void Async_NotifyFrontEnd(void);
94 void Async_Unlisten(char *relname, int pid);
95 static void Async_UnlistenOnExit(int code, char *relname);
98 *--------------------------------------------------------------
99 * Async_NotifyHandler --
101 * This is the signal handler for SIGUSR2. When the backend
102 * is signaled, the backend can be in two states.
103 * 1. If the backend is in the middle of another transaction,
104 * we set the flag, notifyFrontEndPending, and wait until
105 * the end of the transaction to notify the front end.
106 * 2. If the backend is not in the middle of another transaction,
107 * we notify the front end immediately.
117 Async_NotifyHandler(SIGNAL_ARGS)
119 extern TransactionState CurrentTransactionState;
121 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
122 (CurrentTransactionState->blockState == TRANS_DEFAULT))
126 elog(DEBUG, "Waking up sleeping backend process");
128 Async_NotifyFrontEnd();
134 elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
135 CurrentTransactionState->state,
136 CurrentTransactionState->blockState);
138 notifyFrontEndPending = 1;
143 *--------------------------------------------------------------
146 * Adds the relation to the list of pending notifies.
147 * All notification happens at end of commit.
148 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
150 * All notification of backend processes happens here,
151 * then each backend notifies its corresponding front end at
154 * This correspond to 'notify <relname>' command
161 * All tuples for relname in pg_listener are updated.
163 *--------------------------------------------------------------
166 Async_Notify(char *relname)
185 elog(DEBUG, "Async_Notify: %s", relname);
188 if (!pendingNotifies)
189 pendingNotifies = DLNewList();
192 * Allocate memory from the global malloc pool because it needs to be
193 * referenced also when the transaction is finished. DZ - 26-08-1996
195 notifyName = strdup(relname);
196 DLAddHead(pendingNotifies, DLNewElem(notifyName));
198 ScanKeyEntryInitialize(&key, 0,
199 Anum_pg_listener_relname,
201 PointerGetDatum(notifyName));
203 lRel = heap_openr(ListenerRelationName);
204 tdesc = RelationGetTupleDescriptor(lRel);
205 RelationSetLockForWrite(lRel);
206 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
208 nulls[0] = nulls[1] = nulls[2] = ' ';
209 repl[0] = repl[1] = repl[2] = ' ';
210 repl[Anum_pg_listener_notify - 1] = 'r';
211 value[0] = value[1] = value[2] = (Datum) 0;
212 value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
214 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
216 d = heap_getattr(lTuple, Anum_pg_listener_notify,
218 if (!DatumGetInt32(d))
220 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
221 heap_replace(lRel, &lTuple->t_ctid, rTuple);
226 RelationUnsetLockForWrite(lRel);
232 *--------------------------------------------------------------
233 * Async_NotifyAtCommit --
235 * Signal our corresponding frontend process on relations that
236 * were notified. Signal all other backend process that
237 * are listening also.
245 * Tuples in pg_listener that has our listenerpid are updated so
246 * that the notification is 0. We do not want to notify frontend
251 *--------------------------------------------------------------
254 Async_NotifyAtCommit()
264 extern TransactionState CurrentTransactionState;
266 if (!pendingNotifies)
267 pendingNotifies = DLNewList();
269 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
270 (CurrentTransactionState->blockState == TRANS_DEFAULT))
274 { /* 'notify <relname>' issued by us */
276 StartTransactionCommand();
278 elog(DEBUG, "Async_NotifyAtCommit.");
280 ScanKeyEntryInitialize(&key, 0,
281 Anum_pg_listener_notify,
284 lRel = heap_openr(ListenerRelationName);
285 RelationSetLockForWrite(lRel);
286 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
287 tdesc = RelationGetTupleDescriptor(lRel);
289 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
291 d = heap_getattr(lTuple, Anum_pg_listener_relname,
294 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
296 d = heap_getattr(lTuple, Anum_pg_listener_pid,
299 if (MyProcPid == DatumGetInt32(d))
302 elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
304 notifyFrontEndPending = 1;
309 elog(DEBUG, "Notifying others");
312 if (kill(DatumGetInt32(d), SIGUSR2) < 0)
315 heap_delete(lRel, &lTuple->t_ctid);
323 RelationUnsetLockForWrite(lRel);
326 CommitTransactionCommand();
327 ClearPendingNotify();
330 if (notifyFrontEndPending)
331 { /* we need to notify the frontend of all
332 * pending notifies. */
333 notifyFrontEndPending = 1;
334 Async_NotifyFrontEnd();
340 *--------------------------------------------------------------
341 * Async_NotifyAtAbort --
343 * Gets rid of pending notifies. List elements are automatically
344 * freed through memory context.
353 *--------------------------------------------------------------
356 Async_NotifyAtAbort()
358 extern TransactionState CurrentTransactionState;
361 ClearPendingNotify();
364 DLFreeList(pendingNotifies);
365 pendingNotifies = DLNewList();
367 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
368 (CurrentTransactionState->blockState == TRANS_DEFAULT))
370 if (notifyFrontEndPending)
371 { /* don't forget to notify front end */
372 Async_NotifyFrontEnd();
378 *--------------------------------------------------------------
381 * Register a backend (identified by its Unix PID) as listening
382 * on the specified relation.
384 * This corresponds to the 'listen <relation>' command in SQL
386 * One listener per relation, pg_listener relation is keyed
387 * on (relname,pid) to provide multiple listeners in future.
390 * pg_listeners is updated.
395 *--------------------------------------------------------------
398 Async_Listen(char *relname, int pid)
400 Datum values[Natts_pg_listener];
401 char nulls[Natts_pg_listener];
411 int alreadyListener = 0;
416 elog(DEBUG, "Async_Listen: %s", relname);
418 for (i = 0; i < Natts_pg_listener; i++)
421 values[i] = PointerGetDatum(NULL);
425 values[i++] = (Datum) relname;
426 values[i++] = (Datum) pid;
427 values[i++] = (Datum) 0; /* no notifies pending */
429 lDesc = heap_openr(ListenerRelationName);
430 RelationSetLockForWrite(lDesc);
432 /* is someone already listening. One listener per relation */
433 tdesc = RelationGetTupleDescriptor(lDesc);
434 s = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
435 while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
437 d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
439 relnamei = DatumGetPointer(d);
440 if (!strncmp(relnamei, relname, NAMEDATALEN))
442 d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
443 pid = DatumGetInt32(d);
444 if (pid == MyProcPid)
453 elog(NOTICE, "Async_Listen: We are already listening on %s",
458 tupDesc = lDesc->rd_att;
459 tup = heap_formtuple(tupDesc,
462 heap_insert(lDesc, tup);
467 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
468 * listener on %s (possibly dead)",relname); }
471 RelationUnsetLockForWrite(lDesc);
475 * now that we are listening, we should make a note to ourselves to
476 * unlisten prior to dying.
478 relnamei = malloc(NAMEDATALEN); /* persists to process exit */
479 StrNCpy(relnamei, relname, NAMEDATALEN);
480 on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
484 *--------------------------------------------------------------
487 * Remove the backend from the list of listening backends
488 * for the specified relation.
490 * This would correspond to the 'unlisten <relation>'
491 * command, but there isn't one yet.
494 * pg_listeners is updated.
499 *--------------------------------------------------------------
502 Async_Unlisten(char *relname, int pid)
507 lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
510 lDesc = heap_openr(ListenerRelationName);
511 RelationSetLockForWrite(lDesc);
514 heap_delete(lDesc, &lTuple->t_ctid);
516 RelationUnsetLockForWrite(lDesc);
521 Async_UnlistenOnExit(int code, /* from exitpg */
524 Async_Unlisten((char *) relname, MyProcPid);
528 * --------------------------------------------------------------
529 * Async_NotifyFrontEnd --
531 * Perform an asynchronous notification to front end over
532 * portal comm channel. The name of the relation which contains the
533 * data is sent to the front end.
535 * We remove the notification flag from the pg_listener tuple
536 * associated with our process.
541 * --------------------------------------------------------------
543 GlobalMemory notifyContext = NULL;
546 Async_NotifyFrontEnd()
548 extern CommandDest whereToSendOutput;
562 notifyFrontEndPending = 0;
565 elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
568 StartTransactionCommand();
569 ScanKeyEntryInitialize(&key[0], 0,
570 Anum_pg_listener_notify,
573 ScanKeyEntryInitialize(&key[1], 0,
574 Anum_pg_listener_pid,
576 Int32GetDatum(MyProcPid));
577 lRel = heap_openr(ListenerRelationName);
578 RelationSetLockForWrite(lRel);
579 tdesc = RelationGetTupleDescriptor(lRel);
580 sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
582 nulls[0] = nulls[1] = nulls[2] = ' ';
583 repl[0] = repl[1] = repl[2] = ' ';
584 repl[Anum_pg_listener_notify - 1] = 'r';
585 value[0] = value[1] = value[2] = (Datum) 0;
586 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
588 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
590 d = heap_getattr(lTuple, Anum_pg_listener_relname,
592 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
593 heap_replace(lRel, &lTuple->t_ctid, rTuple);
595 /* notifying the front end */
597 if (whereToSendOutput == Remote)
600 pq_putint((int32) MyProcPid, sizeof(int32));
601 pq_putstr(DatumGetName(d)->data);
605 elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
608 CommitTransactionCommand();
612 AsyncExistsPendingNotify(char *relname)
616 for (p = DLGetHead(pendingNotifies);
620 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
621 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
633 while ((p = DLRemHead(pendingNotifies)) != NULL)