]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Use Snapshot in heap access methods.
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c--
4  *        Asynchronous notification
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.36 1998/07/27 19:37:50 vadim Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /* New Async Notification Model:
15  * 1. Multiple backends on same machine.  Multiple backends listening on
16  *        one relation.
17  *
18  * 2. One of the backend does a 'notify <relname>'.  For all backends that
19  *        are listening to this relation (all notifications take place at the
20  *        end of commit),
21  *        2.a  If the process is the same as the backend process that issued
22  *                 notification (we are notifying something that we are listening),
23  *                 signal the corresponding frontend over the comm channel.
24  *        2.b  For all other listening processes, we send kill(SIGUSR2) to wake up
25  *                 the listening backend.
26  * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
27  *        notifying that one of the relation that we are listening is being
28  *        notified, we can be in either of two following states:
29  *        3.a  We are sleeping, wake up and signal our frontend.
30  *        3.b  We are in middle of another transaction, wait until the end of
31  *                 of the current transaction and signal our frontend.
32  * 4. Each frontend receives this notification and processes accordingly.
33  *
34  * -- jw, 12/28/93
35  *
36  */
37 /*
38  * The following is the old model which does not work.
39  */
40 /*
41  * Model is:
42  * 1. Multiple backends on same machine.
43  *
44  * 2. Query on one backend sends stuff over an asynchronous portal by
45  *        appending to a relation, and then doing an async. notification
46  *        (which takes place after commit) to all listeners on this relation.
47  *
48  * 3. Async. notification results in all backends listening on relation
49  *        to be woken up, by a process signal kill(SIGUSR2), with name of relation
50  *        passed in shared memory.
51  *
52  * 4. Each backend notifies its respective frontend over the comm
53  *        channel using the out-of-band channel.
54  *
55  * 5. Each frontend receives this notification and processes accordingly.
56  *
57  * #4,#5 are changing soon with pending rewrite of portal/protocol.
58  *
59  */
60 #include <unistd.h>
61 #include <signal.h>
62 #include <string.h>
63 #include <errno.h>
64 #include <sys/types.h>                  /* Needed by in.h on Ultrix */
65 #include <netinet/in.h>
66
67 #include "postgres.h"
68
69 #include "access/heapam.h"
70 #include "access/relscan.h"
71 #include "access/xact.h"
72 #include "catalog/catname.h"
73 #include "catalog/pg_listener.h"
74 #include "commands/async.h"
75 #include "fmgr.h"
76 #include "lib/dllist.h"
77 #include "libpq/libpq.h"
78 #include "miscadmin.h"
79 #include "nodes/memnodes.h"
80 #include "storage/bufmgr.h"
81 #include "storage/lmgr.h"
82 #include "tcop/dest.h"
83 #include "utils/mcxt.h"
84 #include "utils/syscache.h"
85
86 static int      notifyFrontEndPending = 0;
87 static int      notifyIssued = 0;
88 static Dllist *pendingNotifies = NULL;
89
90
91 static int      AsyncExistsPendingNotify(char *);
92 static void ClearPendingNotify(void);
93 static void Async_NotifyFrontEnd(void);
94 void            Async_Unlisten(char *relname, int pid);
95 static void Async_UnlistenOnExit(int code, char *relname);
96
97 /*
98  *--------------------------------------------------------------
99  * Async_NotifyHandler --
100  *
101  *              This is the signal handler for SIGUSR2.  When the backend
102  *              is signaled, the backend can be in two states.
103  *              1. If the backend is in the middle of another transaction,
104  *                 we set the flag, notifyFrontEndPending, and wait until
105  *                 the end of the transaction to notify the front end.
106  *              2. If the backend is not in the middle of another transaction,
107  *                 we notify the front end immediately.
108  *
109  *              -- jw, 12/28/93
110  * Results:
111  *              none
112  *
113  * Side effects:
114  *              none
115  */
116 void
117 Async_NotifyHandler(SIGNAL_ARGS)
118 {
119         extern TransactionState CurrentTransactionState;
120
121         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
122                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
123         {
124
125 #ifdef ASYNC_DEBUG
126                 elog(DEBUG, "Waking up sleeping backend process");
127 #endif
128                 Async_NotifyFrontEnd();
129
130         }
131         else
132         {
133 #ifdef ASYNC_DEBUG
134                 elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
135                          CurrentTransactionState->state,
136                          CurrentTransactionState->blockState);
137 #endif
138                 notifyFrontEndPending = 1;
139         }
140 }
141
142 /*
143  *--------------------------------------------------------------
144  * Async_Notify --
145  *
146  *              Adds the relation to the list of pending notifies.
147  *              All notification happens at end of commit.
148  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
149  *
150  *              All notification of backend processes happens here,
151  *              then each backend notifies its corresponding front end at
152  *              the end of commit.
153  *
154  *              This correspond to 'notify <relname>' command
155  *              -- jw, 12/28/93
156  *
157  * Results:
158  *              XXX
159  *
160  * Side effects:
161  *              All tuples for relname in pg_listener are updated.
162  *
163  *--------------------------------------------------------------
164  */
165 void
166 Async_Notify(char *relname)
167 {
168
169         HeapTuple       lTuple,
170                                 rTuple;
171         Relation        lRel;
172         HeapScanDesc sRel;
173         TupleDesc       tdesc;
174         ScanKeyData key;
175         Buffer          b;
176         Datum           d,
177                                 value[3];
178         bool            isnull;
179         char            repl[3],
180                                 nulls[3];
181
182         char       *notifyName;
183
184 #ifdef ASYNC_DEBUG
185         elog(DEBUG, "Async_Notify: %s", relname);
186 #endif
187
188         if (!pendingNotifies)
189                 pendingNotifies = DLNewList();
190
191         /*
192          * Allocate memory from the global malloc pool because it needs to be
193          * referenced also when the transaction is finished.  DZ - 26-08-1996
194          */
195         notifyName = strdup(relname);
196         DLAddHead(pendingNotifies, DLNewElem(notifyName));
197
198         ScanKeyEntryInitialize(&key, 0,
199                                                    Anum_pg_listener_relname,
200                                                    F_NAMEEQ,
201                                                    PointerGetDatum(notifyName));
202
203         lRel = heap_openr(ListenerRelationName);
204         tdesc = RelationGetTupleDescriptor(lRel);
205         RelationSetLockForWrite(lRel);
206         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
207
208         nulls[0] = nulls[1] = nulls[2] = ' ';
209         repl[0] = repl[1] = repl[2] = ' ';
210         repl[Anum_pg_listener_notify - 1] = 'r';
211         value[0] = value[1] = value[2] = (Datum) 0;
212         value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
213
214         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
215         {
216                 d = heap_getattr(lTuple, Anum_pg_listener_notify,
217                                                  tdesc, &isnull);
218                 if (!DatumGetInt32(d))
219                 {
220                         rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
221                         heap_replace(lRel, &lTuple->t_ctid, rTuple);
222                 }
223                 ReleaseBuffer(b);
224         }
225         heap_endscan(sRel);
226         RelationUnsetLockForWrite(lRel);
227         heap_close(lRel);
228         notifyIssued = 1;
229 }
230
231 /*
232  *--------------------------------------------------------------
233  * Async_NotifyAtCommit --
234  *
235  *              Signal our corresponding frontend process on relations that
236  *              were notified.  Signal all other backend process that
237  *              are listening also.
238  *
239  *              -- jw, 12/28/93
240  *
241  * Results:
242  *              XXX
243  *
244  * Side effects:
245  *              Tuples in pg_listener that has our listenerpid are updated so
246  *              that the notification is 0.  We do not want to notify frontend
247  *              more than once.
248  *
249  *              -- jw, 12/28/93
250  *
251  *--------------------------------------------------------------
252  */
253 void
254 Async_NotifyAtCommit()
255 {
256         HeapTuple       lTuple;
257         Relation        lRel;
258         HeapScanDesc sRel;
259         TupleDesc       tdesc;
260         ScanKeyData key;
261         Datum           d;
262         bool            isnull;
263         Buffer          b;
264         extern TransactionState CurrentTransactionState;
265
266         if (!pendingNotifies)
267                 pendingNotifies = DLNewList();
268
269         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
270                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
271         {
272
273                 if (notifyIssued)
274                 {                                               /* 'notify <relname>' issued by us */
275                         notifyIssued = 0;
276                         StartTransactionCommand();
277 #ifdef ASYNC_DEBUG
278                         elog(DEBUG, "Async_NotifyAtCommit.");
279 #endif
280                         ScanKeyEntryInitialize(&key, 0,
281                                                                    Anum_pg_listener_notify,
282                                                                    F_INT4EQ,
283                                                                    Int32GetDatum(1));
284                         lRel = heap_openr(ListenerRelationName);
285                         RelationSetLockForWrite(lRel);
286                         sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
287                         tdesc = RelationGetTupleDescriptor(lRel);
288
289                         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
290                         {
291                                 d = heap_getattr(lTuple, Anum_pg_listener_relname,
292                                                                  tdesc, &isnull);
293
294                                 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
295                                 {
296                                         d = heap_getattr(lTuple, Anum_pg_listener_pid,
297                                                                          tdesc, &isnull);
298
299                                         if (MyProcPid == DatumGetInt32(d))
300                                         {
301 #ifdef ASYNC_DEBUG
302                                                 elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
303 #endif
304                                                 notifyFrontEndPending = 1;
305                                         }
306                                         else
307                                         {
308 #ifdef ASYNC_DEBUG
309                                                 elog(DEBUG, "Notifying others");
310 #endif
311 #ifdef HAVE_KILL
312                                                 if (kill(DatumGetInt32(d), SIGUSR2) < 0)
313                                                 {
314                                                         if (errno == ESRCH)
315                                                                 heap_delete(lRel, &lTuple->t_ctid);
316                                                 }
317 #endif
318                                         }
319                                 }
320                                 ReleaseBuffer(b);
321                         }
322                         heap_endscan(sRel);
323                         RelationUnsetLockForWrite(lRel);
324                         heap_close(lRel);
325
326                         CommitTransactionCommand();
327                         ClearPendingNotify();
328                 }
329
330                 if (notifyFrontEndPending)
331                 {                                               /* we need to notify the frontend of all
332                                                                  * pending notifies. */
333                         notifyFrontEndPending = 1;
334                         Async_NotifyFrontEnd();
335                 }
336         }
337 }
338
339 /*
340  *--------------------------------------------------------------
341  * Async_NotifyAtAbort --
342  *
343  *              Gets rid of pending notifies.  List elements are automatically
344  *              freed through memory context.
345  *
346  *
347  * Results:
348  *              XXX
349  *
350  * Side effects:
351  *              XXX
352  *
353  *--------------------------------------------------------------
354  */
355 void
356 Async_NotifyAtAbort()
357 {
358         extern TransactionState CurrentTransactionState;
359
360         if (notifyIssued)
361                 ClearPendingNotify();
362         notifyIssued = 0;
363         if (pendingNotifies)
364                 DLFreeList(pendingNotifies);
365         pendingNotifies = DLNewList();
366
367         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
368                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
369         {
370                 if (notifyFrontEndPending)
371                 {                                               /* don't forget to notify front end */
372                         Async_NotifyFrontEnd();
373                 }
374         }
375 }
376
377 /*
378  *--------------------------------------------------------------
379  * Async_Listen --
380  *
381  *              Register a backend (identified by its Unix PID) as listening
382  *              on the specified relation.
383  *
384  *              This corresponds to the 'listen <relation>' command in SQL
385  *
386  *              One listener per relation, pg_listener relation is keyed
387  *              on (relname,pid) to provide multiple listeners in future.
388  *
389  * Results:
390  *              pg_listeners is updated.
391  *
392  * Side effects:
393  *              XXX
394  *
395  *--------------------------------------------------------------
396  */
397 void
398 Async_Listen(char *relname, int pid)
399 {
400         Datum           values[Natts_pg_listener];
401         char            nulls[Natts_pg_listener];
402         TupleDesc       tdesc;
403         HeapScanDesc s;
404         HeapTuple       htup,
405                                 tup;
406         Relation        lDesc;
407         Buffer          b;
408         Datum           d;
409         int                     i;
410         bool            isnull;
411         int                     alreadyListener = 0;
412         char       *relnamei;
413         TupleDesc       tupDesc;
414
415 #ifdef ASYNC_DEBUG
416         elog(DEBUG, "Async_Listen: %s", relname);
417 #endif
418         for (i = 0; i < Natts_pg_listener; i++)
419         {
420                 nulls[i] = ' ';
421                 values[i] = PointerGetDatum(NULL);
422         }
423
424         i = 0;
425         values[i++] = (Datum) relname;
426         values[i++] = (Datum) pid;
427         values[i++] = (Datum) 0;        /* no notifies pending */
428
429         lDesc = heap_openr(ListenerRelationName);
430         RelationSetLockForWrite(lDesc);
431
432         /* is someone already listening.  One listener per relation */
433         tdesc = RelationGetTupleDescriptor(lDesc);
434         s = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
435         while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
436         {
437                 d = heap_getattr(htup, Anum_pg_listener_relname, tdesc,
438                                                  &isnull);
439                 relnamei = DatumGetPointer(d);
440                 if (!strncmp(relnamei, relname, NAMEDATALEN))
441                 {
442                         d = heap_getattr(htup, Anum_pg_listener_pid, tdesc, &isnull);
443                         pid = DatumGetInt32(d);
444                         if (pid == MyProcPid)
445                                 alreadyListener = 1;
446                 }
447                 ReleaseBuffer(b);
448         }
449         heap_endscan(s);
450
451         if (alreadyListener)
452         {
453                 elog(NOTICE, "Async_Listen: We are already listening on %s",
454                          relname);
455                 return;
456         }
457
458         tupDesc = lDesc->rd_att;
459         tup = heap_formtuple(tupDesc,
460                                                  values,
461                                                  nulls);
462         heap_insert(lDesc, tup);
463
464         pfree(tup);
465
466         /*
467          * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
468          * listener on %s (possibly dead)",relname); }
469          */
470
471         RelationUnsetLockForWrite(lDesc);
472         heap_close(lDesc);
473
474         /*
475          * now that we are listening, we should make a note to ourselves to
476          * unlisten prior to dying.
477          */
478         relnamei = malloc(NAMEDATALEN);         /* persists to process exit */
479         StrNCpy(relnamei, relname, NAMEDATALEN);
480         on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
481 }
482
483 /*
484  *--------------------------------------------------------------
485  * Async_Unlisten --
486  *
487  *              Remove the backend from the list of listening backends
488  *              for the specified relation.
489  *
490  *              This would correspond to the 'unlisten <relation>'
491  *              command, but there isn't one yet.
492  *
493  * Results:
494  *              pg_listeners is updated.
495  *
496  * Side effects:
497  *              XXX
498  *
499  *--------------------------------------------------------------
500  */
501 void
502 Async_Unlisten(char *relname, int pid)
503 {
504         Relation        lDesc;
505         HeapTuple       lTuple;
506
507         lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
508                                                                  Int32GetDatum(pid),
509                                                                  0, 0);
510         lDesc = heap_openr(ListenerRelationName);
511         RelationSetLockForWrite(lDesc);
512
513         if (lTuple != NULL)
514                 heap_delete(lDesc, &lTuple->t_ctid);
515
516         RelationUnsetLockForWrite(lDesc);
517         heap_close(lDesc);
518 }
519
520 static void
521 Async_UnlistenOnExit(int code,  /* from exitpg */
522                                          char *relname)
523 {
524         Async_Unlisten((char *) relname, MyProcPid);
525 }
526
527 /*
528  * --------------------------------------------------------------
529  * Async_NotifyFrontEnd --
530  *
531  *              Perform an asynchronous notification to front end over
532  *              portal comm channel.  The name of the relation which contains the
533  *              data is sent to the front end.
534  *
535  *              We remove the notification flag from the pg_listener tuple
536  *              associated with our process.
537  *
538  * Results:
539  *              XXX
540  *
541  * --------------------------------------------------------------
542  */
543 GlobalMemory notifyContext = NULL;
544
545 static void
546 Async_NotifyFrontEnd()
547 {
548         extern CommandDest whereToSendOutput;
549         HeapTuple       lTuple,
550                                 rTuple;
551         Relation        lRel;
552         HeapScanDesc sRel;
553         TupleDesc       tdesc;
554         ScanKeyData key[2];
555         Datum           d,
556                                 value[3];
557         char            repl[3],
558                                 nulls[3];
559         Buffer          b;
560         bool            isnull;
561
562         notifyFrontEndPending = 0;
563
564 #ifdef ASYNC_DEBUG
565         elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
566 #endif
567
568         StartTransactionCommand();
569         ScanKeyEntryInitialize(&key[0], 0,
570                                                    Anum_pg_listener_notify,
571                                                    F_INT4EQ,
572                                                    Int32GetDatum(1));
573         ScanKeyEntryInitialize(&key[1], 0,
574                                                    Anum_pg_listener_pid,
575                                                    F_INT4EQ,
576                                                    Int32GetDatum(MyProcPid));
577         lRel = heap_openr(ListenerRelationName);
578         RelationSetLockForWrite(lRel);
579         tdesc = RelationGetTupleDescriptor(lRel);
580         sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
581
582         nulls[0] = nulls[1] = nulls[2] = ' ';
583         repl[0] = repl[1] = repl[2] = ' ';
584         repl[Anum_pg_listener_notify - 1] = 'r';
585         value[0] = value[1] = value[2] = (Datum) 0;
586         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
587
588         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
589         {
590                 d = heap_getattr(lTuple, Anum_pg_listener_relname,
591                                                  tdesc, &isnull);
592                 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
593                 heap_replace(lRel, &lTuple->t_ctid, rTuple);
594
595                 /* notifying the front end */
596
597                 if (whereToSendOutput == Remote)
598                 {
599                         pq_putnchar("A", 1);
600                         pq_putint((int32) MyProcPid, sizeof(int32));
601                         pq_putstr(DatumGetName(d)->data);
602                         pq_flush();
603                 }
604                 else
605                         elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
606                 ReleaseBuffer(b);
607         }
608         CommitTransactionCommand();
609 }
610
611 static int
612 AsyncExistsPendingNotify(char *relname)
613 {
614         Dlelem     *p;
615
616         for (p = DLGetHead(pendingNotifies);
617                  p != NULL;
618                  p = DLGetSucc(p))
619         {
620                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
621                 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
622                         return 1;
623         }
624
625         return 0;
626 }
627
628 static void
629 ClearPendingNotify()
630 {
631         Dlelem     *p;
632
633         while ((p = DLRemHead(pendingNotifies)) != NULL)
634                 free(DLE_VAL(p));
635 }