]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Banish caddr_t (mostly), use Datum where appropriate.
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.69 2000/10/02 19:42:45 petere Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <sys/types.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/catname.h"
83 #include "catalog/indexing.h"
84 #include "catalog/pg_listener.h"
85 #include "commands/async.h"
86 #include "lib/dllist.h"
87 #include "libpq/libpq.h"
88 #include "libpq/pqformat.h"
89 #include "miscadmin.h"
90 #include "tcop/dest.h"
91 #include "utils/fmgroids.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
94
95
96 /* stuff that we really ought not be touching directly :-( */
97 extern TransactionState CurrentTransactionState;
98 extern CommandDest whereToSendOutput;
99
100 /*
101  * State for outbound notifies consists of a list of all relnames NOTIFYed
102  * in the current transaction.  We do not actually perform a NOTIFY until
103  * and unless the transaction commits.  pendingNotifies is NULL if no
104  * NOTIFYs have been done in the current transaction.
105  */
106 static Dllist *pendingNotifies = NULL;
107
108 /*
109  * State for inbound notifies consists of two flags: one saying whether
110  * the signal handler is currently allowed to call ProcessIncomingNotify
111  * directly, and one saying whether the signal has occurred but the handler
112  * was not allowed to call ProcessIncomingNotify at the time.
113  *
114  * NB: the "volatile" on these declarations is critical!  If your compiler
115  * does not grok "volatile", you'd be best advised to compile this file
116  * with all optimization turned off.
117  */
118 static volatile int notifyInterruptEnabled = 0;
119 static volatile int notifyInterruptOccurred = 0;
120
121 /* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
122 static int      unlistenExitRegistered = 0;
123
124
125 static void Async_UnlistenAll(void);
126 static void Async_UnlistenOnExit(void);
127 static void ProcessIncomingNotify(void);
128 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
129 static int      AsyncExistsPendingNotify(char *relname);
130 static void ClearPendingNotifies(void);
131
132 bool Trace_notify = false;
133
134
135 /*
136  *--------------------------------------------------------------
137  * Async_Notify
138  *
139  *              This is executed by the SQL notify command.
140  *
141  *              Adds the relation to the list of pending notifies.
142  *              Actual notification happens during transaction commit.
143  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
144  *
145  * Results:
146  *              XXX
147  *
148  *--------------------------------------------------------------
149  */
150 void
151 Async_Notify(char *relname)
152 {
153         char       *notifyName;
154
155         if (Trace_notify)
156                 elog(DEBUG, "Async_Notify: %s", relname);
157
158         if (!pendingNotifies)
159                 pendingNotifies = DLNewList();
160         /* no point in making duplicate entries in the list ... */
161         if (!AsyncExistsPendingNotify(relname))
162         {
163                 /*
164                  * We allocate list memory from the global malloc pool to ensure
165                  * that it will live until we want to use it.  This is probably
166                  * not necessary any longer, since we will use it before the end
167                  * of the transaction. DLList only knows how to use malloc()
168                  * anyway, but we could probably palloc() the strings...
169                  */
170                 notifyName = strdup(relname);
171                 DLAddHead(pendingNotifies, DLNewElem(notifyName));
172         }
173 }
174
175 /*
176  *--------------------------------------------------------------
177  * Async_Listen
178  *
179  *              This is executed by the SQL listen command.
180  *
181  *              Register a backend (identified by its Unix PID) as listening
182  *              on the specified relation.
183  *
184  * Results:
185  *              XXX
186  *
187  * Side effects:
188  *              pg_listener is updated.
189  *
190  *--------------------------------------------------------------
191  */
192 void
193 Async_Listen(char *relname, int pid)
194 {
195         Relation        lRel;
196         TupleDesc       tdesc;
197         HeapTuple       tuple,
198                                 newtup;
199         Datum           values[Natts_pg_listener];
200         char            nulls[Natts_pg_listener];
201         int                     i;
202         TupleDesc       tupDesc;
203
204         if (Trace_notify)
205                 elog(DEBUG, "Async_Listen: %s", relname);
206
207         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
208         tdesc = RelationGetDescr(lRel);
209
210         /* Detect whether we are already listening on this relname */
211         tuple = SearchSysCacheTuple(LISTENREL, Int32GetDatum(pid),
212                                                                 PointerGetDatum(relname),
213                                                                 0, 0);
214         if (tuple != NULL)
215         {
216                 /* No need to scan the rest of the table */
217                 heap_close(lRel, AccessExclusiveLock);
218                 elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
219                 return;
220         }
221
222         /*
223          * OK to insert a new tuple
224          */
225
226         for (i = 0; i < Natts_pg_listener; i++)
227         {
228                 nulls[i] = ' ';
229                 values[i] = PointerGetDatum(NULL);
230         }
231
232         i = 0;
233         values[i++] = (Datum) relname;
234         values[i++] = (Datum) pid;
235         values[i++] = (Datum) 0;        /* no notifies pending */
236
237         tupDesc = lRel->rd_att;
238         newtup = heap_formtuple(tupDesc, values, nulls);
239         heap_insert(lRel, newtup);
240         if (RelationGetForm(lRel)->relhasindex)
241         {
242                 Relation        idescs[Num_pg_listener_indices];
243
244                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
245                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, newtup);
246                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
247         }
248
249         heap_freetuple(newtup);
250
251         heap_close(lRel, AccessExclusiveLock);
252
253         /*
254          * now that we are listening, make sure we will unlisten before dying.
255          */
256         if (!unlistenExitRegistered)
257         {
258                 if (on_shmem_exit(Async_UnlistenOnExit, 0) < 0)
259                         elog(NOTICE, "Async_Listen: out of shmem_exit slots");
260                 unlistenExitRegistered = 1;
261         }
262 }
263
264 /*
265  *--------------------------------------------------------------
266  * Async_Unlisten
267  *
268  *              This is executed by the SQL unlisten command.
269  *
270  *              Remove the backend from the list of listening backends
271  *              for the specified relation.
272  *
273  * Results:
274  *              XXX
275  *
276  * Side effects:
277  *              pg_listener is updated.
278  *
279  *--------------------------------------------------------------
280  */
281 void
282 Async_Unlisten(char *relname, int pid)
283 {
284         Relation        lRel;
285         HeapTuple       lTuple;
286
287         /* Handle specially the `unlisten "*"' command */
288         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
289         {
290                 Async_UnlistenAll();
291                 return;
292         }
293
294         if (Trace_notify)
295                 elog(DEBUG, "Async_Unlisten %s", relname);
296
297         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
298         /* Note we assume there can be only one matching tuple. */
299         lTuple = SearchSysCacheTuple(LISTENREL, Int32GetDatum(pid),
300                                                                 PointerGetDatum(relname),
301                                                                 0, 0);
302         if (lTuple != NULL)
303                 heap_delete(lRel, &lTuple->t_self, NULL);
304         heap_close(lRel, AccessExclusiveLock);
305
306         /*
307          * We do not complain about unlistening something not being listened;
308          * should we?
309          */
310 }
311
312 /*
313  *--------------------------------------------------------------
314  * Async_UnlistenAll
315  *
316  *              Unlisten all relations for this backend.
317  *
318  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
319  *
320  * Results:
321  *              XXX
322  *
323  * Side effects:
324  *              pg_listener is updated.
325  *
326  *--------------------------------------------------------------
327  */
328 static void
329 Async_UnlistenAll()
330 {
331         Relation        lRel;
332         TupleDesc       tdesc;
333         HeapScanDesc sRel;
334         HeapTuple       lTuple;
335         ScanKeyData key[1];
336
337         if (Trace_notify)
338                 elog(DEBUG, "Async_UnlistenAll");
339
340         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
341         tdesc = RelationGetDescr(lRel);
342
343         /* Find and delete all entries with my listenerPID */
344         ScanKeyEntryInitialize(&key[0], 0,
345                                                    Anum_pg_listener_pid,
346                                                    F_INT4EQ,
347                                                    Int32GetDatum(MyProcPid));
348         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
349
350         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
351                 heap_delete(lRel, &lTuple->t_self, NULL);
352
353         heap_endscan(sRel);
354         heap_close(lRel, AccessExclusiveLock);
355 }
356
357 /*
358  *--------------------------------------------------------------
359  * Async_UnlistenOnExit
360  *
361  *              Clean up the pg_listener table at backend exit.
362  *
363  *              This is executed if we have done any LISTENs in this backend.
364  *              It might not be necessary anymore, if the user UNLISTENed everything,
365  *              but we don't try to detect that case.
366  *
367  * Results:
368  *              XXX
369  *
370  * Side effects:
371  *              pg_listener is updated if necessary.
372  *
373  *--------------------------------------------------------------
374  */
375 static void
376 Async_UnlistenOnExit(void)
377 {
378
379         /*
380          * We need to start/commit a transaction for the unlisten, but if
381          * there is already an active transaction we had better abort that one
382          * first.  Otherwise we'd end up committing changes that probably
383          * ought to be discarded.
384          */
385         AbortOutOfAnyTransaction();
386         /* Now we can do the unlisten */
387         StartTransactionCommand();
388         Async_UnlistenAll();
389         CommitTransactionCommand();
390 }
391
392 /*
393  *--------------------------------------------------------------
394  * AtCommit_Notify
395  *
396  *              This is called at transaction commit.
397  *
398  *              If there are outbound notify requests in the pendingNotifies list,
399  *              scan pg_listener for matching tuples, and either signal the other
400  *              backend or send a message to our own frontend.
401  *
402  *              NOTE: we are still inside the current transaction, therefore can
403  *              piggyback on its committing of changes.
404  *
405  * Results:
406  *              XXX
407  *
408  * Side effects:
409  *              Tuples in pg_listener that have matching relnames and other peoples'
410  *              listenerPIDs are updated with a nonzero notification field.
411  *
412  *--------------------------------------------------------------
413  */
414 void
415 AtCommit_Notify()
416 {
417         Relation        lRel;
418         TupleDesc       tdesc;
419         HeapScanDesc sRel;
420         HeapTuple       lTuple,
421                                 rTuple;
422         Datum           d,
423                                 value[Natts_pg_listener];
424         char            repl[Natts_pg_listener],
425                                 nulls[Natts_pg_listener];
426         bool            isnull;
427         char       *relname;
428         int32           listenerPID;
429
430         if (!pendingNotifies)
431                 return;                                 /* no NOTIFY statements in this
432                                                                  * transaction */
433
434         /*
435          * NOTIFY is disabled if not normal processing mode. This test used to
436          * be in xact.c, but it seems cleaner to do it here.
437          */
438         if (!IsNormalProcessingMode())
439         {
440                 ClearPendingNotifies();
441                 return;
442         }
443
444         if (Trace_notify)
445                 elog(DEBUG, "AtCommit_Notify");
446
447         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
448         tdesc = RelationGetDescr(lRel);
449         sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
450
451         /* preset data to update notify column to MyProcPid */
452         nulls[0] = nulls[1] = nulls[2] = ' ';
453         repl[0] = repl[1] = repl[2] = ' ';
454         repl[Anum_pg_listener_notify - 1] = 'r';
455         value[0] = value[1] = value[2] = (Datum) 0;
456         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
457
458         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
459         {
460                 d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
461                 relname = (char *) DatumGetPointer(d);
462
463                 if (AsyncExistsPendingNotify(relname))
464                 {
465                         d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
466                         listenerPID = DatumGetInt32(d);
467
468                         if (listenerPID == MyProcPid)
469                         {
470
471                                 /*
472                                  * Self-notify: no need to bother with table update.
473                                  * Indeed, we *must not* clear the notification field in
474                                  * this path, or we could lose an outside notify, which'd
475                                  * be bad for applications that ignore self-notify
476                                  * messages.
477                                  */
478
479                                 if (Trace_notify)
480                                         elog(DEBUG, "AtCommit_Notify: notifying self");
481
482                                 NotifyMyFrontEnd(relname, listenerPID);
483                         }
484                         else
485                         {
486                                 if (Trace_notify)
487                                         elog(DEBUG, "AtCommit_Notify: notifying pid %d", listenerPID);
488
489                                 /*
490                                  * If someone has already notified this listener, we don't
491                                  * bother modifying the table, but we do still send a
492                                  * SIGUSR2 signal, just in case that backend missed the
493                                  * earlier signal for some reason.      It's OK to send the
494                                  * signal first, because the other guy can't read
495                                  * pg_listener until we unlock it.
496                                  */
497                                 if (kill(listenerPID, SIGUSR2) < 0)
498                                 {
499
500                                         /*
501                                          * Get rid of pg_listener entry if it refers to a PID
502                                          * that no longer exists.  Presumably, that backend
503                                          * crashed without deleting its pg_listener entries.
504                                          * This code used to only delete the entry if
505                                          * errno==ESRCH, but as far as I can see we should
506                                          * just do it for any failure (certainly at least for
507                                          * EPERM too...)
508                                          */
509                                         heap_delete(lRel, &lTuple->t_self, NULL);
510                                 }
511                                 else
512                                 {
513                                         d = heap_getattr(lTuple, Anum_pg_listener_notify,
514                                                                          tdesc, &isnull);
515                                         if (DatumGetInt32(d) == 0)
516                                         {
517                                                 rTuple = heap_modifytuple(lTuple, lRel,
518                                                                                                   value, nulls, repl);
519                                                 heap_update(lRel, &lTuple->t_self, rTuple, NULL);
520                                                 if (RelationGetForm(lRel)->relhasindex)
521                                                 {
522                                                         Relation        idescs[Num_pg_listener_indices];
523
524                                                         CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
525                                                         CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
526                                                         CatalogCloseIndices(Num_pg_listener_indices, idescs);
527                                                 }
528                                         }
529                                 }
530                         }
531                 }
532         }
533
534         heap_endscan(sRel);
535
536         /*
537          * We do NOT release the lock on pg_listener here; we need to hold it
538          * until end of transaction (which is about to happen, anyway) to
539          * ensure that notified backends see our tuple updates when they look.
540          * Else they might disregard the signal, which would make the
541          * application programmer very unhappy.
542          */
543         heap_close(lRel, NoLock);
544
545         ClearPendingNotifies();
546
547         if (Trace_notify)
548                 elog(DEBUG, "AtCommit_Notify: done");
549 }
550
551 /*
552  *--------------------------------------------------------------
553  * AtAbort_Notify
554  *
555  *              This is called at transaction abort.
556  *
557  *              Gets rid of pending outbound notifies that we would have executed
558  *              if the transaction got committed.
559  *
560  * Results:
561  *              XXX
562  *
563  *--------------------------------------------------------------
564  */
565 void
566 AtAbort_Notify()
567 {
568         ClearPendingNotifies();
569 }
570
571 /*
572  *--------------------------------------------------------------
573  * Async_NotifyHandler
574  *
575  *              This is the signal handler for SIGUSR2.
576  *
577  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
578  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
579  *              to do it later.
580  *
581  * Results:
582  *              none
583  *
584  * Side effects:
585  *              per above
586  *--------------------------------------------------------------
587  */
588
589 void
590 Async_NotifyHandler(SIGNAL_ARGS)
591 {
592
593         /*
594          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
595          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
596          * which would likely lead to corruption of stdio buffers if they were
597          * ever turned on.
598          */
599
600         if (notifyInterruptEnabled)
601         {
602
603                 /*
604                  * I'm not sure whether some flavors of Unix might allow another
605                  * SIGUSR2 occurrence to recursively interrupt this routine. To
606                  * cope with the possibility, we do the same sort of dance that
607                  * EnableNotifyInterrupt must do --- see that routine for
608                  * comments.
609                  */
610                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
611                 notifyInterruptOccurred = 1;    /* do at least one iteration */
612                 for (;;)
613                 {
614                         notifyInterruptEnabled = 1;
615                         if (!notifyInterruptOccurred)
616                                 break;
617                         notifyInterruptEnabled = 0;
618                         if (notifyInterruptOccurred)
619                         {
620                                 /* Here, it is finally safe to do stuff. */
621                                 if (Trace_notify)
622                                         elog(DEBUG, "Async_NotifyHandler: perform async notify");
623
624                                 ProcessIncomingNotify();
625
626                                 if (Trace_notify)
627                                         elog(DEBUG, "Async_NotifyHandler: done");
628                         }
629                 }
630         }
631         else
632         {
633
634                 /*
635                  * In this path it is NOT SAFE to do much of anything, except
636                  * this:
637                  */
638                 notifyInterruptOccurred = 1;
639         }
640 }
641
642 /*
643  * --------------------------------------------------------------
644  * EnableNotifyInterrupt
645  *
646  *              This is called by the PostgresMain main loop just before waiting
647  *              for a frontend command.  If we are truly idle (ie, *not* inside
648  *              a transaction block), then process any pending inbound notifies,
649  *              and enable the signal handler to process future notifies directly.
650  *
651  *              NOTE: the signal handler starts out disabled, and stays so until
652  *              PostgresMain calls this the first time.
653  * --------------------------------------------------------------
654  */
655
656 void
657 EnableNotifyInterrupt(void)
658 {
659         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
660                 return;                                 /* not really idle */
661
662         /*
663          * This code is tricky because we are communicating with a signal
664          * handler that could interrupt us at any point.  If we just checked
665          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
666          * could fail to respond promptly to a signal that happens in between
667          * those two steps.  (A very small time window, perhaps, but Murphy's
668          * Law says you can hit it...)  Instead, we first set the enable flag,
669          * then test the occurred flag.  If we see an unserviced interrupt has
670          * occurred, we re-clear the enable flag before going off to do the
671          * service work.  (That prevents re-entrant invocation of
672          * ProcessIncomingNotify() if another interrupt occurs.) If an
673          * interrupt comes in between the setting and clearing of
674          * notifyInterruptEnabled, then it will have done the service work and
675          * left notifyInterruptOccurred zero, so we have to check again after
676          * clearing enable.  The whole thing has to be in a loop in case
677          * another interrupt occurs while we're servicing the first. Once we
678          * get out of the loop, enable is set and we know there is no
679          * unserviced interrupt.
680          *
681          * NB: an overenthusiastic optimizing compiler could easily break this
682          * code.  Hopefully, they all understand what "volatile" means these
683          * days.
684          */
685         for (;;)
686         {
687                 notifyInterruptEnabled = 1;
688                 if (!notifyInterruptOccurred)
689                         break;
690                 notifyInterruptEnabled = 0;
691                 if (notifyInterruptOccurred)
692                 {
693                         if (Trace_notify)
694                                 elog(DEBUG, "EnableNotifyInterrupt: perform async notify");
695
696                         ProcessIncomingNotify();
697
698                         if (Trace_notify)
699                                 elog(DEBUG, "EnableNotifyInterrupt: done");
700                 }
701         }
702 }
703
704 /*
705  * --------------------------------------------------------------
706  * DisableNotifyInterrupt
707  *
708  *              This is called by the PostgresMain main loop just after receiving
709  *              a frontend command.  Signal handler execution of inbound notifies
710  *              is disabled until the next EnableNotifyInterrupt call.
711  * --------------------------------------------------------------
712  */
713
714 void
715 DisableNotifyInterrupt(void)
716 {
717         notifyInterruptEnabled = 0;
718 }
719
720 /*
721  * --------------------------------------------------------------
722  * ProcessIncomingNotify
723  *
724  *              Deal with arriving NOTIFYs from other backends.
725  *              This is called either directly from the SIGUSR2 signal handler,
726  *              or the next time control reaches the outer idle loop.
727  *              Scan pg_listener for arriving notifies, report them to my front end,
728  *              and clear the notification field in pg_listener until next time.
729  *
730  *              NOTE: since we are outside any transaction, we must create our own.
731  *
732  * Results:
733  *              XXX
734  *
735  * --------------------------------------------------------------
736  */
737 static void
738 ProcessIncomingNotify(void)
739 {
740         Relation        lRel;
741         TupleDesc       tdesc;
742         ScanKeyData key[1];
743         HeapScanDesc sRel;
744         HeapTuple       lTuple,
745                                 rTuple;
746         Datum           d,
747                                 value[Natts_pg_listener];
748         char            repl[Natts_pg_listener],
749                                 nulls[Natts_pg_listener];
750         bool            isnull;
751         char       *relname;
752         int32           sourcePID;
753
754         if (Trace_notify)
755                 elog(DEBUG, "ProcessIncomingNotify");
756
757         set_ps_display("async_notify");
758
759         notifyInterruptOccurred = 0;
760
761         StartTransactionCommand();
762
763         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
764         tdesc = RelationGetDescr(lRel);
765
766         /* Scan only entries with my listenerPID */
767         ScanKeyEntryInitialize(&key[0], 0,
768                                                    Anum_pg_listener_pid,
769                                                    F_INT4EQ,
770                                                    Int32GetDatum(MyProcPid));
771         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
772
773         /* Prepare data for rewriting 0 into notification field */
774         nulls[0] = nulls[1] = nulls[2] = ' ';
775         repl[0] = repl[1] = repl[2] = ' ';
776         repl[Anum_pg_listener_notify - 1] = 'r';
777         value[0] = value[1] = value[2] = (Datum) 0;
778         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
779
780         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
781         {
782                 d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
783                 sourcePID = DatumGetInt32(d);
784                 if (sourcePID != 0)
785                 {
786                         d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
787                         relname = (char *) DatumGetPointer(d);
788                         /* Notify the frontend */
789
790                         if (Trace_notify)
791                                 elog(DEBUG, "ProcessIncomingNotify: received %s from %d",
792                                         relname, (int) sourcePID);
793
794                         NotifyMyFrontEnd(relname, sourcePID);
795                         /* Rewrite the tuple with 0 in notification column */
796                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
797                         heap_update(lRel, &lTuple->t_self, rTuple, NULL);
798                         if (RelationGetForm(lRel)->relhasindex)
799                         {
800                                 Relation        idescs[Num_pg_listener_indices];
801
802                                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
803                                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
804                                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
805                         }
806                 }
807         }
808         heap_endscan(sRel);
809
810         /*
811          * We do NOT release the lock on pg_listener here; we need to hold it
812          * until end of transaction (which is about to happen, anyway) to
813          * ensure that other backends see our tuple updates when they look.
814          * Otherwise, a transaction started after this one might mistakenly
815          * think it doesn't need to send this backend a new NOTIFY.
816          */
817         heap_close(lRel, NoLock);
818
819         CommitTransactionCommand();
820
821         /*
822          * Must flush the notify messages to ensure frontend gets them
823          * promptly.
824          */
825         pq_flush();
826
827         set_ps_display("idle");
828
829         if (Trace_notify)
830                 elog(DEBUG, "ProcessIncomingNotify: done");
831 }
832
833 /* Send NOTIFY message to my front end. */
834
835 static void
836 NotifyMyFrontEnd(char *relname, int32 listenerPID)
837 {
838         if (whereToSendOutput == Remote)
839         {
840                 StringInfoData buf;
841
842                 pq_beginmessage(&buf);
843                 pq_sendbyte(&buf, 'A');
844                 pq_sendint(&buf, listenerPID, sizeof(int32));
845                 pq_sendstring(&buf, relname);
846                 pq_endmessage(&buf);
847
848                 /*
849                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
850                  * happen at the end of the transaction, and for incoming notifies
851                  * ProcessIncomingNotify will do it after finding all the
852                  * notifies.
853                  */
854         }
855         else
856                 elog(NOTICE, "NOTIFY for %s", relname);
857 }
858
859 /* Does pendingNotifies include the given relname?
860  *
861  * NB: not called unless pendingNotifies != NULL.
862  */
863
864 static int
865 AsyncExistsPendingNotify(char *relname)
866 {
867         Dlelem     *p;
868
869         for (p = DLGetHead(pendingNotifies);
870                  p != NULL;
871                  p = DLGetSucc(p))
872         {
873                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
874                 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
875                         return 1;
876         }
877
878         return 0;
879 }
880
881 /* Clear the pendingNotifies list. */
882
883 static void
884 ClearPendingNotifies()
885 {
886         Dlelem     *p;
887
888         if (pendingNotifies)
889         {
890
891                 /*
892                  * Since the referenced strings are malloc'd, we have to scan the
893                  * list and delete them individually.  If we used palloc for the
894                  * strings then we could just do DLFreeList to get rid of both the
895                  * list nodes and the list base...
896                  */
897                 while ((p = DLRemHead(pendingNotifies)) != NULL)
898                 {
899                         free(DLE_VAL(p));
900                         DLFreeElem(p);
901                 }
902                 DLFreeList(pendingNotifies);
903                 pendingNotifies = NULL;
904         }
905 }