1 /*-------------------------------------------------------------------------
4 * Asynchronous notification
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $
12 *-------------------------------------------------------------------------
14 /* New Async Notification Model:
15 * 1. Multiple backends on same machine. Multiple backends listening on
18 * 2. One of the backend does a 'notify <relname>'. For all backends that
19 * are listening to this relation (all notifications take place at the
21 * 2.a If the process is the same as the backend process that issued
22 * notification (we are notifying something that we are listening),
23 * signal the corresponding frontend over the comm channel.
24 * 2.b For all other listening processes, we send kill(SIGUSR2) to wake up
25 * the listening backend.
26 * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
27 * notifying that one of the relation that we are listening is being
28 * notified, we can be in either of two following states:
29 * 3.a We are sleeping, wake up and signal our frontend.
30 * 3.b We are in middle of another transaction, wait until the end of
31 * of the current transaction and signal our frontend.
32 * 4. Each frontend receives this notification and processes accordingly.
42 #include <sys/types.h> /* Needed by in.h on Ultrix */
43 #include <netinet/in.h>
47 #include "access/heapam.h"
48 #include "access/relscan.h"
49 #include "access/xact.h"
50 #include "catalog/catname.h"
51 #include "catalog/pg_listener.h"
52 #include "commands/async.h"
54 #include "lib/dllist.h"
55 #include "libpq/libpq.h"
56 #include "miscadmin.h"
57 #include "nodes/memnodes.h"
58 #include "storage/bufmgr.h"
59 #include "storage/lmgr.h"
60 #include "tcop/dest.h"
61 #include "utils/mcxt.h"
62 #include "utils/syscache.h"
63 #include <utils/trace.h>
64 #include <utils/ps_status.h>
66 #define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
67 #define NotifyHack pg_options[OPT_NOTIFYHACK]
69 extern TransactionState CurrentTransactionState;
70 extern CommandDest whereToSendOutput;
72 GlobalMemory notifyContext = NULL;
74 static int notifyFrontEndPending = 0;
75 static int notifyIssued = 0;
76 static Dllist *pendingNotifies = NULL;
78 static int AsyncExistsPendingNotify(char *);
79 static void ClearPendingNotify(void);
80 static void Async_NotifyFrontEnd(void);
81 static void Async_NotifyFrontEnd_Aux(void);
82 void Async_Unlisten(char *relname, int pid);
83 static void Async_UnlistenOnExit(int code, char *relname);
84 static void Async_UnlistenAll(void);
87 *--------------------------------------------------------------
88 * Async_NotifyHandler --
90 * This is the signal handler for SIGUSR2. When the backend
91 * is signaled, the backend can be in two states.
92 * 1. If the backend is in the middle of another transaction,
93 * we set the flag, notifyFrontEndPending, and wait until
94 * the end of the transaction to notify the front end.
95 * 2. If the backend is not in the middle of another transaction,
96 * we notify the front end immediately.
106 Async_NotifyHandler(SIGNAL_ARGS)
108 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
110 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
111 (CurrentTransactionState->blockState == TRANS_DEFAULT))
113 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
114 "waking up sleeping backend process");
115 PS_SET_STATUS("async_notify");
116 Async_NotifyFrontEnd();
117 PS_SET_STATUS("idle");
121 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
122 "process in middle of transaction, state=%d, blockstate=%d",
123 CurrentTransactionState->state,
124 CurrentTransactionState->blockState);
125 notifyFrontEndPending = 1;
126 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
129 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
133 *--------------------------------------------------------------
136 * This is executed by the SQL notify command.
138 * Adds the relation to the list of pending notifies.
139 * All notification happens at end of commit.
140 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
142 * All notification of backend processes happens here,
143 * then each backend notifies its corresponding front end at
152 * All tuples for relname in pg_listener are updated.
154 *--------------------------------------------------------------
157 Async_Notify(char *relname)
174 TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
176 if (!pendingNotifies)
177 pendingNotifies = DLNewList();
180 * Allocate memory from the global malloc pool because it needs to be
181 * referenced also when the transaction is finished. DZ - 26-08-1996
183 notifyName = strdup(relname);
184 DLAddHead(pendingNotifies, DLNewElem(notifyName));
186 ScanKeyEntryInitialize(&key, 0,
187 Anum_pg_listener_relname,
189 PointerGetDatum(notifyName));
191 lRel = heap_openr(ListenerRelationName);
192 tdesc = RelationGetDescr(lRel);
193 RelationSetLockForWrite(lRel);
194 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
196 nulls[0] = nulls[1] = nulls[2] = ' ';
197 repl[0] = repl[1] = repl[2] = ' ';
198 repl[Anum_pg_listener_notify - 1] = 'r';
199 value[0] = value[1] = value[2] = (Datum) 0;
200 value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
202 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
204 d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
205 if (!DatumGetInt32(d))
207 rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
208 heap_replace(lRel, &lTuple->t_ctid, rTuple);
209 /* notify is really issued only if a tuple has been changed */
216 * Note: if the write lock is unset we can get multiple tuples with
217 * same oid if other backends notify the same relation. Use this
218 * option at your own risk.
221 RelationUnsetLockForWrite(lRel);
225 TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
229 *--------------------------------------------------------------
230 * Async_NotifyAtCommit --
232 * This is called at transaction commit.
234 * Signal our corresponding frontend process on relations that
235 * were notified. Signal all other backend process that
236 * are listening also.
244 * Tuples in pg_listener that has our listenerpid are updated so
245 * that the notification is 0. We do not want to notify frontend
250 *--------------------------------------------------------------
253 Async_NotifyAtCommit()
262 extern TransactionState CurrentTransactionState;
264 if (!pendingNotifies)
265 pendingNotifies = DLNewList();
267 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
268 (CurrentTransactionState->blockState == TRANS_DEFAULT))
272 /* 'notify <relname>' issued by us */
274 StartTransactionCommand();
275 TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
276 ScanKeyEntryInitialize(&key, 0,
277 Anum_pg_listener_notify,
280 lRel = heap_openr(ListenerRelationName);
281 RelationSetLockForWrite(lRel);
282 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
283 tdesc = RelationGetDescr(lRel);
285 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
287 d = heap_getattr(lTuple, Anum_pg_listener_relname,
290 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
292 d = heap_getattr(lTuple, Anum_pg_listener_pid,
295 if (MyProcPid == DatumGetInt32(d))
297 notifyFrontEndPending = 1;
298 TPRINTF(TRACE_NOTIFY,
299 "Async_NotifyAtCommit: notifying self");
303 TPRINTF(TRACE_NOTIFY,
304 "Async_NotifyAtCommit: notifying pid %d",
307 if (kill(DatumGetInt32(d), SIGUSR2) < 0)
310 heap_delete(lRel, &lTuple->t_ctid);
320 * Notify the frontend inside the current transaction while we
321 * still have a valid write lock on pg_listeners. This avoid
322 * waiting until all other backends have finished with
325 if (notifyFrontEndPending)
327 /* The aux version is called inside transaction */
328 Async_NotifyFrontEnd_Aux();
331 TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
332 CommitTransactionCommand();
338 * No notifies issued by us. If notifyFrontEndPending has been
339 * set by Async_NotifyHandler notify the frontend of pending
340 * notifies from other backends.
342 if (notifyFrontEndPending)
343 Async_NotifyFrontEnd();
346 ClearPendingNotify();
351 *--------------------------------------------------------------
352 * Async_NotifyAtAbort --
354 * This is called at transaction commit.
356 * Gets rid of pending notifies. List elements are automatically
357 * freed through memory context.
366 *--------------------------------------------------------------
369 Async_NotifyAtAbort()
373 ClearPendingNotify();
374 DLFreeList(pendingNotifies);
376 pendingNotifies = DLNewList();
379 if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
380 (CurrentTransactionState->blockState == TRANS_DEFAULT))
382 /* don't forget to notify front end */
383 if (notifyFrontEndPending)
384 Async_NotifyFrontEnd();
389 *--------------------------------------------------------------
392 * This is executed by the SQL listen command.
394 * Register a backend (identified by its Unix PID) as listening
395 * on the specified relation.
397 * One listener per relation, pg_listener relation is keyed
398 * on (relname,pid) to provide multiple listeners in future.
401 * pg_listeners is updated.
406 *--------------------------------------------------------------
409 Async_Listen(char *relname, int pid)
411 Datum values[Natts_pg_listener];
412 char nulls[Natts_pg_listener];
421 int alreadyListener = 0;
425 if (whereToSendOutput != Remote)
427 elog(NOTICE, "Async_Listen: "
428 "listen not available on interactive sessions");
432 TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
433 for (i = 0; i < Natts_pg_listener; i++)
436 values[i] = PointerGetDatum(NULL);
440 values[i++] = (Datum) relname;
441 values[i++] = (Datum) pid;
442 values[i++] = (Datum) 0; /* no notifies pending */
444 lDesc = heap_openr(ListenerRelationName);
445 RelationSetLockForWrite(lDesc);
447 /* is someone already listening. One listener per relation */
448 tdesc = RelationGetDescr(lDesc);
449 scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
450 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
452 d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc,
454 relnamei = DatumGetPointer(d);
455 if (!strncmp(relnamei, relname, NAMEDATALEN))
457 d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
458 pid = DatumGetInt32(d);
459 if (pid == MyProcPid)
464 /* No need to scan the rest of the table */
472 elog(NOTICE, "Async_Listen: We are already listening on %s",
474 RelationUnsetLockForWrite(lDesc);
479 tupDesc = lDesc->rd_att;
480 newtup = heap_formtuple(tupDesc, values, nulls);
481 heap_insert(lDesc, newtup);
485 * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
486 * listener on %s (possibly dead)",relname); }
489 RelationUnsetLockForWrite(lDesc);
493 * now that we are listening, we should make a note to ourselves to
494 * unlisten prior to dying.
496 relnamei = malloc(NAMEDATALEN); /* persists to process exit */
497 StrNCpy(relnamei, relname, NAMEDATALEN);
498 on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
502 *--------------------------------------------------------------
505 * This is executed by the SQL unlisten command.
507 * Remove the backend from the list of listening backends
508 * for the specified relation.
511 * pg_listeners is updated.
516 *--------------------------------------------------------------
519 Async_Unlisten(char *relname, int pid)
524 /* Handle specially the `unlisten "*"' command */
525 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
531 TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
533 lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
538 lDesc = heap_openr(ListenerRelationName);
539 RelationSetLockForWrite(lDesc);
540 heap_delete(lDesc, &lTuple->t_ctid);
541 RelationUnsetLockForWrite(lDesc);
547 *--------------------------------------------------------------
548 * Async_UnlistenAll --
550 * Unlisten all relations for this backend.
553 * pg_listeners is updated.
558 *--------------------------------------------------------------
569 TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
570 ScanKeyEntryInitialize(&key[0], 0,
571 Anum_pg_listener_pid,
573 Int32GetDatum(MyProcPid));
574 lRel = heap_openr(ListenerRelationName);
575 RelationSetLockForWrite(lRel);
576 tdesc = RelationGetDescr(lRel);
577 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
579 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
580 heap_delete(lRel, &lTuple->t_ctid);
582 RelationUnsetLockForWrite(lRel);
584 TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
588 * --------------------------------------------------------------
589 * Async_UnlistenOnExit --
591 * This is called at backend exit for each registered listen.
596 * --------------------------------------------------------------
599 Async_UnlistenOnExit(int code, /* from exitpg */
602 Async_Unlisten((char *) relname, MyProcPid);
606 * --------------------------------------------------------------
607 * Async_NotifyFrontEnd --
609 * This is called outside transactions. The real work is done
610 * by Async_NotifyFrontEnd_Aux().
612 * --------------------------------------------------------------
615 Async_NotifyFrontEnd()
617 StartTransactionCommand();
618 Async_NotifyFrontEnd_Aux();
619 CommitTransactionCommand();
623 * --------------------------------------------------------------
624 * Async_NotifyFrontEnd_Aux --
626 * This must be called inside a transaction block.
628 * Perform an asynchronous notification to front end over
629 * portal comm channel. The name of the relation which contains the
630 * data is sent to the front end.
632 * We remove the notification flag from the pg_listener tuple
633 * associated with our process.
638 * --------------------------------------------------------------
641 Async_NotifyFrontEnd_Aux()
657 char *done[MAX_DONE];
661 notifyFrontEndPending = 0;
663 TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
664 StartTransactionCommand();
665 ScanKeyEntryInitialize(&key[0], 0,
666 Anum_pg_listener_notify,
669 ScanKeyEntryInitialize(&key[1], 0,
670 Anum_pg_listener_pid,
672 Int32GetDatum(MyProcPid));
673 lRel = heap_openr(ListenerRelationName);
674 RelationSetLockForWrite(lRel);
675 tdesc = RelationGetDescr(lRel);
676 sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
678 nulls[0] = nulls[1] = nulls[2] = ' ';
679 repl[0] = repl[1] = repl[2] = ' ';
680 repl[Anum_pg_listener_notify - 1] = 'r';
681 value[0] = value[1] = value[2] = (Datum) 0;
682 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
684 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
686 d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
690 * This hack deletes duplicate tuples which can be left in the
691 * table if the NotifyUnlock option is set. I'm further
692 * investigating this. -- dz
696 for (i = 0; i < ndone; i++)
698 if (strcmp(DatumGetName(d)->data, done[i]) == 0)
700 TPRINTF(TRACE_NOTIFY,
701 "Async_NotifyFrontEnd: duplicate %s",
702 DatumGetName(d)->data);
703 heap_delete(lRel, &lTuple->t_ctid);
707 if (ndone < MAX_DONE)
708 done[ndone++] = pstrdup(DatumGetName(d)->data);
711 rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
712 heap_replace(lRel, &lTuple->t_ctid, rTuple);
714 /* notifying the front end */
715 TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
716 DatumGetName(d)->data);
718 if (whereToSendOutput == Remote)
721 pq_putint((int32) MyProcPid, sizeof(int32));
722 pq_putstr(DatumGetName(d)->data);
727 RelationUnsetLockForWrite(lRel);
730 TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
734 AsyncExistsPendingNotify(char *relname)
738 for (p = DLGetHead(pendingNotifies);
742 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
743 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
755 while ((p = DLRemHead(pendingNotifies)) != NULL)