]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Renumber SnapshotNow and the other special snapshot codes so that
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.117 2004/09/11 18:28:33 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab ExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.  (Previously we used AccessExclusiveLock, but
58  * there's no real reason to forbid concurrent reads.)
59  *
60  * An application that listens on the same relname it notifies will get
61  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
62  * by comparing be_pid in the NOTIFY message to the application's own backend's
63  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
64  * frontend during startup.)  The above design guarantees that notifies from
65  * other backends will never be missed by ignoring self-notifies.  Note,
66  * however, that we do *not* guarantee that a separate frontend message will
67  * be sent for every outside NOTIFY.  Since there is only room for one
68  * originating PID in pg_listener, outside notifies occurring at about the
69  * same time may be collapsed into a single message bearing the PID of the
70  * first outside backend to perform the NOTIFY.
71  *-------------------------------------------------------------------------
72  */
73
74 #include "postgres.h"
75
76 #include <unistd.h>
77 #include <signal.h>
78 #include <errno.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89 #include "storage/sinval.h"
90 #include "tcop/tcopprot.h"
91 #include "utils/fmgroids.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
94
95
96 /*
97  * State for outbound notifies consists of a list of all relnames NOTIFYed
98  * in the current transaction.  We do not actually perform a NOTIFY until
99  * and unless the transaction commits.  pendingNotifies is NIL if no
100  * NOTIFYs have been done in the current transaction.
101  *
102  * The list is kept in CurTransactionContext.  In subtransactions, each
103  * subtransaction has its own list in its own CurTransactionContext, but
104  * successful subtransactions attach their lists to their parent's list.
105  * Failed subtransactions simply discard their lists.
106  */
107 static List *pendingNotifies = NIL;
108
109 static List *upperPendingNotifies = NIL;                /* list of upper-xact
110                                                                                                  * lists */
111
112 /*
113  * State for inbound notifies consists of two flags: one saying whether
114  * the signal handler is currently allowed to call ProcessIncomingNotify
115  * directly, and one saying whether the signal has occurred but the handler
116  * was not allowed to call ProcessIncomingNotify at the time.
117  *
118  * NB: the "volatile" on these declarations is critical!  If your compiler
119  * does not grok "volatile", you'd be best advised to compile this file
120  * with all optimization turned off.
121  */
122 static volatile int notifyInterruptEnabled = 0;
123 static volatile int notifyInterruptOccurred = 0;
124
125 /* True if we've registered an on_shmem_exit cleanup */
126 static bool unlistenExitRegistered = false;
127
128 bool            Trace_notify = false;
129
130
131 static void Async_UnlistenAll(void);
132 static void Async_UnlistenOnExit(int code, Datum arg);
133 static void ProcessIncomingNotify(void);
134 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
135 static bool AsyncExistsPendingNotify(const char *relname);
136 static void ClearPendingNotifies(void);
137
138
139 /*
140  *--------------------------------------------------------------
141  * Async_Notify
142  *
143  *              This is executed by the SQL notify command.
144  *
145  *              Adds the relation to the list of pending notifies.
146  *              Actual notification happens during transaction commit.
147  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
148  *
149  * Results:
150  *              XXX
151  *
152  *--------------------------------------------------------------
153  */
154 void
155 Async_Notify(char *relname)
156 {
157         if (Trace_notify)
158                 elog(DEBUG1, "Async_Notify(%s)", relname);
159
160         /* no point in making duplicate entries in the list ... */
161         if (!AsyncExistsPendingNotify(relname))
162         {
163                 /*
164                  * The name list needs to live until end of transaction, so store
165                  * it in the transaction context.
166                  */
167                 MemoryContext oldcontext;
168
169                 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
170
171                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
172
173                 MemoryContextSwitchTo(oldcontext);
174         }
175 }
176
177 /*
178  *--------------------------------------------------------------
179  * Async_Listen
180  *
181  *              This is executed by the SQL listen command.
182  *
183  *              Register a backend (identified by its Unix PID) as listening
184  *              on the specified relation.
185  *
186  * Results:
187  *              XXX
188  *
189  * Side effects:
190  *              pg_listener is updated.
191  *
192  *--------------------------------------------------------------
193  */
194 void
195 Async_Listen(char *relname, int pid)
196 {
197         Relation        lRel;
198         HeapScanDesc scan;
199         HeapTuple       tuple;
200         Datum           values[Natts_pg_listener];
201         char            nulls[Natts_pg_listener];
202         int                     i;
203         bool            alreadyListener = false;
204
205         if (Trace_notify)
206                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
207
208         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
209
210         /* Detect whether we are already listening on this relname */
211         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
212         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
213         {
214                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
215
216                 if (listener->listenerpid == pid &&
217                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
218                 {
219                         alreadyListener = true;
220                         /* No need to scan the rest of the table */
221                         break;
222                 }
223         }
224         heap_endscan(scan);
225
226         if (alreadyListener)
227         {
228                 heap_close(lRel, ExclusiveLock);
229                 return;
230         }
231
232         /*
233          * OK to insert a new tuple
234          */
235
236         for (i = 0; i < Natts_pg_listener; i++)
237         {
238                 nulls[i] = ' ';
239                 values[i] = PointerGetDatum(NULL);
240         }
241
242         i = 0;
243         values[i++] = (Datum) relname;
244         values[i++] = (Datum) pid;
245         values[i++] = (Datum) 0;        /* no notifies pending */
246
247         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
248         simple_heap_insert(lRel, tuple);
249
250 #ifdef NOT_USED                                 /* currently there are no indexes */
251         CatalogUpdateIndexes(lRel, tuple);
252 #endif
253
254         heap_freetuple(tuple);
255
256         heap_close(lRel, ExclusiveLock);
257
258         /*
259          * now that we are listening, make sure we will unlisten before dying.
260          */
261         if (!unlistenExitRegistered)
262         {
263                 on_shmem_exit(Async_UnlistenOnExit, 0);
264                 unlistenExitRegistered = true;
265         }
266 }
267
268 /*
269  *--------------------------------------------------------------
270  * Async_Unlisten
271  *
272  *              This is executed by the SQL unlisten command.
273  *
274  *              Remove the backend from the list of listening backends
275  *              for the specified relation.
276  *
277  * Results:
278  *              XXX
279  *
280  * Side effects:
281  *              pg_listener is updated.
282  *
283  *--------------------------------------------------------------
284  */
285 void
286 Async_Unlisten(char *relname, int pid)
287 {
288         Relation        lRel;
289         HeapScanDesc scan;
290         HeapTuple       tuple;
291
292         /* Handle specially the `unlisten "*"' command */
293         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
294         {
295                 Async_UnlistenAll();
296                 return;
297         }
298
299         if (Trace_notify)
300                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
301
302         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
303
304         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
305         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
306         {
307                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
308
309                 if (listener->listenerpid == pid &&
310                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
311                 {
312                         /* Found the matching tuple, delete it */
313                         simple_heap_delete(lRel, &tuple->t_self);
314
315                         /*
316                          * We assume there can be only one match, so no need to scan
317                          * the rest of the table
318                          */
319                         break;
320                 }
321         }
322         heap_endscan(scan);
323
324         heap_close(lRel, ExclusiveLock);
325
326         /*
327          * We do not complain about unlistening something not being listened;
328          * should we?
329          */
330 }
331
332 /*
333  *--------------------------------------------------------------
334  * Async_UnlistenAll
335  *
336  *              Unlisten all relations for this backend.
337  *
338  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
339  *
340  * Results:
341  *              XXX
342  *
343  * Side effects:
344  *              pg_listener is updated.
345  *
346  *--------------------------------------------------------------
347  */
348 static void
349 Async_UnlistenAll(void)
350 {
351         Relation        lRel;
352         TupleDesc       tdesc;
353         HeapScanDesc scan;
354         HeapTuple       lTuple;
355         ScanKeyData key[1];
356
357         if (Trace_notify)
358                 elog(DEBUG1, "Async_UnlistenAll");
359
360         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
361         tdesc = RelationGetDescr(lRel);
362
363         /* Find and delete all entries with my listenerPID */
364         ScanKeyInit(&key[0],
365                                 Anum_pg_listener_pid,
366                                 BTEqualStrategyNumber, F_INT4EQ,
367                                 Int32GetDatum(MyProcPid));
368         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
369
370         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
371                 simple_heap_delete(lRel, &lTuple->t_self);
372
373         heap_endscan(scan);
374         heap_close(lRel, ExclusiveLock);
375 }
376
377 /*
378  *--------------------------------------------------------------
379  * Async_UnlistenOnExit
380  *
381  *              Clean up the pg_listener table at backend exit.
382  *
383  *              This is executed if we have done any LISTENs in this backend.
384  *              It might not be necessary anymore, if the user UNLISTENed everything,
385  *              but we don't try to detect that case.
386  *
387  * Results:
388  *              XXX
389  *
390  * Side effects:
391  *              pg_listener is updated if necessary.
392  *
393  *--------------------------------------------------------------
394  */
395 static void
396 Async_UnlistenOnExit(int code, Datum arg)
397 {
398         /*
399          * We need to start/commit a transaction for the unlisten, but if
400          * there is already an active transaction we had better abort that one
401          * first.  Otherwise we'd end up committing changes that probably
402          * ought to be discarded.
403          */
404         AbortOutOfAnyTransaction();
405         /* Now we can do the unlisten */
406         StartTransactionCommand();
407         Async_UnlistenAll();
408         CommitTransactionCommand();
409 }
410
411 /*
412  *--------------------------------------------------------------
413  * AtCommit_Notify
414  *
415  *              This is called at transaction commit.
416  *
417  *              If there are outbound notify requests in the pendingNotifies list,
418  *              scan pg_listener for matching tuples, and either signal the other
419  *              backend or send a message to our own frontend.
420  *
421  *              NOTE: we are still inside the current transaction, therefore can
422  *              piggyback on its committing of changes.
423  *
424  * Results:
425  *              XXX
426  *
427  * Side effects:
428  *              Tuples in pg_listener that have matching relnames and other peoples'
429  *              listenerPIDs are updated with a nonzero notification field.
430  *
431  *--------------------------------------------------------------
432  */
433 void
434 AtCommit_Notify(void)
435 {
436         Relation        lRel;
437         TupleDesc       tdesc;
438         HeapScanDesc scan;
439         HeapTuple       lTuple,
440                                 rTuple;
441         Datum           value[Natts_pg_listener];
442         char            repl[Natts_pg_listener],
443                                 nulls[Natts_pg_listener];
444
445         if (pendingNotifies == NIL)
446                 return;                                 /* no NOTIFY statements in this
447                                                                  * transaction */
448
449         /*
450          * NOTIFY is disabled if not normal processing mode. This test used to
451          * be in xact.c, but it seems cleaner to do it here.
452          */
453         if (!IsNormalProcessingMode())
454         {
455                 ClearPendingNotifies();
456                 return;
457         }
458
459         if (Trace_notify)
460                 elog(DEBUG1, "AtCommit_Notify");
461
462         /* preset data to update notify column to MyProcPid */
463         nulls[0] = nulls[1] = nulls[2] = ' ';
464         repl[0] = repl[1] = repl[2] = ' ';
465         repl[Anum_pg_listener_notify - 1] = 'r';
466         value[0] = value[1] = value[2] = (Datum) 0;
467         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
468
469         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
470         tdesc = RelationGetDescr(lRel);
471         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
472
473         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
474         {
475                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
476                 char       *relname = NameStr(listener->relname);
477                 int32           listenerPID = listener->listenerpid;
478
479                 if (!AsyncExistsPendingNotify(relname))
480                         continue;
481
482                 if (listenerPID == MyProcPid)
483                 {
484                         /*
485                          * Self-notify: no need to bother with table update. Indeed,
486                          * we *must not* clear the notification field in this path, or
487                          * we could lose an outside notify, which'd be bad for
488                          * applications that ignore self-notify messages.
489                          */
490
491                         if (Trace_notify)
492                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
493
494                         NotifyMyFrontEnd(relname, listenerPID);
495                 }
496                 else
497                 {
498                         if (Trace_notify)
499                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
500                                          listenerPID);
501
502                         /*
503                          * If someone has already notified this listener, we don't
504                          * bother modifying the table, but we do still send a SIGUSR2
505                          * signal, just in case that backend missed the earlier signal
506                          * for some reason.  It's OK to send the signal first, because
507                          * the other guy can't read pg_listener until we unlock it.
508                          */
509                         if (kill(listenerPID, SIGUSR2) < 0)
510                         {
511                                 /*
512                                  * Get rid of pg_listener entry if it refers to a PID that
513                                  * no longer exists.  Presumably, that backend crashed
514                                  * without deleting its pg_listener entries. This code
515                                  * used to only delete the entry if errno==ESRCH, but as
516                                  * far as I can see we should just do it for any failure
517                                  * (certainly at least for EPERM too...)
518                                  */
519                                 simple_heap_delete(lRel, &lTuple->t_self);
520                         }
521                         else if (listener->notification == 0)
522                         {
523                                 ItemPointerData ctid;
524                                 int                     result;
525
526                                 rTuple = heap_modifytuple(lTuple, lRel,
527                                                                                   value, nulls, repl);
528
529                                 /*
530                                  * We cannot use simple_heap_update here because the tuple
531                                  * could have been modified by an uncommitted transaction;
532                                  * specifically, since UNLISTEN releases exclusive lock on
533                                  * the table before commit, the other guy could already
534                                  * have tried to unlisten.      There are no other cases where
535                                  * we should be able to see an uncommitted update or
536                                  * delete. Therefore, our response to a
537                                  * HeapTupleBeingUpdated result is just to ignore it.  We
538                                  * do *not* wait for the other guy to commit --- that
539                                  * would risk deadlock, and we don't want to block while
540                                  * holding the table lock anyway for performance reasons.
541                                  * We also ignore HeapTupleUpdated, which could occur if
542                                  * the other guy commits between our heap_getnext and
543                                  * heap_update calls.
544                                  */
545                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
546                                                                          &ctid,
547                                                                          GetCurrentCommandId(), InvalidSnapshot,
548                                                                          false /* no wait for commit */ );
549                                 switch (result)
550                                 {
551                                         case HeapTupleSelfUpdated:
552                                                 /* Tuple was already updated in current command? */
553                                                 elog(ERROR, "tuple already updated by self");
554                                                 break;
555
556                                         case HeapTupleMayBeUpdated:
557                                                 /* done successfully */
558
559 #ifdef NOT_USED                                 /* currently there are no indexes */
560                                                 CatalogUpdateIndexes(lRel, rTuple);
561 #endif
562                                                 break;
563
564                                         case HeapTupleBeingUpdated:
565                                                 /* ignore uncommitted tuples */
566                                                 break;
567
568                                         case HeapTupleUpdated:
569                                                 /* ignore just-committed tuples */
570                                                 break;
571
572                                         default:
573                                                 elog(ERROR, "unrecognized heap_update status: %u",
574                                                          result);
575                                                 break;
576                                 }
577                         }
578                 }
579         }
580
581         heap_endscan(scan);
582
583         /*
584          * We do NOT release the lock on pg_listener here; we need to hold it
585          * until end of transaction (which is about to happen, anyway) to
586          * ensure that notified backends see our tuple updates when they look.
587          * Else they might disregard the signal, which would make the
588          * application programmer very unhappy.
589          */
590         heap_close(lRel, NoLock);
591
592         ClearPendingNotifies();
593
594         if (Trace_notify)
595                 elog(DEBUG1, "AtCommit_Notify: done");
596 }
597
598 /*
599  *--------------------------------------------------------------
600  * AtAbort_Notify
601  *
602  *              This is called at transaction abort.
603  *
604  *              Gets rid of pending outbound notifies that we would have executed
605  *              if the transaction got committed.
606  *
607  * Results:
608  *              XXX
609  *
610  *--------------------------------------------------------------
611  */
612 void
613 AtAbort_Notify(void)
614 {
615         ClearPendingNotifies();
616 }
617
618 /*
619  * AtSubStart_Notify() --- Take care of subtransaction start.
620  *
621  * Push empty state for the new subtransaction.
622  */
623 void
624 AtSubStart_Notify(void)
625 {
626         MemoryContext old_cxt;
627
628         /* Keep the list-of-lists in TopTransactionContext for simplicity */
629         old_cxt = MemoryContextSwitchTo(TopTransactionContext);
630
631         upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
632
633         Assert(list_length(upperPendingNotifies) ==
634                    GetCurrentTransactionNestLevel() - 1);
635
636         pendingNotifies = NIL;
637
638         MemoryContextSwitchTo(old_cxt);
639 }
640
641 /*
642  * AtSubCommit_Notify() --- Take care of subtransaction commit.
643  *
644  * Reassign all items in the pending notifies list to the parent transaction.
645  */
646 void
647 AtSubCommit_Notify(void)
648 {
649         List       *parentPendingNotifies;
650
651         parentPendingNotifies = (List *) linitial(upperPendingNotifies);
652         upperPendingNotifies = list_delete_first(upperPendingNotifies);
653
654         Assert(list_length(upperPendingNotifies) ==
655                    GetCurrentTransactionNestLevel() - 2);
656
657         /*
658          * We could try to eliminate duplicates here, but it seems not
659          * worthwhile.
660          */
661         pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
662 }
663
664 /*
665  * AtSubAbort_Notify() --- Take care of subtransaction abort.
666  */
667 void
668 AtSubAbort_Notify(void)
669 {
670         int                     my_level = GetCurrentTransactionNestLevel();
671
672         /*
673          * All we have to do is pop the stack --- the notifies made in this
674          * subxact are no longer interesting, and the space will be freed when
675          * CurTransactionContext is recycled.
676          *
677          * This routine could be called more than once at a given nesting level
678          * if there is trouble during subxact abort.  Avoid dumping core by
679          * using GetCurrentTransactionNestLevel as the indicator of how far
680          * we need to prune the list.
681          */
682         while (list_length(upperPendingNotifies) > my_level - 2)
683         {
684                 pendingNotifies = (List *) linitial(upperPendingNotifies);
685                 upperPendingNotifies = list_delete_first(upperPendingNotifies);
686         }
687 }
688
689 /*
690  *--------------------------------------------------------------
691  * NotifyInterruptHandler
692  *
693  *              This is the signal handler for SIGUSR2.
694  *
695  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
696  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
697  *              to do it later.
698  *
699  * Results:
700  *              none
701  *
702  * Side effects:
703  *              per above
704  *--------------------------------------------------------------
705  */
706 void
707 NotifyInterruptHandler(SIGNAL_ARGS)
708 {
709         int                     save_errno = errno;
710
711         /*
712          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
713          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
714          * which would likely lead to corruption of stdio buffers if they were
715          * ever turned on.
716          */
717
718         /* Don't joggle the elbow of proc_exit */
719         if (proc_exit_inprogress)
720                 return;
721
722         if (notifyInterruptEnabled)
723         {
724                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
725
726                 /*
727                  * We may be called while ImmediateInterruptOK is true; turn it
728                  * off while messing with the NOTIFY state.  (We would have to
729                  * save and restore it anyway, because PGSemaphore operations
730                  * inside ProcessIncomingNotify() might reset it.)
731                  */
732                 ImmediateInterruptOK = false;
733
734                 /*
735                  * I'm not sure whether some flavors of Unix might allow another
736                  * SIGUSR2 occurrence to recursively interrupt this routine. To
737                  * cope with the possibility, we do the same sort of dance that
738                  * EnableNotifyInterrupt must do --- see that routine for
739                  * comments.
740                  */
741                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
742                 notifyInterruptOccurred = 1;    /* do at least one iteration */
743                 for (;;)
744                 {
745                         notifyInterruptEnabled = 1;
746                         if (!notifyInterruptOccurred)
747                                 break;
748                         notifyInterruptEnabled = 0;
749                         if (notifyInterruptOccurred)
750                         {
751                                 /* Here, it is finally safe to do stuff. */
752                                 if (Trace_notify)
753                                         elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
754
755                                 ProcessIncomingNotify();
756
757                                 if (Trace_notify)
758                                         elog(DEBUG1, "NotifyInterruptHandler: done");
759                         }
760                 }
761
762                 /*
763                  * Restore ImmediateInterruptOK, and check for interrupts if
764                  * needed.
765                  */
766                 ImmediateInterruptOK = save_ImmediateInterruptOK;
767                 if (save_ImmediateInterruptOK)
768                         CHECK_FOR_INTERRUPTS();
769         }
770         else
771         {
772                 /*
773                  * In this path it is NOT SAFE to do much of anything, except
774                  * this:
775                  */
776                 notifyInterruptOccurred = 1;
777         }
778
779         errno = save_errno;
780 }
781
782 /*
783  * --------------------------------------------------------------
784  * EnableNotifyInterrupt
785  *
786  *              This is called by the PostgresMain main loop just before waiting
787  *              for a frontend command.  If we are truly idle (ie, *not* inside
788  *              a transaction block), then process any pending inbound notifies,
789  *              and enable the signal handler to process future notifies directly.
790  *
791  *              NOTE: the signal handler starts out disabled, and stays so until
792  *              PostgresMain calls this the first time.
793  * --------------------------------------------------------------
794  */
795 void
796 EnableNotifyInterrupt(void)
797 {
798         if (IsTransactionOrTransactionBlock())
799                 return;                                 /* not really idle */
800
801         /*
802          * This code is tricky because we are communicating with a signal
803          * handler that could interrupt us at any point.  If we just checked
804          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
805          * could fail to respond promptly to a signal that happens in between
806          * those two steps.  (A very small time window, perhaps, but Murphy's
807          * Law says you can hit it...)  Instead, we first set the enable flag,
808          * then test the occurred flag.  If we see an unserviced interrupt has
809          * occurred, we re-clear the enable flag before going off to do the
810          * service work.  (That prevents re-entrant invocation of
811          * ProcessIncomingNotify() if another interrupt occurs.) If an
812          * interrupt comes in between the setting and clearing of
813          * notifyInterruptEnabled, then it will have done the service work and
814          * left notifyInterruptOccurred zero, so we have to check again after
815          * clearing enable.  The whole thing has to be in a loop in case
816          * another interrupt occurs while we're servicing the first. Once we
817          * get out of the loop, enable is set and we know there is no
818          * unserviced interrupt.
819          *
820          * NB: an overenthusiastic optimizing compiler could easily break this
821          * code.  Hopefully, they all understand what "volatile" means these
822          * days.
823          */
824         for (;;)
825         {
826                 notifyInterruptEnabled = 1;
827                 if (!notifyInterruptOccurred)
828                         break;
829                 notifyInterruptEnabled = 0;
830                 if (notifyInterruptOccurred)
831                 {
832                         if (Trace_notify)
833                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
834
835                         ProcessIncomingNotify();
836
837                         if (Trace_notify)
838                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
839                 }
840         }
841 }
842
843 /*
844  * --------------------------------------------------------------
845  * DisableNotifyInterrupt
846  *
847  *              This is called by the PostgresMain main loop just after receiving
848  *              a frontend command.  Signal handler execution of inbound notifies
849  *              is disabled until the next EnableNotifyInterrupt call.
850  *
851  *              The SIGUSR1 signal handler also needs to call this, so as to
852  *              prevent conflicts if one signal interrupts the other.  So we
853  *              must return the previous state of the flag.
854  * --------------------------------------------------------------
855  */
856 bool
857 DisableNotifyInterrupt(void)
858 {
859         bool            result = (notifyInterruptEnabled != 0);
860
861         notifyInterruptEnabled = 0;
862
863         return result;
864 }
865
866 /*
867  * --------------------------------------------------------------
868  * ProcessIncomingNotify
869  *
870  *              Deal with arriving NOTIFYs from other backends.
871  *              This is called either directly from the SIGUSR2 signal handler,
872  *              or the next time control reaches the outer idle loop.
873  *              Scan pg_listener for arriving notifies, report them to my front end,
874  *              and clear the notification field in pg_listener until next time.
875  *
876  *              NOTE: since we are outside any transaction, we must create our own.
877  * --------------------------------------------------------------
878  */
879 static void
880 ProcessIncomingNotify(void)
881 {
882         Relation        lRel;
883         TupleDesc       tdesc;
884         ScanKeyData key[1];
885         HeapScanDesc scan;
886         HeapTuple       lTuple,
887                                 rTuple;
888         Datum           value[Natts_pg_listener];
889         char            repl[Natts_pg_listener],
890                                 nulls[Natts_pg_listener];
891         bool            catchup_enabled;
892
893         /* Must prevent SIGUSR1 interrupt while I am running */
894         catchup_enabled = DisableCatchupInterrupt();
895
896         if (Trace_notify)
897                 elog(DEBUG1, "ProcessIncomingNotify");
898
899         set_ps_display("notify interrupt");
900
901         notifyInterruptOccurred = 0;
902
903         StartTransactionCommand();
904
905         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
906         tdesc = RelationGetDescr(lRel);
907
908         /* Scan only entries with my listenerPID */
909         ScanKeyInit(&key[0],
910                                 Anum_pg_listener_pid,
911                                 BTEqualStrategyNumber, F_INT4EQ,
912                                 Int32GetDatum(MyProcPid));
913         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
914
915         /* Prepare data for rewriting 0 into notification field */
916         nulls[0] = nulls[1] = nulls[2] = ' ';
917         repl[0] = repl[1] = repl[2] = ' ';
918         repl[Anum_pg_listener_notify - 1] = 'r';
919         value[0] = value[1] = value[2] = (Datum) 0;
920         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
921
922         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
923         {
924                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
925                 char       *relname = NameStr(listener->relname);
926                 int32           sourcePID = listener->notification;
927
928                 if (sourcePID != 0)
929                 {
930                         /* Notify the frontend */
931
932                         if (Trace_notify)
933                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
934                                          relname, (int) sourcePID);
935
936                         NotifyMyFrontEnd(relname, sourcePID);
937
938                         /*
939                          * Rewrite the tuple with 0 in notification column.
940                          *
941                          * simple_heap_update is safe here because no one else would have
942                          * tried to UNLISTEN us, so there can be no uncommitted
943                          * changes.
944                          */
945                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
946                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
947
948 #ifdef NOT_USED                                 /* currently there are no indexes */
949                         CatalogUpdateIndexes(lRel, rTuple);
950 #endif
951                 }
952         }
953         heap_endscan(scan);
954
955         /*
956          * We do NOT release the lock on pg_listener here; we need to hold it
957          * until end of transaction (which is about to happen, anyway) to
958          * ensure that other backends see our tuple updates when they look.
959          * Otherwise, a transaction started after this one might mistakenly
960          * think it doesn't need to send this backend a new NOTIFY.
961          */
962         heap_close(lRel, NoLock);
963
964         CommitTransactionCommand();
965
966         /*
967          * Must flush the notify messages to ensure frontend gets them
968          * promptly.
969          */
970         pq_flush();
971
972         set_ps_display("idle");
973
974         if (Trace_notify)
975                 elog(DEBUG1, "ProcessIncomingNotify: done");
976
977         if (catchup_enabled)
978                 EnableCatchupInterrupt();
979 }
980
981 /*
982  * Send NOTIFY message to my front end.
983  */
984 static void
985 NotifyMyFrontEnd(char *relname, int32 listenerPID)
986 {
987         if (whereToSendOutput == Remote)
988         {
989                 StringInfoData buf;
990
991                 pq_beginmessage(&buf, 'A');
992                 pq_sendint(&buf, listenerPID, sizeof(int32));
993                 pq_sendstring(&buf, relname);
994                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
995                 {
996                         /* XXX Add parameter string here later */
997                         pq_sendstring(&buf, "");
998                 }
999                 pq_endmessage(&buf);
1000
1001                 /*
1002                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
1003                  * happen at the end of the transaction, and for incoming notifies
1004                  * ProcessIncomingNotify will do it after finding all the
1005                  * notifies.
1006                  */
1007         }
1008         else
1009                 elog(INFO, "NOTIFY for %s", relname);
1010 }
1011
1012 /* Does pendingNotifies include the given relname? */
1013 static bool
1014 AsyncExistsPendingNotify(const char *relname)
1015 {
1016         ListCell   *p;
1017
1018         foreach(p, pendingNotifies)
1019         {
1020                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
1021                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
1022                         return true;
1023         }
1024
1025         return false;
1026 }
1027
1028 /* Clear the pendingNotifies list. */
1029 static void
1030 ClearPendingNotifies(void)
1031 {
1032         /*
1033          * We used to have to explicitly deallocate the list members and
1034          * nodes, because they were malloc'd.  Now, since we know they are
1035          * palloc'd in CurTransactionContext, we need not do that --- they'll
1036          * go away automatically at transaction exit.  We need only reset the
1037          * list head pointer.
1038          */
1039         pendingNotifies = NIL;
1040 }