1 /*-------------------------------------------------------------------------
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.132 2006/06/27 22:16:43 momjian Exp $
12 *-------------------------------------------------------------------------
15 /*-------------------------------------------------------------------------
16 * New Async Notification Model:
17 * 1. Multiple backends on same machine. Multiple backends listening on
18 * one relation. (Note: "listening on a relation" is not really the
19 * right way to think about it, since the notify names need not have
20 * anything to do with the names of relations actually in the database.
21 * But this terminology is all over the code and docs, and I don't feel
22 * like trying to replace it.)
24 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25 * ie, each relname/listenerPID pair. The "notification" field of the
26 * tuple is zero when no NOTIFY is pending for that listener, or the PID
27 * of the originating backend when a cross-backend NOTIFY is pending.
28 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
29 * notification field should never be equal to the listenerPID field.)
31 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32 * relname to a list of outstanding NOTIFY requests. Actual processing
33 * happens if and only if we reach transaction commit. At that time (in
34 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
35 * If the listenerPID in a matching tuple is ours, we just send a notify
36 * message to our own front end. If it is not ours, and "notification"
37 * is not already nonzero, we set notification to our own PID and send a
38 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
39 * BTW: if the signal operation fails, we presume that the listener backend
40 * crashed without removing this tuple, and remove the tuple for it.
42 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43 * notify processing immediately if this backend is idle (ie, it is
44 * waiting for a frontend command and is not within a transaction block).
45 * Otherwise the handler may only set a flag, which will cause the
46 * processing to occur just before we next go idle.
48 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49 * matching our own listenerPID and having nonzero notification fields.
50 * For each such tuple, we send a message to our frontend and clear the
51 * notification field. BTW: this routine has to start/commit its own
52 * transaction, since by assumption it is only called from outside any
55 * Although we grab ExclusiveLock on pg_listener for any operation,
56 * the lock is never held very long, so it shouldn't cause too much of
57 * a performance problem. (Previously we used AccessExclusiveLock, but
58 * there's no real reason to forbid concurrent reads.)
60 * An application that listens on the same relname it notifies will get
61 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
62 * by comparing be_pid in the NOTIFY message to the application's own backend's
63 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
64 * frontend during startup.) The above design guarantees that notifies from
65 * other backends will never be missed by ignoring self-notifies. Note,
66 * however, that we do *not* guarantee that a separate frontend message will
67 * be sent for every outside NOTIFY. Since there is only room for one
68 * originating PID in pg_listener, outside notifies occurring at about the
69 * same time may be collapsed into a single message bearing the PID of the
70 * first outside backend to perform the NOTIFY.
71 *-------------------------------------------------------------------------
78 #include <netinet/in.h>
80 #include "access/heapam.h"
81 #include "access/twophase_rmgr.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "miscadmin.h"
87 #include "storage/ipc.h"
88 #include "storage/sinval.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/fmgroids.h"
91 #include "utils/memutils.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
97 * State for outbound notifies consists of a list of all relnames NOTIFYed
98 * in the current transaction. We do not actually perform a NOTIFY until
99 * and unless the transaction commits. pendingNotifies is NIL if no
100 * NOTIFYs have been done in the current transaction.
102 * The list is kept in CurTransactionContext. In subtransactions, each
103 * subtransaction has its own list in its own CurTransactionContext, but
104 * successful subtransactions attach their lists to their parent's list.
105 * Failed subtransactions simply discard their lists.
107 static List *pendingNotifies = NIL;
109 static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
112 * State for inbound notifies consists of two flags: one saying whether
113 * the signal handler is currently allowed to call ProcessIncomingNotify
114 * directly, and one saying whether the signal has occurred but the handler
115 * was not allowed to call ProcessIncomingNotify at the time.
117 * NB: the "volatile" on these declarations is critical! If your compiler
118 * does not grok "volatile", you'd be best advised to compile this file
119 * with all optimization turned off.
121 static volatile int notifyInterruptEnabled = 0;
122 static volatile int notifyInterruptOccurred = 0;
124 /* True if we've registered an on_shmem_exit cleanup */
125 static bool unlistenExitRegistered = false;
127 bool Trace_notify = false;
130 static void Async_UnlistenAll(void);
131 static void Async_UnlistenOnExit(int code, Datum arg);
132 static void ProcessIncomingNotify(void);
133 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
134 static bool AsyncExistsPendingNotify(const char *relname);
135 static void ClearPendingNotifies(void);
139 *--------------------------------------------------------------
142 * This is executed by the SQL notify command.
144 * Adds the relation to the list of pending notifies.
145 * Actual notification happens during transaction commit.
146 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
148 *--------------------------------------------------------------
151 Async_Notify(const char *relname)
154 elog(DEBUG1, "Async_Notify(%s)", relname);
156 /* no point in making duplicate entries in the list ... */
157 if (!AsyncExistsPendingNotify(relname))
160 * The name list needs to live until end of transaction, so store it
161 * in the transaction context.
163 MemoryContext oldcontext;
165 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
167 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
169 MemoryContextSwitchTo(oldcontext);
174 *--------------------------------------------------------------
177 * This is executed by the SQL listen command.
179 * Register the current backend as listening on the specified
183 * pg_listener is updated.
185 *--------------------------------------------------------------
188 Async_Listen(const char *relname)
193 Datum values[Natts_pg_listener];
194 char nulls[Natts_pg_listener];
196 bool alreadyListener = false;
199 elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
201 lRel = heap_open(ListenerRelationId, ExclusiveLock);
203 /* Detect whether we are already listening on this relname */
204 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
205 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
207 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
209 if (listener->listenerpid == MyProcPid &&
210 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
212 alreadyListener = true;
213 /* No need to scan the rest of the table */
221 heap_close(lRel, ExclusiveLock);
226 * OK to insert a new tuple
229 for (i = 0; i < Natts_pg_listener; i++)
232 values[i] = PointerGetDatum(NULL);
236 values[i++] = (Datum) relname;
237 values[i++] = (Datum) MyProcPid;
238 values[i++] = (Datum) 0; /* no notifies pending */
240 tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
241 simple_heap_insert(lRel, tuple);
243 #ifdef NOT_USED /* currently there are no indexes */
244 CatalogUpdateIndexes(lRel, tuple);
247 heap_freetuple(tuple);
249 heap_close(lRel, ExclusiveLock);
252 * now that we are listening, make sure we will unlisten before dying.
254 if (!unlistenExitRegistered)
256 on_shmem_exit(Async_UnlistenOnExit, 0);
257 unlistenExitRegistered = true;
262 *--------------------------------------------------------------
265 * This is executed by the SQL unlisten command.
267 * Remove the current backend from the list of listening backends
268 * for the specified relation.
271 * pg_listener is updated.
273 *--------------------------------------------------------------
276 Async_Unlisten(const char *relname)
282 /* Handle specially the `unlisten "*"' command */
283 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
290 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
292 lRel = heap_open(ListenerRelationId, ExclusiveLock);
294 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
295 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
297 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
299 if (listener->listenerpid == MyProcPid &&
300 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
302 /* Found the matching tuple, delete it */
303 simple_heap_delete(lRel, &tuple->t_self);
306 * We assume there can be only one match, so no need to scan the
314 heap_close(lRel, ExclusiveLock);
317 * We do not complain about unlistening something not being listened;
323 *--------------------------------------------------------------
326 * Unlisten all relations for this backend.
328 * This is invoked by UNLISTEN "*" command, and also at backend exit.
334 * pg_listener is updated.
336 *--------------------------------------------------------------
339 Async_UnlistenAll(void)
348 elog(DEBUG1, "Async_UnlistenAll");
350 lRel = heap_open(ListenerRelationId, ExclusiveLock);
351 tdesc = RelationGetDescr(lRel);
353 /* Find and delete all entries with my listenerPID */
355 Anum_pg_listener_pid,
356 BTEqualStrategyNumber, F_INT4EQ,
357 Int32GetDatum(MyProcPid));
358 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
360 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
361 simple_heap_delete(lRel, &lTuple->t_self);
364 heap_close(lRel, ExclusiveLock);
368 *--------------------------------------------------------------
369 * Async_UnlistenOnExit
371 * Clean up the pg_listener table at backend exit.
373 * This is executed if we have done any LISTENs in this backend.
374 * It might not be necessary anymore, if the user UNLISTENed everything,
375 * but we don't try to detect that case.
381 * pg_listener is updated if necessary.
383 *--------------------------------------------------------------
386 Async_UnlistenOnExit(int code, Datum arg)
389 * We need to start/commit a transaction for the unlisten, but if there is
390 * already an active transaction we had better abort that one first.
391 * Otherwise we'd end up committing changes that probably ought to be
394 AbortOutOfAnyTransaction();
395 /* Now we can do the unlisten */
396 StartTransactionCommand();
398 CommitTransactionCommand();
403 *--------------------------------------------------------------
406 * This is called at the prepare phase of a two-phase
407 * transaction. Save the state for possible commit later.
408 *--------------------------------------------------------------
411 AtPrepare_Notify(void)
415 foreach(p, pendingNotifies)
417 const char *relname = (const char *) lfirst(p);
419 RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
420 relname, strlen(relname) + 1);
424 * We can clear the state immediately, rather than needing a separate
425 * PostPrepare call, because if the transaction fails we'd just discard
428 ClearPendingNotifies();
432 *--------------------------------------------------------------
435 * This is called at transaction commit.
437 * If there are outbound notify requests in the pendingNotifies list,
438 * scan pg_listener for matching tuples, and either signal the other
439 * backend or send a message to our own frontend.
441 * NOTE: we are still inside the current transaction, therefore can
442 * piggyback on its committing of changes.
448 * Tuples in pg_listener that have matching relnames and other peoples'
449 * listenerPIDs are updated with a nonzero notification field.
451 *--------------------------------------------------------------
454 AtCommit_Notify(void)
461 Datum value[Natts_pg_listener];
462 char repl[Natts_pg_listener],
463 nulls[Natts_pg_listener];
465 if (pendingNotifies == NIL)
466 return; /* no NOTIFY statements in this transaction */
469 * NOTIFY is disabled if not normal processing mode. This test used to be
470 * in xact.c, but it seems cleaner to do it here.
472 if (!IsNormalProcessingMode())
474 ClearPendingNotifies();
479 elog(DEBUG1, "AtCommit_Notify");
481 /* preset data to update notify column to MyProcPid */
482 nulls[0] = nulls[1] = nulls[2] = ' ';
483 repl[0] = repl[1] = repl[2] = ' ';
484 repl[Anum_pg_listener_notify - 1] = 'r';
485 value[0] = value[1] = value[2] = (Datum) 0;
486 value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
488 lRel = heap_open(ListenerRelationId, ExclusiveLock);
489 tdesc = RelationGetDescr(lRel);
490 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
492 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
494 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
495 char *relname = NameStr(listener->relname);
496 int32 listenerPID = listener->listenerpid;
498 if (!AsyncExistsPendingNotify(relname))
501 if (listenerPID == MyProcPid)
504 * Self-notify: no need to bother with table update. Indeed, we
505 * *must not* clear the notification field in this path, or we
506 * could lose an outside notify, which'd be bad for applications
507 * that ignore self-notify messages.
511 elog(DEBUG1, "AtCommit_Notify: notifying self");
513 NotifyMyFrontEnd(relname, listenerPID);
518 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
522 * If someone has already notified this listener, we don't bother
523 * modifying the table, but we do still send a SIGUSR2 signal,
524 * just in case that backend missed the earlier signal for some
525 * reason. It's OK to send the signal first, because the other
526 * guy can't read pg_listener until we unlock it.
528 if (kill(listenerPID, SIGUSR2) < 0)
531 * Get rid of pg_listener entry if it refers to a PID that no
532 * longer exists. Presumably, that backend crashed without
533 * deleting its pg_listener entries. This code used to only
534 * delete the entry if errno==ESRCH, but as far as I can see
535 * we should just do it for any failure (certainly at least
538 simple_heap_delete(lRel, &lTuple->t_self);
540 else if (listener->notification == 0)
543 ItemPointerData update_ctid;
544 TransactionId update_xmax;
546 rTuple = heap_modifytuple(lTuple, tdesc,
550 * We cannot use simple_heap_update here because the tuple
551 * could have been modified by an uncommitted transaction;
552 * specifically, since UNLISTEN releases exclusive lock on the
553 * table before commit, the other guy could already have tried
554 * to unlisten. There are no other cases where we should be
555 * able to see an uncommitted update or delete. Therefore, our
556 * response to a HeapTupleBeingUpdated result is just to
557 * ignore it. We do *not* wait for the other guy to commit
558 * --- that would risk deadlock, and we don't want to block
559 * while holding the table lock anyway for performance
560 * reasons. We also ignore HeapTupleUpdated, which could occur
561 * if the other guy commits between our heap_getnext and
564 result = heap_update(lRel, &lTuple->t_self, rTuple,
565 &update_ctid, &update_xmax,
566 GetCurrentCommandId(), InvalidSnapshot,
567 false /* no wait for commit */ );
570 case HeapTupleSelfUpdated:
571 /* Tuple was already updated in current command? */
572 elog(ERROR, "tuple already updated by self");
575 case HeapTupleMayBeUpdated:
576 /* done successfully */
577 #ifdef NOT_USED /* currently there are no indexes */
578 CatalogUpdateIndexes(lRel, rTuple);
582 case HeapTupleBeingUpdated:
583 /* ignore uncommitted tuples */
586 case HeapTupleUpdated:
587 /* ignore just-committed tuples */
591 elog(ERROR, "unrecognized heap_update status: %u",
602 * We do NOT release the lock on pg_listener here; we need to hold it
603 * until end of transaction (which is about to happen, anyway) to ensure
604 * that notified backends see our tuple updates when they look. Else they
605 * might disregard the signal, which would make the application programmer
608 heap_close(lRel, NoLock);
610 ClearPendingNotifies();
613 elog(DEBUG1, "AtCommit_Notify: done");
617 *--------------------------------------------------------------
620 * This is called at transaction abort.
622 * Gets rid of pending outbound notifies that we would have executed
623 * if the transaction got committed.
628 *--------------------------------------------------------------
633 ClearPendingNotifies();
637 * AtSubStart_Notify() --- Take care of subtransaction start.
639 * Push empty state for the new subtransaction.
642 AtSubStart_Notify(void)
644 MemoryContext old_cxt;
646 /* Keep the list-of-lists in TopTransactionContext for simplicity */
647 old_cxt = MemoryContextSwitchTo(TopTransactionContext);
649 upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
651 Assert(list_length(upperPendingNotifies) ==
652 GetCurrentTransactionNestLevel() - 1);
654 pendingNotifies = NIL;
656 MemoryContextSwitchTo(old_cxt);
660 * AtSubCommit_Notify() --- Take care of subtransaction commit.
662 * Reassign all items in the pending notifies list to the parent transaction.
665 AtSubCommit_Notify(void)
667 List *parentPendingNotifies;
669 parentPendingNotifies = (List *) linitial(upperPendingNotifies);
670 upperPendingNotifies = list_delete_first(upperPendingNotifies);
672 Assert(list_length(upperPendingNotifies) ==
673 GetCurrentTransactionNestLevel() - 2);
676 * We could try to eliminate duplicates here, but it seems not worthwhile.
678 pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
682 * AtSubAbort_Notify() --- Take care of subtransaction abort.
685 AtSubAbort_Notify(void)
687 int my_level = GetCurrentTransactionNestLevel();
690 * All we have to do is pop the stack --- the notifies made in this
691 * subxact are no longer interesting, and the space will be freed when
692 * CurTransactionContext is recycled.
694 * This routine could be called more than once at a given nesting level if
695 * there is trouble during subxact abort. Avoid dumping core by using
696 * GetCurrentTransactionNestLevel as the indicator of how far we need to
699 while (list_length(upperPendingNotifies) > my_level - 2)
701 pendingNotifies = (List *) linitial(upperPendingNotifies);
702 upperPendingNotifies = list_delete_first(upperPendingNotifies);
707 *--------------------------------------------------------------
708 * NotifyInterruptHandler
710 * This is the signal handler for SIGUSR2.
712 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
713 * ProcessIncomingNotify directly. Otherwise, just set a flag
721 *--------------------------------------------------------------
724 NotifyInterruptHandler(SIGNAL_ARGS)
726 int save_errno = errno;
729 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
730 * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
731 * would likely lead to corruption of stdio buffers if they were ever
735 /* Don't joggle the elbow of proc_exit */
736 if (proc_exit_inprogress)
739 if (notifyInterruptEnabled)
741 bool save_ImmediateInterruptOK = ImmediateInterruptOK;
744 * We may be called while ImmediateInterruptOK is true; turn it off
745 * while messing with the NOTIFY state. (We would have to save and
746 * restore it anyway, because PGSemaphore operations inside
747 * ProcessIncomingNotify() might reset it.)
749 ImmediateInterruptOK = false;
752 * I'm not sure whether some flavors of Unix might allow another
753 * SIGUSR2 occurrence to recursively interrupt this routine. To cope
754 * with the possibility, we do the same sort of dance that
755 * EnableNotifyInterrupt must do --- see that routine for comments.
757 notifyInterruptEnabled = 0; /* disable any recursive signal */
758 notifyInterruptOccurred = 1; /* do at least one iteration */
761 notifyInterruptEnabled = 1;
762 if (!notifyInterruptOccurred)
764 notifyInterruptEnabled = 0;
765 if (notifyInterruptOccurred)
767 /* Here, it is finally safe to do stuff. */
769 elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
771 ProcessIncomingNotify();
774 elog(DEBUG1, "NotifyInterruptHandler: done");
779 * Restore ImmediateInterruptOK, and check for interrupts if needed.
781 ImmediateInterruptOK = save_ImmediateInterruptOK;
782 if (save_ImmediateInterruptOK)
783 CHECK_FOR_INTERRUPTS();
788 * In this path it is NOT SAFE to do much of anything, except this:
790 notifyInterruptOccurred = 1;
797 * --------------------------------------------------------------
798 * EnableNotifyInterrupt
800 * This is called by the PostgresMain main loop just before waiting
801 * for a frontend command. If we are truly idle (ie, *not* inside
802 * a transaction block), then process any pending inbound notifies,
803 * and enable the signal handler to process future notifies directly.
805 * NOTE: the signal handler starts out disabled, and stays so until
806 * PostgresMain calls this the first time.
807 * --------------------------------------------------------------
810 EnableNotifyInterrupt(void)
812 if (IsTransactionOrTransactionBlock())
813 return; /* not really idle */
816 * This code is tricky because we are communicating with a signal handler
817 * that could interrupt us at any point. If we just checked
818 * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
819 * fail to respond promptly to a signal that happens in between those two
820 * steps. (A very small time window, perhaps, but Murphy's Law says you
821 * can hit it...) Instead, we first set the enable flag, then test the
822 * occurred flag. If we see an unserviced interrupt has occurred, we
823 * re-clear the enable flag before going off to do the service work. (That
824 * prevents re-entrant invocation of ProcessIncomingNotify() if another
825 * interrupt occurs.) If an interrupt comes in between the setting and
826 * clearing of notifyInterruptEnabled, then it will have done the service
827 * work and left notifyInterruptOccurred zero, so we have to check again
828 * after clearing enable. The whole thing has to be in a loop in case
829 * another interrupt occurs while we're servicing the first. Once we get
830 * out of the loop, enable is set and we know there is no unserviced
833 * NB: an overenthusiastic optimizing compiler could easily break this
834 * code. Hopefully, they all understand what "volatile" means these days.
838 notifyInterruptEnabled = 1;
839 if (!notifyInterruptOccurred)
841 notifyInterruptEnabled = 0;
842 if (notifyInterruptOccurred)
845 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
847 ProcessIncomingNotify();
850 elog(DEBUG1, "EnableNotifyInterrupt: done");
856 * --------------------------------------------------------------
857 * DisableNotifyInterrupt
859 * This is called by the PostgresMain main loop just after receiving
860 * a frontend command. Signal handler execution of inbound notifies
861 * is disabled until the next EnableNotifyInterrupt call.
863 * The SIGUSR1 signal handler also needs to call this, so as to
864 * prevent conflicts if one signal interrupts the other. So we
865 * must return the previous state of the flag.
866 * --------------------------------------------------------------
869 DisableNotifyInterrupt(void)
871 bool result = (notifyInterruptEnabled != 0);
873 notifyInterruptEnabled = 0;
879 * --------------------------------------------------------------
880 * ProcessIncomingNotify
882 * Deal with arriving NOTIFYs from other backends.
883 * This is called either directly from the SIGUSR2 signal handler,
884 * or the next time control reaches the outer idle loop.
885 * Scan pg_listener for arriving notifies, report them to my front end,
886 * and clear the notification field in pg_listener until next time.
888 * NOTE: since we are outside any transaction, we must create our own.
889 * --------------------------------------------------------------
892 ProcessIncomingNotify(void)
900 Datum value[Natts_pg_listener];
901 char repl[Natts_pg_listener],
902 nulls[Natts_pg_listener];
903 bool catchup_enabled;
905 /* Must prevent SIGUSR1 interrupt while I am running */
906 catchup_enabled = DisableCatchupInterrupt();
909 elog(DEBUG1, "ProcessIncomingNotify");
911 set_ps_display("notify interrupt", false);
913 notifyInterruptOccurred = 0;
915 StartTransactionCommand();
917 lRel = heap_open(ListenerRelationId, ExclusiveLock);
918 tdesc = RelationGetDescr(lRel);
920 /* Scan only entries with my listenerPID */
922 Anum_pg_listener_pid,
923 BTEqualStrategyNumber, F_INT4EQ,
924 Int32GetDatum(MyProcPid));
925 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
927 /* Prepare data for rewriting 0 into notification field */
928 nulls[0] = nulls[1] = nulls[2] = ' ';
929 repl[0] = repl[1] = repl[2] = ' ';
930 repl[Anum_pg_listener_notify - 1] = 'r';
931 value[0] = value[1] = value[2] = (Datum) 0;
932 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
934 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
936 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
937 char *relname = NameStr(listener->relname);
938 int32 sourcePID = listener->notification;
942 /* Notify the frontend */
945 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
946 relname, (int) sourcePID);
948 NotifyMyFrontEnd(relname, sourcePID);
951 * Rewrite the tuple with 0 in notification column.
953 * simple_heap_update is safe here because no one else would have
954 * tried to UNLISTEN us, so there can be no uncommitted changes.
956 rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
957 simple_heap_update(lRel, &lTuple->t_self, rTuple);
959 #ifdef NOT_USED /* currently there are no indexes */
960 CatalogUpdateIndexes(lRel, rTuple);
967 * We do NOT release the lock on pg_listener here; we need to hold it
968 * until end of transaction (which is about to happen, anyway) to ensure
969 * that other backends see our tuple updates when they look. Otherwise, a
970 * transaction started after this one might mistakenly think it doesn't
971 * need to send this backend a new NOTIFY.
973 heap_close(lRel, NoLock);
975 CommitTransactionCommand();
978 * Must flush the notify messages to ensure frontend gets them promptly.
982 set_ps_display("idle", false);
985 elog(DEBUG1, "ProcessIncomingNotify: done");
988 EnableCatchupInterrupt();
992 * Send NOTIFY message to my front end.
995 NotifyMyFrontEnd(char *relname, int32 listenerPID)
997 if (whereToSendOutput == DestRemote)
1001 pq_beginmessage(&buf, 'A');
1002 pq_sendint(&buf, listenerPID, sizeof(int32));
1003 pq_sendstring(&buf, relname);
1004 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1006 /* XXX Add parameter string here later */
1007 pq_sendstring(&buf, "");
1009 pq_endmessage(&buf);
1012 * NOTE: we do not do pq_flush() here. For a self-notify, it will
1013 * happen at the end of the transaction, and for incoming notifies
1014 * ProcessIncomingNotify will do it after finding all the notifies.
1018 elog(INFO, "NOTIFY for %s", relname);
1021 /* Does pendingNotifies include the given relname? */
1023 AsyncExistsPendingNotify(const char *relname)
1027 foreach(p, pendingNotifies)
1029 const char *prelname = (const char *) lfirst(p);
1031 if (strcmp(prelname, relname) == 0)
1038 /* Clear the pendingNotifies list. */
1040 ClearPendingNotifies(void)
1043 * We used to have to explicitly deallocate the list members and nodes,
1044 * because they were malloc'd. Now, since we know they are palloc'd in
1045 * CurTransactionContext, we need not do that --- they'll go away
1046 * automatically at transaction exit. We need only reset the list head
1049 pendingNotifies = NIL;
1053 * 2PC processing routine for COMMIT PREPARED case.
1055 * (We don't have to do anything for ROLLBACK PREPARED.)
1058 notify_twophase_postcommit(TransactionId xid, uint16 info,
1059 void *recdata, uint32 len)
1062 * Set up to issue the NOTIFY at the end of my own current transaction.
1063 * (XXX this has some issues if my own transaction later rolls back, or if
1064 * there is any significant delay before I commit. OK for now because we
1065 * disallow COMMIT PREPARED inside a transaction block.)
1067 Async_Notify((char *) recdata);