]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Remove all uses of the deprecated functions heap_formtuple, heap_modifytuple,
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.142 2008/11/02 01:45:27 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
56  * of pending actions.  If we reach transaction commit, the changes are
57  * applied to pg_listener just before executing any pending NOTIFYs.  This
58  * method is necessary because to avoid race conditions, we must hold lock
59  * on pg_listener from when we insert a new listener tuple until we commit.
60  * To do that and not create undue hazard of deadlock, we don't want to
61  * touch pg_listener until we are otherwise done with the transaction;
62  * in particular it'd be uncool to still be taking user-commanded locks
63  * while holding the pg_listener lock.
64  *
65  * Although we grab ExclusiveLock on pg_listener for any operation,
66  * the lock is never held very long, so it shouldn't cause too much of
67  * a performance problem.  (Previously we used AccessExclusiveLock, but
68  * there's no real reason to forbid concurrent reads.)
69  *
70  * An application that listens on the same relname it notifies will get
71  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
72  * by comparing be_pid in the NOTIFY message to the application's own backend's
73  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
74  * frontend during startup.)  The above design guarantees that notifies from
75  * other backends will never be missed by ignoring self-notifies.  Note,
76  * however, that we do *not* guarantee that a separate frontend message will
77  * be sent for every outside NOTIFY.  Since there is only room for one
78  * originating PID in pg_listener, outside notifies occurring at about the
79  * same time may be collapsed into a single message bearing the PID of the
80  * first outside backend to perform the NOTIFY.
81  *-------------------------------------------------------------------------
82  */
83
84 #include "postgres.h"
85
86 #include <unistd.h>
87 #include <signal.h>
88
89 #include "access/heapam.h"
90 #include "access/twophase_rmgr.h"
91 #include "access/xact.h"
92 #include "catalog/pg_listener.h"
93 #include "commands/async.h"
94 #include "libpq/libpq.h"
95 #include "libpq/pqformat.h"
96 #include "miscadmin.h"
97 #include "storage/ipc.h"
98 #include "storage/sinval.h"
99 #include "tcop/tcopprot.h"
100 #include "utils/builtins.h"
101 #include "utils/fmgroids.h"
102 #include "utils/memutils.h"
103 #include "utils/ps_status.h"
104 #include "utils/tqual.h"
105
106
107 /*
108  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
109  * all actions requested in the current transaction.  As explained above,
110  * we don't actually modify pg_listener until we reach transaction commit.
111  *
112  * The list is kept in CurTransactionContext.  In subtransactions, each
113  * subtransaction has its own list in its own CurTransactionContext, but
114  * successful subtransactions attach their lists to their parent's list.
115  * Failed subtransactions simply discard their lists.
116  */
117 typedef enum
118 {
119         LISTEN_LISTEN,
120         LISTEN_UNLISTEN,
121         LISTEN_UNLISTEN_ALL
122 } ListenActionKind;
123
124 typedef struct
125 {
126         ListenActionKind action;
127         char            condname[1];                            /* actually, as long as needed */
128 } ListenAction;
129
130 static List *pendingActions = NIL;                      /* list of ListenAction */
131
132 static List *upperPendingActions = NIL;         /* list of upper-xact lists */
133
134 /*
135  * State for outbound notifies consists of a list of all relnames NOTIFYed
136  * in the current transaction.  We do not actually perform a NOTIFY until
137  * and unless the transaction commits.  pendingNotifies is NIL if no
138  * NOTIFYs have been done in the current transaction.
139  *
140  * The list is kept in CurTransactionContext.  In subtransactions, each
141  * subtransaction has its own list in its own CurTransactionContext, but
142  * successful subtransactions attach their lists to their parent's list.
143  * Failed subtransactions simply discard their lists.
144  *
145  * Note: the action and notify lists do not interact within a transaction.
146  * In particular, if a transaction does NOTIFY and then LISTEN on the same
147  * condition name, it will get a self-notify at commit.  This is a bit odd
148  * but is consistent with our historical behavior.
149  */
150 static List *pendingNotifies = NIL;                             /* list of C strings */
151
152 static List *upperPendingNotifies = NIL;                /* list of upper-xact lists */
153
154 /*
155  * State for inbound notifies consists of two flags: one saying whether
156  * the signal handler is currently allowed to call ProcessIncomingNotify
157  * directly, and one saying whether the signal has occurred but the handler
158  * was not allowed to call ProcessIncomingNotify at the time.
159  *
160  * NB: the "volatile" on these declarations is critical!  If your compiler
161  * does not grok "volatile", you'd be best advised to compile this file
162  * with all optimization turned off.
163  */
164 static volatile sig_atomic_t notifyInterruptEnabled = 0;
165 static volatile sig_atomic_t notifyInterruptOccurred = 0;
166
167 /* True if we've registered an on_shmem_exit cleanup */
168 static bool unlistenExitRegistered = false;
169
170 bool            Trace_notify = false;
171
172
173 static void queue_listen(ListenActionKind action, const char *condname);
174 static void Async_UnlistenOnExit(int code, Datum arg);
175 static void Exec_Listen(Relation lRel, const char *relname);
176 static void Exec_Unlisten(Relation lRel, const char *relname);
177 static void Exec_UnlistenAll(Relation lRel);
178 static void Send_Notify(Relation lRel);
179 static void ProcessIncomingNotify(void);
180 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
181 static bool AsyncExistsPendingNotify(const char *relname);
182 static void ClearPendingActionsAndNotifies(void);
183
184
185 /*
186  * Async_Notify
187  *
188  *              This is executed by the SQL notify command.
189  *
190  *              Adds the relation to the list of pending notifies.
191  *              Actual notification happens during transaction commit.
192  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
193  */
194 void
195 Async_Notify(const char *relname)
196 {
197         if (Trace_notify)
198                 elog(DEBUG1, "Async_Notify(%s)", relname);
199
200         /* no point in making duplicate entries in the list ... */
201         if (!AsyncExistsPendingNotify(relname))
202         {
203                 /*
204                  * The name list needs to live until end of transaction, so store it
205                  * in the transaction context.
206                  */
207                 MemoryContext oldcontext;
208
209                 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
210
211                 /*
212                  * Ordering of the list isn't important.  We choose to put new
213                  * entries on the front, as this might make duplicate-elimination
214                  * a tad faster when the same condition is signaled many times in
215                  * a row.
216                  */
217                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
218
219                 MemoryContextSwitchTo(oldcontext);
220         }
221 }
222
223 /*
224  * queue_listen
225  *              Common code for listen, unlisten, unlisten all commands.
226  *
227  *              Adds the request to the list of pending actions.
228  *              Actual update of pg_listener happens during transaction commit.
229  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
230  */
231 static void
232 queue_listen(ListenActionKind action, const char *condname)
233 {
234         MemoryContext oldcontext;
235         ListenAction *actrec;
236
237         /*
238          * Unlike Async_Notify, we don't try to collapse out duplicates.
239          * It would be too complicated to ensure we get the right interactions
240          * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
241          * there would be any performance benefit anyway in sane applications.
242          */
243         oldcontext = MemoryContextSwitchTo(CurTransactionContext);
244
245         /* space for terminating null is included in sizeof(ListenAction) */
246         actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
247         actrec->action = action;
248         strcpy(actrec->condname, condname);
249
250         pendingActions = lappend(pendingActions, actrec);
251
252         MemoryContextSwitchTo(oldcontext);
253 }
254
255 /*
256  * Async_Listen
257  *
258  *              This is executed by the SQL listen command.
259  */
260 void
261 Async_Listen(const char *relname)
262 {
263         if (Trace_notify)
264                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
265
266         queue_listen(LISTEN_LISTEN, relname);
267 }
268
269 /*
270  * Async_Unlisten
271  *
272  *              This is executed by the SQL unlisten command.
273  */
274 void
275 Async_Unlisten(const char *relname)
276 {
277         if (Trace_notify)
278                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
279
280         queue_listen(LISTEN_UNLISTEN, relname);
281 }
282
283 /*
284  * Async_UnlistenAll
285  *
286  *              This is invoked by UNLISTEN * command, and also at backend exit.
287  */
288 void
289 Async_UnlistenAll(void)
290 {
291         if (Trace_notify)
292                 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
293
294         queue_listen(LISTEN_UNLISTEN_ALL, "");
295 }
296
297 /*
298  * Async_UnlistenOnExit
299  *
300  *              Clean up the pg_listener table at backend exit.
301  *
302  *              This is executed if we have done any LISTENs in this backend.
303  *              It might not be necessary anymore, if the user UNLISTENed everything,
304  *              but we don't try to detect that case.
305  */
306 static void
307 Async_UnlistenOnExit(int code, Datum arg)
308 {
309         /*
310          * We need to start/commit a transaction for the unlisten, but if there is
311          * already an active transaction we had better abort that one first.
312          * Otherwise we'd end up committing changes that probably ought to be
313          * discarded.
314          */
315         AbortOutOfAnyTransaction();
316         /* Now we can do the unlisten */
317         StartTransactionCommand();
318         Async_UnlistenAll();
319         CommitTransactionCommand();
320 }
321
322 /*
323  * AtPrepare_Notify
324  *
325  *              This is called at the prepare phase of a two-phase
326  *              transaction.  Save the state for possible commit later.
327  */
328 void
329 AtPrepare_Notify(void)
330 {
331         ListCell   *p;
332
333         /* It's not sensible to have any pending LISTEN/UNLISTEN actions */
334         if (pendingActions)
335                 ereport(ERROR,
336                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
337                                  errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
338
339         /* We can deal with pending NOTIFY though */
340         foreach(p, pendingNotifies)
341         {
342                 const char *relname = (const char *) lfirst(p);
343
344                 RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
345                                                            relname, strlen(relname) + 1);
346         }
347
348         /*
349          * We can clear the state immediately, rather than needing a separate
350          * PostPrepare call, because if the transaction fails we'd just discard
351          * the state anyway.
352          */
353         ClearPendingActionsAndNotifies();
354 }
355
356 /*
357  * AtCommit_Notify
358  *
359  *              This is called at transaction commit.
360  *
361  *              If there are pending LISTEN/UNLISTEN actions, insert or delete
362  *              tuples in pg_listener accordingly.
363  *
364  *              If there are outbound notify requests in the pendingNotifies list,
365  *              scan pg_listener for matching tuples, and either signal the other
366  *              backend or send a message to our own frontend.
367  *
368  *              NOTE: we are still inside the current transaction, therefore can
369  *              piggyback on its committing of changes.
370  */
371 void
372 AtCommit_Notify(void)
373 {
374         Relation        lRel;
375         ListCell   *p;
376
377         if (pendingActions == NIL && pendingNotifies == NIL)
378                 return;                                 /* no relevant statements in this xact */
379
380         /*
381          * NOTIFY is disabled if not normal processing mode. This test used to be
382          * in xact.c, but it seems cleaner to do it here.
383          */
384         if (!IsNormalProcessingMode())
385         {
386                 ClearPendingActionsAndNotifies();
387                 return;
388         }
389
390         if (Trace_notify)
391                 elog(DEBUG1, "AtCommit_Notify");
392
393         /* Acquire ExclusiveLock on pg_listener */
394         lRel = heap_open(ListenerRelationId, ExclusiveLock);
395
396         /* Perform any pending listen/unlisten actions */
397         foreach(p, pendingActions)
398         {
399                 ListenAction *actrec = (ListenAction *) lfirst(p);
400
401                 switch (actrec->action)
402                 {
403                         case LISTEN_LISTEN:
404                                 Exec_Listen(lRel, actrec->condname);
405                                 break;
406                         case LISTEN_UNLISTEN:
407                                 Exec_Unlisten(lRel, actrec->condname);
408                                 break;
409                         case LISTEN_UNLISTEN_ALL:
410                                 Exec_UnlistenAll(lRel);
411                                 break;
412                 }
413
414                 /* We must CCI after each action in case of conflicting actions */
415                 CommandCounterIncrement();
416         }
417
418         /* Perform any pending notifies */
419         if (pendingNotifies)
420                 Send_Notify(lRel);
421
422         /*
423          * We do NOT release the lock on pg_listener here; we need to hold it
424          * until end of transaction (which is about to happen, anyway) to ensure
425          * that notified backends see our tuple updates when they look. Else they
426          * might disregard the signal, which would make the application programmer
427          * very unhappy.  Also, this prevents race conditions when we have just
428          * inserted a listening tuple.
429          */
430         heap_close(lRel, NoLock);
431
432         ClearPendingActionsAndNotifies();
433
434         if (Trace_notify)
435                 elog(DEBUG1, "AtCommit_Notify: done");
436 }
437
438 /*
439  * Exec_Listen --- subroutine for AtCommit_Notify
440  *
441  *              Register the current backend as listening on the specified relation.
442  */
443 static void
444 Exec_Listen(Relation lRel, const char *relname)
445 {
446         HeapScanDesc scan;
447         HeapTuple       tuple;
448         Datum           values[Natts_pg_listener];
449         bool            nulls[Natts_pg_listener];
450         NameData        condname;
451         bool            alreadyListener = false;
452
453         if (Trace_notify)
454                 elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
455
456         /* Detect whether we are already listening on this relname */
457         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
458         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
459         {
460                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
461
462                 if (listener->listenerpid == MyProcPid &&
463                         strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
464                 {
465                         alreadyListener = true;
466                         /* No need to scan the rest of the table */
467                         break;
468                 }
469         }
470         heap_endscan(scan);
471
472         if (alreadyListener)
473                 return;
474
475         /*
476          * OK to insert a new tuple
477          */
478         memset(nulls, false, sizeof(nulls));
479
480         namestrcpy(&condname, relname);
481         values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
482         values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid);
483         values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */
484
485         tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
486
487         simple_heap_insert(lRel, tuple);
488
489 #ifdef NOT_USED                                 /* currently there are no indexes */
490         CatalogUpdateIndexes(lRel, tuple);
491 #endif
492
493         heap_freetuple(tuple);
494
495         /*
496          * now that we are listening, make sure we will unlisten before dying.
497          */
498         if (!unlistenExitRegistered)
499         {
500                 on_shmem_exit(Async_UnlistenOnExit, 0);
501                 unlistenExitRegistered = true;
502         }
503 }
504
505 /*
506  * Exec_Unlisten --- subroutine for AtCommit_Notify
507  *
508  *              Remove the current backend from the list of listening backends
509  *              for the specified relation.
510  */
511 static void
512 Exec_Unlisten(Relation lRel, const char *relname)
513 {
514         HeapScanDesc scan;
515         HeapTuple       tuple;
516
517         if (Trace_notify)
518                 elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
519
520         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
521         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
522         {
523                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
524
525                 if (listener->listenerpid == MyProcPid &&
526                         strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
527                 {
528                         /* Found the matching tuple, delete it */
529                         simple_heap_delete(lRel, &tuple->t_self);
530
531                         /*
532                          * We assume there can be only one match, so no need to scan the
533                          * rest of the table
534                          */
535                         break;
536                 }
537         }
538         heap_endscan(scan);
539
540         /*
541          * We do not complain about unlistening something not being listened;
542          * should we?
543          */
544 }
545
546 /*
547  * Exec_UnlistenAll --- subroutine for AtCommit_Notify
548  *
549  *              Update pg_listener to unlisten all relations for this backend.
550  */
551 static void
552 Exec_UnlistenAll(Relation lRel)
553 {
554         HeapScanDesc scan;
555         HeapTuple       lTuple;
556         ScanKeyData key[1];
557
558         if (Trace_notify)
559                 elog(DEBUG1, "Exec_UnlistenAll");
560
561         /* Find and delete all entries with my listenerPID */
562         ScanKeyInit(&key[0],
563                                 Anum_pg_listener_pid,
564                                 BTEqualStrategyNumber, F_INT4EQ,
565                                 Int32GetDatum(MyProcPid));
566         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
567
568         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
569                 simple_heap_delete(lRel, &lTuple->t_self);
570
571         heap_endscan(scan);
572 }
573
574 /*
575  * Send_Notify --- subroutine for AtCommit_Notify
576  *
577  *              Scan pg_listener for tuples matching our pending notifies, and
578  *              either signal the other backend or send a message to our own frontend.
579  */
580 static void
581 Send_Notify(Relation lRel)
582 {
583         TupleDesc       tdesc = RelationGetDescr(lRel);
584         HeapScanDesc scan;
585         HeapTuple       lTuple,
586                                 rTuple;
587         Datum           value[Natts_pg_listener];
588         bool            repl[Natts_pg_listener],
589                                 nulls[Natts_pg_listener];
590
591         /* preset data to update notify column to MyProcPid */
592         memset(nulls, false, sizeof(nulls));
593         memset(repl, false, sizeof(repl));
594         repl[Anum_pg_listener_notify - 1] = true;
595         memset(value, 0, sizeof(value));
596         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
597
598         scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
599
600         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
601         {
602                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
603                 char       *relname = NameStr(listener->relname);
604                 int32           listenerPID = listener->listenerpid;
605
606                 if (!AsyncExistsPendingNotify(relname))
607                         continue;
608
609                 if (listenerPID == MyProcPid)
610                 {
611                         /*
612                          * Self-notify: no need to bother with table update. Indeed, we
613                          * *must not* clear the notification field in this path, or we
614                          * could lose an outside notify, which'd be bad for applications
615                          * that ignore self-notify messages.
616                          */
617                         if (Trace_notify)
618                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
619
620                         NotifyMyFrontEnd(relname, listenerPID);
621                 }
622                 else
623                 {
624                         if (Trace_notify)
625                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
626                                          listenerPID);
627
628                         /*
629                          * If someone has already notified this listener, we don't bother
630                          * modifying the table, but we do still send a SIGUSR2 signal,
631                          * just in case that backend missed the earlier signal for some
632                          * reason.      It's OK to send the signal first, because the other
633                          * guy can't read pg_listener until we unlock it.
634                          */
635                         if (kill(listenerPID, SIGUSR2) < 0)
636                         {
637                                 /*
638                                  * Get rid of pg_listener entry if it refers to a PID that no
639                                  * longer exists.  Presumably, that backend crashed without
640                                  * deleting its pg_listener entries. This code used to only
641                                  * delete the entry if errno==ESRCH, but as far as I can see
642                                  * we should just do it for any failure (certainly at least
643                                  * for EPERM too...)
644                                  */
645                                 simple_heap_delete(lRel, &lTuple->t_self);
646                         }
647                         else if (listener->notification == 0)
648                         {
649                                 /* Rewrite the tuple with my PID in notification column */
650                                 rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
651                                 simple_heap_update(lRel, &lTuple->t_self, rTuple);
652
653 #ifdef NOT_USED                                 /* currently there are no indexes */
654                                 CatalogUpdateIndexes(lRel, rTuple);
655 #endif
656                         }
657                 }
658         }
659
660         heap_endscan(scan);
661 }
662
663 /*
664  * AtAbort_Notify
665  *
666  *              This is called at transaction abort.
667  *
668  *              Gets rid of pending actions and outbound notifies that we would have
669  *              executed if the transaction got committed.
670  */
671 void
672 AtAbort_Notify(void)
673 {
674         ClearPendingActionsAndNotifies();
675 }
676
677 /*
678  * AtSubStart_Notify() --- Take care of subtransaction start.
679  *
680  * Push empty state for the new subtransaction.
681  */
682 void
683 AtSubStart_Notify(void)
684 {
685         MemoryContext old_cxt;
686
687         /* Keep the list-of-lists in TopTransactionContext for simplicity */
688         old_cxt = MemoryContextSwitchTo(TopTransactionContext);
689
690         upperPendingActions = lcons(pendingActions, upperPendingActions);
691
692         Assert(list_length(upperPendingActions) ==
693                    GetCurrentTransactionNestLevel() - 1);
694
695         pendingActions = NIL;
696
697         upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
698
699         Assert(list_length(upperPendingNotifies) ==
700                    GetCurrentTransactionNestLevel() - 1);
701
702         pendingNotifies = NIL;
703
704         MemoryContextSwitchTo(old_cxt);
705 }
706
707 /*
708  * AtSubCommit_Notify() --- Take care of subtransaction commit.
709  *
710  * Reassign all items in the pending lists to the parent transaction.
711  */
712 void
713 AtSubCommit_Notify(void)
714 {
715         List       *parentPendingActions;
716         List       *parentPendingNotifies;
717
718         parentPendingActions = (List *) linitial(upperPendingActions);
719         upperPendingActions = list_delete_first(upperPendingActions);
720
721         Assert(list_length(upperPendingActions) ==
722                    GetCurrentTransactionNestLevel() - 2);
723
724         /*
725          * Mustn't try to eliminate duplicates here --- see queue_listen()
726          */
727         pendingActions = list_concat(parentPendingActions, pendingActions);
728
729         parentPendingNotifies = (List *) linitial(upperPendingNotifies);
730         upperPendingNotifies = list_delete_first(upperPendingNotifies);
731
732         Assert(list_length(upperPendingNotifies) ==
733                    GetCurrentTransactionNestLevel() - 2);
734
735         /*
736          * We could try to eliminate duplicates here, but it seems not worthwhile.
737          */
738         pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
739 }
740
741 /*
742  * AtSubAbort_Notify() --- Take care of subtransaction abort.
743  */
744 void
745 AtSubAbort_Notify(void)
746 {
747         int                     my_level = GetCurrentTransactionNestLevel();
748
749         /*
750          * All we have to do is pop the stack --- the actions/notifies made in this
751          * subxact are no longer interesting, and the space will be freed when
752          * CurTransactionContext is recycled.
753          *
754          * This routine could be called more than once at a given nesting level if
755          * there is trouble during subxact abort.  Avoid dumping core by using
756          * GetCurrentTransactionNestLevel as the indicator of how far we need to
757          * prune the list.
758          */
759         while (list_length(upperPendingActions) > my_level - 2)
760         {
761                 pendingActions = (List *) linitial(upperPendingActions);
762                 upperPendingActions = list_delete_first(upperPendingActions);
763         }
764
765         while (list_length(upperPendingNotifies) > my_level - 2)
766         {
767                 pendingNotifies = (List *) linitial(upperPendingNotifies);
768                 upperPendingNotifies = list_delete_first(upperPendingNotifies);
769         }
770 }
771
772 /*
773  * NotifyInterruptHandler
774  *
775  *              This is the signal handler for SIGUSR2.
776  *
777  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
778  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
779  *              to do it later.
780  */
781 void
782 NotifyInterruptHandler(SIGNAL_ARGS)
783 {
784         int                     save_errno = errno;
785
786         /*
787          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
788          * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
789          * would likely lead to corruption of stdio buffers if they were ever
790          * turned on.
791          */
792
793         /* Don't joggle the elbow of proc_exit */
794         if (proc_exit_inprogress)
795                 return;
796
797         if (notifyInterruptEnabled)
798         {
799                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
800
801                 /*
802                  * We may be called while ImmediateInterruptOK is true; turn it off
803                  * while messing with the NOTIFY state.  (We would have to save and
804                  * restore it anyway, because PGSemaphore operations inside
805                  * ProcessIncomingNotify() might reset it.)
806                  */
807                 ImmediateInterruptOK = false;
808
809                 /*
810                  * I'm not sure whether some flavors of Unix might allow another
811                  * SIGUSR2 occurrence to recursively interrupt this routine. To cope
812                  * with the possibility, we do the same sort of dance that
813                  * EnableNotifyInterrupt must do --- see that routine for comments.
814                  */
815                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
816                 notifyInterruptOccurred = 1;    /* do at least one iteration */
817                 for (;;)
818                 {
819                         notifyInterruptEnabled = 1;
820                         if (!notifyInterruptOccurred)
821                                 break;
822                         notifyInterruptEnabled = 0;
823                         if (notifyInterruptOccurred)
824                         {
825                                 /* Here, it is finally safe to do stuff. */
826                                 if (Trace_notify)
827                                         elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
828
829                                 ProcessIncomingNotify();
830
831                                 if (Trace_notify)
832                                         elog(DEBUG1, "NotifyInterruptHandler: done");
833                         }
834                 }
835
836                 /*
837                  * Restore ImmediateInterruptOK, and check for interrupts if needed.
838                  */
839                 ImmediateInterruptOK = save_ImmediateInterruptOK;
840                 if (save_ImmediateInterruptOK)
841                         CHECK_FOR_INTERRUPTS();
842         }
843         else
844         {
845                 /*
846                  * In this path it is NOT SAFE to do much of anything, except this:
847                  */
848                 notifyInterruptOccurred = 1;
849         }
850
851         errno = save_errno;
852 }
853
854 /*
855  * EnableNotifyInterrupt
856  *
857  *              This is called by the PostgresMain main loop just before waiting
858  *              for a frontend command.  If we are truly idle (ie, *not* inside
859  *              a transaction block), then process any pending inbound notifies,
860  *              and enable the signal handler to process future notifies directly.
861  *
862  *              NOTE: the signal handler starts out disabled, and stays so until
863  *              PostgresMain calls this the first time.
864  */
865 void
866 EnableNotifyInterrupt(void)
867 {
868         if (IsTransactionOrTransactionBlock())
869                 return;                                 /* not really idle */
870
871         /*
872          * This code is tricky because we are communicating with a signal handler
873          * that could interrupt us at any point.  If we just checked
874          * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
875          * fail to respond promptly to a signal that happens in between those two
876          * steps.  (A very small time window, perhaps, but Murphy's Law says you
877          * can hit it...)  Instead, we first set the enable flag, then test the
878          * occurred flag.  If we see an unserviced interrupt has occurred, we
879          * re-clear the enable flag before going off to do the service work. (That
880          * prevents re-entrant invocation of ProcessIncomingNotify() if another
881          * interrupt occurs.) If an interrupt comes in between the setting and
882          * clearing of notifyInterruptEnabled, then it will have done the service
883          * work and left notifyInterruptOccurred zero, so we have to check again
884          * after clearing enable.  The whole thing has to be in a loop in case
885          * another interrupt occurs while we're servicing the first. Once we get
886          * out of the loop, enable is set and we know there is no unserviced
887          * interrupt.
888          *
889          * NB: an overenthusiastic optimizing compiler could easily break this
890          * code. Hopefully, they all understand what "volatile" means these days.
891          */
892         for (;;)
893         {
894                 notifyInterruptEnabled = 1;
895                 if (!notifyInterruptOccurred)
896                         break;
897                 notifyInterruptEnabled = 0;
898                 if (notifyInterruptOccurred)
899                 {
900                         if (Trace_notify)
901                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
902
903                         ProcessIncomingNotify();
904
905                         if (Trace_notify)
906                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
907                 }
908         }
909 }
910
911 /*
912  * DisableNotifyInterrupt
913  *
914  *              This is called by the PostgresMain main loop just after receiving
915  *              a frontend command.  Signal handler execution of inbound notifies
916  *              is disabled until the next EnableNotifyInterrupt call.
917  *
918  *              The SIGUSR1 signal handler also needs to call this, so as to
919  *              prevent conflicts if one signal interrupts the other.  So we
920  *              must return the previous state of the flag.
921  */
922 bool
923 DisableNotifyInterrupt(void)
924 {
925         bool            result = (notifyInterruptEnabled != 0);
926
927         notifyInterruptEnabled = 0;
928
929         return result;
930 }
931
932 /*
933  * ProcessIncomingNotify
934  *
935  *              Deal with arriving NOTIFYs from other backends.
936  *              This is called either directly from the SIGUSR2 signal handler,
937  *              or the next time control reaches the outer idle loop.
938  *              Scan pg_listener for arriving notifies, report them to my front end,
939  *              and clear the notification field in pg_listener until next time.
940  *
941  *              NOTE: since we are outside any transaction, we must create our own.
942  */
943 static void
944 ProcessIncomingNotify(void)
945 {
946         Relation        lRel;
947         TupleDesc       tdesc;
948         ScanKeyData key[1];
949         HeapScanDesc scan;
950         HeapTuple       lTuple,
951                                 rTuple;
952         Datum           value[Natts_pg_listener];
953         bool            repl[Natts_pg_listener],
954                                 nulls[Natts_pg_listener];
955         bool            catchup_enabled;
956
957         /* Must prevent SIGUSR1 interrupt while I am running */
958         catchup_enabled = DisableCatchupInterrupt();
959
960         if (Trace_notify)
961                 elog(DEBUG1, "ProcessIncomingNotify");
962
963         set_ps_display("notify interrupt", false);
964
965         notifyInterruptOccurred = 0;
966
967         StartTransactionCommand();
968
969         lRel = heap_open(ListenerRelationId, ExclusiveLock);
970         tdesc = RelationGetDescr(lRel);
971
972         /* Scan only entries with my listenerPID */
973         ScanKeyInit(&key[0],
974                                 Anum_pg_listener_pid,
975                                 BTEqualStrategyNumber, F_INT4EQ,
976                                 Int32GetDatum(MyProcPid));
977         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
978
979         /* Prepare data for rewriting 0 into notification field */
980         memset(nulls, false, sizeof(nulls));
981         memset(repl, false, sizeof(repl));
982         repl[Anum_pg_listener_notify - 1] = true;
983         memset(value, 0, sizeof(value));
984         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
985
986         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
987         {
988                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
989                 char       *relname = NameStr(listener->relname);
990                 int32           sourcePID = listener->notification;
991
992                 if (sourcePID != 0)
993                 {
994                         /* Notify the frontend */
995
996                         if (Trace_notify)
997                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
998                                          relname, (int) sourcePID);
999
1000                         NotifyMyFrontEnd(relname, sourcePID);
1001
1002                         /*
1003                          * Rewrite the tuple with 0 in notification column.
1004                          */
1005                         rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
1006                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
1007
1008 #ifdef NOT_USED                                 /* currently there are no indexes */
1009                         CatalogUpdateIndexes(lRel, rTuple);
1010 #endif
1011                 }
1012         }
1013         heap_endscan(scan);
1014
1015         /*
1016          * We do NOT release the lock on pg_listener here; we need to hold it
1017          * until end of transaction (which is about to happen, anyway) to ensure
1018          * that other backends see our tuple updates when they look. Otherwise, a
1019          * transaction started after this one might mistakenly think it doesn't
1020          * need to send this backend a new NOTIFY.
1021          */
1022         heap_close(lRel, NoLock);
1023
1024         CommitTransactionCommand();
1025
1026         /*
1027          * Must flush the notify messages to ensure frontend gets them promptly.
1028          */
1029         pq_flush();
1030
1031         set_ps_display("idle", false);
1032
1033         if (Trace_notify)
1034                 elog(DEBUG1, "ProcessIncomingNotify: done");
1035
1036         if (catchup_enabled)
1037                 EnableCatchupInterrupt();
1038 }
1039
1040 /*
1041  * Send NOTIFY message to my front end.
1042  */
1043 static void
1044 NotifyMyFrontEnd(char *relname, int32 listenerPID)
1045 {
1046         if (whereToSendOutput == DestRemote)
1047         {
1048                 StringInfoData buf;
1049
1050                 pq_beginmessage(&buf, 'A');
1051                 pq_sendint(&buf, listenerPID, sizeof(int32));
1052                 pq_sendstring(&buf, relname);
1053                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1054                 {
1055                         /* XXX Add parameter string here later */
1056                         pq_sendstring(&buf, "");
1057                 }
1058                 pq_endmessage(&buf);
1059
1060                 /*
1061                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
1062                  * happen at the end of the transaction, and for incoming notifies
1063                  * ProcessIncomingNotify will do it after finding all the notifies.
1064                  */
1065         }
1066         else
1067                 elog(INFO, "NOTIFY for %s", relname);
1068 }
1069
1070 /* Does pendingNotifies include the given relname? */
1071 static bool
1072 AsyncExistsPendingNotify(const char *relname)
1073 {
1074         ListCell   *p;
1075
1076         foreach(p, pendingNotifies)
1077         {
1078                 const char *prelname = (const char *) lfirst(p);
1079
1080                 if (strcmp(prelname, relname) == 0)
1081                         return true;
1082         }
1083
1084         return false;
1085 }
1086
1087 /* Clear the pendingActions and pendingNotifies lists. */
1088 static void
1089 ClearPendingActionsAndNotifies(void)
1090 {
1091         /*
1092          * We used to have to explicitly deallocate the list members and nodes,
1093          * because they were malloc'd.  Now, since we know they are palloc'd in
1094          * CurTransactionContext, we need not do that --- they'll go away
1095          * automatically at transaction exit.  We need only reset the list head
1096          * pointers.
1097          */
1098         pendingActions = NIL;
1099         pendingNotifies = NIL;
1100 }
1101
1102 /*
1103  * 2PC processing routine for COMMIT PREPARED case.
1104  *
1105  * (We don't have to do anything for ROLLBACK PREPARED.)
1106  */
1107 void
1108 notify_twophase_postcommit(TransactionId xid, uint16 info,
1109                                                    void *recdata, uint32 len)
1110 {
1111         /*
1112          * Set up to issue the NOTIFY at the end of my own current transaction.
1113          * (XXX this has some issues if my own transaction later rolls back, or if
1114          * there is any significant delay before I commit.      OK for now because we
1115          * disallow COMMIT PREPARED inside a transaction block.)
1116          */
1117         Async_Notify((char *) recdata);
1118 }