]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Rename heap_replace to heap_update.
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  * IDENTIFICATION
9  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.56 1999/11/24 00:44:29 momjian Exp $
10  *
11  *-------------------------------------------------------------------------
12  */
13
14 /*-------------------------------------------------------------------------
15  * New Async Notification Model:
16  * 1. Multiple backends on same machine.  Multiple backends listening on
17  *        one relation.  (Note: "listening on a relation" is not really the
18  *        right way to think about it, since the notify names need not have
19  *        anything to do with the names of relations actually in the database.
20  *        But this terminology is all over the code and docs, and I don't feel
21  *        like trying to replace it.)
22  *
23  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
24  *        ie, each relname/listenerPID pair.  The "notification" field of the
25  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
26  *        of the originating backend when a cross-backend NOTIFY is pending.
27  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
28  *        notification field should never be equal to the listenerPID field.)
29  *
30  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
31  *        relname to a list of outstanding NOTIFY requests.  Actual processing
32  *        happens if and only if we reach transaction commit.  At that time (in
33  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
34  *        If the listenerPID in a matching tuple is ours, we just send a notify
35  *        message to our own front end.  If it is not ours, and "notification"
36  *        is not already nonzero, we set notification to our own PID and send a
37  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
38  *        BTW: if the signal operation fails, we presume that the listener backend
39  *        crashed without removing this tuple, and remove the tuple for it.
40  *
41  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
42  *        notify processing immediately if this backend is idle (ie, it is
43  *        waiting for a frontend command and is not within a transaction block).
44  *        Otherwise the handler may only set a flag, which will cause the
45  *        processing to occur just before we next go idle.
46  *
47  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
48  *        matching our own listenerPID and having nonzero notification fields.
49  *        For each such tuple, we send a message to our frontend and clear the
50  *        notification field.  BTW: this routine has to start/commit its own
51  *        transaction, since by assumption it is only called from outside any
52  *        transaction.
53  *
54  * Although we grab AccessExclusiveLock on pg_listener for any operation,
55  * the lock is never held very long, so it shouldn't cause too much of
56  * a performance problem.
57  *
58  * An application that listens on the same relname it notifies will get
59  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
60  * by comparing be_pid in the NOTIFY message to the application's own backend's
61  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
62  * frontend during startup.)  The above design guarantees that notifies from
63  * other backends will never be missed by ignoring self-notifies.  Note,
64  * however, that we do *not* guarantee that a separate frontend message will
65  * be sent for every outside NOTIFY.  Since there is only room for one
66  * originating PID in pg_listener, outside notifies occurring at about the
67  * same time may be collapsed into a single message bearing the PID of the
68  * first outside backend to perform the NOTIFY.
69  *-------------------------------------------------------------------------
70  */
71
72 #include <unistd.h>
73 #include <signal.h>
74 #include <errno.h>
75 #include <sys/types.h>
76 #include <netinet/in.h>
77
78 #include "postgres.h"
79
80 #include "access/heapam.h"
81 #include "catalog/catname.h"
82 #include "catalog/indexing.h"
83 #include "catalog/pg_listener.h"
84 #include "commands/async.h"
85 #include "lib/dllist.h"
86 #include "libpq/libpq.h"
87 #include "libpq/pqformat.h"
88 #include "miscadmin.h"
89 #include "utils/ps_status.h"
90 #include "utils/syscache.h"
91 #include "utils/trace.h"
92
93 /* stuff that we really ought not be touching directly :-( */
94 extern TransactionState CurrentTransactionState;
95 extern CommandDest whereToSendOutput;
96
97 /*
98  * State for outbound notifies consists of a list of all relnames NOTIFYed
99  * in the current transaction.  We do not actually perform a NOTIFY until
100  * and unless the transaction commits.  pendingNotifies is NULL if no
101  * NOTIFYs have been done in the current transaction.
102  */
103 static Dllist *pendingNotifies = NULL;
104
105 /*
106  * State for inbound notifies consists of two flags: one saying whether
107  * the signal handler is currently allowed to call ProcessIncomingNotify
108  * directly, and one saying whether the signal has occurred but the handler
109  * was not allowed to call ProcessIncomingNotify at the time.
110  *
111  * NB: the "volatile" on these declarations is critical!  If your compiler
112  * does not grok "volatile", you'd be best advised to compile this file
113  * with all optimization turned off.
114  */
115 static volatile int notifyInterruptEnabled = 0;
116 static volatile int notifyInterruptOccurred = 0;
117
118 /* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
119 static int      unlistenExitRegistered = 0;
120
121
122 static void Async_UnlistenAll(void);
123 static void Async_UnlistenOnExit(void);
124 static void ProcessIncomingNotify(void);
125 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
126 static int      AsyncExistsPendingNotify(char *relname);
127 static void ClearPendingNotifies(void);
128
129
130 /*
131  *--------------------------------------------------------------
132  * Async_Notify
133  *
134  *              This is executed by the SQL notify command.
135  *
136  *              Adds the relation to the list of pending notifies.
137  *              Actual notification happens during transaction commit.
138  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
139  *
140  * Results:
141  *              XXX
142  *
143  *--------------------------------------------------------------
144  */
145 void
146 Async_Notify(char *relname)
147 {
148         char       *notifyName;
149
150         TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
151
152         if (!pendingNotifies)
153                 pendingNotifies = DLNewList();
154         /* no point in making duplicate entries in the list ... */
155         if (!AsyncExistsPendingNotify(relname))
156         {
157                 /*
158                  * We allocate list memory from the global malloc pool to ensure
159                  * that it will live until we want to use it.  This is probably not
160                  * necessary any longer, since we will use it before the end of the
161                  * transaction. DLList only knows how to use malloc() anyway, but we
162                  * could probably palloc() the strings...
163                  */
164                 notifyName = strdup(relname);
165                 DLAddHead(pendingNotifies, DLNewElem(notifyName));
166         }
167 }
168
169 /*
170  *--------------------------------------------------------------
171  * Async_Listen
172  *
173  *              This is executed by the SQL listen command.
174  *
175  *              Register a backend (identified by its Unix PID) as listening
176  *              on the specified relation.
177  *
178  * Results:
179  *              XXX
180  *
181  * Side effects:
182  *              pg_listener is updated.
183  *
184  *--------------------------------------------------------------
185  */
186 void
187 Async_Listen(char *relname, int pid)
188 {
189         Relation        lRel;
190         TupleDesc       tdesc;
191         HeapScanDesc scan;
192         HeapTuple       tuple,
193                                 newtup;
194         Datum           values[Natts_pg_listener];
195         char            nulls[Natts_pg_listener];
196         Datum           d;
197         int                     i;
198         bool            isnull;
199         int                     alreadyListener = 0;
200         TupleDesc       tupDesc;
201
202         TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
203
204         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
205         tdesc = RelationGetDescr(lRel);
206
207         /* Detect whether we are already listening on this relname */
208         scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
209         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
210         {
211                 d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
212                 if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
213                 {
214                         d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
215                         if (DatumGetInt32(d) == pid)
216                         {
217                                 alreadyListener = 1;
218                                 /* No need to scan the rest of the table */
219                                 break;
220                         }
221                 }
222         }
223         heap_endscan(scan);
224
225         if (alreadyListener)
226         {
227                 heap_close(lRel, AccessExclusiveLock);
228                 elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
229                 return;
230         }
231
232         /*
233          * OK to insert a new tuple
234          */
235
236         for (i = 0; i < Natts_pg_listener; i++)
237         {
238                 nulls[i] = ' ';
239                 values[i] = PointerGetDatum(NULL);
240         }
241
242         i = 0;
243         values[i++] = (Datum) relname;
244         values[i++] = (Datum) pid;
245         values[i++] = (Datum) 0;        /* no notifies pending */
246
247         tupDesc = lRel->rd_att;
248         newtup = heap_formtuple(tupDesc, values, nulls);
249         heap_insert(lRel, newtup);
250         pfree(newtup);
251
252         heap_close(lRel, AccessExclusiveLock);
253
254         /*
255          * now that we are listening, make sure we will unlisten before dying.
256          */
257         if (!unlistenExitRegistered)
258         {
259                 if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
260                         elog(NOTICE, "Async_Listen: out of shmem_exit slots");
261                 unlistenExitRegistered = 1;
262         }
263 }
264
265 /*
266  *--------------------------------------------------------------
267  * Async_Unlisten
268  *
269  *              This is executed by the SQL unlisten command.
270  *
271  *              Remove the backend from the list of listening backends
272  *              for the specified relation.
273  *
274  * Results:
275  *              XXX
276  *
277  * Side effects:
278  *              pg_listener is updated.
279  *
280  *--------------------------------------------------------------
281  */
282 void
283 Async_Unlisten(char *relname, int pid)
284 {
285         Relation        lRel;
286         HeapTuple       lTuple;
287
288         /* Handle specially the `unlisten "*"' command */
289         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
290         {
291                 Async_UnlistenAll();
292                 return;
293         }
294
295         TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
296
297         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
298         /* Note we assume there can be only one matching tuple. */
299         lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
300                                                                  Int32GetDatum(pid),
301                                                                  0, 0);
302         if (lTuple != NULL)
303                 heap_delete(lRel, &lTuple->t_self, NULL);
304         heap_close(lRel, AccessExclusiveLock);
305
306         /*
307          * We do not complain about unlistening something not being listened;
308          * should we?
309          */
310 }
311
312 /*
313  *--------------------------------------------------------------
314  * Async_UnlistenAll
315  *
316  *              Unlisten all relations for this backend.
317  *
318  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
319  *
320  * Results:
321  *              XXX
322  *
323  * Side effects:
324  *              pg_listener is updated.
325  *
326  *--------------------------------------------------------------
327  */
328 static void
329 Async_UnlistenAll()
330 {
331         Relation        lRel;
332         TupleDesc       tdesc;
333         HeapScanDesc sRel;
334         HeapTuple       lTuple;
335         ScanKeyData key[1];
336
337         TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
338
339         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
340         tdesc = RelationGetDescr(lRel);
341
342         /* Find and delete all entries with my listenerPID */
343         ScanKeyEntryInitialize(&key[0], 0,
344                                                    Anum_pg_listener_pid,
345                                                    F_INT4EQ,
346                                                    Int32GetDatum(MyProcPid));
347         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
348
349         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
350                 heap_delete(lRel, &lTuple->t_self, NULL);
351
352         heap_endscan(sRel);
353         heap_close(lRel, AccessExclusiveLock);
354 }
355
356 /*
357  *--------------------------------------------------------------
358  * Async_UnlistenOnExit
359  *
360  *              Clean up the pg_listener table at backend exit.
361  *
362  *              This is executed if we have done any LISTENs in this backend.
363  *              It might not be necessary anymore, if the user UNLISTENed everything,
364  *              but we don't try to detect that case.
365  *
366  * Results:
367  *              XXX
368  *
369  * Side effects:
370  *              pg_listener is updated if necessary.
371  *
372  *--------------------------------------------------------------
373  */
374 static void
375 Async_UnlistenOnExit()
376 {
377
378         /*
379          * We need to start/commit a transaction for the unlisten, but if
380          * there is already an active transaction we had better abort that one
381          * first.  Otherwise we'd end up committing changes that probably
382          * ought to be discarded.
383          */
384         AbortOutOfAnyTransaction();
385         /* Now we can do the unlisten */
386         StartTransactionCommand();
387         Async_UnlistenAll();
388         CommitTransactionCommand();
389 }
390
391 /*
392  *--------------------------------------------------------------
393  * AtCommit_Notify
394  *
395  *              This is called at transaction commit.
396  *
397  *              If there are outbound notify requests in the pendingNotifies list,
398  *              scan pg_listener for matching tuples, and either signal the other
399  *              backend or send a message to our own frontend.
400  *
401  *              NOTE: we are still inside the current transaction, therefore can
402  *              piggyback on its committing of changes.
403  *
404  * Results:
405  *              XXX
406  *
407  * Side effects:
408  *              Tuples in pg_listener that have matching relnames and other peoples'
409  *              listenerPIDs are updated with a nonzero notification field.
410  *
411  *--------------------------------------------------------------
412  */
413 void
414 AtCommit_Notify()
415 {
416         Relation        lRel;
417         TupleDesc       tdesc;
418         HeapScanDesc sRel;
419         HeapTuple       lTuple,
420                                 rTuple;
421         Datum           d,
422                                 value[Natts_pg_listener];
423         char            repl[Natts_pg_listener],
424                                 nulls[Natts_pg_listener];
425         bool            isnull;
426         char       *relname;
427         int32           listenerPID;
428
429         if (!pendingNotifies)
430                 return;                                 /* no NOTIFY statements in this
431                                                                  * transaction */
432
433         /*
434          * NOTIFY is disabled if not normal processing mode. This test used to
435          * be in xact.c, but it seems cleaner to do it here.
436          */
437         if (!IsNormalProcessingMode())
438         {
439                 ClearPendingNotifies();
440                 return;
441         }
442
443         TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
444
445         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
446         tdesc = RelationGetDescr(lRel);
447         sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
448
449         /* preset data to update notify column to MyProcPid */
450         nulls[0] = nulls[1] = nulls[2] = ' ';
451         repl[0] = repl[1] = repl[2] = ' ';
452         repl[Anum_pg_listener_notify - 1] = 'r';
453         value[0] = value[1] = value[2] = (Datum) 0;
454         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
455
456         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
457         {
458                 d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
459                 relname = (char *) DatumGetPointer(d);
460
461                 if (AsyncExistsPendingNotify(relname))
462                 {
463                         d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
464                         listenerPID = DatumGetInt32(d);
465
466                         if (listenerPID == MyProcPid)
467                         {
468                                 /*
469                                  * Self-notify: no need to bother with table update.
470                                  * Indeed, we *must not* clear the notification field in
471                                  * this path, or we could lose an outside notify, which'd
472                                  * be bad for applications that ignore self-notify
473                                  * messages.
474                                  */
475                                 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
476                                 NotifyMyFrontEnd(relname, listenerPID);
477                         }
478                         else
479                         {
480                                 TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
481                                                 listenerPID);
482
483                                 /*
484                                  * If someone has already notified this listener, we don't
485                                  * bother modifying the table, but we do still send a
486                                  * SIGUSR2 signal, just in case that backend missed the
487                                  * earlier signal for some reason.      It's OK to send the
488                                  * signal first, because the other guy can't read
489                                  * pg_listener until we unlock it.
490                                  */
491                                 if (kill(listenerPID, SIGUSR2) < 0)
492                                 {
493                                         /*
494                                          * Get rid of pg_listener entry if it refers to a PID
495                                          * that no longer exists.  Presumably, that backend
496                                          * crashed without deleting its pg_listener entries.
497                                          * This code used to only delete the entry if
498                                          * errno==ESRCH, but as far as I can see we should
499                                          * just do it for any failure (certainly at least for
500                                          * EPERM too...)
501                                          */
502                                         heap_delete(lRel, &lTuple->t_self, NULL);
503                                 }
504                                 else
505                                 {
506                                         d = heap_getattr(lTuple, Anum_pg_listener_notify,
507                                                                          tdesc, &isnull);
508                                         if (DatumGetInt32(d) == 0)
509                                         {
510                                                 rTuple = heap_modifytuple(lTuple, lRel,
511                                                                                                   value, nulls, repl);
512                                                 heap_update(lRel, &lTuple->t_self, rTuple, NULL);
513                                                 if (RelationGetForm(lRel)->relhasindex)
514                                                 {
515                                                         Relation        idescs[Num_pg_listener_indices];
516                                         
517                                                         CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
518                                                         CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
519                                                         CatalogCloseIndices(Num_pg_listener_indices, idescs);
520                                                 }
521                                         }
522                                 }
523                         }
524                 }
525         }
526
527         heap_endscan(sRel);
528
529         /*
530          * We do NOT release the lock on pg_listener here; we need to hold it
531          * until end of transaction (which is about to happen, anyway) to
532          * ensure that notified backends see our tuple updates when they look.
533          * Else they might disregard the signal, which would make the
534          * application programmer very unhappy.
535          */
536         heap_close(lRel, NoLock);
537
538         ClearPendingNotifies();
539
540         TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
541 }
542
543 /*
544  *--------------------------------------------------------------
545  * AtAbort_Notify
546  *
547  *              This is called at transaction abort.
548  *
549  *              Gets rid of pending outbound notifies that we would have executed
550  *              if the transaction got committed.
551  *
552  * Results:
553  *              XXX
554  *
555  *--------------------------------------------------------------
556  */
557 void
558 AtAbort_Notify()
559 {
560         ClearPendingNotifies();
561 }
562
563 /*
564  *--------------------------------------------------------------
565  * Async_NotifyHandler
566  *
567  *              This is the signal handler for SIGUSR2.
568  *
569  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
570  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
571  *              to do it later.
572  *
573  * Results:
574  *              none
575  *
576  * Side effects:
577  *              per above
578  *--------------------------------------------------------------
579  */
580
581 void
582 Async_NotifyHandler(SIGNAL_ARGS)
583 {
584
585         /*
586          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
587          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
588          * which would likely lead to corruption of stdio buffers if they were
589          * ever turned on.
590          */
591
592         if (notifyInterruptEnabled)
593         {
594
595                 /*
596                  * I'm not sure whether some flavors of Unix might allow another
597                  * SIGUSR2 occurrence to recursively interrupt this routine. To
598                  * cope with the possibility, we do the same sort of dance that
599                  * EnableNotifyInterrupt must do --- see that routine for
600                  * comments.
601                  */
602                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
603                 notifyInterruptOccurred = 1;    /* do at least one iteration */
604                 for (;;)
605                 {
606                         notifyInterruptEnabled = 1;
607                         if (!notifyInterruptOccurred)
608                                 break;
609                         notifyInterruptEnabled = 0;
610                         if (notifyInterruptOccurred)
611                         {
612                                 /* Here, it is finally safe to do stuff. */
613                                 TPRINTF(TRACE_NOTIFY,
614                                                 "Async_NotifyHandler: perform async notify");
615                                 ProcessIncomingNotify();
616                                 TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
617                         }
618                 }
619         }
620         else
621         {
622
623                 /*
624                  * In this path it is NOT SAFE to do much of anything, except
625                  * this:
626                  */
627                 notifyInterruptOccurred = 1;
628         }
629 }
630
631 /*
632  * --------------------------------------------------------------
633  * EnableNotifyInterrupt
634  *
635  *              This is called by the PostgresMain main loop just before waiting
636  *              for a frontend command.  If we are truly idle (ie, *not* inside
637  *              a transaction block), then process any pending inbound notifies,
638  *              and enable the signal handler to process future notifies directly.
639  *
640  *              NOTE: the signal handler starts out disabled, and stays so until
641  *              PostgresMain calls this the first time.
642  * --------------------------------------------------------------
643  */
644
645 void
646 EnableNotifyInterrupt(void)
647 {
648         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
649                 return;                                 /* not really idle */
650
651         /*
652          * This code is tricky because we are communicating with a signal
653          * handler that could interrupt us at any point.  If we just checked
654          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
655          * could fail to respond promptly to a signal that happens in between
656          * those two steps.  (A very small time window, perhaps, but Murphy's
657          * Law says you can hit it...)  Instead, we first set the enable flag,
658          * then test the occurred flag.  If we see an unserviced interrupt has
659          * occurred, we re-clear the enable flag before going off to do the
660          * service work.  (That prevents re-entrant invocation of
661          * ProcessIncomingNotify() if another interrupt occurs.) If an
662          * interrupt comes in between the setting and clearing of
663          * notifyInterruptEnabled, then it will have done the service work and
664          * left notifyInterruptOccurred zero, so we have to check again after
665          * clearing enable.  The whole thing has to be in a loop in case
666          * another interrupt occurs while we're servicing the first. Once we
667          * get out of the loop, enable is set and we know there is no
668          * unserviced interrupt.
669          *
670          * NB: an overenthusiastic optimizing compiler could easily break this
671          * code.  Hopefully, they all understand what "volatile" means these
672          * days.
673          */
674         for (;;)
675         {
676                 notifyInterruptEnabled = 1;
677                 if (!notifyInterruptOccurred)
678                         break;
679                 notifyInterruptEnabled = 0;
680                 if (notifyInterruptOccurred)
681                 {
682                         TPRINTF(TRACE_NOTIFY,
683                                         "EnableNotifyInterrupt: perform async notify");
684                         ProcessIncomingNotify();
685                         TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
686                 }
687         }
688 }
689
690 /*
691  * --------------------------------------------------------------
692  * DisableNotifyInterrupt
693  *
694  *              This is called by the PostgresMain main loop just after receiving
695  *              a frontend command.  Signal handler execution of inbound notifies
696  *              is disabled until the next EnableNotifyInterrupt call.
697  * --------------------------------------------------------------
698  */
699
700 void
701 DisableNotifyInterrupt(void)
702 {
703         notifyInterruptEnabled = 0;
704 }
705
706 /*
707  * --------------------------------------------------------------
708  * ProcessIncomingNotify
709  *
710  *              Deal with arriving NOTIFYs from other backends.
711  *              This is called either directly from the SIGUSR2 signal handler,
712  *              or the next time control reaches the outer idle loop.
713  *              Scan pg_listener for arriving notifies, report them to my front end,
714  *              and clear the notification field in pg_listener until next time.
715  *
716  *              NOTE: since we are outside any transaction, we must create our own.
717  *
718  * Results:
719  *              XXX
720  *
721  * --------------------------------------------------------------
722  */
723 static void
724 ProcessIncomingNotify(void)
725 {
726         Relation        lRel;
727         TupleDesc       tdesc;
728         ScanKeyData key[1];
729         HeapScanDesc sRel;
730         HeapTuple       lTuple,
731                                 rTuple;
732         Datum           d,
733                                 value[Natts_pg_listener];
734         char            repl[Natts_pg_listener],
735                                 nulls[Natts_pg_listener];
736         bool            isnull;
737         char       *relname;
738         int32           sourcePID;
739
740         TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
741         PS_SET_STATUS("async_notify");
742
743         notifyInterruptOccurred = 0;
744
745         StartTransactionCommand();
746
747         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
748         tdesc = RelationGetDescr(lRel);
749
750         /* Scan only entries with my listenerPID */
751         ScanKeyEntryInitialize(&key[0], 0,
752                                                    Anum_pg_listener_pid,
753                                                    F_INT4EQ,
754                                                    Int32GetDatum(MyProcPid));
755         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
756
757         /* Prepare data for rewriting 0 into notification field */
758         nulls[0] = nulls[1] = nulls[2] = ' ';
759         repl[0] = repl[1] = repl[2] = ' ';
760         repl[Anum_pg_listener_notify - 1] = 'r';
761         value[0] = value[1] = value[2] = (Datum) 0;
762         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
763
764         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
765         {
766                 d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
767                 sourcePID = DatumGetInt32(d);
768                 if (sourcePID != 0)
769                 {
770                         d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
771                         relname = (char *) DatumGetPointer(d);
772                         /* Notify the frontend */
773                         TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
774                                         relname, (int) sourcePID);
775                         NotifyMyFrontEnd(relname, sourcePID);
776                         /* Rewrite the tuple with 0 in notification column */
777                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
778                         heap_update(lRel, &lTuple->t_self, rTuple, NULL);
779                         if (RelationGetForm(lRel)->relhasindex)
780                         {
781                                 Relation        idescs[Num_pg_listener_indices];
782                 
783                                 CatalogOpenIndices(Num_pg_listener_indices, Name_pg_listener_indices, idescs);
784                                 CatalogIndexInsert(idescs, Num_pg_listener_indices, lRel, rTuple);
785                                 CatalogCloseIndices(Num_pg_listener_indices, idescs);
786                         }
787                 }
788         }
789         heap_endscan(sRel);
790
791         /*
792          * We do NOT release the lock on pg_listener here; we need to hold it
793          * until end of transaction (which is about to happen, anyway) to
794          * ensure that other backends see our tuple updates when they look.
795          * Otherwise, a transaction started after this one might mistakenly
796          * think it doesn't need to send this backend a new NOTIFY.
797          */
798         heap_close(lRel, NoLock);
799
800         CommitTransactionCommand();
801
802         /*
803          * Must flush the notify messages to ensure frontend gets them
804          * promptly.
805          */
806         pq_flush();
807
808         PS_SET_STATUS("idle");
809         TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
810 }
811
812 /* Send NOTIFY message to my front end. */
813
814 static void
815 NotifyMyFrontEnd(char *relname, int32 listenerPID)
816 {
817         if (whereToSendOutput == Remote)
818         {
819                 StringInfoData buf;
820
821                 pq_beginmessage(&buf);
822                 pq_sendbyte(&buf, 'A');
823                 pq_sendint(&buf, listenerPID, sizeof(int32));
824                 pq_sendstring(&buf, relname);
825                 pq_endmessage(&buf);
826
827                 /*
828                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
829                  * happen at the end of the transaction, and for incoming notifies
830                  * ProcessIncomingNotify will do it after finding all the
831                  * notifies.
832                  */
833         }
834         else
835                 elog(NOTICE, "NOTIFY for %s", relname);
836 }
837
838 /* Does pendingNotifies include the given relname?
839  *
840  * NB: not called unless pendingNotifies != NULL.
841  */
842
843 static int
844 AsyncExistsPendingNotify(char *relname)
845 {
846         Dlelem     *p;
847
848         for (p = DLGetHead(pendingNotifies);
849                  p != NULL;
850                  p = DLGetSucc(p))
851         {
852                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
853                 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
854                         return 1;
855         }
856
857         return 0;
858 }
859
860 /* Clear the pendingNotifies list. */
861
862 static void
863 ClearPendingNotifies()
864 {
865         Dlelem     *p;
866
867         if (pendingNotifies)
868         {
869
870                 /*
871                  * Since the referenced strings are malloc'd, we have to scan the
872                  * list and delete them individually.  If we used palloc for the
873                  * strings then we could just do DLFreeList to get rid of both the
874                  * list nodes and the list base...
875                  */
876                 while ((p = DLRemHead(pendingNotifies)) != NULL)
877                 {
878                         free(DLE_VAL(p));
879                         DLFreeElem(p);
880                 }
881                 DLFreeList(pendingNotifies);
882                 pendingNotifies = NULL;
883         }
884 }