]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Create an internal semaphore API that is not tied to SysV semaphores.
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.84 2002/05/05 00:03:28 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <sys/types.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "storage/ipc.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/fmgroids.h"
91 #include "utils/ps_status.h"
92 #include "utils/syscache.h"
93
94
95 /* stuff that we really ought not be touching directly :-( */
96 extern TransactionState CurrentTransactionState;
97
98
99 /*
100  * State for outbound notifies consists of a list of all relnames NOTIFYed
101  * in the current transaction.  We do not actually perform a NOTIFY until
102  * and unless the transaction commits.  pendingNotifies is NIL if no
103  * NOTIFYs have been done in the current transaction.  The List nodes and
104  * referenced strings are all palloc'd in TopTransactionContext.
105  */
106 static List *pendingNotifies = NIL;
107
108 /*
109  * State for inbound notifies consists of two flags: one saying whether
110  * the signal handler is currently allowed to call ProcessIncomingNotify
111  * directly, and one saying whether the signal has occurred but the handler
112  * was not allowed to call ProcessIncomingNotify at the time.
113  *
114  * NB: the "volatile" on these declarations is critical!  If your compiler
115  * does not grok "volatile", you'd be best advised to compile this file
116  * with all optimization turned off.
117  */
118 static volatile int notifyInterruptEnabled = 0;
119 static volatile int notifyInterruptOccurred = 0;
120
121 /* True if we've registered an on_shmem_exit cleanup */
122 static bool unlistenExitRegistered = false;
123
124 bool            Trace_notify = false;
125
126
127 static void Async_UnlistenAll(void);
128 static void Async_UnlistenOnExit(void);
129 static void ProcessIncomingNotify(void);
130 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
131 static bool AsyncExistsPendingNotify(const char *relname);
132 static void ClearPendingNotifies(void);
133
134
135 /*
136  *--------------------------------------------------------------
137  * Async_Notify
138  *
139  *              This is executed by the SQL notify command.
140  *
141  *              Adds the relation to the list of pending notifies.
142  *              Actual notification happens during transaction commit.
143  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
144  *
145  * Results:
146  *              XXX
147  *
148  *--------------------------------------------------------------
149  */
150 void
151 Async_Notify(char *relname)
152 {
153         if (Trace_notify)
154                 elog(LOG, "Async_Notify: %s", relname);
155
156         /* no point in making duplicate entries in the list ... */
157         if (!AsyncExistsPendingNotify(relname))
158         {
159                 /*
160                  * The name list needs to live until end of transaction, so store
161                  * it in the top transaction context.
162                  */
163                 MemoryContext oldcontext;
164
165                 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
166
167                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
168
169                 MemoryContextSwitchTo(oldcontext);
170         }
171 }
172
173 /*
174  *--------------------------------------------------------------
175  * Async_Listen
176  *
177  *              This is executed by the SQL listen command.
178  *
179  *              Register a backend (identified by its Unix PID) as listening
180  *              on the specified relation.
181  *
182  * Results:
183  *              XXX
184  *
185  * Side effects:
186  *              pg_listener is updated.
187  *
188  *--------------------------------------------------------------
189  */
190 void
191 Async_Listen(char *relname, int pid)
192 {
193         Relation        lRel;
194         HeapScanDesc scan;
195         HeapTuple       tuple;
196         Datum           values[Natts_pg_listener];
197         char            nulls[Natts_pg_listener];
198         int                     i;
199         bool            alreadyListener = false;
200
201         if (Trace_notify)
202                 elog(LOG, "Async_Listen: %s", relname);
203
204         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
205
206         /* Detect whether we are already listening on this relname */
207         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
208         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
209         {
210                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
211
212                 if (listener->listenerpid == pid &&
213                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
214                 {
215                         alreadyListener = true;
216                         /* No need to scan the rest of the table */
217                         break;
218                 }
219         }
220         heap_endscan(scan);
221
222         if (alreadyListener)
223         {
224                 heap_close(lRel, AccessExclusiveLock);
225                 elog(WARNING, "Async_Listen: We are already listening on %s", relname);
226                 return;
227         }
228
229         /*
230          * OK to insert a new tuple
231          */
232
233         for (i = 0; i < Natts_pg_listener; i++)
234         {
235                 nulls[i] = ' ';
236                 values[i] = PointerGetDatum(NULL);
237         }
238
239         i = 0;
240         values[i++] = (Datum) relname;
241         values[i++] = (Datum) pid;
242         values[i++] = (Datum) 0;        /* no notifies pending */
243
244         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
245         heap_insert(lRel, tuple);
246
247 #ifdef NOT_USED                                 /* currently there are no indexes */
248         if (RelationGetForm(lRel)->relhasindex)
249         {
250                 Relation        idescs[Num_pg_listener_indices];
251
252                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
253                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
254                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
255         }
256 #endif
257
258         heap_freetuple(tuple);
259
260         heap_close(lRel, AccessExclusiveLock);
261
262         /*
263          * now that we are listening, make sure we will unlisten before dying.
264          */
265         if (!unlistenExitRegistered)
266         {
267                 on_shmem_exit(Async_UnlistenOnExit, 0);
268                 unlistenExitRegistered = true;
269         }
270 }
271
272 /*
273  *--------------------------------------------------------------
274  * Async_Unlisten
275  *
276  *              This is executed by the SQL unlisten command.
277  *
278  *              Remove the backend from the list of listening backends
279  *              for the specified relation.
280  *
281  * Results:
282  *              XXX
283  *
284  * Side effects:
285  *              pg_listener is updated.
286  *
287  *--------------------------------------------------------------
288  */
289 void
290 Async_Unlisten(char *relname, int pid)
291 {
292         Relation        lRel;
293         HeapScanDesc scan;
294         HeapTuple       tuple;
295
296         /* Handle specially the `unlisten "*"' command */
297         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
298         {
299                 Async_UnlistenAll();
300                 return;
301         }
302
303         if (Trace_notify)
304                 elog(LOG, "Async_Unlisten %s", relname);
305
306         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
307
308         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
309         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
310         {
311                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
312
313                 if (listener->listenerpid == pid &&
314                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
315                 {
316                         /* Found the matching tuple, delete it */
317                         simple_heap_delete(lRel, &tuple->t_self);
318
319                         /*
320                          * We assume there can be only one match, so no need to scan
321                          * the rest of the table
322                          */
323                         break;
324                 }
325         }
326         heap_endscan(scan);
327
328         heap_close(lRel, AccessExclusiveLock);
329
330         /*
331          * We do not complain about unlistening something not being listened;
332          * should we?
333          */
334 }
335
336 /*
337  *--------------------------------------------------------------
338  * Async_UnlistenAll
339  *
340  *              Unlisten all relations for this backend.
341  *
342  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
343  *
344  * Results:
345  *              XXX
346  *
347  * Side effects:
348  *              pg_listener is updated.
349  *
350  *--------------------------------------------------------------
351  */
352 static void
353 Async_UnlistenAll(void)
354 {
355         Relation        lRel;
356         TupleDesc       tdesc;
357         HeapScanDesc scan;
358         HeapTuple       lTuple;
359         ScanKeyData key[1];
360
361         if (Trace_notify)
362                 elog(LOG, "Async_UnlistenAll");
363
364         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
365         tdesc = RelationGetDescr(lRel);
366
367         /* Find and delete all entries with my listenerPID */
368         ScanKeyEntryInitialize(&key[0], 0,
369                                                    Anum_pg_listener_pid,
370                                                    F_INT4EQ,
371                                                    Int32GetDatum(MyProcPid));
372         scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
373
374         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
375                 simple_heap_delete(lRel, &lTuple->t_self);
376
377         heap_endscan(scan);
378         heap_close(lRel, AccessExclusiveLock);
379 }
380
381 /*
382  *--------------------------------------------------------------
383  * Async_UnlistenOnExit
384  *
385  *              Clean up the pg_listener table at backend exit.
386  *
387  *              This is executed if we have done any LISTENs in this backend.
388  *              It might not be necessary anymore, if the user UNLISTENed everything,
389  *              but we don't try to detect that case.
390  *
391  * Results:
392  *              XXX
393  *
394  * Side effects:
395  *              pg_listener is updated if necessary.
396  *
397  *--------------------------------------------------------------
398  */
399 static void
400 Async_UnlistenOnExit(void)
401 {
402         /*
403          * We need to start/commit a transaction for the unlisten, but if
404          * there is already an active transaction we had better abort that one
405          * first.  Otherwise we'd end up committing changes that probably
406          * ought to be discarded.
407          */
408         AbortOutOfAnyTransaction();
409         /* Now we can do the unlisten */
410         StartTransactionCommand();
411         Async_UnlistenAll();
412         CommitTransactionCommand();
413 }
414
415 /*
416  *--------------------------------------------------------------
417  * AtCommit_Notify
418  *
419  *              This is called at transaction commit.
420  *
421  *              If there are outbound notify requests in the pendingNotifies list,
422  *              scan pg_listener for matching tuples, and either signal the other
423  *              backend or send a message to our own frontend.
424  *
425  *              NOTE: we are still inside the current transaction, therefore can
426  *              piggyback on its committing of changes.
427  *
428  * Results:
429  *              XXX
430  *
431  * Side effects:
432  *              Tuples in pg_listener that have matching relnames and other peoples'
433  *              listenerPIDs are updated with a nonzero notification field.
434  *
435  *--------------------------------------------------------------
436  */
437 void
438 AtCommit_Notify(void)
439 {
440         Relation        lRel;
441         TupleDesc       tdesc;
442         HeapScanDesc scan;
443         HeapTuple       lTuple,
444                                 rTuple;
445         Datum           value[Natts_pg_listener];
446         char            repl[Natts_pg_listener],
447                                 nulls[Natts_pg_listener];
448
449         if (pendingNotifies == NIL)
450                 return;                                 /* no NOTIFY statements in this
451                                                                  * transaction */
452
453         /*
454          * NOTIFY is disabled if not normal processing mode. This test used to
455          * be in xact.c, but it seems cleaner to do it here.
456          */
457         if (!IsNormalProcessingMode())
458         {
459                 ClearPendingNotifies();
460                 return;
461         }
462
463         if (Trace_notify)
464                 elog(LOG, "AtCommit_Notify");
465
466         /* preset data to update notify column to MyProcPid */
467         nulls[0] = nulls[1] = nulls[2] = ' ';
468         repl[0] = repl[1] = repl[2] = ' ';
469         repl[Anum_pg_listener_notify - 1] = 'r';
470         value[0] = value[1] = value[2] = (Datum) 0;
471         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
472
473         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
474         tdesc = RelationGetDescr(lRel);
475         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
476
477         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
478         {
479                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
480                 char       *relname = NameStr(listener->relname);
481                 int32           listenerPID = listener->listenerpid;
482
483                 if (!AsyncExistsPendingNotify(relname))
484                         continue;
485
486                 if (listenerPID == MyProcPid)
487                 {
488                         /*
489                          * Self-notify: no need to bother with table update. Indeed,
490                          * we *must not* clear the notification field in this path, or
491                          * we could lose an outside notify, which'd be bad for
492                          * applications that ignore self-notify messages.
493                          */
494
495                         if (Trace_notify)
496                                 elog(LOG, "AtCommit_Notify: notifying self");
497
498                         NotifyMyFrontEnd(relname, listenerPID);
499                 }
500                 else
501                 {
502                         if (Trace_notify)
503                                 elog(LOG, "AtCommit_Notify: notifying pid %d",
504                                          listenerPID);
505
506                         /*
507                          * If someone has already notified this listener, we don't
508                          * bother modifying the table, but we do still send a SIGUSR2
509                          * signal, just in case that backend missed the earlier signal
510                          * for some reason.  It's OK to send the signal first, because
511                          * the other guy can't read pg_listener until we unlock it.
512                          */
513                         if (kill(listenerPID, SIGUSR2) < 0)
514                         {
515                                 /*
516                                  * Get rid of pg_listener entry if it refers to a PID that
517                                  * no longer exists.  Presumably, that backend crashed
518                                  * without deleting its pg_listener entries. This code
519                                  * used to only delete the entry if errno==ESRCH, but as
520                                  * far as I can see we should just do it for any failure
521                                  * (certainly at least for EPERM too...)
522                                  */
523                                 simple_heap_delete(lRel, &lTuple->t_self);
524                         }
525                         else if (listener->notification == 0)
526                         {
527                                 rTuple = heap_modifytuple(lTuple, lRel,
528                                                                                   value, nulls, repl);
529                                 simple_heap_update(lRel, &lTuple->t_self, rTuple);
530
531 #ifdef NOT_USED                                 /* currently there are no indexes */
532                                 if (RelationGetForm(lRel)->relhasindex)
533                                 {
534                                         Relation        idescs[Num_pg_listener_indices];
535
536                                         CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
537                                         CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
538                                         CatalogCloseIndices(Num_pg_listener_indices, idescs);
539                                 }
540 #endif
541                         }
542                 }
543         }
544
545         heap_endscan(scan);
546
547         /*
548          * We do NOT release the lock on pg_listener here; we need to hold it
549          * until end of transaction (which is about to happen, anyway) to
550          * ensure that notified backends see our tuple updates when they look.
551          * Else they might disregard the signal, which would make the
552          * application programmer very unhappy.
553          */
554         heap_close(lRel, NoLock);
555
556         ClearPendingNotifies();
557
558         if (Trace_notify)
559                 elog(LOG, "AtCommit_Notify: done");
560 }
561
562 /*
563  *--------------------------------------------------------------
564  * AtAbort_Notify
565  *
566  *              This is called at transaction abort.
567  *
568  *              Gets rid of pending outbound notifies that we would have executed
569  *              if the transaction got committed.
570  *
571  * Results:
572  *              XXX
573  *
574  *--------------------------------------------------------------
575  */
576 void
577 AtAbort_Notify(void)
578 {
579         ClearPendingNotifies();
580 }
581
582 /*
583  *--------------------------------------------------------------
584  * Async_NotifyHandler
585  *
586  *              This is the signal handler for SIGUSR2.
587  *
588  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
589  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
590  *              to do it later.
591  *
592  * Results:
593  *              none
594  *
595  * Side effects:
596  *              per above
597  *--------------------------------------------------------------
598  */
599 void
600 Async_NotifyHandler(SIGNAL_ARGS)
601 {
602         int                     save_errno = errno;
603
604         /*
605          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
606          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
607          * which would likely lead to corruption of stdio buffers if they were
608          * ever turned on.
609          */
610
611         if (notifyInterruptEnabled)
612         {
613                 /*
614                  * I'm not sure whether some flavors of Unix might allow another
615                  * SIGUSR2 occurrence to recursively interrupt this routine. To
616                  * cope with the possibility, we do the same sort of dance that
617                  * EnableNotifyInterrupt must do --- see that routine for
618                  * comments.
619                  */
620                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
621                 notifyInterruptOccurred = 1;    /* do at least one iteration */
622                 for (;;)
623                 {
624                         notifyInterruptEnabled = 1;
625                         if (!notifyInterruptOccurred)
626                                 break;
627                         notifyInterruptEnabled = 0;
628                         if (notifyInterruptOccurred)
629                         {
630                                 /* Here, it is finally safe to do stuff. */
631                                 if (Trace_notify)
632                                         elog(LOG, "Async_NotifyHandler: perform async notify");
633
634                                 ProcessIncomingNotify();
635
636                                 if (Trace_notify)
637                                         elog(LOG, "Async_NotifyHandler: done");
638                         }
639                 }
640         }
641         else
642         {
643                 /*
644                  * In this path it is NOT SAFE to do much of anything, except
645                  * this:
646                  */
647                 notifyInterruptOccurred = 1;
648         }
649
650         errno = save_errno;
651 }
652
653 /*
654  * --------------------------------------------------------------
655  * EnableNotifyInterrupt
656  *
657  *              This is called by the PostgresMain main loop just before waiting
658  *              for a frontend command.  If we are truly idle (ie, *not* inside
659  *              a transaction block), then process any pending inbound notifies,
660  *              and enable the signal handler to process future notifies directly.
661  *
662  *              NOTE: the signal handler starts out disabled, and stays so until
663  *              PostgresMain calls this the first time.
664  * --------------------------------------------------------------
665  */
666 void
667 EnableNotifyInterrupt(void)
668 {
669         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
670                 return;                                 /* not really idle */
671
672         /*
673          * This code is tricky because we are communicating with a signal
674          * handler that could interrupt us at any point.  If we just checked
675          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
676          * could fail to respond promptly to a signal that happens in between
677          * those two steps.  (A very small time window, perhaps, but Murphy's
678          * Law says you can hit it...)  Instead, we first set the enable flag,
679          * then test the occurred flag.  If we see an unserviced interrupt has
680          * occurred, we re-clear the enable flag before going off to do the
681          * service work.  (That prevents re-entrant invocation of
682          * ProcessIncomingNotify() if another interrupt occurs.) If an
683          * interrupt comes in between the setting and clearing of
684          * notifyInterruptEnabled, then it will have done the service work and
685          * left notifyInterruptOccurred zero, so we have to check again after
686          * clearing enable.  The whole thing has to be in a loop in case
687          * another interrupt occurs while we're servicing the first. Once we
688          * get out of the loop, enable is set and we know there is no
689          * unserviced interrupt.
690          *
691          * NB: an overenthusiastic optimizing compiler could easily break this
692          * code.  Hopefully, they all understand what "volatile" means these
693          * days.
694          */
695         for (;;)
696         {
697                 notifyInterruptEnabled = 1;
698                 if (!notifyInterruptOccurred)
699                         break;
700                 notifyInterruptEnabled = 0;
701                 if (notifyInterruptOccurred)
702                 {
703                         if (Trace_notify)
704                                 elog(LOG, "EnableNotifyInterrupt: perform async notify");
705
706                         ProcessIncomingNotify();
707
708                         if (Trace_notify)
709                                 elog(LOG, "EnableNotifyInterrupt: done");
710                 }
711         }
712 }
713
714 /*
715  * --------------------------------------------------------------
716  * DisableNotifyInterrupt
717  *
718  *              This is called by the PostgresMain main loop just after receiving
719  *              a frontend command.  Signal handler execution of inbound notifies
720  *              is disabled until the next EnableNotifyInterrupt call.
721  * --------------------------------------------------------------
722  */
723 void
724 DisableNotifyInterrupt(void)
725 {
726         notifyInterruptEnabled = 0;
727 }
728
729 /*
730  * --------------------------------------------------------------
731  * ProcessIncomingNotify
732  *
733  *              Deal with arriving NOTIFYs from other backends.
734  *              This is called either directly from the SIGUSR2 signal handler,
735  *              or the next time control reaches the outer idle loop.
736  *              Scan pg_listener for arriving notifies, report them to my front end,
737  *              and clear the notification field in pg_listener until next time.
738  *
739  *              NOTE: since we are outside any transaction, we must create our own.
740  *
741  * Results:
742  *              XXX
743  *
744  * --------------------------------------------------------------
745  */
746 static void
747 ProcessIncomingNotify(void)
748 {
749         Relation        lRel;
750         TupleDesc       tdesc;
751         ScanKeyData key[1];
752         HeapScanDesc scan;
753         HeapTuple       lTuple,
754                                 rTuple;
755         Datum           value[Natts_pg_listener];
756         char            repl[Natts_pg_listener],
757                                 nulls[Natts_pg_listener];
758
759         if (Trace_notify)
760                 elog(LOG, "ProcessIncomingNotify");
761
762         set_ps_display("async_notify");
763
764         notifyInterruptOccurred = 0;
765
766         StartTransactionCommand();
767
768         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
769         tdesc = RelationGetDescr(lRel);
770
771         /* Scan only entries with my listenerPID */
772         ScanKeyEntryInitialize(&key[0], 0,
773                                                    Anum_pg_listener_pid,
774                                                    F_INT4EQ,
775                                                    Int32GetDatum(MyProcPid));
776         scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
777
778         /* Prepare data for rewriting 0 into notification field */
779         nulls[0] = nulls[1] = nulls[2] = ' ';
780         repl[0] = repl[1] = repl[2] = ' ';
781         repl[Anum_pg_listener_notify - 1] = 'r';
782         value[0] = value[1] = value[2] = (Datum) 0;
783         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
784
785         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
786         {
787                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
788                 char       *relname = NameStr(listener->relname);
789                 int32           sourcePID = listener->notification;
790
791                 if (sourcePID != 0)
792                 {
793                         /* Notify the frontend */
794
795                         if (Trace_notify)
796                                 elog(LOG, "ProcessIncomingNotify: received %s from %d",
797                                          relname, (int) sourcePID);
798
799                         NotifyMyFrontEnd(relname, sourcePID);
800                         /* Rewrite the tuple with 0 in notification column */
801                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
802                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
803
804 #ifdef NOT_USED                                 /* currently there are no indexes */
805                         if (RelationGetForm(lRel)->relhasindex)
806                         {
807                                 Relation        idescs[Num_pg_listener_indices];
808
809                                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
810                                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
811                                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
812                         }
813 #endif
814                 }
815         }
816         heap_endscan(scan);
817
818         /*
819          * We do NOT release the lock on pg_listener here; we need to hold it
820          * until end of transaction (which is about to happen, anyway) to
821          * ensure that other backends see our tuple updates when they look.
822          * Otherwise, a transaction started after this one might mistakenly
823          * think it doesn't need to send this backend a new NOTIFY.
824          */
825         heap_close(lRel, NoLock);
826
827         CommitTransactionCommand();
828
829         /*
830          * Must flush the notify messages to ensure frontend gets them
831          * promptly.
832          */
833         pq_flush();
834
835         set_ps_display("idle");
836
837         if (Trace_notify)
838                 elog(LOG, "ProcessIncomingNotify: done");
839 }
840
841 /*
842  * Send NOTIFY message to my front end.
843  */
844 static void
845 NotifyMyFrontEnd(char *relname, int32 listenerPID)
846 {
847         if (whereToSendOutput == Remote)
848         {
849                 StringInfoData buf;
850
851                 pq_beginmessage(&buf);
852                 pq_sendbyte(&buf, 'A');
853                 pq_sendint(&buf, listenerPID, sizeof(int32));
854                 pq_sendstring(&buf, relname);
855                 pq_endmessage(&buf);
856
857                 /*
858                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
859                  * happen at the end of the transaction, and for incoming notifies
860                  * ProcessIncomingNotify will do it after finding all the
861                  * notifies.
862                  */
863         }
864         else
865                 elog(INFO, "NOTIFY for %s", relname);
866 }
867
868 /* Does pendingNotifies include the given relname? */
869 static bool
870 AsyncExistsPendingNotify(const char *relname)
871 {
872         List       *p;
873
874         foreach(p, pendingNotifies)
875         {
876                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
877                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
878                         return true;
879         }
880
881         return false;
882 }
883
884 /* Clear the pendingNotifies list. */
885 static void
886 ClearPendingNotifies(void)
887 {
888         /*
889          * We used to have to explicitly deallocate the list members and
890          * nodes, because they were malloc'd.  Now, since we know they are
891          * palloc'd in TopTransactionContext, we need not do that --- they'll
892          * go away automatically at transaction exit.  We need only reset the
893          * list head pointer.
894          */
895         pendingNotifies = NIL;
896 }