1 /*-------------------------------------------------------------------------
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Copyright (c) 1994, Regents of the University of California
9 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.56 1999/11/24 00:44:29 momjian Exp $
11 *-------------------------------------------------------------------------
14 /*-------------------------------------------------------------------------
15 * New Async Notification Model:
16 * 1. Multiple backends on same machine. Multiple backends listening on
17 * one relation. (Note: "listening on a relation" is not really the
18 * right way to think about it, since the notify names need not have
19 * anything to do with the names of relations actually in the database.
20 * But this terminology is all over the code and docs, and I don't feel
21 * like trying to replace it.)
23 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
24 * ie, each relname/listenerPID pair. The "notification" field of the
25 * tuple is zero when no NOTIFY is pending for that listener, or the PID
26 * of the originating backend when a cross-backend NOTIFY is pending.
27 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
28 * notification field should never be equal to the listenerPID field.)
30 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
31 * relname to a list of outstanding NOTIFY requests. Actual processing
32 * happens if and only if we reach transaction commit. At that time (in
33 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
34 * If the listenerPID in a matching tuple is ours, we just send a notify
35 * message to our own front end. If it is not ours, and "notification"
36 * is not already nonzero, we set notification to our own PID and send a
37 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
38 * BTW: if the signal operation fails, we presume that the listener backend
39 * crashed without removing this tuple, and remove the tuple for it.
41 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
42 * notify processing immediately if this backend is idle (ie, it is
43 * waiting for a frontend command and is not within a transaction block).
44 * Otherwise the handler may only set a flag, which will cause the
45 * processing to occur just before we next go idle.
47 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
48 * matching our own listenerPID and having nonzero notification fields.
49 * For each such tuple, we send a message to our frontend and clear the
50 * notification field. BTW: this routine has to start/commit its own
51 * transaction, since by assumption it is only called from outside any
54 * Although we grab AccessExclusiveLock on pg_listener for any operation,
55 * the lock is never held very long, so it shouldn't cause too much of
56 * a performance problem.
58 * An application that listens on the same relname it notifies will get
59 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
60 * by comparing be_pid in the NOTIFY message to the application's own backend's
61 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
62 * frontend during startup.) The above design guarantees that notifies from
63 * other backends will never be missed by ignoring self-notifies. Note,
64 * however, that we do *not* guarantee that a separate frontend message will
65 * be sent for every outside NOTIFY. Since there is only room for one
66 * originating PID in pg_listener, outside notifies occurring at about the
67 * same time may be collapsed into a single message bearing the PID of the
68 * first outside backend to perform the NOTIFY.
69 *-------------------------------------------------------------------------
75 #include <sys/types.h>
76 #include <netinet/in.h>
80 #include "access/heapam.h"
81 #include "catalog/catname.h"
82 #include "catalog/indexing.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "lib/dllist.h"
86 #include "libpq/libpq.h"
87 #include "libpq/pqformat.h"
88 #include "miscadmin.h"
89 #include "utils/ps_status.h"
90 #include "utils/syscache.h"
91 #include "utils/trace.h"
93 /* stuff that we really ought not be touching directly :-( */
94 extern TransactionState CurrentTransactionState;
95 extern CommandDest whereToSendOutput;
98 * State for outbound notifies consists of a list of all relnames NOTIFYed
99 * in the current transaction. We do not actually perform a NOTIFY until
100 * and unless the transaction commits. pendingNotifies is NULL if no
101 * NOTIFYs have been done in the current transaction.
103 static Dllist *pendingNotifies = NULL;
106 * State for inbound notifies consists of two flags: one saying whether
107 * the signal handler is currently allowed to call ProcessIncomingNotify
108 * directly, and one saying whether the signal has occurred but the handler
109 * was not allowed to call ProcessIncomingNotify at the time.
111 * NB: the "volatile" on these declarations is critical! If your compiler
112 * does not grok "volatile", you'd be best advised to compile this file
113 * with all optimization turned off.
115 static volatile int notifyInterruptEnabled = 0;
116 static volatile int notifyInterruptOccurred = 0;
118 /* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
119 static int unlistenExitRegistered = 0;
122 static void Async_UnlistenAll(void);
123 static void Async_UnlistenOnExit(void);
124 static void ProcessIncomingNotify(void);
125 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
126 static int AsyncExistsPendingNotify(char *relname);
127 static void ClearPendingNotifies(void);
131 *--------------------------------------------------------------
134 * This is executed by the SQL notify command.
136 * Adds the relation to the list of pending notifies.
137 * Actual notification happens during transaction commit.
138 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
143 *--------------------------------------------------------------
146 Async_Notify(char *relname)
150 TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
152 if (!pendingNotifies)
153 pendingNotifies = DLNewList();
154 /* no point in making duplicate entries in the list ... */
155 if (!AsyncExistsPendingNotify(relname))
158 * We allocate list memory from the global malloc pool to ensure
159 * that it will live until we want to use it. This is probably not
160 * necessary any longer, since we will use it before the end of the
161 * transaction. DLList only knows how to use malloc() anyway, but we
162 * could probably palloc() the strings...
164 notifyName = strdup(relname);
165 DLAddHead(pendingNotifies, DLNewElem(notifyName));
170 *--------------------------------------------------------------
173 * This is executed by the SQL listen command.
175 * Register a backend (identified by its Unix PID) as listening
176 * on the specified relation.
182 * pg_listener is updated.
184 *--------------------------------------------------------------
187 Async_Listen(char *relname, int pid)
194 Datum values[Natts_pg_listener];
195 char nulls[Natts_pg_listener];
199 int alreadyListener = 0;
202 TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
204 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
205 tdesc = RelationGetDescr(lRel);
207 /* Detect whether we are already listening on this relname */
208 scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
209 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
211 d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
212 if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
214 d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
215 if (DatumGetInt32(d) == pid)
218 /* No need to scan the rest of the table */
227 heap_close(lRel, AccessExclusiveLock);
228 elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
233 * OK to insert a new tuple
236 for (i = 0; i < Natts_pg_listener; i++)
239 values[i] = PointerGetDatum(NULL);
243 values[i++] = (Datum) relname;
244 values[i++] = (Datum) pid;
245 values[i++] = (Datum) 0; /* no notifies pending */
247 tupDesc = lRel->rd_att;
248 newtup = heap_formtuple(tupDesc, values, nulls);
249 heap_insert(lRel, newtup);
252 heap_close(lRel, AccessExclusiveLock);
255 * now that we are listening, make sure we will unlisten before dying.
257 if (!unlistenExitRegistered)
259 if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
260 elog(NOTICE, "Async_Listen: out of shmem_exit slots");
261 unlistenExitRegistered = 1;
266 *--------------------------------------------------------------
269 * This is executed by the SQL unlisten command.
271 * Remove the backend from the list of listening backends
272 * for the specified relation.
278 * pg_listener is updated.
280 *--------------------------------------------------------------
283 Async_Unlisten(char *relname, int pid)
288 /* Handle specially the `unlisten "*"' command */
289 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
295 TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
297 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
298 /* Note we assume there can be only one matching tuple. */
299 lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
303 heap_delete(lRel, &lTuple->t_self, NULL);
304 heap_close(lRel, AccessExclusiveLock);
307 * We do not complain about unlistening something not being listened;
313 *--------------------------------------------------------------
316 * Unlisten all relations for this backend.
318 * This is invoked by UNLISTEN "*" command, and also at backend exit.
324 * pg_listener is updated.
326 *--------------------------------------------------------------
337 TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
339 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
340 tdesc = RelationGetDescr(lRel);
342 /* Find and delete all entries with my listenerPID */
343 ScanKeyEntryInitialize(&key[0], 0,
344 Anum_pg_listener_pid,
346 Int32GetDatum(MyProcPid));
347 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
349 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
350 heap_delete(lRel, &lTuple->t_self, NULL);
353 heap_close(lRel, AccessExclusiveLock);
357 *--------------------------------------------------------------
358 * Async_UnlistenOnExit
360 * Clean up the pg_listener table at backend exit.
362 * This is executed if we have done any LISTENs in this backend.
363 * It might not be necessary anymore, if the user UNLISTENed everything,
364 * but we don't try to detect that case.
370 * pg_listener is updated if necessary.
372 *--------------------------------------------------------------
375 Async_UnlistenOnExit()
379 * We need to start/commit a transaction for the unlisten, but if
380 * there is already an active transaction we had better abort that one
381 * first. Otherwise we'd end up committing changes that probably
382 * ought to be discarded.
384 AbortOutOfAnyTransaction();
385 /* Now we can do the unlisten */
386 StartTransactionCommand();
388 CommitTransactionCommand();
392 *--------------------------------------------------------------
395 * This is called at transaction commit.
397 * If there are outbound notify requests in the pendingNotifies list,
398 * scan pg_listener for matching tuples, and either signal the other
399 * backend or send a message to our own frontend.
401 * NOTE: we are still inside the current transaction, therefore can
402 * piggyback on its committing of changes.
408 * Tuples in pg_listener that have matching relnames and other peoples'
409 * listenerPIDs are updated with a nonzero notification field.
411 *--------------------------------------------------------------
422 value[Natts_pg_listener];
423 char repl[Natts_pg_listener],
424 nulls[Natts_pg_listener];
429 if (!pendingNotifies)
430 return; /* no NOTIFY statements in this
434 * NOTIFY is disabled if not normal processing mode. This test used to
435 * be in xact.c, but it seems cleaner to do it here.
437 if (!IsNormalProcessingMode())
439 ClearPendingNotifies();
443 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
445 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
446 tdesc = RelationGetDescr(lRel);
447 sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
449 /* preset data to update notify column to MyProcPid */
450 nulls[0] = nulls[1] = nulls[2] = ' ';
451 repl[0] = repl[1] = repl[2] = ' ';
452 repl[Anum_pg_listener_notify - 1] = 'r';
453 value[0] = value[1] = value[2] = (Datum) 0;
454 value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
456 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
458 d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
459 relname = (char *) DatumGetPointer(d);
461 if (AsyncExistsPendingNotify(relname))
463 d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
464 listenerPID = DatumGetInt32(d);
466 if (listenerPID == MyProcPid)
469 * Self-notify: no need to bother with table update.
470 * Indeed, we *must not* clear the notification field in
471 * this path, or we could lose an outside notify, which'd
472 * be bad for applications that ignore self-notify
475 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
476 NotifyMyFrontEnd(relname, listenerPID);
480 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
484 * If someone has already notified this listener, we don't
485 * bother modifying the table, but we do still send a
486 * SIGUSR2 signal, just in case that backend missed the
487 * earlier signal for some reason. It's OK to send the
488 * signal first, because the other guy can't read
489 * pg_listener until we unlock it.
491 if (kill(listenerPID, SIGUSR2) < 0)
494 * Get rid of pg_listener entry if it refers to a PID
495 * that no longer exists. Presumably, that backend
496 * crashed without deleting its pg_listener entries.
497 * This code used to only delete the entry if
498 * errno==ESRCH, but as far as I can see we should
499 * just do it for any failure (certainly at least for
502 heap_delete(lRel, &lTuple->t_self, NULL);
506 d = heap_getattr(lTuple, Anum_pg_listener_notify,
508 if (DatumGetInt32(d) == 0)
510 rTuple = heap_modifytuple(lTuple, lRel,
512 heap_update(lRel, &lTuple->t_self, rTuple, NULL);
513 if (RelationGetForm(lRel)->relhasindex)
515 Relation idescs[Num_pg_listener_indices];
517 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
518 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
519 CatalogCloseIndices(Num_pg_listener_indices, idescs);
530 * We do NOT release the lock on pg_listener here; we need to hold it
531 * until end of transaction (which is about to happen, anyway) to
532 * ensure that notified backends see our tuple updates when they look.
533 * Else they might disregard the signal, which would make the
534 * application programmer very unhappy.
536 heap_close(lRel, NoLock);
538 ClearPendingNotifies();
540 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
544 *--------------------------------------------------------------
547 * This is called at transaction abort.
549 * Gets rid of pending outbound notifies that we would have executed
550 * if the transaction got committed.
555 *--------------------------------------------------------------
560 ClearPendingNotifies();
564 *--------------------------------------------------------------
565 * Async_NotifyHandler
567 * This is the signal handler for SIGUSR2.
569 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
570 * ProcessIncomingNotify directly. Otherwise, just set a flag
578 *--------------------------------------------------------------
582 Async_NotifyHandler(SIGNAL_ARGS)
586 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
587 * here. Some helpful soul had this routine sprinkled with TPRINTFs,
588 * which would likely lead to corruption of stdio buffers if they were
592 if (notifyInterruptEnabled)
596 * I'm not sure whether some flavors of Unix might allow another
597 * SIGUSR2 occurrence to recursively interrupt this routine. To
598 * cope with the possibility, we do the same sort of dance that
599 * EnableNotifyInterrupt must do --- see that routine for
602 notifyInterruptEnabled = 0; /* disable any recursive signal */
603 notifyInterruptOccurred = 1; /* do at least one iteration */
606 notifyInterruptEnabled = 1;
607 if (!notifyInterruptOccurred)
609 notifyInterruptEnabled = 0;
610 if (notifyInterruptOccurred)
612 /* Here, it is finally safe to do stuff. */
613 TPRINTF(TRACE_NOTIFY,
614 "Async_NotifyHandler: perform async notify");
615 ProcessIncomingNotify();
616 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
624 * In this path it is NOT SAFE to do much of anything, except
627 notifyInterruptOccurred = 1;
632 * --------------------------------------------------------------
633 * EnableNotifyInterrupt
635 * This is called by the PostgresMain main loop just before waiting
636 * for a frontend command. If we are truly idle (ie, *not* inside
637 * a transaction block), then process any pending inbound notifies,
638 * and enable the signal handler to process future notifies directly.
640 * NOTE: the signal handler starts out disabled, and stays so until
641 * PostgresMain calls this the first time.
642 * --------------------------------------------------------------
646 EnableNotifyInterrupt(void)
648 if (CurrentTransactionState->blockState != TRANS_DEFAULT)
649 return; /* not really idle */
652 * This code is tricky because we are communicating with a signal
653 * handler that could interrupt us at any point. If we just checked
654 * notifyInterruptOccurred and then set notifyInterruptEnabled, we
655 * could fail to respond promptly to a signal that happens in between
656 * those two steps. (A very small time window, perhaps, but Murphy's
657 * Law says you can hit it...) Instead, we first set the enable flag,
658 * then test the occurred flag. If we see an unserviced interrupt has
659 * occurred, we re-clear the enable flag before going off to do the
660 * service work. (That prevents re-entrant invocation of
661 * ProcessIncomingNotify() if another interrupt occurs.) If an
662 * interrupt comes in between the setting and clearing of
663 * notifyInterruptEnabled, then it will have done the service work and
664 * left notifyInterruptOccurred zero, so we have to check again after
665 * clearing enable. The whole thing has to be in a loop in case
666 * another interrupt occurs while we're servicing the first. Once we
667 * get out of the loop, enable is set and we know there is no
668 * unserviced interrupt.
670 * NB: an overenthusiastic optimizing compiler could easily break this
671 * code. Hopefully, they all understand what "volatile" means these
676 notifyInterruptEnabled = 1;
677 if (!notifyInterruptOccurred)
679 notifyInterruptEnabled = 0;
680 if (notifyInterruptOccurred)
682 TPRINTF(TRACE_NOTIFY,
683 "EnableNotifyInterrupt: perform async notify");
684 ProcessIncomingNotify();
685 TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
691 * --------------------------------------------------------------
692 * DisableNotifyInterrupt
694 * This is called by the PostgresMain main loop just after receiving
695 * a frontend command. Signal handler execution of inbound notifies
696 * is disabled until the next EnableNotifyInterrupt call.
697 * --------------------------------------------------------------
701 DisableNotifyInterrupt(void)
703 notifyInterruptEnabled = 0;
707 * --------------------------------------------------------------
708 * ProcessIncomingNotify
710 * Deal with arriving NOTIFYs from other backends.
711 * This is called either directly from the SIGUSR2 signal handler,
712 * or the next time control reaches the outer idle loop.
713 * Scan pg_listener for arriving notifies, report them to my front end,
714 * and clear the notification field in pg_listener until next time.
716 * NOTE: since we are outside any transaction, we must create our own.
721 * --------------------------------------------------------------
724 ProcessIncomingNotify(void)
733 value[Natts_pg_listener];
734 char repl[Natts_pg_listener],
735 nulls[Natts_pg_listener];
740 TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
741 PS_SET_STATUS("async_notify");
743 notifyInterruptOccurred = 0;
745 StartTransactionCommand();
747 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
748 tdesc = RelationGetDescr(lRel);
750 /* Scan only entries with my listenerPID */
751 ScanKeyEntryInitialize(&key[0], 0,
752 Anum_pg_listener_pid,
754 Int32GetDatum(MyProcPid));
755 sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
757 /* Prepare data for rewriting 0 into notification field */
758 nulls[0] = nulls[1] = nulls[2] = ' ';
759 repl[0] = repl[1] = repl[2] = ' ';
760 repl[Anum_pg_listener_notify - 1] = 'r';
761 value[0] = value[1] = value[2] = (Datum) 0;
762 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
764 while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
766 d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
767 sourcePID = DatumGetInt32(d);
770 d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
771 relname = (char *) DatumGetPointer(d);
772 /* Notify the frontend */
773 TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
774 relname, (int) sourcePID);
775 NotifyMyFrontEnd(relname, sourcePID);
776 /* Rewrite the tuple with 0 in notification column */
777 rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
778 heap_update(lRel, &lTuple->t_self, rTuple, NULL);
779 if (RelationGetForm(lRel)->relhasindex)
781 Relation idescs[Num_pg_listener_indices];
783 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
784 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
785 CatalogCloseIndices(Num_pg_listener_indices, idescs);
792 * We do NOT release the lock on pg_listener here; we need to hold it
793 * until end of transaction (which is about to happen, anyway) to
794 * ensure that other backends see our tuple updates when they look.
795 * Otherwise, a transaction started after this one might mistakenly
796 * think it doesn't need to send this backend a new NOTIFY.
798 heap_close(lRel, NoLock);
800 CommitTransactionCommand();
803 * Must flush the notify messages to ensure frontend gets them
808 PS_SET_STATUS("idle");
809 TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
812 /* Send NOTIFY message to my front end. */
815 NotifyMyFrontEnd(char *relname, int32 listenerPID)
817 if (whereToSendOutput == Remote)
821 pq_beginmessage(&buf);
822 pq_sendbyte(&buf, 'A');
823 pq_sendint(&buf, listenerPID, sizeof(int32));
824 pq_sendstring(&buf, relname);
828 * NOTE: we do not do pq_flush() here. For a self-notify, it will
829 * happen at the end of the transaction, and for incoming notifies
830 * ProcessIncomingNotify will do it after finding all the
835 elog(NOTICE, "NOTIFY for %s", relname);
838 /* Does pendingNotifies include the given relname?
840 * NB: not called unless pendingNotifies != NULL.
844 AsyncExistsPendingNotify(char *relname)
848 for (p = DLGetHead(pendingNotifies);
852 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
853 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
860 /* Clear the pendingNotifies list. */
863 ClearPendingNotifies()
871 * Since the referenced strings are malloc'd, we have to scan the
872 * list and delete them individually. If we used palloc for the
873 * strings then we could just do DLFreeList to get rid of both the
874 * list nodes and the list base...
876 while ((p = DLRemHead(pendingNotifies)) != NULL)
881 DLFreeList(pendingNotifies);
882 pendingNotifies = NULL;