1 /*-------------------------------------------------------------------------
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.82 2002/03/02 21:39:22 momjian Exp $
12 *-------------------------------------------------------------------------
15 /*-------------------------------------------------------------------------
16 * New Async Notification Model:
17 * 1. Multiple backends on same machine. Multiple backends listening on
18 * one relation. (Note: "listening on a relation" is not really the
19 * right way to think about it, since the notify names need not have
20 * anything to do with the names of relations actually in the database.
21 * But this terminology is all over the code and docs, and I don't feel
22 * like trying to replace it.)
24 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25 * ie, each relname/listenerPID pair. The "notification" field of the
26 * tuple is zero when no NOTIFY is pending for that listener, or the PID
27 * of the originating backend when a cross-backend NOTIFY is pending.
28 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
29 * notification field should never be equal to the listenerPID field.)
31 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32 * relname to a list of outstanding NOTIFY requests. Actual processing
33 * happens if and only if we reach transaction commit. At that time (in
34 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
35 * If the listenerPID in a matching tuple is ours, we just send a notify
36 * message to our own front end. If it is not ours, and "notification"
37 * is not already nonzero, we set notification to our own PID and send a
38 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
39 * BTW: if the signal operation fails, we presume that the listener backend
40 * crashed without removing this tuple, and remove the tuple for it.
42 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43 * notify processing immediately if this backend is idle (ie, it is
44 * waiting for a frontend command and is not within a transaction block).
45 * Otherwise the handler may only set a flag, which will cause the
46 * processing to occur just before we next go idle.
48 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49 * matching our own listenerPID and having nonzero notification fields.
50 * For each such tuple, we send a message to our frontend and clear the
51 * notification field. BTW: this routine has to start/commit its own
52 * transaction, since by assumption it is only called from outside any
55 * Although we grab AccessExclusiveLock on pg_listener for any operation,
56 * the lock is never held very long, so it shouldn't cause too much of
57 * a performance problem.
59 * An application that listens on the same relname it notifies will get
60 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
61 * by comparing be_pid in the NOTIFY message to the application's own backend's
62 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
63 * frontend during startup.) The above design guarantees that notifies from
64 * other backends will never be missed by ignoring self-notifies. Note,
65 * however, that we do *not* guarantee that a separate frontend message will
66 * be sent for every outside NOTIFY. Since there is only room for one
67 * originating PID in pg_listener, outside notifies occurring at about the
68 * same time may be collapsed into a single message bearing the PID of the
69 * first outside backend to perform the NOTIFY.
70 *-------------------------------------------------------------------------
78 #include <sys/types.h>
79 #include <netinet/in.h>
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "tcop/tcopprot.h"
89 #include "utils/fmgroids.h"
90 #include "utils/ps_status.h"
91 #include "utils/syscache.h"
94 /* stuff that we really ought not be touching directly :-( */
95 extern TransactionState CurrentTransactionState;
99 * State for outbound notifies consists of a list of all relnames NOTIFYed
100 * in the current transaction. We do not actually perform a NOTIFY until
101 * and unless the transaction commits. pendingNotifies is NIL if no
102 * NOTIFYs have been done in the current transaction. The List nodes and
103 * referenced strings are all palloc'd in TopTransactionContext.
105 static List *pendingNotifies = NIL;
108 * State for inbound notifies consists of two flags: one saying whether
109 * the signal handler is currently allowed to call ProcessIncomingNotify
110 * directly, and one saying whether the signal has occurred but the handler
111 * was not allowed to call ProcessIncomingNotify at the time.
113 * NB: the "volatile" on these declarations is critical! If your compiler
114 * does not grok "volatile", you'd be best advised to compile this file
115 * with all optimization turned off.
117 static volatile int notifyInterruptEnabled = 0;
118 static volatile int notifyInterruptOccurred = 0;
120 /* True if we've registered an on_shmem_exit cleanup */
121 static bool unlistenExitRegistered = false;
123 bool Trace_notify = false;
126 static void Async_UnlistenAll(void);
127 static void Async_UnlistenOnExit(void);
128 static void ProcessIncomingNotify(void);
129 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
130 static bool AsyncExistsPendingNotify(const char *relname);
131 static void ClearPendingNotifies(void);
135 *--------------------------------------------------------------
138 * This is executed by the SQL notify command.
140 * Adds the relation to the list of pending notifies.
141 * Actual notification happens during transaction commit.
142 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
147 *--------------------------------------------------------------
150 Async_Notify(char *relname)
153 elog(LOG, "Async_Notify: %s", relname);
155 /* no point in making duplicate entries in the list ... */
156 if (!AsyncExistsPendingNotify(relname))
159 * The name list needs to live until end of transaction, so store
160 * it in the top transaction context.
162 MemoryContext oldcontext;
164 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
166 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
168 MemoryContextSwitchTo(oldcontext);
173 *--------------------------------------------------------------
176 * This is executed by the SQL listen command.
178 * Register a backend (identified by its Unix PID) as listening
179 * on the specified relation.
185 * pg_listener is updated.
187 *--------------------------------------------------------------
190 Async_Listen(char *relname, int pid)
195 Datum values[Natts_pg_listener];
196 char nulls[Natts_pg_listener];
198 bool alreadyListener = false;
201 elog(LOG, "Async_Listen: %s", relname);
203 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
205 /* Detect whether we are already listening on this relname */
206 scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
207 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
209 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
211 if (listener->listenerpid == pid &&
212 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
214 alreadyListener = true;
215 /* No need to scan the rest of the table */
223 heap_close(lRel, AccessExclusiveLock);
224 elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
229 * OK to insert a new tuple
232 for (i = 0; i < Natts_pg_listener; i++)
235 values[i] = PointerGetDatum(NULL);
239 values[i++] = (Datum) relname;
240 values[i++] = (Datum) pid;
241 values[i++] = (Datum) 0; /* no notifies pending */
243 tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
244 heap_insert(lRel, tuple);
246 #ifdef NOT_USED /* currently there are no indexes */
247 if (RelationGetForm(lRel)->relhasindex)
249 Relation idescs[Num_pg_listener_indices];
251 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
252 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
253 CatalogCloseIndices(Num_pg_listener_indices, idescs);
257 heap_freetuple(tuple);
259 heap_close(lRel, AccessExclusiveLock);
262 * now that we are listening, make sure we will unlisten before dying.
264 if (!unlistenExitRegistered)
266 on_shmem_exit(Async_UnlistenOnExit, 0);
267 unlistenExitRegistered = true;
272 *--------------------------------------------------------------
275 * This is executed by the SQL unlisten command.
277 * Remove the backend from the list of listening backends
278 * for the specified relation.
284 * pg_listener is updated.
286 *--------------------------------------------------------------
289 Async_Unlisten(char *relname, int pid)
295 /* Handle specially the `unlisten "*"' command */
296 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
303 elog(LOG, "Async_Unlisten %s", relname);
305 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
307 scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
308 while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
310 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
312 if (listener->listenerpid == pid &&
313 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
315 /* Found the matching tuple, delete it */
316 simple_heap_delete(lRel, &tuple->t_self);
319 * We assume there can be only one match, so no need to scan
320 * the rest of the table
327 heap_close(lRel, AccessExclusiveLock);
330 * We do not complain about unlistening something not being listened;
336 *--------------------------------------------------------------
339 * Unlisten all relations for this backend.
341 * This is invoked by UNLISTEN "*" command, and also at backend exit.
347 * pg_listener is updated.
349 *--------------------------------------------------------------
352 Async_UnlistenAll(void)
361 elog(LOG, "Async_UnlistenAll");
363 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
364 tdesc = RelationGetDescr(lRel);
366 /* Find and delete all entries with my listenerPID */
367 ScanKeyEntryInitialize(&key[0], 0,
368 Anum_pg_listener_pid,
370 Int32GetDatum(MyProcPid));
371 scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
373 while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
374 simple_heap_delete(lRel, &lTuple->t_self);
377 heap_close(lRel, AccessExclusiveLock);
381 *--------------------------------------------------------------
382 * Async_UnlistenOnExit
384 * Clean up the pg_listener table at backend exit.
386 * This is executed if we have done any LISTENs in this backend.
387 * It might not be necessary anymore, if the user UNLISTENed everything,
388 * but we don't try to detect that case.
394 * pg_listener is updated if necessary.
396 *--------------------------------------------------------------
399 Async_UnlistenOnExit(void)
402 * We need to start/commit a transaction for the unlisten, but if
403 * there is already an active transaction we had better abort that one
404 * first. Otherwise we'd end up committing changes that probably
405 * ought to be discarded.
407 AbortOutOfAnyTransaction();
408 /* Now we can do the unlisten */
409 StartTransactionCommand();
411 CommitTransactionCommand();
415 *--------------------------------------------------------------
418 * This is called at transaction commit.
420 * If there are outbound notify requests in the pendingNotifies list,
421 * scan pg_listener for matching tuples, and either signal the other
422 * backend or send a message to our own frontend.
424 * NOTE: we are still inside the current transaction, therefore can
425 * piggyback on its committing of changes.
431 * Tuples in pg_listener that have matching relnames and other peoples'
432 * listenerPIDs are updated with a nonzero notification field.
434 *--------------------------------------------------------------
437 AtCommit_Notify(void)
444 Datum value[Natts_pg_listener];
445 char repl[Natts_pg_listener],
446 nulls[Natts_pg_listener];
448 if (pendingNotifies == NIL)
449 return; /* no NOTIFY statements in this
453 * NOTIFY is disabled if not normal processing mode. This test used to
454 * be in xact.c, but it seems cleaner to do it here.
456 if (!IsNormalProcessingMode())
458 ClearPendingNotifies();
463 elog(LOG, "AtCommit_Notify");
465 /* preset data to update notify column to MyProcPid */
466 nulls[0] = nulls[1] = nulls[2] = ' ';
467 repl[0] = repl[1] = repl[2] = ' ';
468 repl[Anum_pg_listener_notify - 1] = 'r';
469 value[0] = value[1] = value[2] = (Datum) 0;
470 value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
472 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
473 tdesc = RelationGetDescr(lRel);
474 scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
476 while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
478 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
479 char *relname = NameStr(listener->relname);
480 int32 listenerPID = listener->listenerpid;
482 if (!AsyncExistsPendingNotify(relname))
485 if (listenerPID == MyProcPid)
488 * Self-notify: no need to bother with table update. Indeed,
489 * we *must not* clear the notification field in this path, or
490 * we could lose an outside notify, which'd be bad for
491 * applications that ignore self-notify messages.
495 elog(LOG, "AtCommit_Notify: notifying self");
497 NotifyMyFrontEnd(relname, listenerPID);
502 elog(LOG, "AtCommit_Notify: notifying pid %d",
506 * If someone has already notified this listener, we don't
507 * bother modifying the table, but we do still send a SIGUSR2
508 * signal, just in case that backend missed the earlier signal
509 * for some reason. It's OK to send the signal first, because
510 * the other guy can't read pg_listener until we unlock it.
512 if (kill(listenerPID, SIGUSR2) < 0)
515 * Get rid of pg_listener entry if it refers to a PID that
516 * no longer exists. Presumably, that backend crashed
517 * without deleting its pg_listener entries. This code
518 * used to only delete the entry if errno==ESRCH, but as
519 * far as I can see we should just do it for any failure
520 * (certainly at least for EPERM too...)
522 simple_heap_delete(lRel, &lTuple->t_self);
524 else if (listener->notification == 0)
526 rTuple = heap_modifytuple(lTuple, lRel,
528 simple_heap_update(lRel, &lTuple->t_self, rTuple);
530 #ifdef NOT_USED /* currently there are no indexes */
531 if (RelationGetForm(lRel)->relhasindex)
533 Relation idescs[Num_pg_listener_indices];
535 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
536 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
537 CatalogCloseIndices(Num_pg_listener_indices, idescs);
547 * We do NOT release the lock on pg_listener here; we need to hold it
548 * until end of transaction (which is about to happen, anyway) to
549 * ensure that notified backends see our tuple updates when they look.
550 * Else they might disregard the signal, which would make the
551 * application programmer very unhappy.
553 heap_close(lRel, NoLock);
555 ClearPendingNotifies();
558 elog(LOG, "AtCommit_Notify: done");
562 *--------------------------------------------------------------
565 * This is called at transaction abort.
567 * Gets rid of pending outbound notifies that we would have executed
568 * if the transaction got committed.
573 *--------------------------------------------------------------
578 ClearPendingNotifies();
582 *--------------------------------------------------------------
583 * Async_NotifyHandler
585 * This is the signal handler for SIGUSR2.
587 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
588 * ProcessIncomingNotify directly. Otherwise, just set a flag
596 *--------------------------------------------------------------
599 Async_NotifyHandler(SIGNAL_ARGS)
601 int save_errno = errno;
604 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
605 * here. Some helpful soul had this routine sprinkled with TPRINTFs,
606 * which would likely lead to corruption of stdio buffers if they were
610 if (notifyInterruptEnabled)
613 * I'm not sure whether some flavors of Unix might allow another
614 * SIGUSR2 occurrence to recursively interrupt this routine. To
615 * cope with the possibility, we do the same sort of dance that
616 * EnableNotifyInterrupt must do --- see that routine for
619 notifyInterruptEnabled = 0; /* disable any recursive signal */
620 notifyInterruptOccurred = 1; /* do at least one iteration */
623 notifyInterruptEnabled = 1;
624 if (!notifyInterruptOccurred)
626 notifyInterruptEnabled = 0;
627 if (notifyInterruptOccurred)
629 /* Here, it is finally safe to do stuff. */
631 elog(LOG, "Async_NotifyHandler: perform async notify");
633 ProcessIncomingNotify();
636 elog(LOG, "Async_NotifyHandler: done");
643 * In this path it is NOT SAFE to do much of anything, except
646 notifyInterruptOccurred = 1;
653 * --------------------------------------------------------------
654 * EnableNotifyInterrupt
656 * This is called by the PostgresMain main loop just before waiting
657 * for a frontend command. If we are truly idle (ie, *not* inside
658 * a transaction block), then process any pending inbound notifies,
659 * and enable the signal handler to process future notifies directly.
661 * NOTE: the signal handler starts out disabled, and stays so until
662 * PostgresMain calls this the first time.
663 * --------------------------------------------------------------
666 EnableNotifyInterrupt(void)
668 if (CurrentTransactionState->blockState != TRANS_DEFAULT)
669 return; /* not really idle */
672 * This code is tricky because we are communicating with a signal
673 * handler that could interrupt us at any point. If we just checked
674 * notifyInterruptOccurred and then set notifyInterruptEnabled, we
675 * could fail to respond promptly to a signal that happens in between
676 * those two steps. (A very small time window, perhaps, but Murphy's
677 * Law says you can hit it...) Instead, we first set the enable flag,
678 * then test the occurred flag. If we see an unserviced interrupt has
679 * occurred, we re-clear the enable flag before going off to do the
680 * service work. (That prevents re-entrant invocation of
681 * ProcessIncomingNotify() if another interrupt occurs.) If an
682 * interrupt comes in between the setting and clearing of
683 * notifyInterruptEnabled, then it will have done the service work and
684 * left notifyInterruptOccurred zero, so we have to check again after
685 * clearing enable. The whole thing has to be in a loop in case
686 * another interrupt occurs while we're servicing the first. Once we
687 * get out of the loop, enable is set and we know there is no
688 * unserviced interrupt.
690 * NB: an overenthusiastic optimizing compiler could easily break this
691 * code. Hopefully, they all understand what "volatile" means these
696 notifyInterruptEnabled = 1;
697 if (!notifyInterruptOccurred)
699 notifyInterruptEnabled = 0;
700 if (notifyInterruptOccurred)
703 elog(LOG, "EnableNotifyInterrupt: perform async notify");
705 ProcessIncomingNotify();
708 elog(LOG, "EnableNotifyInterrupt: done");
714 * --------------------------------------------------------------
715 * DisableNotifyInterrupt
717 * This is called by the PostgresMain main loop just after receiving
718 * a frontend command. Signal handler execution of inbound notifies
719 * is disabled until the next EnableNotifyInterrupt call.
720 * --------------------------------------------------------------
723 DisableNotifyInterrupt(void)
725 notifyInterruptEnabled = 0;
729 * --------------------------------------------------------------
730 * ProcessIncomingNotify
732 * Deal with arriving NOTIFYs from other backends.
733 * This is called either directly from the SIGUSR2 signal handler,
734 * or the next time control reaches the outer idle loop.
735 * Scan pg_listener for arriving notifies, report them to my front end,
736 * and clear the notification field in pg_listener until next time.
738 * NOTE: since we are outside any transaction, we must create our own.
743 * --------------------------------------------------------------
746 ProcessIncomingNotify(void)
754 Datum value[Natts_pg_listener];
755 char repl[Natts_pg_listener],
756 nulls[Natts_pg_listener];
759 elog(LOG, "ProcessIncomingNotify");
761 set_ps_display("async_notify");
763 notifyInterruptOccurred = 0;
765 StartTransactionCommand();
767 lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
768 tdesc = RelationGetDescr(lRel);
770 /* Scan only entries with my listenerPID */
771 ScanKeyEntryInitialize(&key[0], 0,
772 Anum_pg_listener_pid,
774 Int32GetDatum(MyProcPid));
775 scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
777 /* Prepare data for rewriting 0 into notification field */
778 nulls[0] = nulls[1] = nulls[2] = ' ';
779 repl[0] = repl[1] = repl[2] = ' ';
780 repl[Anum_pg_listener_notify - 1] = 'r';
781 value[0] = value[1] = value[2] = (Datum) 0;
782 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
784 while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
786 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
787 char *relname = NameStr(listener->relname);
788 int32 sourcePID = listener->notification;
792 /* Notify the frontend */
795 elog(LOG, "ProcessIncomingNotify: received %s from %d",
796 relname, (int) sourcePID);
798 NotifyMyFrontEnd(relname, sourcePID);
799 /* Rewrite the tuple with 0 in notification column */
800 rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
801 simple_heap_update(lRel, &lTuple->t_self, rTuple);
803 #ifdef NOT_USED /* currently there are no indexes */
804 if (RelationGetForm(lRel)->relhasindex)
806 Relation idescs[Num_pg_listener_indices];
808 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
809 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
810 CatalogCloseIndices(Num_pg_listener_indices, idescs);
818 * We do NOT release the lock on pg_listener here; we need to hold it
819 * until end of transaction (which is about to happen, anyway) to
820 * ensure that other backends see our tuple updates when they look.
821 * Otherwise, a transaction started after this one might mistakenly
822 * think it doesn't need to send this backend a new NOTIFY.
824 heap_close(lRel, NoLock);
826 CommitTransactionCommand();
829 * Must flush the notify messages to ensure frontend gets them
834 set_ps_display("idle");
837 elog(LOG, "ProcessIncomingNotify: done");
841 * Send NOTIFY message to my front end.
844 NotifyMyFrontEnd(char *relname, int32 listenerPID)
846 if (whereToSendOutput == Remote)
850 pq_beginmessage(&buf);
851 pq_sendbyte(&buf, 'A');
852 pq_sendint(&buf, listenerPID, sizeof(int32));
853 pq_sendstring(&buf, relname);
857 * NOTE: we do not do pq_flush() here. For a self-notify, it will
858 * happen at the end of the transaction, and for incoming notifies
859 * ProcessIncomingNotify will do it after finding all the
864 elog(INFO, "NOTIFY for %s", relname);
867 /* Does pendingNotifies include the given relname? */
869 AsyncExistsPendingNotify(const char *relname)
873 foreach(p, pendingNotifies)
875 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
876 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
883 /* Clear the pendingNotifies list. */
885 ClearPendingNotifies(void)
888 * We used to have to explicitly deallocate the list members and
889 * nodes, because they were malloc'd. Now, since we know they are
890 * palloc'd in TopTransactionContext, we need not do that --- they'll
891 * go away automatically at transaction exit. We need only reset the
894 pendingNotifies = NIL;