]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Commit to match discussed elog() changes. Only update is that LOG is
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.82 2002/03/02 21:39:22 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <sys/types.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "libpq/libpq.h"
86 #include "libpq/pqformat.h"
87 #include "miscadmin.h"
88 #include "tcop/tcopprot.h"
89 #include "utils/fmgroids.h"
90 #include "utils/ps_status.h"
91 #include "utils/syscache.h"
92
93
94 /* stuff that we really ought not be touching directly :-( */
95 extern TransactionState CurrentTransactionState;
96
97
98 /*
99  * State for outbound notifies consists of a list of all relnames NOTIFYed
100  * in the current transaction.  We do not actually perform a NOTIFY until
101  * and unless the transaction commits.  pendingNotifies is NIL if no
102  * NOTIFYs have been done in the current transaction.  The List nodes and
103  * referenced strings are all palloc'd in TopTransactionContext.
104  */
105 static List *pendingNotifies = NIL;
106
107 /*
108  * State for inbound notifies consists of two flags: one saying whether
109  * the signal handler is currently allowed to call ProcessIncomingNotify
110  * directly, and one saying whether the signal has occurred but the handler
111  * was not allowed to call ProcessIncomingNotify at the time.
112  *
113  * NB: the "volatile" on these declarations is critical!  If your compiler
114  * does not grok "volatile", you'd be best advised to compile this file
115  * with all optimization turned off.
116  */
117 static volatile int notifyInterruptEnabled = 0;
118 static volatile int notifyInterruptOccurred = 0;
119
120 /* True if we've registered an on_shmem_exit cleanup */
121 static bool unlistenExitRegistered = false;
122
123 bool            Trace_notify = false;
124
125
126 static void Async_UnlistenAll(void);
127 static void Async_UnlistenOnExit(void);
128 static void ProcessIncomingNotify(void);
129 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
130 static bool AsyncExistsPendingNotify(const char *relname);
131 static void ClearPendingNotifies(void);
132
133
134 /*
135  *--------------------------------------------------------------
136  * Async_Notify
137  *
138  *              This is executed by the SQL notify command.
139  *
140  *              Adds the relation to the list of pending notifies.
141  *              Actual notification happens during transaction commit.
142  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
143  *
144  * Results:
145  *              XXX
146  *
147  *--------------------------------------------------------------
148  */
149 void
150 Async_Notify(char *relname)
151 {
152         if (Trace_notify)
153                 elog(LOG, "Async_Notify: %s", relname);
154
155         /* no point in making duplicate entries in the list ... */
156         if (!AsyncExistsPendingNotify(relname))
157         {
158                 /*
159                  * The name list needs to live until end of transaction, so store
160                  * it in the top transaction context.
161                  */
162                 MemoryContext oldcontext;
163
164                 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
165
166                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
167
168                 MemoryContextSwitchTo(oldcontext);
169         }
170 }
171
172 /*
173  *--------------------------------------------------------------
174  * Async_Listen
175  *
176  *              This is executed by the SQL listen command.
177  *
178  *              Register a backend (identified by its Unix PID) as listening
179  *              on the specified relation.
180  *
181  * Results:
182  *              XXX
183  *
184  * Side effects:
185  *              pg_listener is updated.
186  *
187  *--------------------------------------------------------------
188  */
189 void
190 Async_Listen(char *relname, int pid)
191 {
192         Relation        lRel;
193         HeapScanDesc scan;
194         HeapTuple       tuple;
195         Datum           values[Natts_pg_listener];
196         char            nulls[Natts_pg_listener];
197         int                     i;
198         bool            alreadyListener = false;
199
200         if (Trace_notify)
201                 elog(LOG, "Async_Listen: %s", relname);
202
203         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
204
205         /* Detect whether we are already listening on this relname */
206         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
207         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
208         {
209                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
210
211                 if (listener->listenerpid == pid &&
212                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
213                 {
214                         alreadyListener = true;
215                         /* No need to scan the rest of the table */
216                         break;
217                 }
218         }
219         heap_endscan(scan);
220
221         if (alreadyListener)
222         {
223                 heap_close(lRel, AccessExclusiveLock);
224                 elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
225                 return;
226         }
227
228         /*
229          * OK to insert a new tuple
230          */
231
232         for (i = 0; i < Natts_pg_listener; i++)
233         {
234                 nulls[i] = ' ';
235                 values[i] = PointerGetDatum(NULL);
236         }
237
238         i = 0;
239         values[i++] = (Datum) relname;
240         values[i++] = (Datum) pid;
241         values[i++] = (Datum) 0;        /* no notifies pending */
242
243         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
244         heap_insert(lRel, tuple);
245
246 #ifdef NOT_USED                                 /* currently there are no indexes */
247         if (RelationGetForm(lRel)->relhasindex)
248         {
249                 Relation        idescs[Num_pg_listener_indices];
250
251                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
252                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, tuple);
253                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
254         }
255 #endif
256
257         heap_freetuple(tuple);
258
259         heap_close(lRel, AccessExclusiveLock);
260
261         /*
262          * now that we are listening, make sure we will unlisten before dying.
263          */
264         if (!unlistenExitRegistered)
265         {
266                 on_shmem_exit(Async_UnlistenOnExit, 0);
267                 unlistenExitRegistered = true;
268         }
269 }
270
271 /*
272  *--------------------------------------------------------------
273  * Async_Unlisten
274  *
275  *              This is executed by the SQL unlisten command.
276  *
277  *              Remove the backend from the list of listening backends
278  *              for the specified relation.
279  *
280  * Results:
281  *              XXX
282  *
283  * Side effects:
284  *              pg_listener is updated.
285  *
286  *--------------------------------------------------------------
287  */
288 void
289 Async_Unlisten(char *relname, int pid)
290 {
291         Relation        lRel;
292         HeapScanDesc scan;
293         HeapTuple       tuple;
294
295         /* Handle specially the `unlisten "*"' command */
296         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
297         {
298                 Async_UnlistenAll();
299                 return;
300         }
301
302         if (Trace_notify)
303                 elog(LOG, "Async_Unlisten %s", relname);
304
305         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
306
307         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
308         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
309         {
310                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
311
312                 if (listener->listenerpid == pid &&
313                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
314                 {
315                         /* Found the matching tuple, delete it */
316                         simple_heap_delete(lRel, &tuple->t_self);
317
318                         /*
319                          * We assume there can be only one match, so no need to scan
320                          * the rest of the table
321                          */
322                         break;
323                 }
324         }
325         heap_endscan(scan);
326
327         heap_close(lRel, AccessExclusiveLock);
328
329         /*
330          * We do not complain about unlistening something not being listened;
331          * should we?
332          */
333 }
334
335 /*
336  *--------------------------------------------------------------
337  * Async_UnlistenAll
338  *
339  *              Unlisten all relations for this backend.
340  *
341  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
342  *
343  * Results:
344  *              XXX
345  *
346  * Side effects:
347  *              pg_listener is updated.
348  *
349  *--------------------------------------------------------------
350  */
351 static void
352 Async_UnlistenAll(void)
353 {
354         Relation        lRel;
355         TupleDesc       tdesc;
356         HeapScanDesc scan;
357         HeapTuple       lTuple;
358         ScanKeyData key[1];
359
360         if (Trace_notify)
361                 elog(LOG, "Async_UnlistenAll");
362
363         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
364         tdesc = RelationGetDescr(lRel);
365
366         /* Find and delete all entries with my listenerPID */
367         ScanKeyEntryInitialize(&key[0], 0,
368                                                    Anum_pg_listener_pid,
369                                                    F_INT4EQ,
370                                                    Int32GetDatum(MyProcPid));
371         scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
372
373         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
374                 simple_heap_delete(lRel, &lTuple->t_self);
375
376         heap_endscan(scan);
377         heap_close(lRel, AccessExclusiveLock);
378 }
379
380 /*
381  *--------------------------------------------------------------
382  * Async_UnlistenOnExit
383  *
384  *              Clean up the pg_listener table at backend exit.
385  *
386  *              This is executed if we have done any LISTENs in this backend.
387  *              It might not be necessary anymore, if the user UNLISTENed everything,
388  *              but we don't try to detect that case.
389  *
390  * Results:
391  *              XXX
392  *
393  * Side effects:
394  *              pg_listener is updated if necessary.
395  *
396  *--------------------------------------------------------------
397  */
398 static void
399 Async_UnlistenOnExit(void)
400 {
401         /*
402          * We need to start/commit a transaction for the unlisten, but if
403          * there is already an active transaction we had better abort that one
404          * first.  Otherwise we'd end up committing changes that probably
405          * ought to be discarded.
406          */
407         AbortOutOfAnyTransaction();
408         /* Now we can do the unlisten */
409         StartTransactionCommand();
410         Async_UnlistenAll();
411         CommitTransactionCommand();
412 }
413
414 /*
415  *--------------------------------------------------------------
416  * AtCommit_Notify
417  *
418  *              This is called at transaction commit.
419  *
420  *              If there are outbound notify requests in the pendingNotifies list,
421  *              scan pg_listener for matching tuples, and either signal the other
422  *              backend or send a message to our own frontend.
423  *
424  *              NOTE: we are still inside the current transaction, therefore can
425  *              piggyback on its committing of changes.
426  *
427  * Results:
428  *              XXX
429  *
430  * Side effects:
431  *              Tuples in pg_listener that have matching relnames and other peoples'
432  *              listenerPIDs are updated with a nonzero notification field.
433  *
434  *--------------------------------------------------------------
435  */
436 void
437 AtCommit_Notify(void)
438 {
439         Relation        lRel;
440         TupleDesc       tdesc;
441         HeapScanDesc scan;
442         HeapTuple       lTuple,
443                                 rTuple;
444         Datum           value[Natts_pg_listener];
445         char            repl[Natts_pg_listener],
446                                 nulls[Natts_pg_listener];
447
448         if (pendingNotifies == NIL)
449                 return;                                 /* no NOTIFY statements in this
450                                                                  * transaction */
451
452         /*
453          * NOTIFY is disabled if not normal processing mode. This test used to
454          * be in xact.c, but it seems cleaner to do it here.
455          */
456         if (!IsNormalProcessingMode())
457         {
458                 ClearPendingNotifies();
459                 return;
460         }
461
462         if (Trace_notify)
463                 elog(LOG, "AtCommit_Notify");
464
465         /* preset data to update notify column to MyProcPid */
466         nulls[0] = nulls[1] = nulls[2] = ' ';
467         repl[0] = repl[1] = repl[2] = ' ';
468         repl[Anum_pg_listener_notify - 1] = 'r';
469         value[0] = value[1] = value[2] = (Datum) 0;
470         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
471
472         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
473         tdesc = RelationGetDescr(lRel);
474         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
475
476         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
477         {
478                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
479                 char       *relname = NameStr(listener->relname);
480                 int32           listenerPID = listener->listenerpid;
481
482                 if (!AsyncExistsPendingNotify(relname))
483                         continue;
484
485                 if (listenerPID == MyProcPid)
486                 {
487                         /*
488                          * Self-notify: no need to bother with table update. Indeed,
489                          * we *must not* clear the notification field in this path, or
490                          * we could lose an outside notify, which'd be bad for
491                          * applications that ignore self-notify messages.
492                          */
493
494                         if (Trace_notify)
495                                 elog(LOG, "AtCommit_Notify: notifying self");
496
497                         NotifyMyFrontEnd(relname, listenerPID);
498                 }
499                 else
500                 {
501                         if (Trace_notify)
502                                 elog(LOG, "AtCommit_Notify: notifying pid %d",
503                                          listenerPID);
504
505                         /*
506                          * If someone has already notified this listener, we don't
507                          * bother modifying the table, but we do still send a SIGUSR2
508                          * signal, just in case that backend missed the earlier signal
509                          * for some reason.  It's OK to send the signal first, because
510                          * the other guy can't read pg_listener until we unlock it.
511                          */
512                         if (kill(listenerPID, SIGUSR2) < 0)
513                         {
514                                 /*
515                                  * Get rid of pg_listener entry if it refers to a PID that
516                                  * no longer exists.  Presumably, that backend crashed
517                                  * without deleting its pg_listener entries. This code
518                                  * used to only delete the entry if errno==ESRCH, but as
519                                  * far as I can see we should just do it for any failure
520                                  * (certainly at least for EPERM too...)
521                                  */
522                                 simple_heap_delete(lRel, &lTuple->t_self);
523                         }
524                         else if (listener->notification == 0)
525                         {
526                                 rTuple = heap_modifytuple(lTuple, lRel,
527                                                                                   value, nulls, repl);
528                                 simple_heap_update(lRel, &lTuple->t_self, rTuple);
529
530 #ifdef NOT_USED                                 /* currently there are no indexes */
531                                 if (RelationGetForm(lRel)->relhasindex)
532                                 {
533                                         Relation        idescs[Num_pg_listener_indices];
534
535                                         CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
536                                         CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
537                                         CatalogCloseIndices(Num_pg_listener_indices, idescs);
538                                 }
539 #endif
540                         }
541                 }
542         }
543
544         heap_endscan(scan);
545
546         /*
547          * We do NOT release the lock on pg_listener here; we need to hold it
548          * until end of transaction (which is about to happen, anyway) to
549          * ensure that notified backends see our tuple updates when they look.
550          * Else they might disregard the signal, which would make the
551          * application programmer very unhappy.
552          */
553         heap_close(lRel, NoLock);
554
555         ClearPendingNotifies();
556
557         if (Trace_notify)
558                 elog(LOG, "AtCommit_Notify: done");
559 }
560
561 /*
562  *--------------------------------------------------------------
563  * AtAbort_Notify
564  *
565  *              This is called at transaction abort.
566  *
567  *              Gets rid of pending outbound notifies that we would have executed
568  *              if the transaction got committed.
569  *
570  * Results:
571  *              XXX
572  *
573  *--------------------------------------------------------------
574  */
575 void
576 AtAbort_Notify(void)
577 {
578         ClearPendingNotifies();
579 }
580
581 /*
582  *--------------------------------------------------------------
583  * Async_NotifyHandler
584  *
585  *              This is the signal handler for SIGUSR2.
586  *
587  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
588  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
589  *              to do it later.
590  *
591  * Results:
592  *              none
593  *
594  * Side effects:
595  *              per above
596  *--------------------------------------------------------------
597  */
598 void
599 Async_NotifyHandler(SIGNAL_ARGS)
600 {
601         int                     save_errno = errno;
602
603         /*
604          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
605          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
606          * which would likely lead to corruption of stdio buffers if they were
607          * ever turned on.
608          */
609
610         if (notifyInterruptEnabled)
611         {
612                 /*
613                  * I'm not sure whether some flavors of Unix might allow another
614                  * SIGUSR2 occurrence to recursively interrupt this routine. To
615                  * cope with the possibility, we do the same sort of dance that
616                  * EnableNotifyInterrupt must do --- see that routine for
617                  * comments.
618                  */
619                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
620                 notifyInterruptOccurred = 1;    /* do at least one iteration */
621                 for (;;)
622                 {
623                         notifyInterruptEnabled = 1;
624                         if (!notifyInterruptOccurred)
625                                 break;
626                         notifyInterruptEnabled = 0;
627                         if (notifyInterruptOccurred)
628                         {
629                                 /* Here, it is finally safe to do stuff. */
630                                 if (Trace_notify)
631                                         elog(LOG, "Async_NotifyHandler: perform async notify");
632
633                                 ProcessIncomingNotify();
634
635                                 if (Trace_notify)
636                                         elog(LOG, "Async_NotifyHandler: done");
637                         }
638                 }
639         }
640         else
641         {
642                 /*
643                  * In this path it is NOT SAFE to do much of anything, except
644                  * this:
645                  */
646                 notifyInterruptOccurred = 1;
647         }
648
649         errno = save_errno;
650 }
651
652 /*
653  * --------------------------------------------------------------
654  * EnableNotifyInterrupt
655  *
656  *              This is called by the PostgresMain main loop just before waiting
657  *              for a frontend command.  If we are truly idle (ie, *not* inside
658  *              a transaction block), then process any pending inbound notifies,
659  *              and enable the signal handler to process future notifies directly.
660  *
661  *              NOTE: the signal handler starts out disabled, and stays so until
662  *              PostgresMain calls this the first time.
663  * --------------------------------------------------------------
664  */
665 void
666 EnableNotifyInterrupt(void)
667 {
668         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
669                 return;                                 /* not really idle */
670
671         /*
672          * This code is tricky because we are communicating with a signal
673          * handler that could interrupt us at any point.  If we just checked
674          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
675          * could fail to respond promptly to a signal that happens in between
676          * those two steps.  (A very small time window, perhaps, but Murphy's
677          * Law says you can hit it...)  Instead, we first set the enable flag,
678          * then test the occurred flag.  If we see an unserviced interrupt has
679          * occurred, we re-clear the enable flag before going off to do the
680          * service work.  (That prevents re-entrant invocation of
681          * ProcessIncomingNotify() if another interrupt occurs.) If an
682          * interrupt comes in between the setting and clearing of
683          * notifyInterruptEnabled, then it will have done the service work and
684          * left notifyInterruptOccurred zero, so we have to check again after
685          * clearing enable.  The whole thing has to be in a loop in case
686          * another interrupt occurs while we're servicing the first. Once we
687          * get out of the loop, enable is set and we know there is no
688          * unserviced interrupt.
689          *
690          * NB: an overenthusiastic optimizing compiler could easily break this
691          * code.  Hopefully, they all understand what "volatile" means these
692          * days.
693          */
694         for (;;)
695         {
696                 notifyInterruptEnabled = 1;
697                 if (!notifyInterruptOccurred)
698                         break;
699                 notifyInterruptEnabled = 0;
700                 if (notifyInterruptOccurred)
701                 {
702                         if (Trace_notify)
703                                 elog(LOG, "EnableNotifyInterrupt: perform async notify");
704
705                         ProcessIncomingNotify();
706
707                         if (Trace_notify)
708                                 elog(LOG, "EnableNotifyInterrupt: done");
709                 }
710         }
711 }
712
713 /*
714  * --------------------------------------------------------------
715  * DisableNotifyInterrupt
716  *
717  *              This is called by the PostgresMain main loop just after receiving
718  *              a frontend command.  Signal handler execution of inbound notifies
719  *              is disabled until the next EnableNotifyInterrupt call.
720  * --------------------------------------------------------------
721  */
722 void
723 DisableNotifyInterrupt(void)
724 {
725         notifyInterruptEnabled = 0;
726 }
727
728 /*
729  * --------------------------------------------------------------
730  * ProcessIncomingNotify
731  *
732  *              Deal with arriving NOTIFYs from other backends.
733  *              This is called either directly from the SIGUSR2 signal handler,
734  *              or the next time control reaches the outer idle loop.
735  *              Scan pg_listener for arriving notifies, report them to my front end,
736  *              and clear the notification field in pg_listener until next time.
737  *
738  *              NOTE: since we are outside any transaction, we must create our own.
739  *
740  * Results:
741  *              XXX
742  *
743  * --------------------------------------------------------------
744  */
745 static void
746 ProcessIncomingNotify(void)
747 {
748         Relation        lRel;
749         TupleDesc       tdesc;
750         ScanKeyData key[1];
751         HeapScanDesc scan;
752         HeapTuple       lTuple,
753                                 rTuple;
754         Datum           value[Natts_pg_listener];
755         char            repl[Natts_pg_listener],
756                                 nulls[Natts_pg_listener];
757
758         if (Trace_notify)
759                 elog(LOG, "ProcessIncomingNotify");
760
761         set_ps_display("async_notify");
762
763         notifyInterruptOccurred = 0;
764
765         StartTransactionCommand();
766
767         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
768         tdesc = RelationGetDescr(lRel);
769
770         /* Scan only entries with my listenerPID */
771         ScanKeyEntryInitialize(&key[0], 0,
772                                                    Anum_pg_listener_pid,
773                                                    F_INT4EQ,
774                                                    Int32GetDatum(MyProcPid));
775         scan = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
776
777         /* Prepare data for rewriting 0 into notification field */
778         nulls[0] = nulls[1] = nulls[2] = ' ';
779         repl[0] = repl[1] = repl[2] = ' ';
780         repl[Anum_pg_listener_notify - 1] = 'r';
781         value[0] = value[1] = value[2] = (Datum) 0;
782         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
783
784         while (HeapTupleIsValid(lTuple = heap_getnext(scan, 0)))
785         {
786                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
787                 char       *relname = NameStr(listener->relname);
788                 int32           sourcePID = listener->notification;
789
790                 if (sourcePID != 0)
791                 {
792                         /* Notify the frontend */
793
794                         if (Trace_notify)
795                                 elog(LOG, "ProcessIncomingNotify: received %s from %d",
796                                          relname, (int) sourcePID);
797
798                         NotifyMyFrontEnd(relname, sourcePID);
799                         /* Rewrite the tuple with 0 in notification column */
800                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
801                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
802
803 #ifdef NOT_USED                                 /* currently there are no indexes */
804                         if (RelationGetForm(lRel)->relhasindex)
805                         {
806                                 Relation        idescs[Num_pg_listener_indices];
807
808                                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
809                                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
810                                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
811                         }
812 #endif
813                 }
814         }
815         heap_endscan(scan);
816
817         /*
818          * We do NOT release the lock on pg_listener here; we need to hold it
819          * until end of transaction (which is about to happen, anyway) to
820          * ensure that other backends see our tuple updates when they look.
821          * Otherwise, a transaction started after this one might mistakenly
822          * think it doesn't need to send this backend a new NOTIFY.
823          */
824         heap_close(lRel, NoLock);
825
826         CommitTransactionCommand();
827
828         /*
829          * Must flush the notify messages to ensure frontend gets them
830          * promptly.
831          */
832         pq_flush();
833
834         set_ps_display("idle");
835
836         if (Trace_notify)
837                 elog(LOG, "ProcessIncomingNotify: done");
838 }
839
840 /*
841  * Send NOTIFY message to my front end.
842  */
843 static void
844 NotifyMyFrontEnd(char *relname, int32 listenerPID)
845 {
846         if (whereToSendOutput == Remote)
847         {
848                 StringInfoData buf;
849
850                 pq_beginmessage(&buf);
851                 pq_sendbyte(&buf, 'A');
852                 pq_sendint(&buf, listenerPID, sizeof(int32));
853                 pq_sendstring(&buf, relname);
854                 pq_endmessage(&buf);
855
856                 /*
857                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
858                  * happen at the end of the transaction, and for incoming notifies
859                  * ProcessIncomingNotify will do it after finding all the
860                  * notifies.
861                  */
862         }
863         else
864                 elog(INFO, "NOTIFY for %s", relname);
865 }
866
867 /* Does pendingNotifies include the given relname? */
868 static bool
869 AsyncExistsPendingNotify(const char *relname)
870 {
871         List       *p;
872
873         foreach(p, pendingNotifies)
874         {
875                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
876                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
877                         return true;
878         }
879
880         return false;
881 }
882
883 /* Clear the pendingNotifies list. */
884 static void
885 ClearPendingNotifies(void)
886 {
887         /*
888          * We used to have to explicitly deallocate the list members and
889          * nodes, because they were malloc'd.  Now, since we know they are
890          * palloc'd in TopTransactionContext, we need not do that --- they'll
891          * go away automatically at transaction exit.  We need only reset the
892          * list head pointer.
893          */
894         pendingNotifies = NIL;
895 }