1 /*-------------------------------------------------------------------------
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.117 2004/09/11 18:28:33 tgl Exp $
12 *-------------------------------------------------------------------------
15 /*-------------------------------------------------------------------------
16 * New Async Notification Model:
17 * 1. Multiple backends on same machine. Multiple backends listening on
18 * one relation. (Note: "listening on a relation" is not really the
19 * right way to think about it, since the notify names need not have
20 * anything to do with the names of relations actually in the database.
21 * But this terminology is all over the code and docs, and I don't feel
22 * like trying to replace it.)
24 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25 * ie, each relname/listenerPID pair. The "notification" field of the
26 * tuple is zero when no NOTIFY is pending for that listener, or the PID
27 * of the originating backend when a cross-backend NOTIFY is pending.
28 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
29 * notification field should never be equal to the listenerPID field.)
31 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32 * relname to a list of outstanding NOTIFY requests. Actual processing
33 * happens if and only if we reach transaction commit. At that time (in
34 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
35 * If the listenerPID in a matching tuple is ours, we just send a notify
36 * message to our own front end. If it is not ours, and "notification"
37 * is not already nonzero, we set notification to our own PID and send a
38 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
39 * BTW: if the signal operation fails, we presume that the listener backend
40 * crashed without removing this tuple, and remove the tuple for it.
42 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43 * notify processing immediately if this backend is idle (ie, it is
44 * waiting for a frontend command and is not within a transaction block).
45 * Otherwise the handler may only set a flag, which will cause the
46 * processing to occur just before we next go idle.
48 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49 * matching our own listenerPID and having nonzero notification fields.
50 * For each such tuple, we send a message to our frontend and clear the
51 * notification field. BTW: this routine has to start/commit its own
52 * transaction, since by assumption it is only called from outside any
55 * Although we grab ExclusiveLock on pg_listener for any operation,
56 * the lock is never held very long, so it shouldn't cause too much of
57 * a performance problem. (Previously we used AccessExclusiveLock, but
58 * there's no real reason to forbid concurrent reads.)
60 * An application that listens on the same relname it notifies will get
61 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
62 * by comparing be_pid in the NOTIFY message to the application's own backend's
63 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
64 * frontend during startup.) The above design guarantees that notifies from
65 * other backends will never be missed by ignoring self-notifies. Note,
66 * however, that we do *not* guarantee that a separate frontend message will
67 * be sent for every outside NOTIFY. Since there is only room for one
68 * originating PID in pg_listener, outside notifies occurring at about the
69 * same time may be collapsed into a single message bearing the PID of the
70 * first outside backend to perform the NOTIFY.
71 *-------------------------------------------------------------------------
79 #include <netinet/in.h>
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89 #include "storage/sinval.h"
90 #include "tcop/tcopprot.h"
91 #include "utils/fmgroids.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
97 * State for outbound notifies consists of a list of all relnames NOTIFYed
98 * in the current transaction. We do not actually perform a NOTIFY until
99 * and unless the transaction commits. pendingNotifies is NIL if no
100 * NOTIFYs have been done in the current transaction.
102 * The list is kept in CurTransactionContext. In subtransactions, each
103 * subtransaction has its own list in its own CurTransactionContext, but
104 * successful subtransactions attach their lists to their parent's list.
105 * Failed subtransactions simply discard their lists.
107 static List *pendingNotifies = NIL;
109 static List *upperPendingNotifies = NIL; /* list of upper-xact
113 * State for inbound notifies consists of two flags: one saying whether
114 * the signal handler is currently allowed to call ProcessIncomingNotify
115 * directly, and one saying whether the signal has occurred but the handler
116 * was not allowed to call ProcessIncomingNotify at the time.
118 * NB: the "volatile" on these declarations is critical! If your compiler
119 * does not grok "volatile", you'd be best advised to compile this file
120 * with all optimization turned off.
122 static volatile int notifyInterruptEnabled = 0;
123 static volatile int notifyInterruptOccurred = 0;
125 /* True if we've registered an on_shmem_exit cleanup */
126 static bool unlistenExitRegistered = false;
128 bool Trace_notify = false;
131 static void Async_UnlistenAll(void);
132 static void Async_UnlistenOnExit(int code, Datum arg);
133 static void ProcessIncomingNotify(void);
134 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
135 static bool AsyncExistsPendingNotify(const char *relname);
136 static void ClearPendingNotifies(void);
140 *--------------------------------------------------------------
143 * This is executed by the SQL notify command.
145 * Adds the relation to the list of pending notifies.
146 * Actual notification happens during transaction commit.
147 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
152 *--------------------------------------------------------------
155 Async_Notify(char *relname)
158 elog(DEBUG1, "Async_Notify(%s)", relname);
160 /* no point in making duplicate entries in the list ... */
161 if (!AsyncExistsPendingNotify(relname))
164 * The name list needs to live until end of transaction, so store
165 * it in the transaction context.
167 MemoryContext oldcontext;
169 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
171 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
173 MemoryContextSwitchTo(oldcontext);
178 *--------------------------------------------------------------
181 * This is executed by the SQL listen command.
183 * Register a backend (identified by its Unix PID) as listening
184 * on the specified relation.
190 * pg_listener is updated.
192 *--------------------------------------------------------------
195 Async_Listen(char *relname, int pid)
200 Datum values[Natts_pg_listener];
201 char nulls[Natts_pg_listener];
203 bool alreadyListener = false;
206 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
208 lRel = heap_openr(ListenerRelationName, ExclusiveLock);
210 /* Detect whether we are already listening on this relname */
211 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
212 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
214 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
216 if (listener->listenerpid == pid &&
217 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
219 alreadyListener = true;
220 /* No need to scan the rest of the table */
228 heap_close(lRel, ExclusiveLock);
233 * OK to insert a new tuple
236 for (i = 0; i < Natts_pg_listener; i++)
239 values[i] = PointerGetDatum(NULL);
243 values[i++] = (Datum) relname;
244 values[i++] = (Datum) pid;
245 values[i++] = (Datum) 0; /* no notifies pending */
247 tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
248 simple_heap_insert(lRel, tuple);
250 #ifdef NOT_USED /* currently there are no indexes */
251 CatalogUpdateIndexes(lRel, tuple);
254 heap_freetuple(tuple);
256 heap_close(lRel, ExclusiveLock);
259 * now that we are listening, make sure we will unlisten before dying.
261 if (!unlistenExitRegistered)
263 on_shmem_exit(Async_UnlistenOnExit, 0);
264 unlistenExitRegistered = true;
269 *--------------------------------------------------------------
272 * This is executed by the SQL unlisten command.
274 * Remove the backend from the list of listening backends
275 * for the specified relation.
281 * pg_listener is updated.
283 *--------------------------------------------------------------
286 Async_Unlisten(char *relname, int pid)
292 /* Handle specially the `unlisten "*"' command */
293 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
300 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
302 lRel = heap_openr(ListenerRelationName, ExclusiveLock);
304 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
305 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
307 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
309 if (listener->listenerpid == pid &&
310 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
312 /* Found the matching tuple, delete it */
313 simple_heap_delete(lRel, &tuple->t_self);
316 * We assume there can be only one match, so no need to scan
317 * the rest of the table
324 heap_close(lRel, ExclusiveLock);
327 * We do not complain about unlistening something not being listened;
333 *--------------------------------------------------------------
336 * Unlisten all relations for this backend.
338 * This is invoked by UNLISTEN "*" command, and also at backend exit.
344 * pg_listener is updated.
346 *--------------------------------------------------------------
349 Async_UnlistenAll(void)
358 elog(DEBUG1, "Async_UnlistenAll");
360 lRel = heap_openr(ListenerRelationName, ExclusiveLock);
361 tdesc = RelationGetDescr(lRel);
363 /* Find and delete all entries with my listenerPID */
365 Anum_pg_listener_pid,
366 BTEqualStrategyNumber, F_INT4EQ,
367 Int32GetDatum(MyProcPid));
368 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
370 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
371 simple_heap_delete(lRel, &lTuple->t_self);
374 heap_close(lRel, ExclusiveLock);
378 *--------------------------------------------------------------
379 * Async_UnlistenOnExit
381 * Clean up the pg_listener table at backend exit.
383 * This is executed if we have done any LISTENs in this backend.
384 * It might not be necessary anymore, if the user UNLISTENed everything,
385 * but we don't try to detect that case.
391 * pg_listener is updated if necessary.
393 *--------------------------------------------------------------
396 Async_UnlistenOnExit(int code, Datum arg)
399 * We need to start/commit a transaction for the unlisten, but if
400 * there is already an active transaction we had better abort that one
401 * first. Otherwise we'd end up committing changes that probably
402 * ought to be discarded.
404 AbortOutOfAnyTransaction();
405 /* Now we can do the unlisten */
406 StartTransactionCommand();
408 CommitTransactionCommand();
412 *--------------------------------------------------------------
415 * This is called at transaction commit.
417 * If there are outbound notify requests in the pendingNotifies list,
418 * scan pg_listener for matching tuples, and either signal the other
419 * backend or send a message to our own frontend.
421 * NOTE: we are still inside the current transaction, therefore can
422 * piggyback on its committing of changes.
428 * Tuples in pg_listener that have matching relnames and other peoples'
429 * listenerPIDs are updated with a nonzero notification field.
431 *--------------------------------------------------------------
434 AtCommit_Notify(void)
441 Datum value[Natts_pg_listener];
442 char repl[Natts_pg_listener],
443 nulls[Natts_pg_listener];
445 if (pendingNotifies == NIL)
446 return; /* no NOTIFY statements in this
450 * NOTIFY is disabled if not normal processing mode. This test used to
451 * be in xact.c, but it seems cleaner to do it here.
453 if (!IsNormalProcessingMode())
455 ClearPendingNotifies();
460 elog(DEBUG1, "AtCommit_Notify");
462 /* preset data to update notify column to MyProcPid */
463 nulls[0] = nulls[1] = nulls[2] = ' ';
464 repl[0] = repl[1] = repl[2] = ' ';
465 repl[Anum_pg_listener_notify - 1] = 'r';
466 value[0] = value[1] = value[2] = (Datum) 0;
467 value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
469 lRel = heap_openr(ListenerRelationName, ExclusiveLock);
470 tdesc = RelationGetDescr(lRel);
471 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
473 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
475 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
476 char *relname = NameStr(listener->relname);
477 int32 listenerPID = listener->listenerpid;
479 if (!AsyncExistsPendingNotify(relname))
482 if (listenerPID == MyProcPid)
485 * Self-notify: no need to bother with table update. Indeed,
486 * we *must not* clear the notification field in this path, or
487 * we could lose an outside notify, which'd be bad for
488 * applications that ignore self-notify messages.
492 elog(DEBUG1, "AtCommit_Notify: notifying self");
494 NotifyMyFrontEnd(relname, listenerPID);
499 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
503 * If someone has already notified this listener, we don't
504 * bother modifying the table, but we do still send a SIGUSR2
505 * signal, just in case that backend missed the earlier signal
506 * for some reason. It's OK to send the signal first, because
507 * the other guy can't read pg_listener until we unlock it.
509 if (kill(listenerPID, SIGUSR2) < 0)
512 * Get rid of pg_listener entry if it refers to a PID that
513 * no longer exists. Presumably, that backend crashed
514 * without deleting its pg_listener entries. This code
515 * used to only delete the entry if errno==ESRCH, but as
516 * far as I can see we should just do it for any failure
517 * (certainly at least for EPERM too...)
519 simple_heap_delete(lRel, &lTuple->t_self);
521 else if (listener->notification == 0)
523 ItemPointerData ctid;
526 rTuple = heap_modifytuple(lTuple, lRel,
530 * We cannot use simple_heap_update here because the tuple
531 * could have been modified by an uncommitted transaction;
532 * specifically, since UNLISTEN releases exclusive lock on
533 * the table before commit, the other guy could already
534 * have tried to unlisten. There are no other cases where
535 * we should be able to see an uncommitted update or
536 * delete. Therefore, our response to a
537 * HeapTupleBeingUpdated result is just to ignore it. We
538 * do *not* wait for the other guy to commit --- that
539 * would risk deadlock, and we don't want to block while
540 * holding the table lock anyway for performance reasons.
541 * We also ignore HeapTupleUpdated, which could occur if
542 * the other guy commits between our heap_getnext and
545 result = heap_update(lRel, &lTuple->t_self, rTuple,
547 GetCurrentCommandId(), InvalidSnapshot,
548 false /* no wait for commit */ );
551 case HeapTupleSelfUpdated:
552 /* Tuple was already updated in current command? */
553 elog(ERROR, "tuple already updated by self");
556 case HeapTupleMayBeUpdated:
557 /* done successfully */
559 #ifdef NOT_USED /* currently there are no indexes */
560 CatalogUpdateIndexes(lRel, rTuple);
564 case HeapTupleBeingUpdated:
565 /* ignore uncommitted tuples */
568 case HeapTupleUpdated:
569 /* ignore just-committed tuples */
573 elog(ERROR, "unrecognized heap_update status: %u",
584 * We do NOT release the lock on pg_listener here; we need to hold it
585 * until end of transaction (which is about to happen, anyway) to
586 * ensure that notified backends see our tuple updates when they look.
587 * Else they might disregard the signal, which would make the
588 * application programmer very unhappy.
590 heap_close(lRel, NoLock);
592 ClearPendingNotifies();
595 elog(DEBUG1, "AtCommit_Notify: done");
599 *--------------------------------------------------------------
602 * This is called at transaction abort.
604 * Gets rid of pending outbound notifies that we would have executed
605 * if the transaction got committed.
610 *--------------------------------------------------------------
615 ClearPendingNotifies();
619 * AtSubStart_Notify() --- Take care of subtransaction start.
621 * Push empty state for the new subtransaction.
624 AtSubStart_Notify(void)
626 MemoryContext old_cxt;
628 /* Keep the list-of-lists in TopTransactionContext for simplicity */
629 old_cxt = MemoryContextSwitchTo(TopTransactionContext);
631 upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
633 Assert(list_length(upperPendingNotifies) ==
634 GetCurrentTransactionNestLevel() - 1);
636 pendingNotifies = NIL;
638 MemoryContextSwitchTo(old_cxt);
642 * AtSubCommit_Notify() --- Take care of subtransaction commit.
644 * Reassign all items in the pending notifies list to the parent transaction.
647 AtSubCommit_Notify(void)
649 List *parentPendingNotifies;
651 parentPendingNotifies = (List *) linitial(upperPendingNotifies);
652 upperPendingNotifies = list_delete_first(upperPendingNotifies);
654 Assert(list_length(upperPendingNotifies) ==
655 GetCurrentTransactionNestLevel() - 2);
658 * We could try to eliminate duplicates here, but it seems not
661 pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
665 * AtSubAbort_Notify() --- Take care of subtransaction abort.
668 AtSubAbort_Notify(void)
670 int my_level = GetCurrentTransactionNestLevel();
673 * All we have to do is pop the stack --- the notifies made in this
674 * subxact are no longer interesting, and the space will be freed when
675 * CurTransactionContext is recycled.
677 * This routine could be called more than once at a given nesting level
678 * if there is trouble during subxact abort. Avoid dumping core by
679 * using GetCurrentTransactionNestLevel as the indicator of how far
680 * we need to prune the list.
682 while (list_length(upperPendingNotifies) > my_level - 2)
684 pendingNotifies = (List *) linitial(upperPendingNotifies);
685 upperPendingNotifies = list_delete_first(upperPendingNotifies);
690 *--------------------------------------------------------------
691 * NotifyInterruptHandler
693 * This is the signal handler for SIGUSR2.
695 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
696 * ProcessIncomingNotify directly. Otherwise, just set a flag
704 *--------------------------------------------------------------
707 NotifyInterruptHandler(SIGNAL_ARGS)
709 int save_errno = errno;
712 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
713 * here. Some helpful soul had this routine sprinkled with TPRINTFs,
714 * which would likely lead to corruption of stdio buffers if they were
718 /* Don't joggle the elbow of proc_exit */
719 if (proc_exit_inprogress)
722 if (notifyInterruptEnabled)
724 bool save_ImmediateInterruptOK = ImmediateInterruptOK;
727 * We may be called while ImmediateInterruptOK is true; turn it
728 * off while messing with the NOTIFY state. (We would have to
729 * save and restore it anyway, because PGSemaphore operations
730 * inside ProcessIncomingNotify() might reset it.)
732 ImmediateInterruptOK = false;
735 * I'm not sure whether some flavors of Unix might allow another
736 * SIGUSR2 occurrence to recursively interrupt this routine. To
737 * cope with the possibility, we do the same sort of dance that
738 * EnableNotifyInterrupt must do --- see that routine for
741 notifyInterruptEnabled = 0; /* disable any recursive signal */
742 notifyInterruptOccurred = 1; /* do at least one iteration */
745 notifyInterruptEnabled = 1;
746 if (!notifyInterruptOccurred)
748 notifyInterruptEnabled = 0;
749 if (notifyInterruptOccurred)
751 /* Here, it is finally safe to do stuff. */
753 elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
755 ProcessIncomingNotify();
758 elog(DEBUG1, "NotifyInterruptHandler: done");
763 * Restore ImmediateInterruptOK, and check for interrupts if
766 ImmediateInterruptOK = save_ImmediateInterruptOK;
767 if (save_ImmediateInterruptOK)
768 CHECK_FOR_INTERRUPTS();
773 * In this path it is NOT SAFE to do much of anything, except
776 notifyInterruptOccurred = 1;
783 * --------------------------------------------------------------
784 * EnableNotifyInterrupt
786 * This is called by the PostgresMain main loop just before waiting
787 * for a frontend command. If we are truly idle (ie, *not* inside
788 * a transaction block), then process any pending inbound notifies,
789 * and enable the signal handler to process future notifies directly.
791 * NOTE: the signal handler starts out disabled, and stays so until
792 * PostgresMain calls this the first time.
793 * --------------------------------------------------------------
796 EnableNotifyInterrupt(void)
798 if (IsTransactionOrTransactionBlock())
799 return; /* not really idle */
802 * This code is tricky because we are communicating with a signal
803 * handler that could interrupt us at any point. If we just checked
804 * notifyInterruptOccurred and then set notifyInterruptEnabled, we
805 * could fail to respond promptly to a signal that happens in between
806 * those two steps. (A very small time window, perhaps, but Murphy's
807 * Law says you can hit it...) Instead, we first set the enable flag,
808 * then test the occurred flag. If we see an unserviced interrupt has
809 * occurred, we re-clear the enable flag before going off to do the
810 * service work. (That prevents re-entrant invocation of
811 * ProcessIncomingNotify() if another interrupt occurs.) If an
812 * interrupt comes in between the setting and clearing of
813 * notifyInterruptEnabled, then it will have done the service work and
814 * left notifyInterruptOccurred zero, so we have to check again after
815 * clearing enable. The whole thing has to be in a loop in case
816 * another interrupt occurs while we're servicing the first. Once we
817 * get out of the loop, enable is set and we know there is no
818 * unserviced interrupt.
820 * NB: an overenthusiastic optimizing compiler could easily break this
821 * code. Hopefully, they all understand what "volatile" means these
826 notifyInterruptEnabled = 1;
827 if (!notifyInterruptOccurred)
829 notifyInterruptEnabled = 0;
830 if (notifyInterruptOccurred)
833 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
835 ProcessIncomingNotify();
838 elog(DEBUG1, "EnableNotifyInterrupt: done");
844 * --------------------------------------------------------------
845 * DisableNotifyInterrupt
847 * This is called by the PostgresMain main loop just after receiving
848 * a frontend command. Signal handler execution of inbound notifies
849 * is disabled until the next EnableNotifyInterrupt call.
851 * The SIGUSR1 signal handler also needs to call this, so as to
852 * prevent conflicts if one signal interrupts the other. So we
853 * must return the previous state of the flag.
854 * --------------------------------------------------------------
857 DisableNotifyInterrupt(void)
859 bool result = (notifyInterruptEnabled != 0);
861 notifyInterruptEnabled = 0;
867 * --------------------------------------------------------------
868 * ProcessIncomingNotify
870 * Deal with arriving NOTIFYs from other backends.
871 * This is called either directly from the SIGUSR2 signal handler,
872 * or the next time control reaches the outer idle loop.
873 * Scan pg_listener for arriving notifies, report them to my front end,
874 * and clear the notification field in pg_listener until next time.
876 * NOTE: since we are outside any transaction, we must create our own.
877 * --------------------------------------------------------------
880 ProcessIncomingNotify(void)
888 Datum value[Natts_pg_listener];
889 char repl[Natts_pg_listener],
890 nulls[Natts_pg_listener];
891 bool catchup_enabled;
893 /* Must prevent SIGUSR1 interrupt while I am running */
894 catchup_enabled = DisableCatchupInterrupt();
897 elog(DEBUG1, "ProcessIncomingNotify");
899 set_ps_display("notify interrupt");
901 notifyInterruptOccurred = 0;
903 StartTransactionCommand();
905 lRel = heap_openr(ListenerRelationName, ExclusiveLock);
906 tdesc = RelationGetDescr(lRel);
908 /* Scan only entries with my listenerPID */
910 Anum_pg_listener_pid,
911 BTEqualStrategyNumber, F_INT4EQ,
912 Int32GetDatum(MyProcPid));
913 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
915 /* Prepare data for rewriting 0 into notification field */
916 nulls[0] = nulls[1] = nulls[2] = ' ';
917 repl[0] = repl[1] = repl[2] = ' ';
918 repl[Anum_pg_listener_notify - 1] = 'r';
919 value[0] = value[1] = value[2] = (Datum) 0;
920 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
922 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
924 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
925 char *relname = NameStr(listener->relname);
926 int32 sourcePID = listener->notification;
930 /* Notify the frontend */
933 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
934 relname, (int) sourcePID);
936 NotifyMyFrontEnd(relname, sourcePID);
939 * Rewrite the tuple with 0 in notification column.
941 * simple_heap_update is safe here because no one else would have
942 * tried to UNLISTEN us, so there can be no uncommitted
945 rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
946 simple_heap_update(lRel, &lTuple->t_self, rTuple);
948 #ifdef NOT_USED /* currently there are no indexes */
949 CatalogUpdateIndexes(lRel, rTuple);
956 * We do NOT release the lock on pg_listener here; we need to hold it
957 * until end of transaction (which is about to happen, anyway) to
958 * ensure that other backends see our tuple updates when they look.
959 * Otherwise, a transaction started after this one might mistakenly
960 * think it doesn't need to send this backend a new NOTIFY.
962 heap_close(lRel, NoLock);
964 CommitTransactionCommand();
967 * Must flush the notify messages to ensure frontend gets them
972 set_ps_display("idle");
975 elog(DEBUG1, "ProcessIncomingNotify: done");
978 EnableCatchupInterrupt();
982 * Send NOTIFY message to my front end.
985 NotifyMyFrontEnd(char *relname, int32 listenerPID)
987 if (whereToSendOutput == Remote)
991 pq_beginmessage(&buf, 'A');
992 pq_sendint(&buf, listenerPID, sizeof(int32));
993 pq_sendstring(&buf, relname);
994 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
996 /* XXX Add parameter string here later */
997 pq_sendstring(&buf, "");
1002 * NOTE: we do not do pq_flush() here. For a self-notify, it will
1003 * happen at the end of the transaction, and for incoming notifies
1004 * ProcessIncomingNotify will do it after finding all the
1009 elog(INFO, "NOTIFY for %s", relname);
1012 /* Does pendingNotifies include the given relname? */
1014 AsyncExistsPendingNotify(const char *relname)
1018 foreach(p, pendingNotifies)
1020 /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
1021 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
1028 /* Clear the pendingNotifies list. */
1030 ClearPendingNotifies(void)
1033 * We used to have to explicitly deallocate the list members and
1034 * nodes, because they were malloc'd. Now, since we know they are
1035 * palloc'd in CurTransactionContext, we need not do that --- they'll
1036 * go away automatically at transaction exit. We need only reset the
1037 * list head pointer.
1039 pendingNotifies = NIL;