]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Add GUC update_process_title to control whether 'ps' display is updated
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.132 2006/06/27 22:16:43 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab ExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.  (Previously we used AccessExclusiveLock, but
58  * there's no real reason to forbid concurrent reads.)
59  *
60  * An application that listens on the same relname it notifies will get
61  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
62  * by comparing be_pid in the NOTIFY message to the application's own backend's
63  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
64  * frontend during startup.)  The above design guarantees that notifies from
65  * other backends will never be missed by ignoring self-notifies.  Note,
66  * however, that we do *not* guarantee that a separate frontend message will
67  * be sent for every outside NOTIFY.  Since there is only room for one
68  * originating PID in pg_listener, outside notifies occurring at about the
69  * same time may be collapsed into a single message bearing the PID of the
70  * first outside backend to perform the NOTIFY.
71  *-------------------------------------------------------------------------
72  */
73
74 #include "postgres.h"
75
76 #include <unistd.h>
77 #include <signal.h>
78 #include <netinet/in.h>
79
80 #include "access/heapam.h"
81 #include "access/twophase_rmgr.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "miscadmin.h"
87 #include "storage/ipc.h"
88 #include "storage/sinval.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/fmgroids.h"
91 #include "utils/memutils.h"
92 #include "utils/ps_status.h"
93 #include "utils/syscache.h"
94
95
96 /*
97  * State for outbound notifies consists of a list of all relnames NOTIFYed
98  * in the current transaction.  We do not actually perform a NOTIFY until
99  * and unless the transaction commits.  pendingNotifies is NIL if no
100  * NOTIFYs have been done in the current transaction.
101  *
102  * The list is kept in CurTransactionContext.  In subtransactions, each
103  * subtransaction has its own list in its own CurTransactionContext, but
104  * successful subtransactions attach their lists to their parent's list.
105  * Failed subtransactions simply discard their lists.
106  */
107 static List *pendingNotifies = NIL;
108
109 static List *upperPendingNotifies = NIL;                /* list of upper-xact lists */
110
111 /*
112  * State for inbound notifies consists of two flags: one saying whether
113  * the signal handler is currently allowed to call ProcessIncomingNotify
114  * directly, and one saying whether the signal has occurred but the handler
115  * was not allowed to call ProcessIncomingNotify at the time.
116  *
117  * NB: the "volatile" on these declarations is critical!  If your compiler
118  * does not grok "volatile", you'd be best advised to compile this file
119  * with all optimization turned off.
120  */
121 static volatile int notifyInterruptEnabled = 0;
122 static volatile int notifyInterruptOccurred = 0;
123
124 /* True if we've registered an on_shmem_exit cleanup */
125 static bool unlistenExitRegistered = false;
126
127 bool            Trace_notify = false;
128
129
130 static void Async_UnlistenAll(void);
131 static void Async_UnlistenOnExit(int code, Datum arg);
132 static void ProcessIncomingNotify(void);
133 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
134 static bool AsyncExistsPendingNotify(const char *relname);
135 static void ClearPendingNotifies(void);
136
137
138 /*
139  *--------------------------------------------------------------
140  * Async_Notify
141  *
142  *              This is executed by the SQL notify command.
143  *
144  *              Adds the relation to the list of pending notifies.
145  *              Actual notification happens during transaction commit.
146  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
147  *
148  *--------------------------------------------------------------
149  */
150 void
151 Async_Notify(const char *relname)
152 {
153         if (Trace_notify)
154                 elog(DEBUG1, "Async_Notify(%s)", relname);
155
156         /* no point in making duplicate entries in the list ... */
157         if (!AsyncExistsPendingNotify(relname))
158         {
159                 /*
160                  * The name list needs to live until end of transaction, so store it
161                  * in the transaction context.
162                  */
163                 MemoryContext oldcontext;
164
165                 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
166
167                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
168
169                 MemoryContextSwitchTo(oldcontext);
170         }
171 }
172
173 /*
174  *--------------------------------------------------------------
175  * Async_Listen
176  *
177  *              This is executed by the SQL listen command.
178  *
179  *              Register the current backend as listening on the specified
180  *              relation.
181  *
182  * Side effects:
183  *              pg_listener is updated.
184  *
185  *--------------------------------------------------------------
186  */
187 void
188 Async_Listen(const char *relname)
189 {
190         Relation        lRel;
191         HeapScanDesc scan;
192         HeapTuple       tuple;
193         Datum           values[Natts_pg_listener];
194         char            nulls[Natts_pg_listener];
195         int                     i;
196         bool            alreadyListener = false;
197
198         if (Trace_notify)
199                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
200
201         lRel = heap_open(ListenerRelationId, ExclusiveLock);
202
203         /* Detect whether we are already listening on this relname */
204         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
205         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
206         {
207                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
208
209                 if (listener->listenerpid == MyProcPid &&
210                         strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
211                 {
212                         alreadyListener = true;
213                         /* No need to scan the rest of the table */
214                         break;
215                 }
216         }
217         heap_endscan(scan);
218
219         if (alreadyListener)
220         {
221                 heap_close(lRel, ExclusiveLock);
222                 return;
223         }
224
225         /*
226          * OK to insert a new tuple
227          */
228
229         for (i = 0; i < Natts_pg_listener; i++)
230         {
231                 nulls[i] = ' ';
232                 values[i] = PointerGetDatum(NULL);
233         }
234
235         i = 0;
236         values[i++] = (Datum) relname;
237         values[i++] = (Datum) MyProcPid;
238         values[i++] = (Datum) 0;        /* no notifies pending */
239
240         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
241         simple_heap_insert(lRel, tuple);
242
243 #ifdef NOT_USED                                 /* currently there are no indexes */
244         CatalogUpdateIndexes(lRel, tuple);
245 #endif
246
247         heap_freetuple(tuple);
248
249         heap_close(lRel, ExclusiveLock);
250
251         /*
252          * now that we are listening, make sure we will unlisten before dying.
253          */
254         if (!unlistenExitRegistered)
255         {
256                 on_shmem_exit(Async_UnlistenOnExit, 0);
257                 unlistenExitRegistered = true;
258         }
259 }
260
261 /*
262  *--------------------------------------------------------------
263  * Async_Unlisten
264  *
265  *              This is executed by the SQL unlisten command.
266  *
267  *              Remove the current backend from the list of listening backends
268  *              for the specified relation.
269  *
270  * Side effects:
271  *              pg_listener is updated.
272  *
273  *--------------------------------------------------------------
274  */
275 void
276 Async_Unlisten(const char *relname)
277 {
278         Relation        lRel;
279         HeapScanDesc scan;
280         HeapTuple       tuple;
281
282         /* Handle specially the `unlisten "*"' command */
283         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
284         {
285                 Async_UnlistenAll();
286                 return;
287         }
288
289         if (Trace_notify)
290                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
291
292         lRel = heap_open(ListenerRelationId, ExclusiveLock);
293
294         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
295         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
296         {
297                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
298
299                 if (listener->listenerpid == MyProcPid &&
300                         strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
301                 {
302                         /* Found the matching tuple, delete it */
303                         simple_heap_delete(lRel, &tuple->t_self);
304
305                         /*
306                          * We assume there can be only one match, so no need to scan the
307                          * rest of the table
308                          */
309                         break;
310                 }
311         }
312         heap_endscan(scan);
313
314         heap_close(lRel, ExclusiveLock);
315
316         /*
317          * We do not complain about unlistening something not being listened;
318          * should we?
319          */
320 }
321
322 /*
323  *--------------------------------------------------------------
324  * Async_UnlistenAll
325  *
326  *              Unlisten all relations for this backend.
327  *
328  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
329  *
330  * Results:
331  *              XXX
332  *
333  * Side effects:
334  *              pg_listener is updated.
335  *
336  *--------------------------------------------------------------
337  */
338 static void
339 Async_UnlistenAll(void)
340 {
341         Relation        lRel;
342         TupleDesc       tdesc;
343         HeapScanDesc scan;
344         HeapTuple       lTuple;
345         ScanKeyData key[1];
346
347         if (Trace_notify)
348                 elog(DEBUG1, "Async_UnlistenAll");
349
350         lRel = heap_open(ListenerRelationId, ExclusiveLock);
351         tdesc = RelationGetDescr(lRel);
352
353         /* Find and delete all entries with my listenerPID */
354         ScanKeyInit(&key[0],
355                                 Anum_pg_listener_pid,
356                                 BTEqualStrategyNumber, F_INT4EQ,
357                                 Int32GetDatum(MyProcPid));
358         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
359
360         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
361                 simple_heap_delete(lRel, &lTuple->t_self);
362
363         heap_endscan(scan);
364         heap_close(lRel, ExclusiveLock);
365 }
366
367 /*
368  *--------------------------------------------------------------
369  * Async_UnlistenOnExit
370  *
371  *              Clean up the pg_listener table at backend exit.
372  *
373  *              This is executed if we have done any LISTENs in this backend.
374  *              It might not be necessary anymore, if the user UNLISTENed everything,
375  *              but we don't try to detect that case.
376  *
377  * Results:
378  *              XXX
379  *
380  * Side effects:
381  *              pg_listener is updated if necessary.
382  *
383  *--------------------------------------------------------------
384  */
385 static void
386 Async_UnlistenOnExit(int code, Datum arg)
387 {
388         /*
389          * We need to start/commit a transaction for the unlisten, but if there is
390          * already an active transaction we had better abort that one first.
391          * Otherwise we'd end up committing changes that probably ought to be
392          * discarded.
393          */
394         AbortOutOfAnyTransaction();
395         /* Now we can do the unlisten */
396         StartTransactionCommand();
397         Async_UnlistenAll();
398         CommitTransactionCommand();
399 }
400
401
402 /*
403  *--------------------------------------------------------------
404  * AtPrepare_Notify
405  *
406  *              This is called at the prepare phase of a two-phase
407  *              transaction.  Save the state for possible commit later.
408  *--------------------------------------------------------------
409  */
410 void
411 AtPrepare_Notify(void)
412 {
413         ListCell   *p;
414
415         foreach(p, pendingNotifies)
416         {
417                 const char *relname = (const char *) lfirst(p);
418
419                 RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
420                                                            relname, strlen(relname) + 1);
421         }
422
423         /*
424          * We can clear the state immediately, rather than needing a separate
425          * PostPrepare call, because if the transaction fails we'd just discard
426          * the state anyway.
427          */
428         ClearPendingNotifies();
429 }
430
431 /*
432  *--------------------------------------------------------------
433  * AtCommit_Notify
434  *
435  *              This is called at transaction commit.
436  *
437  *              If there are outbound notify requests in the pendingNotifies list,
438  *              scan pg_listener for matching tuples, and either signal the other
439  *              backend or send a message to our own frontend.
440  *
441  *              NOTE: we are still inside the current transaction, therefore can
442  *              piggyback on its committing of changes.
443  *
444  * Results:
445  *              XXX
446  *
447  * Side effects:
448  *              Tuples in pg_listener that have matching relnames and other peoples'
449  *              listenerPIDs are updated with a nonzero notification field.
450  *
451  *--------------------------------------------------------------
452  */
453 void
454 AtCommit_Notify(void)
455 {
456         Relation        lRel;
457         TupleDesc       tdesc;
458         HeapScanDesc scan;
459         HeapTuple       lTuple,
460                                 rTuple;
461         Datum           value[Natts_pg_listener];
462         char            repl[Natts_pg_listener],
463                                 nulls[Natts_pg_listener];
464
465         if (pendingNotifies == NIL)
466                 return;                                 /* no NOTIFY statements in this transaction */
467
468         /*
469          * NOTIFY is disabled if not normal processing mode. This test used to be
470          * in xact.c, but it seems cleaner to do it here.
471          */
472         if (!IsNormalProcessingMode())
473         {
474                 ClearPendingNotifies();
475                 return;
476         }
477
478         if (Trace_notify)
479                 elog(DEBUG1, "AtCommit_Notify");
480
481         /* preset data to update notify column to MyProcPid */
482         nulls[0] = nulls[1] = nulls[2] = ' ';
483         repl[0] = repl[1] = repl[2] = ' ';
484         repl[Anum_pg_listener_notify - 1] = 'r';
485         value[0] = value[1] = value[2] = (Datum) 0;
486         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
487
488         lRel = heap_open(ListenerRelationId, ExclusiveLock);
489         tdesc = RelationGetDescr(lRel);
490         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
491
492         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
493         {
494                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
495                 char       *relname = NameStr(listener->relname);
496                 int32           listenerPID = listener->listenerpid;
497
498                 if (!AsyncExistsPendingNotify(relname))
499                         continue;
500
501                 if (listenerPID == MyProcPid)
502                 {
503                         /*
504                          * Self-notify: no need to bother with table update. Indeed, we
505                          * *must not* clear the notification field in this path, or we
506                          * could lose an outside notify, which'd be bad for applications
507                          * that ignore self-notify messages.
508                          */
509
510                         if (Trace_notify)
511                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
512
513                         NotifyMyFrontEnd(relname, listenerPID);
514                 }
515                 else
516                 {
517                         if (Trace_notify)
518                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
519                                          listenerPID);
520
521                         /*
522                          * If someone has already notified this listener, we don't bother
523                          * modifying the table, but we do still send a SIGUSR2 signal,
524                          * just in case that backend missed the earlier signal for some
525                          * reason.      It's OK to send the signal first, because the other
526                          * guy can't read pg_listener until we unlock it.
527                          */
528                         if (kill(listenerPID, SIGUSR2) < 0)
529                         {
530                                 /*
531                                  * Get rid of pg_listener entry if it refers to a PID that no
532                                  * longer exists.  Presumably, that backend crashed without
533                                  * deleting its pg_listener entries. This code used to only
534                                  * delete the entry if errno==ESRCH, but as far as I can see
535                                  * we should just do it for any failure (certainly at least
536                                  * for EPERM too...)
537                                  */
538                                 simple_heap_delete(lRel, &lTuple->t_self);
539                         }
540                         else if (listener->notification == 0)
541                         {
542                                 HTSU_Result result;
543                                 ItemPointerData update_ctid;
544                                 TransactionId update_xmax;
545
546                                 rTuple = heap_modifytuple(lTuple, tdesc,
547                                                                                   value, nulls, repl);
548
549                                 /*
550                                  * We cannot use simple_heap_update here because the tuple
551                                  * could have been modified by an uncommitted transaction;
552                                  * specifically, since UNLISTEN releases exclusive lock on the
553                                  * table before commit, the other guy could already have tried
554                                  * to unlisten.  There are no other cases where we should be
555                                  * able to see an uncommitted update or delete. Therefore, our
556                                  * response to a HeapTupleBeingUpdated result is just to
557                                  * ignore it.  We do *not* wait for the other guy to commit
558                                  * --- that would risk deadlock, and we don't want to block
559                                  * while holding the table lock anyway for performance
560                                  * reasons. We also ignore HeapTupleUpdated, which could occur
561                                  * if the other guy commits between our heap_getnext and
562                                  * heap_update calls.
563                                  */
564                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
565                                                                          &update_ctid, &update_xmax,
566                                                                          GetCurrentCommandId(), InvalidSnapshot,
567                                                                          false /* no wait for commit */ );
568                                 switch (result)
569                                 {
570                                         case HeapTupleSelfUpdated:
571                                                 /* Tuple was already updated in current command? */
572                                                 elog(ERROR, "tuple already updated by self");
573                                                 break;
574
575                                         case HeapTupleMayBeUpdated:
576                                                 /* done successfully */
577 #ifdef NOT_USED                                 /* currently there are no indexes */
578                                                 CatalogUpdateIndexes(lRel, rTuple);
579 #endif
580                                                 break;
581
582                                         case HeapTupleBeingUpdated:
583                                                 /* ignore uncommitted tuples */
584                                                 break;
585
586                                         case HeapTupleUpdated:
587                                                 /* ignore just-committed tuples */
588                                                 break;
589
590                                         default:
591                                                 elog(ERROR, "unrecognized heap_update status: %u",
592                                                          result);
593                                                 break;
594                                 }
595                         }
596                 }
597         }
598
599         heap_endscan(scan);
600
601         /*
602          * We do NOT release the lock on pg_listener here; we need to hold it
603          * until end of transaction (which is about to happen, anyway) to ensure
604          * that notified backends see our tuple updates when they look. Else they
605          * might disregard the signal, which would make the application programmer
606          * very unhappy.
607          */
608         heap_close(lRel, NoLock);
609
610         ClearPendingNotifies();
611
612         if (Trace_notify)
613                 elog(DEBUG1, "AtCommit_Notify: done");
614 }
615
616 /*
617  *--------------------------------------------------------------
618  * AtAbort_Notify
619  *
620  *              This is called at transaction abort.
621  *
622  *              Gets rid of pending outbound notifies that we would have executed
623  *              if the transaction got committed.
624  *
625  * Results:
626  *              XXX
627  *
628  *--------------------------------------------------------------
629  */
630 void
631 AtAbort_Notify(void)
632 {
633         ClearPendingNotifies();
634 }
635
636 /*
637  * AtSubStart_Notify() --- Take care of subtransaction start.
638  *
639  * Push empty state for the new subtransaction.
640  */
641 void
642 AtSubStart_Notify(void)
643 {
644         MemoryContext old_cxt;
645
646         /* Keep the list-of-lists in TopTransactionContext for simplicity */
647         old_cxt = MemoryContextSwitchTo(TopTransactionContext);
648
649         upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
650
651         Assert(list_length(upperPendingNotifies) ==
652                    GetCurrentTransactionNestLevel() - 1);
653
654         pendingNotifies = NIL;
655
656         MemoryContextSwitchTo(old_cxt);
657 }
658
659 /*
660  * AtSubCommit_Notify() --- Take care of subtransaction commit.
661  *
662  * Reassign all items in the pending notifies list to the parent transaction.
663  */
664 void
665 AtSubCommit_Notify(void)
666 {
667         List       *parentPendingNotifies;
668
669         parentPendingNotifies = (List *) linitial(upperPendingNotifies);
670         upperPendingNotifies = list_delete_first(upperPendingNotifies);
671
672         Assert(list_length(upperPendingNotifies) ==
673                    GetCurrentTransactionNestLevel() - 2);
674
675         /*
676          * We could try to eliminate duplicates here, but it seems not worthwhile.
677          */
678         pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
679 }
680
681 /*
682  * AtSubAbort_Notify() --- Take care of subtransaction abort.
683  */
684 void
685 AtSubAbort_Notify(void)
686 {
687         int                     my_level = GetCurrentTransactionNestLevel();
688
689         /*
690          * All we have to do is pop the stack --- the notifies made in this
691          * subxact are no longer interesting, and the space will be freed when
692          * CurTransactionContext is recycled.
693          *
694          * This routine could be called more than once at a given nesting level if
695          * there is trouble during subxact abort.  Avoid dumping core by using
696          * GetCurrentTransactionNestLevel as the indicator of how far we need to
697          * prune the list.
698          */
699         while (list_length(upperPendingNotifies) > my_level - 2)
700         {
701                 pendingNotifies = (List *) linitial(upperPendingNotifies);
702                 upperPendingNotifies = list_delete_first(upperPendingNotifies);
703         }
704 }
705
706 /*
707  *--------------------------------------------------------------
708  * NotifyInterruptHandler
709  *
710  *              This is the signal handler for SIGUSR2.
711  *
712  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
713  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
714  *              to do it later.
715  *
716  * Results:
717  *              none
718  *
719  * Side effects:
720  *              per above
721  *--------------------------------------------------------------
722  */
723 void
724 NotifyInterruptHandler(SIGNAL_ARGS)
725 {
726         int                     save_errno = errno;
727
728         /*
729          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
730          * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
731          * would likely lead to corruption of stdio buffers if they were ever
732          * turned on.
733          */
734
735         /* Don't joggle the elbow of proc_exit */
736         if (proc_exit_inprogress)
737                 return;
738
739         if (notifyInterruptEnabled)
740         {
741                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
742
743                 /*
744                  * We may be called while ImmediateInterruptOK is true; turn it off
745                  * while messing with the NOTIFY state.  (We would have to save and
746                  * restore it anyway, because PGSemaphore operations inside
747                  * ProcessIncomingNotify() might reset it.)
748                  */
749                 ImmediateInterruptOK = false;
750
751                 /*
752                  * I'm not sure whether some flavors of Unix might allow another
753                  * SIGUSR2 occurrence to recursively interrupt this routine. To cope
754                  * with the possibility, we do the same sort of dance that
755                  * EnableNotifyInterrupt must do --- see that routine for comments.
756                  */
757                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
758                 notifyInterruptOccurred = 1;    /* do at least one iteration */
759                 for (;;)
760                 {
761                         notifyInterruptEnabled = 1;
762                         if (!notifyInterruptOccurred)
763                                 break;
764                         notifyInterruptEnabled = 0;
765                         if (notifyInterruptOccurred)
766                         {
767                                 /* Here, it is finally safe to do stuff. */
768                                 if (Trace_notify)
769                                         elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
770
771                                 ProcessIncomingNotify();
772
773                                 if (Trace_notify)
774                                         elog(DEBUG1, "NotifyInterruptHandler: done");
775                         }
776                 }
777
778                 /*
779                  * Restore ImmediateInterruptOK, and check for interrupts if needed.
780                  */
781                 ImmediateInterruptOK = save_ImmediateInterruptOK;
782                 if (save_ImmediateInterruptOK)
783                         CHECK_FOR_INTERRUPTS();
784         }
785         else
786         {
787                 /*
788                  * In this path it is NOT SAFE to do much of anything, except this:
789                  */
790                 notifyInterruptOccurred = 1;
791         }
792
793         errno = save_errno;
794 }
795
796 /*
797  * --------------------------------------------------------------
798  * EnableNotifyInterrupt
799  *
800  *              This is called by the PostgresMain main loop just before waiting
801  *              for a frontend command.  If we are truly idle (ie, *not* inside
802  *              a transaction block), then process any pending inbound notifies,
803  *              and enable the signal handler to process future notifies directly.
804  *
805  *              NOTE: the signal handler starts out disabled, and stays so until
806  *              PostgresMain calls this the first time.
807  * --------------------------------------------------------------
808  */
809 void
810 EnableNotifyInterrupt(void)
811 {
812         if (IsTransactionOrTransactionBlock())
813                 return;                                 /* not really idle */
814
815         /*
816          * This code is tricky because we are communicating with a signal handler
817          * that could interrupt us at any point.  If we just checked
818          * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
819          * fail to respond promptly to a signal that happens in between those two
820          * steps.  (A very small time window, perhaps, but Murphy's Law says you
821          * can hit it...)  Instead, we first set the enable flag, then test the
822          * occurred flag.  If we see an unserviced interrupt has occurred, we
823          * re-clear the enable flag before going off to do the service work. (That
824          * prevents re-entrant invocation of ProcessIncomingNotify() if another
825          * interrupt occurs.) If an interrupt comes in between the setting and
826          * clearing of notifyInterruptEnabled, then it will have done the service
827          * work and left notifyInterruptOccurred zero, so we have to check again
828          * after clearing enable.  The whole thing has to be in a loop in case
829          * another interrupt occurs while we're servicing the first. Once we get
830          * out of the loop, enable is set and we know there is no unserviced
831          * interrupt.
832          *
833          * NB: an overenthusiastic optimizing compiler could easily break this
834          * code. Hopefully, they all understand what "volatile" means these days.
835          */
836         for (;;)
837         {
838                 notifyInterruptEnabled = 1;
839                 if (!notifyInterruptOccurred)
840                         break;
841                 notifyInterruptEnabled = 0;
842                 if (notifyInterruptOccurred)
843                 {
844                         if (Trace_notify)
845                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
846
847                         ProcessIncomingNotify();
848
849                         if (Trace_notify)
850                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
851                 }
852         }
853 }
854
855 /*
856  * --------------------------------------------------------------
857  * DisableNotifyInterrupt
858  *
859  *              This is called by the PostgresMain main loop just after receiving
860  *              a frontend command.  Signal handler execution of inbound notifies
861  *              is disabled until the next EnableNotifyInterrupt call.
862  *
863  *              The SIGUSR1 signal handler also needs to call this, so as to
864  *              prevent conflicts if one signal interrupts the other.  So we
865  *              must return the previous state of the flag.
866  * --------------------------------------------------------------
867  */
868 bool
869 DisableNotifyInterrupt(void)
870 {
871         bool            result = (notifyInterruptEnabled != 0);
872
873         notifyInterruptEnabled = 0;
874
875         return result;
876 }
877
878 /*
879  * --------------------------------------------------------------
880  * ProcessIncomingNotify
881  *
882  *              Deal with arriving NOTIFYs from other backends.
883  *              This is called either directly from the SIGUSR2 signal handler,
884  *              or the next time control reaches the outer idle loop.
885  *              Scan pg_listener for arriving notifies, report them to my front end,
886  *              and clear the notification field in pg_listener until next time.
887  *
888  *              NOTE: since we are outside any transaction, we must create our own.
889  * --------------------------------------------------------------
890  */
891 static void
892 ProcessIncomingNotify(void)
893 {
894         Relation        lRel;
895         TupleDesc       tdesc;
896         ScanKeyData key[1];
897         HeapScanDesc scan;
898         HeapTuple       lTuple,
899                                 rTuple;
900         Datum           value[Natts_pg_listener];
901         char            repl[Natts_pg_listener],
902                                 nulls[Natts_pg_listener];
903         bool            catchup_enabled;
904
905         /* Must prevent SIGUSR1 interrupt while I am running */
906         catchup_enabled = DisableCatchupInterrupt();
907
908         if (Trace_notify)
909                 elog(DEBUG1, "ProcessIncomingNotify");
910
911         set_ps_display("notify interrupt", false);
912
913         notifyInterruptOccurred = 0;
914
915         StartTransactionCommand();
916
917         lRel = heap_open(ListenerRelationId, ExclusiveLock);
918         tdesc = RelationGetDescr(lRel);
919
920         /* Scan only entries with my listenerPID */
921         ScanKeyInit(&key[0],
922                                 Anum_pg_listener_pid,
923                                 BTEqualStrategyNumber, F_INT4EQ,
924                                 Int32GetDatum(MyProcPid));
925         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
926
927         /* Prepare data for rewriting 0 into notification field */
928         nulls[0] = nulls[1] = nulls[2] = ' ';
929         repl[0] = repl[1] = repl[2] = ' ';
930         repl[Anum_pg_listener_notify - 1] = 'r';
931         value[0] = value[1] = value[2] = (Datum) 0;
932         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
933
934         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
935         {
936                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
937                 char       *relname = NameStr(listener->relname);
938                 int32           sourcePID = listener->notification;
939
940                 if (sourcePID != 0)
941                 {
942                         /* Notify the frontend */
943
944                         if (Trace_notify)
945                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
946                                          relname, (int) sourcePID);
947
948                         NotifyMyFrontEnd(relname, sourcePID);
949
950                         /*
951                          * Rewrite the tuple with 0 in notification column.
952                          *
953                          * simple_heap_update is safe here because no one else would have
954                          * tried to UNLISTEN us, so there can be no uncommitted changes.
955                          */
956                         rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
957                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
958
959 #ifdef NOT_USED                                 /* currently there are no indexes */
960                         CatalogUpdateIndexes(lRel, rTuple);
961 #endif
962                 }
963         }
964         heap_endscan(scan);
965
966         /*
967          * We do NOT release the lock on pg_listener here; we need to hold it
968          * until end of transaction (which is about to happen, anyway) to ensure
969          * that other backends see our tuple updates when they look. Otherwise, a
970          * transaction started after this one might mistakenly think it doesn't
971          * need to send this backend a new NOTIFY.
972          */
973         heap_close(lRel, NoLock);
974
975         CommitTransactionCommand();
976
977         /*
978          * Must flush the notify messages to ensure frontend gets them promptly.
979          */
980         pq_flush();
981
982         set_ps_display("idle", false);
983
984         if (Trace_notify)
985                 elog(DEBUG1, "ProcessIncomingNotify: done");
986
987         if (catchup_enabled)
988                 EnableCatchupInterrupt();
989 }
990
991 /*
992  * Send NOTIFY message to my front end.
993  */
994 static void
995 NotifyMyFrontEnd(char *relname, int32 listenerPID)
996 {
997         if (whereToSendOutput == DestRemote)
998         {
999                 StringInfoData buf;
1000
1001                 pq_beginmessage(&buf, 'A');
1002                 pq_sendint(&buf, listenerPID, sizeof(int32));
1003                 pq_sendstring(&buf, relname);
1004                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1005                 {
1006                         /* XXX Add parameter string here later */
1007                         pq_sendstring(&buf, "");
1008                 }
1009                 pq_endmessage(&buf);
1010
1011                 /*
1012                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
1013                  * happen at the end of the transaction, and for incoming notifies
1014                  * ProcessIncomingNotify will do it after finding all the notifies.
1015                  */
1016         }
1017         else
1018                 elog(INFO, "NOTIFY for %s", relname);
1019 }
1020
1021 /* Does pendingNotifies include the given relname? */
1022 static bool
1023 AsyncExistsPendingNotify(const char *relname)
1024 {
1025         ListCell   *p;
1026
1027         foreach(p, pendingNotifies)
1028         {
1029                 const char *prelname = (const char *) lfirst(p);
1030
1031                 if (strcmp(prelname, relname) == 0)
1032                         return true;
1033         }
1034
1035         return false;
1036 }
1037
1038 /* Clear the pendingNotifies list. */
1039 static void
1040 ClearPendingNotifies(void)
1041 {
1042         /*
1043          * We used to have to explicitly deallocate the list members and nodes,
1044          * because they were malloc'd.  Now, since we know they are palloc'd in
1045          * CurTransactionContext, we need not do that --- they'll go away
1046          * automatically at transaction exit.  We need only reset the list head
1047          * pointer.
1048          */
1049         pendingNotifies = NIL;
1050 }
1051
1052 /*
1053  * 2PC processing routine for COMMIT PREPARED case.
1054  *
1055  * (We don't have to do anything for ROLLBACK PREPARED.)
1056  */
1057 void
1058 notify_twophase_postcommit(TransactionId xid, uint16 info,
1059                                                    void *recdata, uint32 len)
1060 {
1061         /*
1062          * Set up to issue the NOTIFY at the end of my own current transaction.
1063          * (XXX this has some issues if my own transaction later rolls back, or if
1064          * there is any significant delay before I commit.      OK for now because we
1065          * disallow COMMIT PREPARED inside a transaction block.)
1066          */
1067         Async_Notify((char *) recdata);
1068 }