]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Here's the latest win32 signals code, this time in the form of a patch
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.108 2004/01/27 00:45:26 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <netinet/in.h>
79
80 #include "access/heapam.h"
81 #include "catalog/catname.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "libpq/pqsignal.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/fmgroids.h"
91 #include "utils/ps_status.h"
92 #include "utils/syscache.h"
93
94
95 /*
96  * State for outbound notifies consists of a list of all relnames NOTIFYed
97  * in the current transaction.  We do not actually perform a NOTIFY until
98  * and unless the transaction commits.  pendingNotifies is NIL if no
99  * NOTIFYs have been done in the current transaction.  The List nodes and
100  * referenced strings are all palloc'd in TopTransactionContext.
101  */
102 static List *pendingNotifies = NIL;
103
104 /*
105  * State for inbound notifies consists of two flags: one saying whether
106  * the signal handler is currently allowed to call ProcessIncomingNotify
107  * directly, and one saying whether the signal has occurred but the handler
108  * was not allowed to call ProcessIncomingNotify at the time.
109  *
110  * NB: the "volatile" on these declarations is critical!  If your compiler
111  * does not grok "volatile", you'd be best advised to compile this file
112  * with all optimization turned off.
113  */
114 static volatile int notifyInterruptEnabled = 0;
115 static volatile int notifyInterruptOccurred = 0;
116
117 /* True if we've registered an on_shmem_exit cleanup */
118 static bool unlistenExitRegistered = false;
119
120 bool            Trace_notify = false;
121
122
123 static void Async_UnlistenAll(void);
124 static void Async_UnlistenOnExit(int code, Datum arg);
125 static void ProcessIncomingNotify(void);
126 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
127 static bool AsyncExistsPendingNotify(const char *relname);
128 static void ClearPendingNotifies(void);
129
130
131 /*
132  *--------------------------------------------------------------
133  * Async_Notify
134  *
135  *              This is executed by the SQL notify command.
136  *
137  *              Adds the relation to the list of pending notifies.
138  *              Actual notification happens during transaction commit.
139  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
140  *
141  * Results:
142  *              XXX
143  *
144  *--------------------------------------------------------------
145  */
146 void
147 Async_Notify(char *relname)
148 {
149         if (Trace_notify)
150                 elog(DEBUG1, "Async_Notify(%s)", relname);
151
152         /* no point in making duplicate entries in the list ... */
153         if (!AsyncExistsPendingNotify(relname))
154         {
155                 /*
156                  * The name list needs to live until end of transaction, so store
157                  * it in the top transaction context.
158                  */
159                 MemoryContext oldcontext;
160
161                 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
162
163                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
164
165                 MemoryContextSwitchTo(oldcontext);
166         }
167 }
168
169 /*
170  *--------------------------------------------------------------
171  * Async_Listen
172  *
173  *              This is executed by the SQL listen command.
174  *
175  *              Register a backend (identified by its Unix PID) as listening
176  *              on the specified relation.
177  *
178  * Results:
179  *              XXX
180  *
181  * Side effects:
182  *              pg_listener is updated.
183  *
184  *--------------------------------------------------------------
185  */
186 void
187 Async_Listen(char *relname, int pid)
188 {
189         Relation        lRel;
190         HeapScanDesc scan;
191         HeapTuple       tuple;
192         Datum           values[Natts_pg_listener];
193         char            nulls[Natts_pg_listener];
194         int                     i;
195         bool            alreadyListener = false;
196
197         if (Trace_notify)
198                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
199
200         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
201
202         /* Detect whether we are already listening on this relname */
203         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
204         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
205         {
206                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
207
208                 if (listener->listenerpid == pid &&
209                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
210                 {
211                         alreadyListener = true;
212                         /* No need to scan the rest of the table */
213                         break;
214                 }
215         }
216         heap_endscan(scan);
217
218         if (alreadyListener)
219         {
220                 heap_close(lRel, AccessExclusiveLock);
221                 return;
222         }
223
224         /*
225          * OK to insert a new tuple
226          */
227
228         for (i = 0; i < Natts_pg_listener; i++)
229         {
230                 nulls[i] = ' ';
231                 values[i] = PointerGetDatum(NULL);
232         }
233
234         i = 0;
235         values[i++] = (Datum) relname;
236         values[i++] = (Datum) pid;
237         values[i++] = (Datum) 0;        /* no notifies pending */
238
239         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
240         simple_heap_insert(lRel, tuple);
241
242 #ifdef NOT_USED                                 /* currently there are no indexes */
243         CatalogUpdateIndexes(lRel, tuple);
244 #endif
245
246         heap_freetuple(tuple);
247
248         heap_close(lRel, AccessExclusiveLock);
249
250         /*
251          * now that we are listening, make sure we will unlisten before dying.
252          */
253         if (!unlistenExitRegistered)
254         {
255                 on_shmem_exit(Async_UnlistenOnExit, 0);
256                 unlistenExitRegistered = true;
257         }
258 }
259
260 /*
261  *--------------------------------------------------------------
262  * Async_Unlisten
263  *
264  *              This is executed by the SQL unlisten command.
265  *
266  *              Remove the backend from the list of listening backends
267  *              for the specified relation.
268  *
269  * Results:
270  *              XXX
271  *
272  * Side effects:
273  *              pg_listener is updated.
274  *
275  *--------------------------------------------------------------
276  */
277 void
278 Async_Unlisten(char *relname, int pid)
279 {
280         Relation        lRel;
281         HeapScanDesc scan;
282         HeapTuple       tuple;
283
284         /* Handle specially the `unlisten "*"' command */
285         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
286         {
287                 Async_UnlistenAll();
288                 return;
289         }
290
291         if (Trace_notify)
292                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
293
294         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
295
296         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
297         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
298         {
299                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
300
301                 if (listener->listenerpid == pid &&
302                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
303                 {
304                         /* Found the matching tuple, delete it */
305                         simple_heap_delete(lRel, &tuple->t_self);
306
307                         /*
308                          * We assume there can be only one match, so no need to scan
309                          * the rest of the table
310                          */
311                         break;
312                 }
313         }
314         heap_endscan(scan);
315
316         heap_close(lRel, AccessExclusiveLock);
317
318         /*
319          * We do not complain about unlistening something not being listened;
320          * should we?
321          */
322 }
323
324 /*
325  *--------------------------------------------------------------
326  * Async_UnlistenAll
327  *
328  *              Unlisten all relations for this backend.
329  *
330  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
331  *
332  * Results:
333  *              XXX
334  *
335  * Side effects:
336  *              pg_listener is updated.
337  *
338  *--------------------------------------------------------------
339  */
340 static void
341 Async_UnlistenAll(void)
342 {
343         Relation        lRel;
344         TupleDesc       tdesc;
345         HeapScanDesc scan;
346         HeapTuple       lTuple;
347         ScanKeyData key[1];
348
349         if (Trace_notify)
350                 elog(DEBUG1, "Async_UnlistenAll");
351
352         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
353         tdesc = RelationGetDescr(lRel);
354
355         /* Find and delete all entries with my listenerPID */
356         ScanKeyInit(&key[0],
357                                 Anum_pg_listener_pid,
358                                 BTEqualStrategyNumber, F_INT4EQ,
359                                 Int32GetDatum(MyProcPid));
360         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
361
362         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
363                 simple_heap_delete(lRel, &lTuple->t_self);
364
365         heap_endscan(scan);
366         heap_close(lRel, AccessExclusiveLock);
367 }
368
369 /*
370  *--------------------------------------------------------------
371  * Async_UnlistenOnExit
372  *
373  *              Clean up the pg_listener table at backend exit.
374  *
375  *              This is executed if we have done any LISTENs in this backend.
376  *              It might not be necessary anymore, if the user UNLISTENed everything,
377  *              but we don't try to detect that case.
378  *
379  * Results:
380  *              XXX
381  *
382  * Side effects:
383  *              pg_listener is updated if necessary.
384  *
385  *--------------------------------------------------------------
386  */
387 static void
388 Async_UnlistenOnExit(int code, Datum arg)
389 {
390         /*
391          * We need to start/commit a transaction for the unlisten, but if
392          * there is already an active transaction we had better abort that one
393          * first.  Otherwise we'd end up committing changes that probably
394          * ought to be discarded.
395          */
396         AbortOutOfAnyTransaction();
397         /* Now we can do the unlisten */
398         StartTransactionCommand();
399         Async_UnlistenAll();
400         CommitTransactionCommand();
401 }
402
403 /*
404  *--------------------------------------------------------------
405  * AtCommit_Notify
406  *
407  *              This is called at transaction commit.
408  *
409  *              If there are outbound notify requests in the pendingNotifies list,
410  *              scan pg_listener for matching tuples, and either signal the other
411  *              backend or send a message to our own frontend.
412  *
413  *              NOTE: we are still inside the current transaction, therefore can
414  *              piggyback on its committing of changes.
415  *
416  * Results:
417  *              XXX
418  *
419  * Side effects:
420  *              Tuples in pg_listener that have matching relnames and other peoples'
421  *              listenerPIDs are updated with a nonzero notification field.
422  *
423  *--------------------------------------------------------------
424  */
425 void
426 AtCommit_Notify(void)
427 {
428         Relation        lRel;
429         TupleDesc       tdesc;
430         HeapScanDesc scan;
431         HeapTuple       lTuple,
432                                 rTuple;
433         Datum           value[Natts_pg_listener];
434         char            repl[Natts_pg_listener],
435                                 nulls[Natts_pg_listener];
436
437         if (pendingNotifies == NIL)
438                 return;                                 /* no NOTIFY statements in this
439                                                                  * transaction */
440
441         /*
442          * NOTIFY is disabled if not normal processing mode. This test used to
443          * be in xact.c, but it seems cleaner to do it here.
444          */
445         if (!IsNormalProcessingMode())
446         {
447                 ClearPendingNotifies();
448                 return;
449         }
450
451         if (Trace_notify)
452                 elog(DEBUG1, "AtCommit_Notify");
453
454         /* preset data to update notify column to MyProcPid */
455         nulls[0] = nulls[1] = nulls[2] = ' ';
456         repl[0] = repl[1] = repl[2] = ' ';
457         repl[Anum_pg_listener_notify - 1] = 'r';
458         value[0] = value[1] = value[2] = (Datum) 0;
459         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
460
461         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
462         tdesc = RelationGetDescr(lRel);
463         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
464
465         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
466         {
467                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
468                 char       *relname = NameStr(listener->relname);
469                 int32           listenerPID = listener->listenerpid;
470
471                 if (!AsyncExistsPendingNotify(relname))
472                         continue;
473
474                 if (listenerPID == MyProcPid)
475                 {
476                         /*
477                          * Self-notify: no need to bother with table update. Indeed,
478                          * we *must not* clear the notification field in this path, or
479                          * we could lose an outside notify, which'd be bad for
480                          * applications that ignore self-notify messages.
481                          */
482
483                         if (Trace_notify)
484                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
485
486                         NotifyMyFrontEnd(relname, listenerPID);
487                 }
488                 else
489                 {
490                         if (Trace_notify)
491                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
492                                          listenerPID);
493
494                         /*
495                          * If someone has already notified this listener, we don't
496                          * bother modifying the table, but we do still send a SIGUSR2
497                          * signal, just in case that backend missed the earlier signal
498                          * for some reason.  It's OK to send the signal first, because
499                          * the other guy can't read pg_listener until we unlock it.
500                          */
501                         if (pqkill(listenerPID, SIGUSR2) < 0)
502                         {
503                                 /*
504                                  * Get rid of pg_listener entry if it refers to a PID that
505                                  * no longer exists.  Presumably, that backend crashed
506                                  * without deleting its pg_listener entries. This code
507                                  * used to only delete the entry if errno==ESRCH, but as
508                                  * far as I can see we should just do it for any failure
509                                  * (certainly at least for EPERM too...)
510                                  */
511                                 simple_heap_delete(lRel, &lTuple->t_self);
512                         }
513                         else if (listener->notification == 0)
514                         {
515                                 ItemPointerData ctid;
516                                 int                     result;
517
518                                 rTuple = heap_modifytuple(lTuple, lRel,
519                                                                                   value, nulls, repl);
520                                 /*
521                                  * We cannot use simple_heap_update here because the tuple
522                                  * could have been modified by an uncommitted transaction;
523                                  * specifically, since UNLISTEN releases exclusive lock on
524                                  * the table before commit, the other guy could already have
525                                  * tried to unlisten.  There are no other cases where we
526                                  * should be able to see an uncommitted update or delete.
527                                  * Therefore, our response to a HeapTupleBeingUpdated result
528                                  * is just to ignore it.  We do *not* wait for the other
529                                  * guy to commit --- that would risk deadlock, and we don't
530                                  * want to block while holding the table lock anyway for
531                                  * performance reasons.  We also ignore HeapTupleUpdated,
532                                  * which could occur if the other guy commits between our
533                                  * heap_getnext and heap_update calls.
534                                  */
535                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
536                                                                          &ctid,
537                                                                          GetCurrentCommandId(), SnapshotAny,
538                                                                          false /* no wait for commit */);
539                                 switch (result)
540                                 {
541                                         case HeapTupleSelfUpdated:
542                                                 /* Tuple was already updated in current command? */
543                                                 elog(ERROR, "tuple already updated by self");
544                                                 break;
545
546                                         case HeapTupleMayBeUpdated:
547                                                 /* done successfully */
548
549 #ifdef NOT_USED                                 /* currently there are no indexes */
550                                                 CatalogUpdateIndexes(lRel, rTuple);
551 #endif
552                                                 break;
553
554                                         case HeapTupleBeingUpdated:
555                                                 /* ignore uncommitted tuples */
556                                                 break;
557
558                                         case HeapTupleUpdated:
559                                                 /* ignore just-committed tuples */
560                                                 break;
561
562                                         default:
563                                                 elog(ERROR, "unrecognized heap_update status: %u",
564                                                          result);
565                                                 break;
566                                 }
567                         }
568                 }
569         }
570
571         heap_endscan(scan);
572
573         /*
574          * We do NOT release the lock on pg_listener here; we need to hold it
575          * until end of transaction (which is about to happen, anyway) to
576          * ensure that notified backends see our tuple updates when they look.
577          * Else they might disregard the signal, which would make the
578          * application programmer very unhappy.
579          */
580         heap_close(lRel, NoLock);
581
582         ClearPendingNotifies();
583
584         if (Trace_notify)
585                 elog(DEBUG1, "AtCommit_Notify: done");
586 }
587
588 /*
589  *--------------------------------------------------------------
590  * AtAbort_Notify
591  *
592  *              This is called at transaction abort.
593  *
594  *              Gets rid of pending outbound notifies that we would have executed
595  *              if the transaction got committed.
596  *
597  * Results:
598  *              XXX
599  *
600  *--------------------------------------------------------------
601  */
602 void
603 AtAbort_Notify(void)
604 {
605         ClearPendingNotifies();
606 }
607
608 /*
609  *--------------------------------------------------------------
610  * Async_NotifyHandler
611  *
612  *              This is the signal handler for SIGUSR2.
613  *
614  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
615  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
616  *              to do it later.
617  *
618  * Results:
619  *              none
620  *
621  * Side effects:
622  *              per above
623  *--------------------------------------------------------------
624  */
625 void
626 Async_NotifyHandler(SIGNAL_ARGS)
627 {
628         int                     save_errno = errno;
629
630         /*
631          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
632          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
633          * which would likely lead to corruption of stdio buffers if they were
634          * ever turned on.
635          */
636
637         /* Don't joggle the elbow of proc_exit */
638         if (proc_exit_inprogress)
639                 return;
640
641         if (notifyInterruptEnabled)
642         {
643                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
644
645                 /*
646                  * We may be called while ImmediateInterruptOK is true; turn it
647                  * off while messing with the NOTIFY state.  (We would have to
648                  * save and restore it anyway, because PGSemaphore operations
649                  * inside ProcessIncomingNotify() might reset it.)
650                  */
651                 ImmediateInterruptOK = false;
652
653                 /*
654                  * I'm not sure whether some flavors of Unix might allow another
655                  * SIGUSR2 occurrence to recursively interrupt this routine. To
656                  * cope with the possibility, we do the same sort of dance that
657                  * EnableNotifyInterrupt must do --- see that routine for
658                  * comments.
659                  */
660                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
661                 notifyInterruptOccurred = 1;    /* do at least one iteration */
662                 for (;;)
663                 {
664                         notifyInterruptEnabled = 1;
665                         if (!notifyInterruptOccurred)
666                                 break;
667                         notifyInterruptEnabled = 0;
668                         if (notifyInterruptOccurred)
669                         {
670                                 /* Here, it is finally safe to do stuff. */
671                                 if (Trace_notify)
672                                         elog(DEBUG1, "Async_NotifyHandler: perform async notify");
673
674                                 ProcessIncomingNotify();
675
676                                 if (Trace_notify)
677                                         elog(DEBUG1, "Async_NotifyHandler: done");
678                         }
679                 }
680
681                 /*
682                  * Restore ImmediateInterruptOK, and check for interrupts if
683                  * needed.
684                  */
685                 ImmediateInterruptOK = save_ImmediateInterruptOK;
686                 if (save_ImmediateInterruptOK)
687                         CHECK_FOR_INTERRUPTS();
688         }
689         else
690         {
691                 /*
692                  * In this path it is NOT SAFE to do much of anything, except
693                  * this:
694                  */
695                 notifyInterruptOccurred = 1;
696         }
697
698         errno = save_errno;
699 }
700
701 /*
702  * --------------------------------------------------------------
703  * EnableNotifyInterrupt
704  *
705  *              This is called by the PostgresMain main loop just before waiting
706  *              for a frontend command.  If we are truly idle (ie, *not* inside
707  *              a transaction block), then process any pending inbound notifies,
708  *              and enable the signal handler to process future notifies directly.
709  *
710  *              NOTE: the signal handler starts out disabled, and stays so until
711  *              PostgresMain calls this the first time.
712  * --------------------------------------------------------------
713  */
714 void
715 EnableNotifyInterrupt(void)
716 {
717         if (IsTransactionOrTransactionBlock())
718                 return;                                 /* not really idle */
719
720         /*
721          * This code is tricky because we are communicating with a signal
722          * handler that could interrupt us at any point.  If we just checked
723          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
724          * could fail to respond promptly to a signal that happens in between
725          * those two steps.  (A very small time window, perhaps, but Murphy's
726          * Law says you can hit it...)  Instead, we first set the enable flag,
727          * then test the occurred flag.  If we see an unserviced interrupt has
728          * occurred, we re-clear the enable flag before going off to do the
729          * service work.  (That prevents re-entrant invocation of
730          * ProcessIncomingNotify() if another interrupt occurs.) If an
731          * interrupt comes in between the setting and clearing of
732          * notifyInterruptEnabled, then it will have done the service work and
733          * left notifyInterruptOccurred zero, so we have to check again after
734          * clearing enable.  The whole thing has to be in a loop in case
735          * another interrupt occurs while we're servicing the first. Once we
736          * get out of the loop, enable is set and we know there is no
737          * unserviced interrupt.
738          *
739          * NB: an overenthusiastic optimizing compiler could easily break this
740          * code.  Hopefully, they all understand what "volatile" means these
741          * days.
742          */
743         for (;;)
744         {
745                 notifyInterruptEnabled = 1;
746                 if (!notifyInterruptOccurred)
747                         break;
748                 notifyInterruptEnabled = 0;
749                 if (notifyInterruptOccurred)
750                 {
751                         if (Trace_notify)
752                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
753
754                         ProcessIncomingNotify();
755
756                         if (Trace_notify)
757                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
758                 }
759         }
760 }
761
762 /*
763  * --------------------------------------------------------------
764  * DisableNotifyInterrupt
765  *
766  *              This is called by the PostgresMain main loop just after receiving
767  *              a frontend command.  Signal handler execution of inbound notifies
768  *              is disabled until the next EnableNotifyInterrupt call.
769  * --------------------------------------------------------------
770  */
771 void
772 DisableNotifyInterrupt(void)
773 {
774         notifyInterruptEnabled = 0;
775 }
776
777 /*
778  * --------------------------------------------------------------
779  * ProcessIncomingNotify
780  *
781  *              Deal with arriving NOTIFYs from other backends.
782  *              This is called either directly from the SIGUSR2 signal handler,
783  *              or the next time control reaches the outer idle loop.
784  *              Scan pg_listener for arriving notifies, report them to my front end,
785  *              and clear the notification field in pg_listener until next time.
786  *
787  *              NOTE: since we are outside any transaction, we must create our own.
788  *
789  * Results:
790  *              XXX
791  *
792  * --------------------------------------------------------------
793  */
794 static void
795 ProcessIncomingNotify(void)
796 {
797         Relation        lRel;
798         TupleDesc       tdesc;
799         ScanKeyData key[1];
800         HeapScanDesc scan;
801         HeapTuple       lTuple,
802                                 rTuple;
803         Datum           value[Natts_pg_listener];
804         char            repl[Natts_pg_listener],
805                                 nulls[Natts_pg_listener];
806
807         if (Trace_notify)
808                 elog(DEBUG1, "ProcessIncomingNotify");
809
810         set_ps_display("async_notify");
811
812         notifyInterruptOccurred = 0;
813
814         StartTransactionCommand();
815
816         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
817         tdesc = RelationGetDescr(lRel);
818
819         /* Scan only entries with my listenerPID */
820         ScanKeyInit(&key[0],
821                                 Anum_pg_listener_pid,
822                                 BTEqualStrategyNumber, F_INT4EQ,
823                                 Int32GetDatum(MyProcPid));
824         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
825
826         /* Prepare data for rewriting 0 into notification field */
827         nulls[0] = nulls[1] = nulls[2] = ' ';
828         repl[0] = repl[1] = repl[2] = ' ';
829         repl[Anum_pg_listener_notify - 1] = 'r';
830         value[0] = value[1] = value[2] = (Datum) 0;
831         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
832
833         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
834         {
835                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
836                 char       *relname = NameStr(listener->relname);
837                 int32           sourcePID = listener->notification;
838
839                 if (sourcePID != 0)
840                 {
841                         /* Notify the frontend */
842
843                         if (Trace_notify)
844                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
845                                          relname, (int) sourcePID);
846
847                         NotifyMyFrontEnd(relname, sourcePID);
848                         /*
849                          * Rewrite the tuple with 0 in notification column.
850                          *
851                          * simple_heap_update is safe here because no one else would
852                          * have tried to UNLISTEN us, so there can be no uncommitted
853                          * changes.
854                          */
855                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
856                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
857
858 #ifdef NOT_USED                                 /* currently there are no indexes */
859                         CatalogUpdateIndexes(lRel, rTuple);
860 #endif
861                 }
862         }
863         heap_endscan(scan);
864
865         /*
866          * We do NOT release the lock on pg_listener here; we need to hold it
867          * until end of transaction (which is about to happen, anyway) to
868          * ensure that other backends see our tuple updates when they look.
869          * Otherwise, a transaction started after this one might mistakenly
870          * think it doesn't need to send this backend a new NOTIFY.
871          */
872         heap_close(lRel, NoLock);
873
874         CommitTransactionCommand();
875
876         /*
877          * Must flush the notify messages to ensure frontend gets them
878          * promptly.
879          */
880         pq_flush();
881
882         set_ps_display("idle");
883
884         if (Trace_notify)
885                 elog(DEBUG1, "ProcessIncomingNotify: done");
886 }
887
888 /*
889  * Send NOTIFY message to my front end.
890  */
891 static void
892 NotifyMyFrontEnd(char *relname, int32 listenerPID)
893 {
894         if (whereToSendOutput == Remote)
895         {
896                 StringInfoData buf;
897
898                 pq_beginmessage(&buf, 'A');
899                 pq_sendint(&buf, listenerPID, sizeof(int32));
900                 pq_sendstring(&buf, relname);
901                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
902                 {
903                         /* XXX Add parameter string here later */
904                         pq_sendstring(&buf, "");
905                 }
906                 pq_endmessage(&buf);
907
908                 /*
909                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
910                  * happen at the end of the transaction, and for incoming notifies
911                  * ProcessIncomingNotify will do it after finding all the
912                  * notifies.
913                  */
914         }
915         else
916                 elog(INFO, "NOTIFY for %s", relname);
917 }
918
919 /* Does pendingNotifies include the given relname? */
920 static bool
921 AsyncExistsPendingNotify(const char *relname)
922 {
923         List       *p;
924
925         foreach(p, pendingNotifies)
926         {
927                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
928                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
929                         return true;
930         }
931
932         return false;
933 }
934
935 /* Clear the pendingNotifies list. */
936 static void
937 ClearPendingNotifies(void)
938 {
939         /*
940          * We used to have to explicitly deallocate the list members and
941          * nodes, because they were malloc'd.  Now, since we know they are
942          * palloc'd in TopTransactionContext, we need not do that --- they'll
943          * go away automatically at transaction exit.  We need only reset the
944          * list head pointer.
945          */
946         pendingNotifies = NIL;
947 }