]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Update misleading comment about the use of lanpltrusted ... it is
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.121 2005/04/14 20:03:23 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab ExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.  (Previously we used AccessExclusiveLock, but
58  * there's no real reason to forbid concurrent reads.)
59  *
60  * An application that listens on the same relname it notifies will get
61  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
62  * by comparing be_pid in the NOTIFY message to the application's own backend's
63  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
64  * frontend during startup.)  The above design guarantees that notifies from
65  * other backends will never be missed by ignoring self-notifies.  Note,
66  * however, that we do *not* guarantee that a separate frontend message will
67  * be sent for every outside NOTIFY.  Since there is only room for one
68  * originating PID in pg_listener, outside notifies occurring at about the
69  * same time may be collapsed into a single message bearing the PID of the
70  * first outside backend to perform the NOTIFY.
71  *-------------------------------------------------------------------------
72  */
73
74 #include "postgres.h"
75
76 #include <unistd.h>
77 #include <signal.h>
78 #include <errno.h>
79 #include <netinet/in.h>
80
81 #include "access/heapam.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "miscadmin.h"
87 #include "storage/ipc.h"
88 #include "storage/sinval.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/fmgroids.h"
91 #include "utils/ps_status.h"
92 #include "utils/syscache.h"
93
94
95 /*
96  * State for outbound notifies consists of a list of all relnames NOTIFYed
97  * in the current transaction.  We do not actually perform a NOTIFY until
98  * and unless the transaction commits.  pendingNotifies is NIL if no
99  * NOTIFYs have been done in the current transaction.
100  *
101  * The list is kept in CurTransactionContext.  In subtransactions, each
102  * subtransaction has its own list in its own CurTransactionContext, but
103  * successful subtransactions attach their lists to their parent's list.
104  * Failed subtransactions simply discard their lists.
105  */
106 static List *pendingNotifies = NIL;
107
108 static List *upperPendingNotifies = NIL;                /* list of upper-xact
109                                                                                                  * lists */
110
111 /*
112  * State for inbound notifies consists of two flags: one saying whether
113  * the signal handler is currently allowed to call ProcessIncomingNotify
114  * directly, and one saying whether the signal has occurred but the handler
115  * was not allowed to call ProcessIncomingNotify at the time.
116  *
117  * NB: the "volatile" on these declarations is critical!  If your compiler
118  * does not grok "volatile", you'd be best advised to compile this file
119  * with all optimization turned off.
120  */
121 static volatile int notifyInterruptEnabled = 0;
122 static volatile int notifyInterruptOccurred = 0;
123
124 /* True if we've registered an on_shmem_exit cleanup */
125 static bool unlistenExitRegistered = false;
126
127 bool            Trace_notify = false;
128
129
130 static void Async_UnlistenAll(void);
131 static void Async_UnlistenOnExit(int code, Datum arg);
132 static void ProcessIncomingNotify(void);
133 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
134 static bool AsyncExistsPendingNotify(const char *relname);
135 static void ClearPendingNotifies(void);
136
137
138 /*
139  *--------------------------------------------------------------
140  * Async_Notify
141  *
142  *              This is executed by the SQL notify command.
143  *
144  *              Adds the relation to the list of pending notifies.
145  *              Actual notification happens during transaction commit.
146  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
147  *
148  * Results:
149  *              XXX
150  *
151  *--------------------------------------------------------------
152  */
153 void
154 Async_Notify(char *relname)
155 {
156         if (Trace_notify)
157                 elog(DEBUG1, "Async_Notify(%s)", relname);
158
159         /* no point in making duplicate entries in the list ... */
160         if (!AsyncExistsPendingNotify(relname))
161         {
162                 /*
163                  * The name list needs to live until end of transaction, so store
164                  * it in the transaction context.
165                  */
166                 MemoryContext oldcontext;
167
168                 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
169
170                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
171
172                 MemoryContextSwitchTo(oldcontext);
173         }
174 }
175
176 /*
177  *--------------------------------------------------------------
178  * Async_Listen
179  *
180  *              This is executed by the SQL listen command.
181  *
182  *              Register a backend (identified by its Unix PID) as listening
183  *              on the specified relation.
184  *
185  * Results:
186  *              XXX
187  *
188  * Side effects:
189  *              pg_listener is updated.
190  *
191  *--------------------------------------------------------------
192  */
193 void
194 Async_Listen(char *relname, int pid)
195 {
196         Relation        lRel;
197         HeapScanDesc scan;
198         HeapTuple       tuple;
199         Datum           values[Natts_pg_listener];
200         char            nulls[Natts_pg_listener];
201         int                     i;
202         bool            alreadyListener = false;
203
204         if (Trace_notify)
205                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
206
207         lRel = heap_open(ListenerRelationId, ExclusiveLock);
208
209         /* Detect whether we are already listening on this relname */
210         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
211         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
212         {
213                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
214
215                 if (listener->listenerpid == pid &&
216                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
217                 {
218                         alreadyListener = true;
219                         /* No need to scan the rest of the table */
220                         break;
221                 }
222         }
223         heap_endscan(scan);
224
225         if (alreadyListener)
226         {
227                 heap_close(lRel, ExclusiveLock);
228                 return;
229         }
230
231         /*
232          * OK to insert a new tuple
233          */
234
235         for (i = 0; i < Natts_pg_listener; i++)
236         {
237                 nulls[i] = ' ';
238                 values[i] = PointerGetDatum(NULL);
239         }
240
241         i = 0;
242         values[i++] = (Datum) relname;
243         values[i++] = (Datum) pid;
244         values[i++] = (Datum) 0;        /* no notifies pending */
245
246         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
247         simple_heap_insert(lRel, tuple);
248
249 #ifdef NOT_USED                                 /* currently there are no indexes */
250         CatalogUpdateIndexes(lRel, tuple);
251 #endif
252
253         heap_freetuple(tuple);
254
255         heap_close(lRel, ExclusiveLock);
256
257         /*
258          * now that we are listening, make sure we will unlisten before dying.
259          */
260         if (!unlistenExitRegistered)
261         {
262                 on_shmem_exit(Async_UnlistenOnExit, 0);
263                 unlistenExitRegistered = true;
264         }
265 }
266
267 /*
268  *--------------------------------------------------------------
269  * Async_Unlisten
270  *
271  *              This is executed by the SQL unlisten command.
272  *
273  *              Remove the backend from the list of listening backends
274  *              for the specified relation.
275  *
276  * Results:
277  *              XXX
278  *
279  * Side effects:
280  *              pg_listener is updated.
281  *
282  *--------------------------------------------------------------
283  */
284 void
285 Async_Unlisten(char *relname, int pid)
286 {
287         Relation        lRel;
288         HeapScanDesc scan;
289         HeapTuple       tuple;
290
291         /* Handle specially the `unlisten "*"' command */
292         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
293         {
294                 Async_UnlistenAll();
295                 return;
296         }
297
298         if (Trace_notify)
299                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
300
301         lRel = heap_open(ListenerRelationId, ExclusiveLock);
302
303         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
304         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
305         {
306                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
307
308                 if (listener->listenerpid == pid &&
309                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
310                 {
311                         /* Found the matching tuple, delete it */
312                         simple_heap_delete(lRel, &tuple->t_self);
313
314                         /*
315                          * We assume there can be only one match, so no need to scan
316                          * the rest of the table
317                          */
318                         break;
319                 }
320         }
321         heap_endscan(scan);
322
323         heap_close(lRel, ExclusiveLock);
324
325         /*
326          * We do not complain about unlistening something not being listened;
327          * should we?
328          */
329 }
330
331 /*
332  *--------------------------------------------------------------
333  * Async_UnlistenAll
334  *
335  *              Unlisten all relations for this backend.
336  *
337  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
338  *
339  * Results:
340  *              XXX
341  *
342  * Side effects:
343  *              pg_listener is updated.
344  *
345  *--------------------------------------------------------------
346  */
347 static void
348 Async_UnlistenAll(void)
349 {
350         Relation        lRel;
351         TupleDesc       tdesc;
352         HeapScanDesc scan;
353         HeapTuple       lTuple;
354         ScanKeyData key[1];
355
356         if (Trace_notify)
357                 elog(DEBUG1, "Async_UnlistenAll");
358
359         lRel = heap_open(ListenerRelationId, ExclusiveLock);
360         tdesc = RelationGetDescr(lRel);
361
362         /* Find and delete all entries with my listenerPID */
363         ScanKeyInit(&key[0],
364                                 Anum_pg_listener_pid,
365                                 BTEqualStrategyNumber, F_INT4EQ,
366                                 Int32GetDatum(MyProcPid));
367         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
368
369         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
370                 simple_heap_delete(lRel, &lTuple->t_self);
371
372         heap_endscan(scan);
373         heap_close(lRel, ExclusiveLock);
374 }
375
376 /*
377  *--------------------------------------------------------------
378  * Async_UnlistenOnExit
379  *
380  *              Clean up the pg_listener table at backend exit.
381  *
382  *              This is executed if we have done any LISTENs in this backend.
383  *              It might not be necessary anymore, if the user UNLISTENed everything,
384  *              but we don't try to detect that case.
385  *
386  * Results:
387  *              XXX
388  *
389  * Side effects:
390  *              pg_listener is updated if necessary.
391  *
392  *--------------------------------------------------------------
393  */
394 static void
395 Async_UnlistenOnExit(int code, Datum arg)
396 {
397         /*
398          * We need to start/commit a transaction for the unlisten, but if
399          * there is already an active transaction we had better abort that one
400          * first.  Otherwise we'd end up committing changes that probably
401          * ought to be discarded.
402          */
403         AbortOutOfAnyTransaction();
404         /* Now we can do the unlisten */
405         StartTransactionCommand();
406         Async_UnlistenAll();
407         CommitTransactionCommand();
408 }
409
410 /*
411  *--------------------------------------------------------------
412  * AtCommit_Notify
413  *
414  *              This is called at transaction commit.
415  *
416  *              If there are outbound notify requests in the pendingNotifies list,
417  *              scan pg_listener for matching tuples, and either signal the other
418  *              backend or send a message to our own frontend.
419  *
420  *              NOTE: we are still inside the current transaction, therefore can
421  *              piggyback on its committing of changes.
422  *
423  * Results:
424  *              XXX
425  *
426  * Side effects:
427  *              Tuples in pg_listener that have matching relnames and other peoples'
428  *              listenerPIDs are updated with a nonzero notification field.
429  *
430  *--------------------------------------------------------------
431  */
432 void
433 AtCommit_Notify(void)
434 {
435         Relation        lRel;
436         TupleDesc       tdesc;
437         HeapScanDesc scan;
438         HeapTuple       lTuple,
439                                 rTuple;
440         Datum           value[Natts_pg_listener];
441         char            repl[Natts_pg_listener],
442                                 nulls[Natts_pg_listener];
443
444         if (pendingNotifies == NIL)
445                 return;                                 /* no NOTIFY statements in this
446                                                                  * transaction */
447
448         /*
449          * NOTIFY is disabled if not normal processing mode. This test used to
450          * be in xact.c, but it seems cleaner to do it here.
451          */
452         if (!IsNormalProcessingMode())
453         {
454                 ClearPendingNotifies();
455                 return;
456         }
457
458         if (Trace_notify)
459                 elog(DEBUG1, "AtCommit_Notify");
460
461         /* preset data to update notify column to MyProcPid */
462         nulls[0] = nulls[1] = nulls[2] = ' ';
463         repl[0] = repl[1] = repl[2] = ' ';
464         repl[Anum_pg_listener_notify - 1] = 'r';
465         value[0] = value[1] = value[2] = (Datum) 0;
466         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
467
468         lRel = heap_open(ListenerRelationId, ExclusiveLock);
469         tdesc = RelationGetDescr(lRel);
470         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
471
472         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
473         {
474                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
475                 char       *relname = NameStr(listener->relname);
476                 int32           listenerPID = listener->listenerpid;
477
478                 if (!AsyncExistsPendingNotify(relname))
479                         continue;
480
481                 if (listenerPID == MyProcPid)
482                 {
483                         /*
484                          * Self-notify: no need to bother with table update. Indeed,
485                          * we *must not* clear the notification field in this path, or
486                          * we could lose an outside notify, which'd be bad for
487                          * applications that ignore self-notify messages.
488                          */
489
490                         if (Trace_notify)
491                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
492
493                         NotifyMyFrontEnd(relname, listenerPID);
494                 }
495                 else
496                 {
497                         if (Trace_notify)
498                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
499                                          listenerPID);
500
501                         /*
502                          * If someone has already notified this listener, we don't
503                          * bother modifying the table, but we do still send a SIGUSR2
504                          * signal, just in case that backend missed the earlier signal
505                          * for some reason.  It's OK to send the signal first, because
506                          * the other guy can't read pg_listener until we unlock it.
507                          */
508                         if (kill(listenerPID, SIGUSR2) < 0)
509                         {
510                                 /*
511                                  * Get rid of pg_listener entry if it refers to a PID that
512                                  * no longer exists.  Presumably, that backend crashed
513                                  * without deleting its pg_listener entries. This code
514                                  * used to only delete the entry if errno==ESRCH, but as
515                                  * far as I can see we should just do it for any failure
516                                  * (certainly at least for EPERM too...)
517                                  */
518                                 simple_heap_delete(lRel, &lTuple->t_self);
519                         }
520                         else if (listener->notification == 0)
521                         {
522                                 ItemPointerData ctid;
523                                 HTSU_Result             result;
524
525                                 rTuple = heap_modifytuple(lTuple, tdesc,
526                                                                                   value, nulls, repl);
527
528                                 /*
529                                  * We cannot use simple_heap_update here because the tuple
530                                  * could have been modified by an uncommitted transaction;
531                                  * specifically, since UNLISTEN releases exclusive lock on
532                                  * the table before commit, the other guy could already
533                                  * have tried to unlisten.      There are no other cases where
534                                  * we should be able to see an uncommitted update or
535                                  * delete. Therefore, our response to a
536                                  * HeapTupleBeingUpdated result is just to ignore it.  We
537                                  * do *not* wait for the other guy to commit --- that
538                                  * would risk deadlock, and we don't want to block while
539                                  * holding the table lock anyway for performance reasons.
540                                  * We also ignore HeapTupleUpdated, which could occur if
541                                  * the other guy commits between our heap_getnext and
542                                  * heap_update calls.
543                                  */
544                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
545                                                                          &ctid,
546                                                                          GetCurrentCommandId(), InvalidSnapshot,
547                                                                          false /* no wait for commit */ );
548                                 switch (result)
549                                 {
550                                         case HeapTupleSelfUpdated:
551                                                 /* Tuple was already updated in current command? */
552                                                 elog(ERROR, "tuple already updated by self");
553                                                 break;
554
555                                         case HeapTupleMayBeUpdated:
556                                                 /* done successfully */
557
558 #ifdef NOT_USED                                 /* currently there are no indexes */
559                                                 CatalogUpdateIndexes(lRel, rTuple);
560 #endif
561                                                 break;
562
563                                         case HeapTupleBeingUpdated:
564                                                 /* ignore uncommitted tuples */
565                                                 break;
566
567                                         case HeapTupleUpdated:
568                                                 /* ignore just-committed tuples */
569                                                 break;
570
571                                         default:
572                                                 elog(ERROR, "unrecognized heap_update status: %u",
573                                                          result);
574                                                 break;
575                                 }
576                         }
577                 }
578         }
579
580         heap_endscan(scan);
581
582         /*
583          * We do NOT release the lock on pg_listener here; we need to hold it
584          * until end of transaction (which is about to happen, anyway) to
585          * ensure that notified backends see our tuple updates when they look.
586          * Else they might disregard the signal, which would make the
587          * application programmer very unhappy.
588          */
589         heap_close(lRel, NoLock);
590
591         ClearPendingNotifies();
592
593         if (Trace_notify)
594                 elog(DEBUG1, "AtCommit_Notify: done");
595 }
596
597 /*
598  *--------------------------------------------------------------
599  * AtAbort_Notify
600  *
601  *              This is called at transaction abort.
602  *
603  *              Gets rid of pending outbound notifies that we would have executed
604  *              if the transaction got committed.
605  *
606  * Results:
607  *              XXX
608  *
609  *--------------------------------------------------------------
610  */
611 void
612 AtAbort_Notify(void)
613 {
614         ClearPendingNotifies();
615 }
616
617 /*
618  * AtSubStart_Notify() --- Take care of subtransaction start.
619  *
620  * Push empty state for the new subtransaction.
621  */
622 void
623 AtSubStart_Notify(void)
624 {
625         MemoryContext old_cxt;
626
627         /* Keep the list-of-lists in TopTransactionContext for simplicity */
628         old_cxt = MemoryContextSwitchTo(TopTransactionContext);
629
630         upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
631
632         Assert(list_length(upperPendingNotifies) ==
633                    GetCurrentTransactionNestLevel() - 1);
634
635         pendingNotifies = NIL;
636
637         MemoryContextSwitchTo(old_cxt);
638 }
639
640 /*
641  * AtSubCommit_Notify() --- Take care of subtransaction commit.
642  *
643  * Reassign all items in the pending notifies list to the parent transaction.
644  */
645 void
646 AtSubCommit_Notify(void)
647 {
648         List       *parentPendingNotifies;
649
650         parentPendingNotifies = (List *) linitial(upperPendingNotifies);
651         upperPendingNotifies = list_delete_first(upperPendingNotifies);
652
653         Assert(list_length(upperPendingNotifies) ==
654                    GetCurrentTransactionNestLevel() - 2);
655
656         /*
657          * We could try to eliminate duplicates here, but it seems not
658          * worthwhile.
659          */
660         pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
661 }
662
663 /*
664  * AtSubAbort_Notify() --- Take care of subtransaction abort.
665  */
666 void
667 AtSubAbort_Notify(void)
668 {
669         int                     my_level = GetCurrentTransactionNestLevel();
670
671         /*
672          * All we have to do is pop the stack --- the notifies made in this
673          * subxact are no longer interesting, and the space will be freed when
674          * CurTransactionContext is recycled.
675          *
676          * This routine could be called more than once at a given nesting level
677          * if there is trouble during subxact abort.  Avoid dumping core by
678          * using GetCurrentTransactionNestLevel as the indicator of how far
679          * we need to prune the list.
680          */
681         while (list_length(upperPendingNotifies) > my_level - 2)
682         {
683                 pendingNotifies = (List *) linitial(upperPendingNotifies);
684                 upperPendingNotifies = list_delete_first(upperPendingNotifies);
685         }
686 }
687
688 /*
689  *--------------------------------------------------------------
690  * NotifyInterruptHandler
691  *
692  *              This is the signal handler for SIGUSR2.
693  *
694  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
695  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
696  *              to do it later.
697  *
698  * Results:
699  *              none
700  *
701  * Side effects:
702  *              per above
703  *--------------------------------------------------------------
704  */
705 void
706 NotifyInterruptHandler(SIGNAL_ARGS)
707 {
708         int                     save_errno = errno;
709
710         /*
711          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
712          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
713          * which would likely lead to corruption of stdio buffers if they were
714          * ever turned on.
715          */
716
717         /* Don't joggle the elbow of proc_exit */
718         if (proc_exit_inprogress)
719                 return;
720
721         if (notifyInterruptEnabled)
722         {
723                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
724
725                 /*
726                  * We may be called while ImmediateInterruptOK is true; turn it
727                  * off while messing with the NOTIFY state.  (We would have to
728                  * save and restore it anyway, because PGSemaphore operations
729                  * inside ProcessIncomingNotify() might reset it.)
730                  */
731                 ImmediateInterruptOK = false;
732
733                 /*
734                  * I'm not sure whether some flavors of Unix might allow another
735                  * SIGUSR2 occurrence to recursively interrupt this routine. To
736                  * cope with the possibility, we do the same sort of dance that
737                  * EnableNotifyInterrupt must do --- see that routine for
738                  * comments.
739                  */
740                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
741                 notifyInterruptOccurred = 1;    /* do at least one iteration */
742                 for (;;)
743                 {
744                         notifyInterruptEnabled = 1;
745                         if (!notifyInterruptOccurred)
746                                 break;
747                         notifyInterruptEnabled = 0;
748                         if (notifyInterruptOccurred)
749                         {
750                                 /* Here, it is finally safe to do stuff. */
751                                 if (Trace_notify)
752                                         elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
753
754                                 ProcessIncomingNotify();
755
756                                 if (Trace_notify)
757                                         elog(DEBUG1, "NotifyInterruptHandler: done");
758                         }
759                 }
760
761                 /*
762                  * Restore ImmediateInterruptOK, and check for interrupts if
763                  * needed.
764                  */
765                 ImmediateInterruptOK = save_ImmediateInterruptOK;
766                 if (save_ImmediateInterruptOK)
767                         CHECK_FOR_INTERRUPTS();
768         }
769         else
770         {
771                 /*
772                  * In this path it is NOT SAFE to do much of anything, except
773                  * this:
774                  */
775                 notifyInterruptOccurred = 1;
776         }
777
778         errno = save_errno;
779 }
780
781 /*
782  * --------------------------------------------------------------
783  * EnableNotifyInterrupt
784  *
785  *              This is called by the PostgresMain main loop just before waiting
786  *              for a frontend command.  If we are truly idle (ie, *not* inside
787  *              a transaction block), then process any pending inbound notifies,
788  *              and enable the signal handler to process future notifies directly.
789  *
790  *              NOTE: the signal handler starts out disabled, and stays so until
791  *              PostgresMain calls this the first time.
792  * --------------------------------------------------------------
793  */
794 void
795 EnableNotifyInterrupt(void)
796 {
797         if (IsTransactionOrTransactionBlock())
798                 return;                                 /* not really idle */
799
800         /*
801          * This code is tricky because we are communicating with a signal
802          * handler that could interrupt us at any point.  If we just checked
803          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
804          * could fail to respond promptly to a signal that happens in between
805          * those two steps.  (A very small time window, perhaps, but Murphy's
806          * Law says you can hit it...)  Instead, we first set the enable flag,
807          * then test the occurred flag.  If we see an unserviced interrupt has
808          * occurred, we re-clear the enable flag before going off to do the
809          * service work.  (That prevents re-entrant invocation of
810          * ProcessIncomingNotify() if another interrupt occurs.) If an
811          * interrupt comes in between the setting and clearing of
812          * notifyInterruptEnabled, then it will have done the service work and
813          * left notifyInterruptOccurred zero, so we have to check again after
814          * clearing enable.  The whole thing has to be in a loop in case
815          * another interrupt occurs while we're servicing the first. Once we
816          * get out of the loop, enable is set and we know there is no
817          * unserviced interrupt.
818          *
819          * NB: an overenthusiastic optimizing compiler could easily break this
820          * code.  Hopefully, they all understand what "volatile" means these
821          * days.
822          */
823         for (;;)
824         {
825                 notifyInterruptEnabled = 1;
826                 if (!notifyInterruptOccurred)
827                         break;
828                 notifyInterruptEnabled = 0;
829                 if (notifyInterruptOccurred)
830                 {
831                         if (Trace_notify)
832                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
833
834                         ProcessIncomingNotify();
835
836                         if (Trace_notify)
837                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
838                 }
839         }
840 }
841
842 /*
843  * --------------------------------------------------------------
844  * DisableNotifyInterrupt
845  *
846  *              This is called by the PostgresMain main loop just after receiving
847  *              a frontend command.  Signal handler execution of inbound notifies
848  *              is disabled until the next EnableNotifyInterrupt call.
849  *
850  *              The SIGUSR1 signal handler also needs to call this, so as to
851  *              prevent conflicts if one signal interrupts the other.  So we
852  *              must return the previous state of the flag.
853  * --------------------------------------------------------------
854  */
855 bool
856 DisableNotifyInterrupt(void)
857 {
858         bool            result = (notifyInterruptEnabled != 0);
859
860         notifyInterruptEnabled = 0;
861
862         return result;
863 }
864
865 /*
866  * --------------------------------------------------------------
867  * ProcessIncomingNotify
868  *
869  *              Deal with arriving NOTIFYs from other backends.
870  *              This is called either directly from the SIGUSR2 signal handler,
871  *              or the next time control reaches the outer idle loop.
872  *              Scan pg_listener for arriving notifies, report them to my front end,
873  *              and clear the notification field in pg_listener until next time.
874  *
875  *              NOTE: since we are outside any transaction, we must create our own.
876  * --------------------------------------------------------------
877  */
878 static void
879 ProcessIncomingNotify(void)
880 {
881         Relation        lRel;
882         TupleDesc       tdesc;
883         ScanKeyData key[1];
884         HeapScanDesc scan;
885         HeapTuple       lTuple,
886                                 rTuple;
887         Datum           value[Natts_pg_listener];
888         char            repl[Natts_pg_listener],
889                                 nulls[Natts_pg_listener];
890         bool            catchup_enabled;
891
892         /* Must prevent SIGUSR1 interrupt while I am running */
893         catchup_enabled = DisableCatchupInterrupt();
894
895         if (Trace_notify)
896                 elog(DEBUG1, "ProcessIncomingNotify");
897
898         set_ps_display("notify interrupt");
899
900         notifyInterruptOccurred = 0;
901
902         StartTransactionCommand();
903
904         lRel = heap_open(ListenerRelationId, ExclusiveLock);
905         tdesc = RelationGetDescr(lRel);
906
907         /* Scan only entries with my listenerPID */
908         ScanKeyInit(&key[0],
909                                 Anum_pg_listener_pid,
910                                 BTEqualStrategyNumber, F_INT4EQ,
911                                 Int32GetDatum(MyProcPid));
912         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
913
914         /* Prepare data for rewriting 0 into notification field */
915         nulls[0] = nulls[1] = nulls[2] = ' ';
916         repl[0] = repl[1] = repl[2] = ' ';
917         repl[Anum_pg_listener_notify - 1] = 'r';
918         value[0] = value[1] = value[2] = (Datum) 0;
919         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
920
921         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
922         {
923                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
924                 char       *relname = NameStr(listener->relname);
925                 int32           sourcePID = listener->notification;
926
927                 if (sourcePID != 0)
928                 {
929                         /* Notify the frontend */
930
931                         if (Trace_notify)
932                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
933                                          relname, (int) sourcePID);
934
935                         NotifyMyFrontEnd(relname, sourcePID);
936
937                         /*
938                          * Rewrite the tuple with 0 in notification column.
939                          *
940                          * simple_heap_update is safe here because no one else would have
941                          * tried to UNLISTEN us, so there can be no uncommitted
942                          * changes.
943                          */
944                         rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
945                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
946
947 #ifdef NOT_USED                                 /* currently there are no indexes */
948                         CatalogUpdateIndexes(lRel, rTuple);
949 #endif
950                 }
951         }
952         heap_endscan(scan);
953
954         /*
955          * We do NOT release the lock on pg_listener here; we need to hold it
956          * until end of transaction (which is about to happen, anyway) to
957          * ensure that other backends see our tuple updates when they look.
958          * Otherwise, a transaction started after this one might mistakenly
959          * think it doesn't need to send this backend a new NOTIFY.
960          */
961         heap_close(lRel, NoLock);
962
963         CommitTransactionCommand();
964
965         /*
966          * Must flush the notify messages to ensure frontend gets them
967          * promptly.
968          */
969         pq_flush();
970
971         set_ps_display("idle");
972
973         if (Trace_notify)
974                 elog(DEBUG1, "ProcessIncomingNotify: done");
975
976         if (catchup_enabled)
977                 EnableCatchupInterrupt();
978 }
979
980 /*
981  * Send NOTIFY message to my front end.
982  */
983 static void
984 NotifyMyFrontEnd(char *relname, int32 listenerPID)
985 {
986         if (whereToSendOutput == Remote)
987         {
988                 StringInfoData buf;
989
990                 pq_beginmessage(&buf, 'A');
991                 pq_sendint(&buf, listenerPID, sizeof(int32));
992                 pq_sendstring(&buf, relname);
993                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
994                 {
995                         /* XXX Add parameter string here later */
996                         pq_sendstring(&buf, "");
997                 }
998                 pq_endmessage(&buf);
999
1000                 /*
1001                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
1002                  * happen at the end of the transaction, and for incoming notifies
1003                  * ProcessIncomingNotify will do it after finding all the
1004                  * notifies.
1005                  */
1006         }
1007         else
1008                 elog(INFO, "NOTIFY for %s", relname);
1009 }
1010
1011 /* Does pendingNotifies include the given relname? */
1012 static bool
1013 AsyncExistsPendingNotify(const char *relname)
1014 {
1015         ListCell   *p;
1016
1017         foreach(p, pendingNotifies)
1018         {
1019                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
1020                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
1021                         return true;
1022         }
1023
1024         return false;
1025 }
1026
1027 /* Clear the pendingNotifies list. */
1028 static void
1029 ClearPendingNotifies(void)
1030 {
1031         /*
1032          * We used to have to explicitly deallocate the list members and
1033          * nodes, because they were malloc'd.  Now, since we know they are
1034          * palloc'd in CurTransactionContext, we need not do that --- they'll
1035          * go away automatically at transaction exit.  We need only reset the
1036          * list head pointer.
1037          */
1038         pendingNotifies = NIL;
1039 }