1 /*-------------------------------------------------------------------------
4 * Asynchronous notification
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.29 1998/02/26 04:30:47 momjian Exp $
12 *-------------------------------------------------------------------------
14 /* New Async Notification Model:
15 * 1. Multiple backends on same machine. Multiple backends listening on
18 * 2. One of the backend does a 'notify <relname>'. For all backends that
19 * are listening to this relation (all notifications take place at the
21 * 2.a If the process is the same as the backend process that issued
22 * notification (we are notifying something that we are listening),
23 * signal the corresponding frontend over the comm channel using the
24 * out-of-band channel.
25 * 2.b For all other listening processes, we send kill(2) to wake up
26 * the listening backend.
27 * 3. Upon receiving a kill(2) signal from another backend process notifying
28 * that one of the relation that we are listening is being notified,
29 * we can be in either of two following states:
30 * 3.a We are sleeping, wake up and signal our frontend.
31 * 3.b We are in middle of another transaction, wait until the end of
32 * of the current transaction and signal our frontend.
33 * 4. Each frontend receives this notification and prcesses accordingly.
39 * The following is the old model which does not work.
43 * 1. Multiple backends on same machine.
45 * 2. Query on one backend sends stuff over an asynchronous portal by
46 * appending to a relation, and then doing an async. notification
47 * (which takes place after commit) to all listeners on this relation.
49 * 3. Async. notification results in all backends listening on relation
50 * to be woken up, by a process signal kill(2), with name of relation
51 * passed in shared memory.
53 * 4. Each backend notifies its respective frontend over the comm
54 * channel using the out-of-band channel.
56 * 5. Each frontend receives this notification and processes accordingly.
58 * #4,#5 are changing soon with pending rewrite of portal/protocol.
65 #include <sys/types.h> /* Needed by in.h on Ultrix */
66 #include <netinet/in.h>
70 #include <miscadmin.h>
71 #include <utils/syscache.h>
72 #include <access/relscan.h>
73 #include <access/xact.h>
74 #include <lib/dllist.h>
75 #include <tcop/dest.h>
76 #include <catalog/pg_proc.h>
77 #include <catalog/catname.h>
78 #include <catalog/pg_listener.h>
79 #include <access/heapam.h>
80 #include <storage/bufmgr.h>
81 #include <nodes/memnodes.h>
82 #include <utils/mcxt.h>
83 #include <commands/async.h>
84 #include <libpq/libpq.h>
87 #include <port-protos.h> /* for strdup() */
90 #include <storage/lmgr.h>
92 static int notifyFrontEndPending = 0;
93 static int notifyIssued = 0;
94 static Dllist *pendingNotifies = NULL;
97 static int AsyncExistsPendingNotify(char *);
98 static void ClearPendingNotify(void);
99 static void Async_NotifyFrontEnd(void);
100 void Async_Unlisten(char *relname, int pid);
101 static void Async_UnlistenOnExit(int code, char *relname);
104 *--------------------------------------------------------------
105 * Async_NotifyHandler --
107 * This is the signal handler for SIGUSR2. When the backend
108 * is signaled, the backend can be in two states.
109 * 1. If the backend is in the middle of another transaction,
110 * we set the flag, notifyFrontEndPending, and wait until
111 * the end of the transaction to notify the front end.
112 * 2. If the backend is not in the middle of another transaction,
113 * we notify the front end immediately.
123 Async_NotifyHandler(SIGNAL_ARGS)
125 extern TransactionState CurrentTransactionState;
127 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
128 (CurrentTransactionState->blockState == TRANS_DEFAULT))
132 elog(DEBUG, "Waking up sleeping backend process");
134 Async_NotifyFrontEnd();
140 elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
141 CurrentTransactionState->state,
142 CurrentTransactionState->blockState);
144 notifyFrontEndPending = 1;
149 *--------------------------------------------------------------
152 * Adds the relation to the list of pending notifies.
153 * All notification happens at end of commit.
154 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
156 * All notification of backend processes happens here,
157 * then each backend notifies its corresponding front end at
160 * This correspond to 'notify <relname>' command
167 * All tuples for relname in pg_listener are updated.
169 *--------------------------------------------------------------
172 Async_Notify(char *relname)
191 elog(DEBUG, "Async_Notify: %s", relname);
194 if (!pendingNotifies)
195 pendingNotifies = DLNewList();
198 * Allocate memory from the global malloc pool because it needs to be
199 * referenced also when the transaction is finished. DZ - 26-08-1996
201 notifyName = strdup(relname);
202 DLAddHead(pendingNotifies, DLNewElem(notifyName));
204 ScanKeyEntryInitialize(&key, 0,
205 Anum_pg_listener_relname,
206 NameEqualRegProcedure,
207 PointerGetDatum(notifyName));
209 lRel = heap_openr(ListenerRelationName);
210 tdesc = RelationGetTupleDescriptor(lRel);
211 RelationSetLockForWrite(lRel);
212 sRel = heap_beginscan(lRel, 0, false, 1, &key);
214 nulls[0] = nulls[1] = nulls[2] = ' ';
215 repl[0] = repl[1] = repl[2] = ' ';
216 repl[Anum_pg_listener_notify - 1] = 'r';
217 value[0] = value[1] = value[2] = (Datum) 0;
218 value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
220 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
222 d = heap_getattr(lTuple, Anum_pg_listener_notify,
224 if (!DatumGetInt32(d))
226 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
227 heap_replace(lRel, &lTuple->t_ctid, rTuple);
232 RelationUnsetLockForWrite(lRel);
238 *--------------------------------------------------------------
239 * Async_NotifyAtCommit --
241 * Signal our corresponding frontend process on relations that
242 * were notified. Signal all other backend process that
243 * are listening also.
251 * Tuples in pg_listener that has our listenerpid are updated so
252 * that the notification is 0. We do not want to notify frontend
257 *--------------------------------------------------------------
260 Async_NotifyAtCommit()
270 extern TransactionState CurrentTransactionState;
272 if (!pendingNotifies)
273 pendingNotifies = DLNewList();
275 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
276 (CurrentTransactionState->blockState == TRANS_DEFAULT))
280 { /* 'notify <relname>' issued by us */
282 StartTransactionCommand();
284 elog(DEBUG, "Async_NotifyAtCommit.");
286 ScanKeyEntryInitialize(&key, 0,
287 Anum_pg_listener_notify,
288 Integer32EqualRegProcedure,
290 lRel = heap_openr(ListenerRelationName);
291 RelationSetLockForWrite(lRel);
292 sRel = heap_beginscan(lRel, 0, false, 1, &key);
293 tdesc = RelationGetTupleDescriptor(lRel);
295 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
297 d = heap_getattr(lTuple, Anum_pg_listener_relname,
300 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
302 d = heap_getattr(lTuple, Anum_pg_listener_pid,
305 if (MyProcPid == DatumGetInt32(d))
308 elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
310 notifyFrontEndPending = 1;
315 elog(DEBUG, "Notifying others");
318 if (kill(DatumGetInt32(d), SIGUSR2) < 0)
322 heap_delete(lRel, &lTuple->t_ctid);
331 RelationUnsetLockForWrite(lRel);
334 CommitTransactionCommand();
335 ClearPendingNotify();
338 if (notifyFrontEndPending)
339 { /* we need to notify the frontend of all
340 * pending notifies. */
341 notifyFrontEndPending = 1;
342 Async_NotifyFrontEnd();
348 *--------------------------------------------------------------
349 * Async_NotifyAtAbort --
351 * Gets rid of pending notifies. List elements are automatically
352 * freed through memory context.
361 *--------------------------------------------------------------
364 Async_NotifyAtAbort()
366 extern TransactionState CurrentTransactionState;
370 ClearPendingNotify();
374 DLFreeList(pendingNotifies);
375 pendingNotifies = DLNewList();
377 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
378 (CurrentTransactionState->blockState == TRANS_DEFAULT))
380 if (notifyFrontEndPending)
381 { /* don't forget to notify front end */
382 Async_NotifyFrontEnd();
388 *--------------------------------------------------------------
391 * Register a backend (identified by its Unix PID) as listening
392 * on the specified relation.
394 * This corresponds to the 'listen <relation>' command in SQL
396 * One listener per relation, pg_listener relation is keyed
397 * on (relname,pid) to provide multiple listeners in future.
400 * pg_listeners is updated.
405 *--------------------------------------------------------------
408 Async_Listen(char *relname, int pid)
410 Datum values[Natts_pg_listener];
411 char nulls[Natts_pg_listener];
421 int alreadyListener = 0;
426 elog(DEBUG, "Async_Listen: %s", relname);
428 for (i = 0; i < Natts_pg_listener; i++)
431 values[i] = PointerGetDatum(NULL);
435 values[i++] = (Datum) relname;
436 values[i++] = (Datum) pid;
437 values[i++] = (Datum) 0; /* no notifies pending */
439 lDesc = heap_openr(ListenerRelationName);
440 RelationSetLockForWrite(lDesc);
442 /* is someone already listening. One listener per relation */
443 tdesc = RelationGetTupleDescriptor(lDesc);
444 s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
445 while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
447 d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
449 relnamei = DatumGetPointer(d);
450 if (!strncmp(relnamei, relname, NAMEDATALEN))
452 d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
453 pid = DatumGetInt32(d);
454 if (pid == MyProcPid)
465 elog(NOTICE, "Async_Listen: We are already listening on %s",
470 tupDesc = lDesc->rd_att;
471 tup = heap_formtuple(tupDesc,
474 heap_insert(lDesc, tup);
479 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
480 * listener on %s (possibly dead)",relname); }
483 RelationUnsetLockForWrite(lDesc);
487 * now that we are listening, we should make a note to ourselves to
488 * unlisten prior to dying.
490 relnamei = malloc(NAMEDATALEN); /* persists to process exit */
491 StrNCpy(relnamei, relname, NAMEDATALEN);
492 on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
496 *--------------------------------------------------------------
499 * Remove the backend from the list of listening backends
500 * for the specified relation.
502 * This would correspond to the 'unlisten <relation>'
503 * command, but there isn't one yet.
506 * pg_listeners is updated.
511 *--------------------------------------------------------------
514 Async_Unlisten(char *relname, int pid)
519 lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
522 lDesc = heap_openr(ListenerRelationName);
523 RelationSetLockForWrite(lDesc);
527 heap_delete(lDesc, &lTuple->t_ctid);
530 RelationUnsetLockForWrite(lDesc);
535 Async_UnlistenOnExit(int code, /* from exitpg */
538 Async_Unlisten((char *) relname, MyProcPid);
542 * --------------------------------------------------------------
543 * Async_NotifyFrontEnd --
545 * Perform an asynchronous notification to front end over
546 * portal comm channel. The name of the relation which contains the
547 * data is sent to the front end.
549 * We remove the notification flag from the pg_listener tuple
550 * associated with our process.
557 * We make use of the out-of-band channel to transmit the
558 * notification to the front end. The actual data transfer takes
559 * place at the front end's request.
561 * --------------------------------------------------------------
563 GlobalMemory notifyContext = NULL;
566 Async_NotifyFrontEnd()
568 extern CommandDest whereToSendOutput;
582 notifyFrontEndPending = 0;
585 elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
588 StartTransactionCommand();
589 ScanKeyEntryInitialize(&key[0], 0,
590 Anum_pg_listener_notify,
591 Integer32EqualRegProcedure,
593 ScanKeyEntryInitialize(&key[1], 0,
594 Anum_pg_listener_pid,
595 Integer32EqualRegProcedure,
596 Int32GetDatum(MyProcPid));
597 lRel = heap_openr(ListenerRelationName);
598 RelationSetLockForWrite(lRel);
599 tdesc = RelationGetTupleDescriptor(lRel);
600 sRel = heap_beginscan(lRel, 0, false, 2, key);
602 nulls[0] = nulls[1] = nulls[2] = ' ';
603 repl[0] = repl[1] = repl[2] = ' ';
604 repl[Anum_pg_listener_notify - 1] = 'r';
605 value[0] = value[1] = value[2] = (Datum) 0;
606 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
608 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
610 d = heap_getattr(lTuple, Anum_pg_listener_relname,
612 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
613 heap_replace(lRel, &lTuple->t_ctid, rTuple);
615 /* notifying the front end */
617 if (whereToSendOutput == Remote)
620 pq_putint((int32) MyProcPid, sizeof(int32));
621 pq_putstr(DatumGetName(d)->data);
626 elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
630 CommitTransactionCommand();
634 AsyncExistsPendingNotify(char *relname)
638 for (p = DLGetHead(pendingNotifies);
642 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
643 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
655 while ((p = DLRemHead(pendingNotifies)) != NULL)