]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
Postgres95 1.01 Distribution - Virgin Sources
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c--
4  *    Asynchronous notification
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.1.1.1 1996/07/09 06:21:19 scrappy Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /* New Async Notification Model:
15  * 1. Multiple backends on same machine.  Multiple backends listening on
16  *    one relation.
17  *
18  * 2. One of the backend does a 'notify <relname>'.  For all backends that
19  *    are listening to this relation (all notifications take place at the
20  *    end of commit),
21  *    2.a  If the process is the same as the backend process that issued
22  *         notification (we are notifying something that we are listening),
23  *         signal the corresponding frontend over the comm channel using the
24  *         out-of-band channel.
25  *    2.b  For all other listening processes, we send kill(2) to wake up
26  *         the listening backend.
27  * 3. Upon receiving a kill(2) signal from another backend process notifying
28  *    that one of the relation that we are listening is being notified,
29  *    we can be in either of two following states:
30  *    3.a  We are sleeping, wake up and signal our frontend.
31  *    3.b  We are in middle of another transaction, wait until the end of
32  *         of the current transaction and signal our frontend.
33  * 4. Each frontend receives this notification and prcesses accordingly.
34  *
35  * -- jw, 12/28/93
36  *
37  */
38 /*
39  * The following is the old model which does not work.
40  */
41 /*
42  * Model is:
43  * 1. Multiple backends on same machine.
44  *
45  * 2. Query on one backend sends stuff over an asynchronous portal by 
46  *    appending to a relation, and then doing an async. notification
47  *    (which takes place after commit) to all listeners on this relation.
48  *
49  * 3. Async. notification results in all backends listening on relation 
50  *    to be woken up, by a process signal kill(2), with name of relation
51  *    passed in shared memory.
52  *
53  * 4. Each backend notifies its respective frontend over the comm
54  *    channel using the out-of-band channel.
55  *
56  * 5. Each frontend receives this notification and processes accordingly.
57  *
58  * #4,#5 are changing soon with pending rewrite of portal/protocol.
59  *
60  */
61
62 #include <string.h>
63 #include <signal.h>
64 #include <errno.h>
65 #include "postgres.h"
66
67 #include "access/attnum.h"
68 #include "access/heapam.h"
69 #include "access/htup.h"
70 #include "access/relscan.h"
71 #include "access/skey.h"
72 #include "utils/builtins.h"
73 #include "utils/tqual.h"
74 #include "access/xact.h"
75
76 #include "commands/async.h"
77 #include "commands/copy.h"
78 #include "storage/buf.h"
79 #include "storage/itemptr.h"
80 #include "miscadmin.h"
81 #include "utils/portal.h"
82 #include "utils/excid.h"
83 #include "utils/elog.h"
84 #include "utils/mcxt.h"
85 #include "utils/palloc.h"
86 #include "utils/rel.h"
87
88 #include "nodes/pg_list.h"
89 #include "tcop/dest.h"
90 #include "commands/command.h"
91
92 #include "catalog/catname.h"
93 #include "utils/syscache.h"
94 #include "catalog/pg_attribute.h"
95 #include "catalog/pg_proc.h"
96 #include "catalog/pg_class.h"
97 #include "catalog/pg_type.h"
98 #include "catalog/pg_listener.h"
99
100 #include "executor/execdefs.h"
101 /* #include "executor/execdesc.h"*/
102
103 #include "storage/bufmgr.h"
104 #include "lib/dllist.h"
105 #include "libpq/libpq.h"
106
107
108 static int notifyFrontEndPending = 0;
109 static int notifyIssued = 0;
110 static Dllist *pendingNotifies = NULL;
111
112
113 static int AsyncExistsPendingNotify(char *);
114 static void ClearPendingNotify(void);
115      
116 /*
117  *--------------------------------------------------------------
118  * Async_NotifyHandler --
119  *
120  *      This is the signal handler for SIGUSR2.  When the backend
121  *      is signaled, the backend can be in two states.
122  *      1. If the backend is in the middle of another transaction,
123  *         we set the flag, notifyFrontEndPending, and wait until
124  *         the end of the transaction to notify the front end.
125  *      2. If the backend is not in the middle of another transaction,
126  *         we notify the front end immediately.
127  *
128  *      -- jw, 12/28/93
129  * Results:
130  *      none
131  *
132  * Side effects:
133  *      none
134  */
135 void
136 #if defined(PORTNAME_linux)
137 Async_NotifyHandler(int i)
138 #else
139 Async_NotifyHandler()
140 #endif
141 {
142     extern TransactionState CurrentTransactionState;
143     
144     if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
145         (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
146
147         elog(DEBUG, "Waking up sleeping backend process");
148         Async_NotifyFrontEnd();
149
150     }else {
151         elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
152              CurrentTransactionState->state,
153              CurrentTransactionState->blockState);
154         notifyFrontEndPending = 1;
155     }
156 }
157
158 /*
159  *--------------------------------------------------------------
160  * Async_Notify --
161  *
162  *      Adds the relation to the list of pending notifies.
163  *      All notification happens at end of commit.
164  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
165  *
166  *      All notification of backend processes happens here,
167  *      then each backend notifies its corresponding front end at
168  *      the end of commit.
169  *
170  *      This correspond to 'notify <relname>' command
171  *      -- jw, 12/28/93
172  *
173  * Results:
174  *      XXX
175  *
176  * Side effects:
177  *      All tuples for relname in pg_listener are updated.
178  *
179  *--------------------------------------------------------------
180  */
181 void
182 Async_Notify(char *relname)
183 {
184     
185     HeapTuple lTuple, rTuple;
186     Relation lRel;
187     HeapScanDesc sRel;
188     TupleDesc tdesc;
189     ScanKeyData key;
190     Buffer b;
191     Datum d, value[3];
192     bool isnull;
193     char repl[3], nulls[3];
194     
195     char *notifyName;
196     
197     elog(DEBUG,"Async_Notify: %s",relname);
198     
199     if (!pendingNotifies) 
200       pendingNotifies = DLNewList();
201
202     notifyName = pstrdup(relname);
203     DLAddHead(pendingNotifies, DLNewElem(notifyName));
204     
205     ScanKeyEntryInitialize(&key, 0,
206                            Anum_pg_listener_relname,
207                            NameEqualRegProcedure,
208                            PointerGetDatum(notifyName));
209     
210     lRel = heap_openr(ListenerRelationName);
211     tdesc = RelationGetTupleDescriptor(lRel);
212     sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
213     
214     nulls[0] = nulls[1] = nulls[2] = ' ';
215     repl[0] = repl[1] = repl[2] = ' ';
216     repl[Anum_pg_listener_notify - 1] = 'r';
217     value[0] = value[1] = value[2] = (Datum) 0;
218     value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
219     
220     while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) {
221         d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify,
222                                  tdesc, &isnull);
223         if (!DatumGetInt32(d)) {
224             rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
225             (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
226         }
227         ReleaseBuffer(b);
228     }
229     heap_endscan(sRel);
230     heap_close(lRel);
231     notifyIssued = 1;
232 }
233
234 /*
235  *--------------------------------------------------------------
236  * Async_NotifyAtCommit --
237  *
238  *      Signal our corresponding frontend process on relations that
239  *      were notified.  Signal all other backend process that
240  *      are listening also.
241  *
242  *      -- jw, 12/28/93
243  *
244  * Results:
245  *      XXX
246  *
247  * Side effects:
248  *      Tuples in pg_listener that has our listenerpid are updated so
249  *      that the notification is 0.  We do not want to notify frontend
250  *      more than once.
251  *
252  *      -- jw, 12/28/93
253  *
254  *--------------------------------------------------------------
255  */
256 void
257 Async_NotifyAtCommit()
258 {
259     HeapTuple lTuple;
260     Relation lRel;
261     HeapScanDesc sRel;
262     TupleDesc tdesc;
263     ScanKeyData key;
264     Datum d;
265     int ourpid;
266     bool isnull;
267     Buffer b;
268     extern TransactionState CurrentTransactionState;
269     
270     if (!pendingNotifies)
271       pendingNotifies = DLNewList();
272     
273     if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
274         (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
275         
276         if (notifyIssued) {             /* 'notify <relname>' issued by us */
277             notifyIssued = 0;
278             StartTransactionCommand();
279             elog(DEBUG, "Async_NotifyAtCommit.");
280             ScanKeyEntryInitialize(&key, 0,
281                                    Anum_pg_listener_notify,
282                                    Integer32EqualRegProcedure,
283                                    Int32GetDatum(1));
284             lRel = heap_openr(ListenerRelationName);
285             sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
286             tdesc = RelationGetTupleDescriptor(lRel);
287             ourpid = getpid();
288             
289             while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) {
290                 d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
291                                          tdesc, &isnull);
292                 
293                 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) {
294                     d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid,
295                                              tdesc, &isnull);
296                     
297                     if (ourpid == DatumGetInt32(d)) {
298                         elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
299                         notifyFrontEndPending = 1;
300                     } else {
301                         elog(DEBUG, "Notifying others");
302 #ifndef WIN32
303                         if (kill(DatumGetInt32(d), SIGUSR2) < 0) {
304                             if (errno == ESRCH) {
305                                 heap_delete(lRel, &lTuple->t_ctid);
306                             }
307                         }
308 #endif /* WIN32 */
309                     }
310                 }
311                 ReleaseBuffer(b);
312             }
313             CommitTransactionCommand();
314             ClearPendingNotify();
315         }
316         
317         if (notifyFrontEndPending) {    /* we need to notify the frontend of
318                                            all pending notifies. */
319             notifyFrontEndPending = 1;
320             Async_NotifyFrontEnd();
321         }
322     }
323 }
324
325 /*
326  *--------------------------------------------------------------
327  * Async_NotifyAtAbort --
328  *
329  *      Gets rid of pending notifies.  List elements are automatically
330  *      freed through memory context.
331  *      
332  *
333  * Results:
334  *      XXX
335  *
336  * Side effects:
337  *      XXX
338  *
339  *--------------------------------------------------------------
340  */
341 void
342 Async_NotifyAtAbort()
343 {
344     extern TransactionState CurrentTransactionState;
345     
346     if (notifyIssued) {
347         ClearPendingNotify();
348     }
349     notifyIssued = 0;
350     if (pendingNotifies)
351         DLFreeList(pendingNotifies);
352     pendingNotifies = DLNewList();
353
354     if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
355         (CurrentTransactionState->blockState == TRANS_DEFAULT)) {
356         if (notifyFrontEndPending) { /* don't forget to notify front end */
357             Async_NotifyFrontEnd();
358         }
359     }
360 }
361
362 /*
363  *--------------------------------------------------------------
364  * Async_Listen --
365  *
366  *      Register a backend (identified by its Unix PID) as listening
367  *      on the specified relation.  
368  *
369  *      This corresponds to the 'listen <relation>' command in SQL
370  *
371  *      One listener per relation, pg_listener relation is keyed
372  *      on (relname,pid) to provide multiple listeners in future.
373  *
374  * Results:
375  *      pg_listeners is updated.
376  *
377  * Side effects:
378  *      XXX
379  *
380  *--------------------------------------------------------------
381  */
382 void
383 Async_Listen(char *relname, int pid)
384 {
385     Datum values[Natts_pg_listener];
386     char nulls[Natts_pg_listener];
387     TupleDesc tdesc;
388     HeapScanDesc s;
389     HeapTuple htup,tup;
390     Relation lDesc;
391     Buffer b;
392     Datum d;
393     int i;
394     bool isnull;
395     int alreadyListener = 0;
396     int ourPid = getpid();
397     char *relnamei;
398     TupleDesc tupDesc;
399     
400     elog(DEBUG,"Async_Listen: %s",relname);
401     for (i = 0 ; i < Natts_pg_listener; i++) {
402         nulls[i] = ' ';
403         values[i] = PointerGetDatum(NULL);
404     }
405     
406     i = 0;
407     values[i++] = (Datum) relname;
408     values[i++] = (Datum) pid;
409     values[i++] = (Datum) 0;    /* no notifies pending */
410     
411     lDesc = heap_openr(ListenerRelationName);
412     
413     /* is someone already listening.  One listener per relation */
414     tdesc = RelationGetTupleDescriptor(lDesc);
415     s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
416     while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
417         d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
418                                  &isnull);
419         relnamei = DatumGetPointer(d);
420         if (!strncmp(relnamei,relname, NAMEDATALEN)) {
421             d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
422             pid = DatumGetInt32(d);
423             if (pid == ourPid) {
424                 alreadyListener = 1;
425             }
426         }
427         ReleaseBuffer(b);
428     }
429     heap_endscan(s);
430     
431     if (alreadyListener) {
432         elog(NOTICE, "Async_Listen: We are already listening on %s",
433              relname);
434         return;
435     }
436     
437     tupDesc = lDesc->rd_att;
438     tup = heap_formtuple(tupDesc,
439                          values,
440                          nulls);
441     heap_insert(lDesc, tup);
442     
443     pfree(tup);
444     /*    if (alreadyListener) {
445           elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
446           }*/
447     heap_close(lDesc);
448     
449     /*
450      * now that we are listening, we should make a note to ourselves 
451      * to unlisten prior to dying.
452      */
453     relnamei = malloc(NAMEDATALEN); /* persists to process exit */
454     memset (relnamei, 0, NAMEDATALEN);
455     strncpy(relnamei, relname, NAMEDATALEN);
456     on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
457 }
458
459 /*
460  *--------------------------------------------------------------
461  * Async_Unlisten --
462  *
463  *      Remove the backend from the list of listening backends
464  *      for the specified relation.
465  *    
466  *      This would correspond to the 'unlisten <relation>' 
467  *      command, but there isn't one yet.
468  *
469  * Results:
470  *      pg_listeners is updated.
471  *
472  * Side effects:
473  *      XXX
474  *
475  *--------------------------------------------------------------
476  */
477 void
478 Async_Unlisten(char *relname, int pid)
479 {
480     Relation lDesc;
481     HeapTuple lTuple;
482     
483     lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
484                                  Int32GetDatum(pid),
485                                  0,0);
486     lDesc = heap_openr(ListenerRelationName);
487     if (lTuple != NULL) {
488         heap_delete(lDesc,&lTuple->t_ctid);
489     }
490     heap_close(lDesc);
491 }
492
493 void
494 Async_UnlistenOnExit(int code,  /* from exitpg */
495                      char *relname)
496 {
497     Async_Unlisten((char *) relname, getpid());
498 }
499
500 /*
501  * --------------------------------------------------------------
502  * Async_NotifyFrontEnd --
503  *
504  *      Perform an asynchronous notification to front end over
505  *      portal comm channel.  The name of the relation which contains the
506  *      data is sent to the front end.
507  *
508  *      We remove the notification flag from the pg_listener tuple
509  *      associated with our process.
510  *
511  * Results:
512  *      XXX
513  *
514  * Side effects:
515  *
516  *      We make use of the out-of-band channel to transmit the
517  *      notification to the front end.  The actual data transfer takes
518  *      place at the front end's request.
519  *
520  * --------------------------------------------------------------
521  */
522 GlobalMemory notifyContext = NULL;
523
524 void
525 Async_NotifyFrontEnd()
526 {
527     extern CommandDest whereToSendOutput;
528     HeapTuple lTuple, rTuple;
529     Relation lRel;
530     HeapScanDesc sRel;
531     TupleDesc tdesc;
532     ScanKeyData key[2];
533     Datum d, value[3];
534     char repl[3], nulls[3];
535     Buffer b;
536     int ourpid;
537     bool isnull;
538     
539     notifyFrontEndPending = 0;
540     
541     elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
542     
543     StartTransactionCommand();
544     ourpid = getpid();
545     ScanKeyEntryInitialize(&key[0], 0,
546                            Anum_pg_listener_notify,
547                            Integer32EqualRegProcedure,
548                            Int32GetDatum(1));
549     ScanKeyEntryInitialize(&key[1], 0,
550                            Anum_pg_listener_pid,
551                            Integer32EqualRegProcedure,
552                            Int32GetDatum(ourpid));
553     lRel = heap_openr(ListenerRelationName);
554     tdesc = RelationGetTupleDescriptor(lRel);
555     sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key);
556     
557     nulls[0] = nulls[1] = nulls[2] = ' ';
558     repl[0] = repl[1] = repl[2] = ' ';
559     repl[Anum_pg_listener_notify - 1] = 'r';
560     value[0] = value[1] = value[2] = (Datum) 0;
561     value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
562     
563     while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) {
564         d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
565                                  tdesc, &isnull);
566         rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
567         (void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
568         
569         /* notifying the front end */
570         
571         if (whereToSendOutput == Remote) {
572             pq_putnchar("A", 1);
573             pq_putint(ourpid, 4);
574             pq_putstr(DatumGetName(d)->data);
575             pq_flush();
576         } else {
577             elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
578         }
579         ReleaseBuffer(b);
580     }
581     CommitTransactionCommand();
582 }
583
584 static int
585 AsyncExistsPendingNotify(char *relname)
586 {
587     Dlelem* p;
588     for (p = DLGetHead(pendingNotifies); 
589          p != NULL;
590          p = DLGetSucc(p)) {
591       if (!strcmp(DLE_VAL(p), relname))
592         return 1;
593     }
594
595     return 0;
596 }
597
598 static void
599 ClearPendingNotify()
600 {
601     Dlelem* p;
602     while ( (p = DLRemHead(pendingNotifies)) != NULL)
603         free(DLE_VAL(p));
604 }
605