1 /*-------------------------------------------------------------------------
4 * Asynchronous notification
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.1.1.1 1996/07/09 06:21:19 scrappy Exp $
12 *-------------------------------------------------------------------------
14 /* New Async Notification Model:
15 * 1. Multiple backends on same machine. Multiple backends listening on
18 * 2. One of the backend does a 'notify <relname>'. For all backends that
19 * are listening to this relation (all notifications take place at the
21 * 2.a If the process is the same as the backend process that issued
22 * notification (we are notifying something that we are listening),
23 * signal the corresponding frontend over the comm channel using the
24 * out-of-band channel.
25 * 2.b For all other listening processes, we send kill(2) to wake up
26 * the listening backend.
27 * 3. Upon receiving a kill(2) signal from another backend process notifying
28 * that one of the relation that we are listening is being notified,
29 * we can be in either of two following states:
30 * 3.a We are sleeping, wake up and signal our frontend.
31 * 3.b We are in middle of another transaction, wait until the end of
32 * of the current transaction and signal our frontend.
33 * 4. Each frontend receives this notification and prcesses accordingly.
39 * The following is the old model which does not work.
43 * 1. Multiple backends on same machine.
45 * 2. Query on one backend sends stuff over an asynchronous portal by
46 * appending to a relation, and then doing an async. notification
47 * (which takes place after commit) to all listeners on this relation.
49 * 3. Async. notification results in all backends listening on relation
50 * to be woken up, by a process signal kill(2), with name of relation
51 * passed in shared memory.
53 * 4. Each backend notifies its respective frontend over the comm
54 * channel using the out-of-band channel.
56 * 5. Each frontend receives this notification and processes accordingly.
58 * #4,#5 are changing soon with pending rewrite of portal/protocol.
67 #include "access/attnum.h"
68 #include "access/heapam.h"
69 #include "access/htup.h"
70 #include "access/relscan.h"
71 #include "access/skey.h"
72 #include "utils/builtins.h"
73 #include "utils/tqual.h"
74 #include "access/xact.h"
76 #include "commands/async.h"
77 #include "commands/copy.h"
78 #include "storage/buf.h"
79 #include "storage/itemptr.h"
80 #include "miscadmin.h"
81 #include "utils/portal.h"
82 #include "utils/excid.h"
83 #include "utils/elog.h"
84 #include "utils/mcxt.h"
85 #include "utils/palloc.h"
86 #include "utils/rel.h"
88 #include "nodes/pg_list.h"
89 #include "tcop/dest.h"
90 #include "commands/command.h"
92 #include "catalog/catname.h"
93 #include "utils/syscache.h"
94 #include "catalog/pg_attribute.h"
95 #include "catalog/pg_proc.h"
96 #include "catalog/pg_class.h"
97 #include "catalog/pg_type.h"
98 #include "catalog/pg_listener.h"
100 #include "executor/execdefs.h"
101 /* #include "executor/execdesc.h"*/
103 #include "storage/bufmgr.h"
104 #include "lib/dllist.h"
105 #include "libpq/libpq.h"
108 static int notifyFrontEndPending = 0;
109 static int notifyIssued = 0;
110 static Dllist *pendingNotifies = NULL;
113 static int AsyncExistsPendingNotify(char *);
114 static void ClearPendingNotify(void);
117 *--------------------------------------------------------------
118 * Async_NotifyHandler --
120 * This is the signal handler for SIGUSR2. When the backend
121 * is signaled, the backend can be in two states.
122 * 1. If the backend is in the middle of another transaction,
123 * we set the flag, notifyFrontEndPending, and wait until
124 * the end of the transaction to notify the front end.
125 * 2. If the backend is not in the middle of another transaction,
126 * we notify the front end immediately.
136 #if defined(PORTNAME_linux)
137 Async_NotifyHandler(int i)
139 Async_NotifyHandler()
142 extern TransactionState CurrentTransactionState;
144 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
145 (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
147 elog(DEBUG, "Waking up sleeping backend process");
148 Async_NotifyFrontEnd();
151 elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
152 CurrentTransactionState->state,
153 CurrentTransactionState->blockState);
154 notifyFrontEndPending = 1;
159 *--------------------------------------------------------------
162 * Adds the relation to the list of pending notifies.
163 * All notification happens at end of commit.
164 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
166 * All notification of backend processes happens here,
167 * then each backend notifies its corresponding front end at
170 * This correspond to 'notify <relname>' command
177 * All tuples for relname in pg_listener are updated.
179 *--------------------------------------------------------------
182 Async_Notify(char *relname)
185 HeapTuple lTuple, rTuple;
193 char repl[3], nulls[3];
197 elog(DEBUG,"Async_Notify: %s",relname);
199 if (!pendingNotifies)
200 pendingNotifies = DLNewList();
202 notifyName = pstrdup(relname);
203 DLAddHead(pendingNotifies, DLNewElem(notifyName));
205 ScanKeyEntryInitialize(&key, 0,
206 Anum_pg_listener_relname,
207 NameEqualRegProcedure,
208 PointerGetDatum(notifyName));
210 lRel = heap_openr(ListenerRelationName);
211 tdesc = RelationGetTupleDescriptor(lRel);
212 sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
214 nulls[0] = nulls[1] = nulls[2] = ' ';
215 repl[0] = repl[1] = repl[2] = ' ';
216 repl[Anum_pg_listener_notify - 1] = 'r';
217 value[0] = value[1] = value[2] = (Datum) 0;
218 value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
220 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) {
221 d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify,
223 if (!DatumGetInt32(d)) {
224 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
225 (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
235 *--------------------------------------------------------------
236 * Async_NotifyAtCommit --
238 * Signal our corresponding frontend process on relations that
239 * were notified. Signal all other backend process that
240 * are listening also.
248 * Tuples in pg_listener that has our listenerpid are updated so
249 * that the notification is 0. We do not want to notify frontend
254 *--------------------------------------------------------------
257 Async_NotifyAtCommit()
268 extern TransactionState CurrentTransactionState;
270 if (!pendingNotifies)
271 pendingNotifies = DLNewList();
273 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
274 (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
276 if (notifyIssued) { /* 'notify <relname>' issued by us */
278 StartTransactionCommand();
279 elog(DEBUG, "Async_NotifyAtCommit.");
280 ScanKeyEntryInitialize(&key, 0,
281 Anum_pg_listener_notify,
282 Integer32EqualRegProcedure,
284 lRel = heap_openr(ListenerRelationName);
285 sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
286 tdesc = RelationGetTupleDescriptor(lRel);
289 while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) {
290 d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
293 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) {
294 d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid,
297 if (ourpid == DatumGetInt32(d)) {
298 elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
299 notifyFrontEndPending = 1;
301 elog(DEBUG, "Notifying others");
303 if (kill(DatumGetInt32(d), SIGUSR2) < 0) {
304 if (errno == ESRCH) {
305 heap_delete(lRel, &lTuple->t_ctid);
313 CommitTransactionCommand();
314 ClearPendingNotify();
317 if (notifyFrontEndPending) { /* we need to notify the frontend of
318 all pending notifies. */
319 notifyFrontEndPending = 1;
320 Async_NotifyFrontEnd();
326 *--------------------------------------------------------------
327 * Async_NotifyAtAbort --
329 * Gets rid of pending notifies. List elements are automatically
330 * freed through memory context.
339 *--------------------------------------------------------------
342 Async_NotifyAtAbort()
344 extern TransactionState CurrentTransactionState;
347 ClearPendingNotify();
351 DLFreeList(pendingNotifies);
352 pendingNotifies = DLNewList();
354 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
355 (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
356 if (notifyFrontEndPending) { /* don't forget to notify front end */
357 Async_NotifyFrontEnd();
363 *--------------------------------------------------------------
366 * Register a backend (identified by its Unix PID) as listening
367 * on the specified relation.
369 * This corresponds to the 'listen <relation>' command in SQL
371 * One listener per relation, pg_listener relation is keyed
372 * on (relname,pid) to provide multiple listeners in future.
375 * pg_listeners is updated.
380 *--------------------------------------------------------------
383 Async_Listen(char *relname, int pid)
385 Datum values[Natts_pg_listener];
386 char nulls[Natts_pg_listener];
395 int alreadyListener = 0;
396 int ourPid = getpid();
400 elog(DEBUG,"Async_Listen: %s",relname);
401 for (i = 0 ; i < Natts_pg_listener; i++) {
403 values[i] = PointerGetDatum(NULL);
407 values[i++] = (Datum) relname;
408 values[i++] = (Datum) pid;
409 values[i++] = (Datum) 0; /* no notifies pending */
411 lDesc = heap_openr(ListenerRelationName);
413 /* is someone already listening. One listener per relation */
414 tdesc = RelationGetTupleDescriptor(lDesc);
415 s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
416 while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
417 d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
419 relnamei = DatumGetPointer(d);
420 if (!strncmp(relnamei,relname, NAMEDATALEN)) {
421 d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
422 pid = DatumGetInt32(d);
431 if (alreadyListener) {
432 elog(NOTICE, "Async_Listen: We are already listening on %s",
437 tupDesc = lDesc->rd_att;
438 tup = heap_formtuple(tupDesc,
441 heap_insert(lDesc, tup);
444 /* if (alreadyListener) {
445 elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
450 * now that we are listening, we should make a note to ourselves
451 * to unlisten prior to dying.
453 relnamei = malloc(NAMEDATALEN); /* persists to process exit */
454 memset (relnamei, 0, NAMEDATALEN);
455 strncpy(relnamei, relname, NAMEDATALEN);
456 on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
460 *--------------------------------------------------------------
463 * Remove the backend from the list of listening backends
464 * for the specified relation.
466 * This would correspond to the 'unlisten <relation>'
467 * command, but there isn't one yet.
470 * pg_listeners is updated.
475 *--------------------------------------------------------------
478 Async_Unlisten(char *relname, int pid)
483 lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
486 lDesc = heap_openr(ListenerRelationName);
487 if (lTuple != NULL) {
488 heap_delete(lDesc,&lTuple->t_ctid);
494 Async_UnlistenOnExit(int code, /* from exitpg */
497 Async_Unlisten((char *) relname, getpid());
501 * --------------------------------------------------------------
502 * Async_NotifyFrontEnd --
504 * Perform an asynchronous notification to front end over
505 * portal comm channel. The name of the relation which contains the
506 * data is sent to the front end.
508 * We remove the notification flag from the pg_listener tuple
509 * associated with our process.
516 * We make use of the out-of-band channel to transmit the
517 * notification to the front end. The actual data transfer takes
518 * place at the front end's request.
520 * --------------------------------------------------------------
522 GlobalMemory notifyContext = NULL;
525 Async_NotifyFrontEnd()
527 extern CommandDest whereToSendOutput;
528 HeapTuple lTuple, rTuple;
534 char repl[3], nulls[3];
539 notifyFrontEndPending = 0;
541 elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
543 StartTransactionCommand();
545 ScanKeyEntryInitialize(&key[0], 0,
546 Anum_pg_listener_notify,
547 Integer32EqualRegProcedure,
549 ScanKeyEntryInitialize(&key[1], 0,
550 Anum_pg_listener_pid,
551 Integer32EqualRegProcedure,
552 Int32GetDatum(ourpid));
553 lRel = heap_openr(ListenerRelationName);
554 tdesc = RelationGetTupleDescriptor(lRel);
555 sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key);
557 nulls[0] = nulls[1] = nulls[2] = ' ';
558 repl[0] = repl[1] = repl[2] = ' ';
559 repl[Anum_pg_listener_notify - 1] = 'r';
560 value[0] = value[1] = value[2] = (Datum) 0;
561 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
563 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) {
564 d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
566 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
567 (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
569 /* notifying the front end */
571 if (whereToSendOutput == Remote) {
573 pq_putint(ourpid, 4);
574 pq_putstr(DatumGetName(d)->data);
577 elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
581 CommitTransactionCommand();
585 AsyncExistsPendingNotify(char *relname)
588 for (p = DLGetHead(pendingNotifies);
591 if (!strcmp(DLE_VAL(p), relname))
602 while ( (p = DLRemHead(pendingNotifies)) != NULL)