]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.113 2004/07/01 00:50:10 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab ExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.  (Previously we used AccessExclusiveLock, but
58  * there's no real reason to forbid concurrent reads.)
59  *
60  * An application that listens on the same relname it notifies will get
61  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
62  * by comparing be_pid in the NOTIFY message to the application's own backend's
63  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
64  * frontend during startup.)  The above design guarantees that notifies from
65  * other backends will never be missed by ignoring self-notifies.  Note,
66  * however, that we do *not* guarantee that a separate frontend message will
67  * be sent for every outside NOTIFY.  Since there is only room for one
68  * originating PID in pg_listener, outside notifies occurring at about the
69  * same time may be collapsed into a single message bearing the PID of the
70  * first outside backend to perform the NOTIFY.
71  *-------------------------------------------------------------------------
72  */
73
74 #include "postgres.h"
75
76 #include <unistd.h>
77 #include <signal.h>
78 #include <errno.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89 #include "storage/sinval.h"
90 #include "tcop/tcopprot.h"
91 #include "utils/fmgroids.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
94
95
96 /*
97  * State for outbound notifies consists of a list of all relnames NOTIFYed
98  * in the current transaction.  We do not actually perform a NOTIFY until
99  * and unless the transaction commits.  pendingNotifies is NIL if no
100  * NOTIFYs have been done in the current transaction.
101  *
102  * The list is kept in CurTransactionContext.  In subtransactions, each
103  * subtransaction has its own list in its own CurTransactionContext, but
104  * successful subtransactions attach their lists to their parent's list.
105  * Failed subtransactions simply discard their lists.
106  */
107 static List *pendingNotifies = NIL;
108
109 static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
110
111 /*
112  * State for inbound notifies consists of two flags: one saying whether
113  * the signal handler is currently allowed to call ProcessIncomingNotify
114  * directly, and one saying whether the signal has occurred but the handler
115  * was not allowed to call ProcessIncomingNotify at the time.
116  *
117  * NB: the "volatile" on these declarations is critical!  If your compiler
118  * does not grok "volatile", you'd be best advised to compile this file
119  * with all optimization turned off.
120  */
121 static volatile int notifyInterruptEnabled = 0;
122 static volatile int notifyInterruptOccurred = 0;
123
124 /* True if we've registered an on_shmem_exit cleanup */
125 static bool unlistenExitRegistered = false;
126
127 bool            Trace_notify = false;
128
129
130 static void Async_UnlistenAll(void);
131 static void Async_UnlistenOnExit(int code, Datum arg);
132 static void ProcessIncomingNotify(void);
133 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
134 static bool AsyncExistsPendingNotify(const char *relname);
135 static void ClearPendingNotifies(void);
136
137
138 /*
139  *--------------------------------------------------------------
140  * Async_Notify
141  *
142  *              This is executed by the SQL notify command.
143  *
144  *              Adds the relation to the list of pending notifies.
145  *              Actual notification happens during transaction commit.
146  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
147  *
148  * Results:
149  *              XXX
150  *
151  *--------------------------------------------------------------
152  */
153 void
154 Async_Notify(char *relname)
155 {
156         if (Trace_notify)
157                 elog(DEBUG1, "Async_Notify(%s)", relname);
158
159         /* no point in making duplicate entries in the list ... */
160         if (!AsyncExistsPendingNotify(relname))
161         {
162                 /*
163                  * The name list needs to live until end of transaction, so store
164                  * it in the transaction context.
165                  */
166                 MemoryContext oldcontext;
167
168                 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
169
170                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
171
172                 MemoryContextSwitchTo(oldcontext);
173         }
174 }
175
176 /*
177  *--------------------------------------------------------------
178  * Async_Listen
179  *
180  *              This is executed by the SQL listen command.
181  *
182  *              Register a backend (identified by its Unix PID) as listening
183  *              on the specified relation.
184  *
185  * Results:
186  *              XXX
187  *
188  * Side effects:
189  *              pg_listener is updated.
190  *
191  *--------------------------------------------------------------
192  */
193 void
194 Async_Listen(char *relname, int pid)
195 {
196         Relation        lRel;
197         HeapScanDesc scan;
198         HeapTuple       tuple;
199         Datum           values[Natts_pg_listener];
200         char            nulls[Natts_pg_listener];
201         int                     i;
202         bool            alreadyListener = false;
203
204         if (Trace_notify)
205                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
206
207         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
208
209         /* Detect whether we are already listening on this relname */
210         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
211         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
212         {
213                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
214
215                 if (listener->listenerpid == pid &&
216                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
217                 {
218                         alreadyListener = true;
219                         /* No need to scan the rest of the table */
220                         break;
221                 }
222         }
223         heap_endscan(scan);
224
225         if (alreadyListener)
226         {
227                 heap_close(lRel, ExclusiveLock);
228                 return;
229         }
230
231         /*
232          * OK to insert a new tuple
233          */
234
235         for (i = 0; i < Natts_pg_listener; i++)
236         {
237                 nulls[i] = ' ';
238                 values[i] = PointerGetDatum(NULL);
239         }
240
241         i = 0;
242         values[i++] = (Datum) relname;
243         values[i++] = (Datum) pid;
244         values[i++] = (Datum) 0;        /* no notifies pending */
245
246         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
247         simple_heap_insert(lRel, tuple);
248
249 #ifdef NOT_USED                                 /* currently there are no indexes */
250         CatalogUpdateIndexes(lRel, tuple);
251 #endif
252
253         heap_freetuple(tuple);
254
255         heap_close(lRel, ExclusiveLock);
256
257         /*
258          * now that we are listening, make sure we will unlisten before dying.
259          */
260         if (!unlistenExitRegistered)
261         {
262                 on_shmem_exit(Async_UnlistenOnExit, 0);
263                 unlistenExitRegistered = true;
264         }
265 }
266
267 /*
268  *--------------------------------------------------------------
269  * Async_Unlisten
270  *
271  *              This is executed by the SQL unlisten command.
272  *
273  *              Remove the backend from the list of listening backends
274  *              for the specified relation.
275  *
276  * Results:
277  *              XXX
278  *
279  * Side effects:
280  *              pg_listener is updated.
281  *
282  *--------------------------------------------------------------
283  */
284 void
285 Async_Unlisten(char *relname, int pid)
286 {
287         Relation        lRel;
288         HeapScanDesc scan;
289         HeapTuple       tuple;
290
291         /* Handle specially the `unlisten "*"' command */
292         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
293         {
294                 Async_UnlistenAll();
295                 return;
296         }
297
298         if (Trace_notify)
299                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
300
301         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
302
303         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
304         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
305         {
306                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
307
308                 if (listener->listenerpid == pid &&
309                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
310                 {
311                         /* Found the matching tuple, delete it */
312                         simple_heap_delete(lRel, &tuple->t_self);
313
314                         /*
315                          * We assume there can be only one match, so no need to scan
316                          * the rest of the table
317                          */
318                         break;
319                 }
320         }
321         heap_endscan(scan);
322
323         heap_close(lRel, ExclusiveLock);
324
325         /*
326          * We do not complain about unlistening something not being listened;
327          * should we?
328          */
329 }
330
331 /*
332  *--------------------------------------------------------------
333  * Async_UnlistenAll
334  *
335  *              Unlisten all relations for this backend.
336  *
337  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
338  *
339  * Results:
340  *              XXX
341  *
342  * Side effects:
343  *              pg_listener is updated.
344  *
345  *--------------------------------------------------------------
346  */
347 static void
348 Async_UnlistenAll(void)
349 {
350         Relation        lRel;
351         TupleDesc       tdesc;
352         HeapScanDesc scan;
353         HeapTuple       lTuple;
354         ScanKeyData key[1];
355
356         if (Trace_notify)
357                 elog(DEBUG1, "Async_UnlistenAll");
358
359         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
360         tdesc = RelationGetDescr(lRel);
361
362         /* Find and delete all entries with my listenerPID */
363         ScanKeyInit(&key[0],
364                                 Anum_pg_listener_pid,
365                                 BTEqualStrategyNumber, F_INT4EQ,
366                                 Int32GetDatum(MyProcPid));
367         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
368
369         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
370                 simple_heap_delete(lRel, &lTuple->t_self);
371
372         heap_endscan(scan);
373         heap_close(lRel, ExclusiveLock);
374 }
375
376 /*
377  *--------------------------------------------------------------
378  * Async_UnlistenOnExit
379  *
380  *              Clean up the pg_listener table at backend exit.
381  *
382  *              This is executed if we have done any LISTENs in this backend.
383  *              It might not be necessary anymore, if the user UNLISTENed everything,
384  *              but we don't try to detect that case.
385  *
386  * Results:
387  *              XXX
388  *
389  * Side effects:
390  *              pg_listener is updated if necessary.
391  *
392  *--------------------------------------------------------------
393  */
394 static void
395 Async_UnlistenOnExit(int code, Datum arg)
396 {
397         /*
398          * We need to start/commit a transaction for the unlisten, but if
399          * there is already an active transaction we had better abort that one
400          * first.  Otherwise we'd end up committing changes that probably
401          * ought to be discarded.
402          */
403         AbortOutOfAnyTransaction();
404         /* Now we can do the unlisten */
405         StartTransactionCommand();
406         Async_UnlistenAll();
407         CommitTransactionCommand();
408 }
409
410 /*
411  *--------------------------------------------------------------
412  * AtCommit_Notify
413  *
414  *              This is called at transaction commit.
415  *
416  *              If there are outbound notify requests in the pendingNotifies list,
417  *              scan pg_listener for matching tuples, and either signal the other
418  *              backend or send a message to our own frontend.
419  *
420  *              NOTE: we are still inside the current transaction, therefore can
421  *              piggyback on its committing of changes.
422  *
423  * Results:
424  *              XXX
425  *
426  * Side effects:
427  *              Tuples in pg_listener that have matching relnames and other peoples'
428  *              listenerPIDs are updated with a nonzero notification field.
429  *
430  *--------------------------------------------------------------
431  */
432 void
433 AtCommit_Notify(void)
434 {
435         Relation        lRel;
436         TupleDesc       tdesc;
437         HeapScanDesc scan;
438         HeapTuple       lTuple,
439                                 rTuple;
440         Datum           value[Natts_pg_listener];
441         char            repl[Natts_pg_listener],
442                                 nulls[Natts_pg_listener];
443
444         if (pendingNotifies == NIL)
445                 return;                                 /* no NOTIFY statements in this
446                                                                  * transaction */
447
448         /*
449          * NOTIFY is disabled if not normal processing mode. This test used to
450          * be in xact.c, but it seems cleaner to do it here.
451          */
452         if (!IsNormalProcessingMode())
453         {
454                 ClearPendingNotifies();
455                 return;
456         }
457
458         if (Trace_notify)
459                 elog(DEBUG1, "AtCommit_Notify");
460
461         /* preset data to update notify column to MyProcPid */
462         nulls[0] = nulls[1] = nulls[2] = ' ';
463         repl[0] = repl[1] = repl[2] = ' ';
464         repl[Anum_pg_listener_notify - 1] = 'r';
465         value[0] = value[1] = value[2] = (Datum) 0;
466         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
467
468         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
469         tdesc = RelationGetDescr(lRel);
470         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
471
472         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
473         {
474                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
475                 char       *relname = NameStr(listener->relname);
476                 int32           listenerPID = listener->listenerpid;
477
478                 if (!AsyncExistsPendingNotify(relname))
479                         continue;
480
481                 if (listenerPID == MyProcPid)
482                 {
483                         /*
484                          * Self-notify: no need to bother with table update. Indeed,
485                          * we *must not* clear the notification field in this path, or
486                          * we could lose an outside notify, which'd be bad for
487                          * applications that ignore self-notify messages.
488                          */
489
490                         if (Trace_notify)
491                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
492
493                         NotifyMyFrontEnd(relname, listenerPID);
494                 }
495                 else
496                 {
497                         if (Trace_notify)
498                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
499                                          listenerPID);
500
501                         /*
502                          * If someone has already notified this listener, we don't
503                          * bother modifying the table, but we do still send a SIGUSR2
504                          * signal, just in case that backend missed the earlier signal
505                          * for some reason.  It's OK to send the signal first, because
506                          * the other guy can't read pg_listener until we unlock it.
507                          */
508                         if (kill(listenerPID, SIGUSR2) < 0)
509                         {
510                                 /*
511                                  * Get rid of pg_listener entry if it refers to a PID that
512                                  * no longer exists.  Presumably, that backend crashed
513                                  * without deleting its pg_listener entries. This code
514                                  * used to only delete the entry if errno==ESRCH, but as
515                                  * far as I can see we should just do it for any failure
516                                  * (certainly at least for EPERM too...)
517                                  */
518                                 simple_heap_delete(lRel, &lTuple->t_self);
519                         }
520                         else if (listener->notification == 0)
521                         {
522                                 ItemPointerData ctid;
523                                 int                     result;
524
525                                 rTuple = heap_modifytuple(lTuple, lRel,
526                                                                                   value, nulls, repl);
527                                 /*
528                                  * We cannot use simple_heap_update here because the tuple
529                                  * could have been modified by an uncommitted transaction;
530                                  * specifically, since UNLISTEN releases exclusive lock on
531                                  * the table before commit, the other guy could already have
532                                  * tried to unlisten.  There are no other cases where we
533                                  * should be able to see an uncommitted update or delete.
534                                  * Therefore, our response to a HeapTupleBeingUpdated result
535                                  * is just to ignore it.  We do *not* wait for the other
536                                  * guy to commit --- that would risk deadlock, and we don't
537                                  * want to block while holding the table lock anyway for
538                                  * performance reasons.  We also ignore HeapTupleUpdated,
539                                  * which could occur if the other guy commits between our
540                                  * heap_getnext and heap_update calls.
541                                  */
542                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
543                                                                          &ctid,
544                                                                          GetCurrentCommandId(), SnapshotAny,
545                                                                          false /* no wait for commit */);
546                                 switch (result)
547                                 {
548                                         case HeapTupleSelfUpdated:
549                                                 /* Tuple was already updated in current command? */
550                                                 elog(ERROR, "tuple already updated by self");
551                                                 break;
552
553                                         case HeapTupleMayBeUpdated:
554                                                 /* done successfully */
555
556 #ifdef NOT_USED                                 /* currently there are no indexes */
557                                                 CatalogUpdateIndexes(lRel, rTuple);
558 #endif
559                                                 break;
560
561                                         case HeapTupleBeingUpdated:
562                                                 /* ignore uncommitted tuples */
563                                                 break;
564
565                                         case HeapTupleUpdated:
566                                                 /* ignore just-committed tuples */
567                                                 break;
568
569                                         default:
570                                                 elog(ERROR, "unrecognized heap_update status: %u",
571                                                          result);
572                                                 break;
573                                 }
574                         }
575                 }
576         }
577
578         heap_endscan(scan);
579
580         /*
581          * We do NOT release the lock on pg_listener here; we need to hold it
582          * until end of transaction (which is about to happen, anyway) to
583          * ensure that notified backends see our tuple updates when they look.
584          * Else they might disregard the signal, which would make the
585          * application programmer very unhappy.
586          */
587         heap_close(lRel, NoLock);
588
589         ClearPendingNotifies();
590
591         if (Trace_notify)
592                 elog(DEBUG1, "AtCommit_Notify: done");
593 }
594
595 /*
596  *--------------------------------------------------------------
597  * AtAbort_Notify
598  *
599  *              This is called at transaction abort.
600  *
601  *              Gets rid of pending outbound notifies that we would have executed
602  *              if the transaction got committed.
603  *
604  * Results:
605  *              XXX
606  *
607  *--------------------------------------------------------------
608  */
609 void
610 AtAbort_Notify(void)
611 {
612         ClearPendingNotifies();
613 }
614
615 /*
616  * AtSubStart_Notify() --- Take care of subtransaction start.
617  *
618  * Push empty state for the new subtransaction.
619  */
620 void
621 AtSubStart_Notify(void)
622 {
623         MemoryContext   old_cxt;
624
625         /* Keep the list-of-lists in TopTransactionContext for simplicity */
626         old_cxt = MemoryContextSwitchTo(TopTransactionContext);
627
628         upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
629
630         pendingNotifies = NIL;
631
632         MemoryContextSwitchTo(old_cxt);
633 }
634
635 /*
636  * AtSubCommit_Notify() --- Take care of subtransaction commit.
637  *
638  * Reassign all items in the pending notifies list to the parent transaction.
639  */
640 void
641 AtSubCommit_Notify(void)
642 {
643         List    *parentPendingNotifies;
644
645         parentPendingNotifies = (List *) linitial(upperPendingNotifies);
646         upperPendingNotifies = list_delete_first(upperPendingNotifies);
647
648         /*
649          * We could try to eliminate duplicates here, but it seems not worthwhile.
650          */
651         pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
652 }
653
654 /*
655  * AtSubAbort_Notify() --- Take care of subtransaction abort.
656  */
657 void
658 AtSubAbort_Notify(void)
659 {
660         /*
661          * All we have to do is pop the stack --- the notifies made in this
662          * subxact are no longer interesting, and the space will be freed when
663          * CurTransactionContext is recycled.
664          */
665         pendingNotifies = (List *) linitial(upperPendingNotifies);
666         upperPendingNotifies = list_delete_first(upperPendingNotifies);
667 }
668
669 /*
670  *--------------------------------------------------------------
671  * NotifyInterruptHandler
672  *
673  *              This is the signal handler for SIGUSR2.
674  *
675  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
676  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
677  *              to do it later.
678  *
679  * Results:
680  *              none
681  *
682  * Side effects:
683  *              per above
684  *--------------------------------------------------------------
685  */
686 void
687 NotifyInterruptHandler(SIGNAL_ARGS)
688 {
689         int                     save_errno = errno;
690
691         /*
692          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
693          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
694          * which would likely lead to corruption of stdio buffers if they were
695          * ever turned on.
696          */
697
698         /* Don't joggle the elbow of proc_exit */
699         if (proc_exit_inprogress)
700                 return;
701
702         if (notifyInterruptEnabled)
703         {
704                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
705
706                 /*
707                  * We may be called while ImmediateInterruptOK is true; turn it
708                  * off while messing with the NOTIFY state.  (We would have to
709                  * save and restore it anyway, because PGSemaphore operations
710                  * inside ProcessIncomingNotify() might reset it.)
711                  */
712                 ImmediateInterruptOK = false;
713
714                 /*
715                  * I'm not sure whether some flavors of Unix might allow another
716                  * SIGUSR2 occurrence to recursively interrupt this routine. To
717                  * cope with the possibility, we do the same sort of dance that
718                  * EnableNotifyInterrupt must do --- see that routine for
719                  * comments.
720                  */
721                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
722                 notifyInterruptOccurred = 1;    /* do at least one iteration */
723                 for (;;)
724                 {
725                         notifyInterruptEnabled = 1;
726                         if (!notifyInterruptOccurred)
727                                 break;
728                         notifyInterruptEnabled = 0;
729                         if (notifyInterruptOccurred)
730                         {
731                                 /* Here, it is finally safe to do stuff. */
732                                 if (Trace_notify)
733                                         elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
734
735                                 ProcessIncomingNotify();
736
737                                 if (Trace_notify)
738                                         elog(DEBUG1, "NotifyInterruptHandler: done");
739                         }
740                 }
741
742                 /*
743                  * Restore ImmediateInterruptOK, and check for interrupts if
744                  * needed.
745                  */
746                 ImmediateInterruptOK = save_ImmediateInterruptOK;
747                 if (save_ImmediateInterruptOK)
748                         CHECK_FOR_INTERRUPTS();
749         }
750         else
751         {
752                 /*
753                  * In this path it is NOT SAFE to do much of anything, except
754                  * this:
755                  */
756                 notifyInterruptOccurred = 1;
757         }
758
759         errno = save_errno;
760 }
761
762 /*
763  * --------------------------------------------------------------
764  * EnableNotifyInterrupt
765  *
766  *              This is called by the PostgresMain main loop just before waiting
767  *              for a frontend command.  If we are truly idle (ie, *not* inside
768  *              a transaction block), then process any pending inbound notifies,
769  *              and enable the signal handler to process future notifies directly.
770  *
771  *              NOTE: the signal handler starts out disabled, and stays so until
772  *              PostgresMain calls this the first time.
773  * --------------------------------------------------------------
774  */
775 void
776 EnableNotifyInterrupt(void)
777 {
778         if (IsTransactionOrTransactionBlock())
779                 return;                                 /* not really idle */
780
781         /*
782          * This code is tricky because we are communicating with a signal
783          * handler that could interrupt us at any point.  If we just checked
784          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
785          * could fail to respond promptly to a signal that happens in between
786          * those two steps.  (A very small time window, perhaps, but Murphy's
787          * Law says you can hit it...)  Instead, we first set the enable flag,
788          * then test the occurred flag.  If we see an unserviced interrupt has
789          * occurred, we re-clear the enable flag before going off to do the
790          * service work.  (That prevents re-entrant invocation of
791          * ProcessIncomingNotify() if another interrupt occurs.) If an
792          * interrupt comes in between the setting and clearing of
793          * notifyInterruptEnabled, then it will have done the service work and
794          * left notifyInterruptOccurred zero, so we have to check again after
795          * clearing enable.  The whole thing has to be in a loop in case
796          * another interrupt occurs while we're servicing the first. Once we
797          * get out of the loop, enable is set and we know there is no
798          * unserviced interrupt.
799          *
800          * NB: an overenthusiastic optimizing compiler could easily break this
801          * code.  Hopefully, they all understand what "volatile" means these
802          * days.
803          */
804         for (;;)
805         {
806                 notifyInterruptEnabled = 1;
807                 if (!notifyInterruptOccurred)
808                         break;
809                 notifyInterruptEnabled = 0;
810                 if (notifyInterruptOccurred)
811                 {
812                         if (Trace_notify)
813                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
814
815                         ProcessIncomingNotify();
816
817                         if (Trace_notify)
818                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
819                 }
820         }
821 }
822
823 /*
824  * --------------------------------------------------------------
825  * DisableNotifyInterrupt
826  *
827  *              This is called by the PostgresMain main loop just after receiving
828  *              a frontend command.  Signal handler execution of inbound notifies
829  *              is disabled until the next EnableNotifyInterrupt call.
830  *
831  *              The SIGUSR1 signal handler also needs to call this, so as to
832  *              prevent conflicts if one signal interrupts the other.  So we
833  *              must return the previous state of the flag.
834  * --------------------------------------------------------------
835  */
836 bool
837 DisableNotifyInterrupt(void)
838 {
839         bool    result = (notifyInterruptEnabled != 0);
840
841         notifyInterruptEnabled = 0;
842
843         return result;
844 }
845
846 /*
847  * --------------------------------------------------------------
848  * ProcessIncomingNotify
849  *
850  *              Deal with arriving NOTIFYs from other backends.
851  *              This is called either directly from the SIGUSR2 signal handler,
852  *              or the next time control reaches the outer idle loop.
853  *              Scan pg_listener for arriving notifies, report them to my front end,
854  *              and clear the notification field in pg_listener until next time.
855  *
856  *              NOTE: since we are outside any transaction, we must create our own.
857  * --------------------------------------------------------------
858  */
859 static void
860 ProcessIncomingNotify(void)
861 {
862         Relation        lRel;
863         TupleDesc       tdesc;
864         ScanKeyData key[1];
865         HeapScanDesc scan;
866         HeapTuple       lTuple,
867                                 rTuple;
868         Datum           value[Natts_pg_listener];
869         char            repl[Natts_pg_listener],
870                                 nulls[Natts_pg_listener];
871         bool            catchup_enabled;
872
873         /* Must prevent SIGUSR1 interrupt while I am running */
874         catchup_enabled = DisableCatchupInterrupt();
875
876         if (Trace_notify)
877                 elog(DEBUG1, "ProcessIncomingNotify");
878
879         set_ps_display("notify interrupt");
880
881         notifyInterruptOccurred = 0;
882
883         StartTransactionCommand();
884
885         lRel = heap_openr(ListenerRelationName, ExclusiveLock);
886         tdesc = RelationGetDescr(lRel);
887
888         /* Scan only entries with my listenerPID */
889         ScanKeyInit(&key[0],
890                                 Anum_pg_listener_pid,
891                                 BTEqualStrategyNumber, F_INT4EQ,
892                                 Int32GetDatum(MyProcPid));
893         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
894
895         /* Prepare data for rewriting 0 into notification field */
896         nulls[0] = nulls[1] = nulls[2] = ' ';
897         repl[0] = repl[1] = repl[2] = ' ';
898         repl[Anum_pg_listener_notify - 1] = 'r';
899         value[0] = value[1] = value[2] = (Datum) 0;
900         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
901
902         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
903         {
904                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
905                 char       *relname = NameStr(listener->relname);
906                 int32           sourcePID = listener->notification;
907
908                 if (sourcePID != 0)
909                 {
910                         /* Notify the frontend */
911
912                         if (Trace_notify)
913                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
914                                          relname, (int) sourcePID);
915
916                         NotifyMyFrontEnd(relname, sourcePID);
917                         /*
918                          * Rewrite the tuple with 0 in notification column.
919                          *
920                          * simple_heap_update is safe here because no one else would
921                          * have tried to UNLISTEN us, so there can be no uncommitted
922                          * changes.
923                          */
924                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
925                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
926
927 #ifdef NOT_USED                                 /* currently there are no indexes */
928                         CatalogUpdateIndexes(lRel, rTuple);
929 #endif
930                 }
931         }
932         heap_endscan(scan);
933
934         /*
935          * We do NOT release the lock on pg_listener here; we need to hold it
936          * until end of transaction (which is about to happen, anyway) to
937          * ensure that other backends see our tuple updates when they look.
938          * Otherwise, a transaction started after this one might mistakenly
939          * think it doesn't need to send this backend a new NOTIFY.
940          */
941         heap_close(lRel, NoLock);
942
943         CommitTransactionCommand();
944
945         /*
946          * Must flush the notify messages to ensure frontend gets them
947          * promptly.
948          */
949         pq_flush();
950
951         set_ps_display("idle");
952
953         if (Trace_notify)
954                 elog(DEBUG1, "ProcessIncomingNotify: done");
955
956         if (catchup_enabled)
957                 EnableCatchupInterrupt();
958 }
959
960 /*
961  * Send NOTIFY message to my front end.
962  */
963 static void
964 NotifyMyFrontEnd(char *relname, int32 listenerPID)
965 {
966         if (whereToSendOutput == Remote)
967         {
968                 StringInfoData buf;
969
970                 pq_beginmessage(&buf, 'A');
971                 pq_sendint(&buf, listenerPID, sizeof(int32));
972                 pq_sendstring(&buf, relname);
973                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
974                 {
975                         /* XXX Add parameter string here later */
976                         pq_sendstring(&buf, "");
977                 }
978                 pq_endmessage(&buf);
979
980                 /*
981                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
982                  * happen at the end of the transaction, and for incoming notifies
983                  * ProcessIncomingNotify will do it after finding all the
984                  * notifies.
985                  */
986         }
987         else
988                 elog(INFO, "NOTIFY for %s", relname);
989 }
990
991 /* Does pendingNotifies include the given relname? */
992 static bool
993 AsyncExistsPendingNotify(const char *relname)
994 {
995         ListCell   *p;
996
997         foreach(p, pendingNotifies)
998         {
999                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
1000                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
1001                         return true;
1002         }
1003
1004         return false;
1005 }
1006
1007 /* Clear the pendingNotifies list. */
1008 static void
1009 ClearPendingNotifies(void)
1010 {
1011         /*
1012          * We used to have to explicitly deallocate the list members and
1013          * nodes, because they were malloc'd.  Now, since we know they are
1014          * palloc'd in CurTransactionContext, we need not do that --- they'll
1015          * go away automatically at transaction exit.  We need only reset the
1016          * list head pointer.
1017          */
1018         pendingNotifies = NIL;
1019 }