]> granicus.if.org Git - postgresql/blob - src/backend/commands/async.c
getpid/pid cleanup
[postgresql] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c--
4  *        Asynchronous notification
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.27 1998/01/25 05:12:54 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /* New Async Notification Model:
15  * 1. Multiple backends on same machine.  Multiple backends listening on
16  *        one relation.
17  *
18  * 2. One of the backend does a 'notify <relname>'.  For all backends that
19  *        are listening to this relation (all notifications take place at the
20  *        end of commit),
21  *        2.a  If the process is the same as the backend process that issued
22  *                 notification (we are notifying something that we are listening),
23  *                 signal the corresponding frontend over the comm channel using the
24  *                 out-of-band channel.
25  *        2.b  For all other listening processes, we send kill(2) to wake up
26  *                 the listening backend.
27  * 3. Upon receiving a kill(2) signal from another backend process notifying
28  *        that one of the relation that we are listening is being notified,
29  *        we can be in either of two following states:
30  *        3.a  We are sleeping, wake up and signal our frontend.
31  *        3.b  We are in middle of another transaction, wait until the end of
32  *                 of the current transaction and signal our frontend.
33  * 4. Each frontend receives this notification and prcesses accordingly.
34  *
35  * -- jw, 12/28/93
36  *
37  */
38 /*
39  * The following is the old model which does not work.
40  */
41 /*
42  * Model is:
43  * 1. Multiple backends on same machine.
44  *
45  * 2. Query on one backend sends stuff over an asynchronous portal by
46  *        appending to a relation, and then doing an async. notification
47  *        (which takes place after commit) to all listeners on this relation.
48  *
49  * 3. Async. notification results in all backends listening on relation
50  *        to be woken up, by a process signal kill(2), with name of relation
51  *        passed in shared memory.
52  *
53  * 4. Each backend notifies its respective frontend over the comm
54  *        channel using the out-of-band channel.
55  *
56  * 5. Each frontend receives this notification and processes accordingly.
57  *
58  * #4,#5 are changing soon with pending rewrite of portal/protocol.
59  *
60  */
61 #include <unistd.h>
62 #include <signal.h>
63 #include <string.h>
64 #include <errno.h>
65 #include <sys/types.h>                  /* Needed by in.h on Ultrix */
66 #include <netinet/in.h>
67
68 #include <postgres.h>
69
70 #include <miscadmin.h>
71 #include <utils/syscache.h>
72 #include <access/relscan.h>
73 #include <access/xact.h>
74 #include <lib/dllist.h>
75 #include <tcop/dest.h>
76 #include <catalog/pg_proc.h>
77 #include <catalog/catname.h>
78 #include <catalog/pg_listener.h>
79 #include <access/heapam.h>
80 #include <storage/bufmgr.h>
81 #include <nodes/memnodes.h>
82 #include <utils/mcxt.h>
83 #include <commands/async.h>
84 #include <libpq/libpq.h>
85
86 #ifndef HAVE_STRDUP
87 #  include <port-protos.h>              /* for strdup() */
88 #endif
89
90 #include <storage/lmgr.h>
91
92 static int      notifyFrontEndPending = 0;
93 static int      notifyIssued = 0;
94 static Dllist *pendingNotifies = NULL;
95
96
97 static int      AsyncExistsPendingNotify(char *);
98 static void ClearPendingNotify(void);
99 static void Async_NotifyFrontEnd(void);
100 void        Async_Unlisten(char *relname, int pid);
101 static void Async_UnlistenOnExit(int code, char *relname);
102
103 /*
104  *--------------------------------------------------------------
105  * Async_NotifyHandler --
106  *
107  *              This is the signal handler for SIGUSR2.  When the backend
108  *              is signaled, the backend can be in two states.
109  *              1. If the backend is in the middle of another transaction,
110  *                 we set the flag, notifyFrontEndPending, and wait until
111  *                 the end of the transaction to notify the front end.
112  *              2. If the backend is not in the middle of another transaction,
113  *                 we notify the front end immediately.
114  *
115  *              -- jw, 12/28/93
116  * Results:
117  *              none
118  *
119  * Side effects:
120  *              none
121  */
122 void
123 Async_NotifyHandler(SIGNAL_ARGS)
124 {
125         extern TransactionState CurrentTransactionState;
126
127         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
128                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
129         {
130
131 #ifdef ASYNC_DEBUG
132                 elog(DEBUG, "Waking up sleeping backend process");
133 #endif
134                 Async_NotifyFrontEnd();
135
136         }
137         else
138         {
139 #ifdef ASYNC_DEBUG
140                 elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
141                          CurrentTransactionState->state,
142                          CurrentTransactionState->blockState);
143 #endif
144                 notifyFrontEndPending = 1;
145         }
146 }
147
148 /*
149  *--------------------------------------------------------------
150  * Async_Notify --
151  *
152  *              Adds the relation to the list of pending notifies.
153  *              All notification happens at end of commit.
154  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
155  *
156  *              All notification of backend processes happens here,
157  *              then each backend notifies its corresponding front end at
158  *              the end of commit.
159  *
160  *              This correspond to 'notify <relname>' command
161  *              -- jw, 12/28/93
162  *
163  * Results:
164  *              XXX
165  *
166  * Side effects:
167  *              All tuples for relname in pg_listener are updated.
168  *
169  *--------------------------------------------------------------
170  */
171 void
172 Async_Notify(char *relname)
173 {
174
175         HeapTuple       lTuple,
176                                 rTuple;
177         Relation        lRel;
178         HeapScanDesc sRel;
179         TupleDesc       tdesc;
180         ScanKeyData key;
181         Buffer          b;
182         Datum           d,
183                                 value[3];
184         bool            isnull;
185         char            repl[3],
186                                 nulls[3];
187
188         char       *notifyName;
189
190 #ifdef ASYNC_DEBUG
191         elog(DEBUG, "Async_Notify: %s", relname);
192 #endif
193
194         if (!pendingNotifies)
195                 pendingNotifies = DLNewList();
196
197         /*
198          * Allocate memory from the global malloc pool because it needs to be
199          * referenced also when the transaction is finished.  DZ - 26-08-1996
200          */
201         notifyName = strdup(relname);
202         DLAddHead(pendingNotifies, DLNewElem(notifyName));
203
204         ScanKeyEntryInitialize(&key, 0,
205                                                    Anum_pg_listener_relname,
206                                                    NameEqualRegProcedure,
207                                                    PointerGetDatum(notifyName));
208
209         lRel = heap_openr(ListenerRelationName);
210         tdesc = RelationGetTupleDescriptor(lRel);
211         RelationSetLockForWrite(lRel);
212         sRel = heap_beginscan(lRel, 0, false, 1, &key);
213
214         nulls[0] = nulls[1] = nulls[2] = ' ';
215         repl[0] = repl[1] = repl[2] = ' ';
216         repl[Anum_pg_listener_notify - 1] = 'r';
217         value[0] = value[1] = value[2] = (Datum) 0;
218         value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
219
220         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
221         {
222                 d = heap_getattr(lTuple, b, Anum_pg_listener_notify,
223                                                  tdesc, &isnull);
224                 if (!DatumGetInt32(d))
225                 {
226                         rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
227                         heap_replace(lRel, &lTuple->t_ctid, rTuple);
228                 }
229                 ReleaseBuffer(b);
230         }
231         heap_endscan(sRel);
232         RelationUnsetLockForWrite(lRel);
233         heap_close(lRel);
234         notifyIssued = 1;
235 }
236
237 /*
238  *--------------------------------------------------------------
239  * Async_NotifyAtCommit --
240  *
241  *              Signal our corresponding frontend process on relations that
242  *              were notified.  Signal all other backend process that
243  *              are listening also.
244  *
245  *              -- jw, 12/28/93
246  *
247  * Results:
248  *              XXX
249  *
250  * Side effects:
251  *              Tuples in pg_listener that has our listenerpid are updated so
252  *              that the notification is 0.  We do not want to notify frontend
253  *              more than once.
254  *
255  *              -- jw, 12/28/93
256  *
257  *--------------------------------------------------------------
258  */
259 void
260 Async_NotifyAtCommit()
261 {
262         HeapTuple       lTuple;
263         Relation        lRel;
264         HeapScanDesc sRel;
265         TupleDesc       tdesc;
266         ScanKeyData key;
267         Datum           d;
268         bool            isnull;
269         Buffer          b;
270         extern TransactionState CurrentTransactionState;
271
272         if (!pendingNotifies)
273                 pendingNotifies = DLNewList();
274
275         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
276                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
277         {
278
279                 if (notifyIssued)
280                 {                                               /* 'notify <relname>' issued by us */
281                         notifyIssued = 0;
282                         StartTransactionCommand();
283 #ifdef ASYNC_DEBUG
284                         elog(DEBUG, "Async_NotifyAtCommit.");
285 #endif
286                         ScanKeyEntryInitialize(&key, 0,
287                                                                    Anum_pg_listener_notify,
288                                                                    Integer32EqualRegProcedure,
289                                                                    Int32GetDatum(1));
290                         lRel = heap_openr(ListenerRelationName);
291                         RelationSetLockForWrite(lRel);
292                         sRel = heap_beginscan(lRel, 0, false, 1, &key);
293                         tdesc = RelationGetTupleDescriptor(lRel);
294
295                         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
296                         {
297                                 d = heap_getattr(lTuple, b, Anum_pg_listener_relname,
298                                                                  tdesc, &isnull);
299
300                                 if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
301                                 {
302                                         d = heap_getattr(lTuple, b, Anum_pg_listener_pid,
303                                                                          tdesc, &isnull);
304
305                                         if (MyProcPid == DatumGetInt32(d))
306                                         {
307 #ifdef ASYNC_DEBUG
308                                                 elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
309 #endif
310                                                 notifyFrontEndPending = 1;
311                                         }
312                                         else
313                                         {
314 #ifdef ASYNC_DEBUG
315                                                 elog(DEBUG, "Notifying others");
316 #endif
317 #ifdef HAVE_KILL
318                                                 if (kill(DatumGetInt32(d), SIGUSR2) < 0)
319                                                 {
320                                                         if (errno == ESRCH)
321                                                         {
322                                                                 heap_delete(lRel, &lTuple->t_ctid);
323                                                         }
324                                                 }
325 #endif
326                                         }
327                                 }
328                                 ReleaseBuffer(b);
329                         }
330                         heap_endscan(sRel);
331                         RelationUnsetLockForWrite(lRel);
332                         heap_close(lRel);
333
334                         CommitTransactionCommand();
335                         ClearPendingNotify();
336                 }
337
338                 if (notifyFrontEndPending)
339                 {                                               /* we need to notify the frontend of all
340                                                                  * pending notifies. */
341                         notifyFrontEndPending = 1;
342                         Async_NotifyFrontEnd();
343                 }
344         }
345 }
346
347 /*
348  *--------------------------------------------------------------
349  * Async_NotifyAtAbort --
350  *
351  *              Gets rid of pending notifies.  List elements are automatically
352  *              freed through memory context.
353  *
354  *
355  * Results:
356  *              XXX
357  *
358  * Side effects:
359  *              XXX
360  *
361  *--------------------------------------------------------------
362  */
363 void
364 Async_NotifyAtAbort()
365 {
366         extern TransactionState CurrentTransactionState;
367
368         if (notifyIssued)
369         {
370                 ClearPendingNotify();
371         }
372         notifyIssued = 0;
373         if (pendingNotifies)
374                 DLFreeList(pendingNotifies);
375         pendingNotifies = DLNewList();
376
377         if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
378                 (CurrentTransactionState->blockState == TRANS_DEFAULT))
379         {
380                 if (notifyFrontEndPending)
381                 {                                               /* don't forget to notify front end */
382                         Async_NotifyFrontEnd();
383                 }
384         }
385 }
386
387 /*
388  *--------------------------------------------------------------
389  * Async_Listen --
390  *
391  *              Register a backend (identified by its Unix PID) as listening
392  *              on the specified relation.
393  *
394  *              This corresponds to the 'listen <relation>' command in SQL
395  *
396  *              One listener per relation, pg_listener relation is keyed
397  *              on (relname,pid) to provide multiple listeners in future.
398  *
399  * Results:
400  *              pg_listeners is updated.
401  *
402  * Side effects:
403  *              XXX
404  *
405  *--------------------------------------------------------------
406  */
407 void
408 Async_Listen(char *relname, int pid)
409 {
410         Datum           values[Natts_pg_listener];
411         char            nulls[Natts_pg_listener];
412         TupleDesc       tdesc;
413         HeapScanDesc s;
414         HeapTuple       htup,
415                                 tup;
416         Relation        lDesc;
417         Buffer          b;
418         Datum           d;
419         int                     i;
420         bool            isnull;
421         int                     alreadyListener = 0;
422         char       *relnamei;
423         TupleDesc       tupDesc;
424
425 #ifdef ASYNC_DEBUG
426         elog(DEBUG, "Async_Listen: %s", relname);
427 #endif
428         for (i = 0; i < Natts_pg_listener; i++)
429         {
430                 nulls[i] = ' ';
431                 values[i] = PointerGetDatum(NULL);
432         }
433
434         i = 0;
435         values[i++] = (Datum) relname;
436         values[i++] = (Datum) pid;
437         values[i++] = (Datum) 0;        /* no notifies pending */
438
439         lDesc = heap_openr(ListenerRelationName);
440         RelationSetLockForWrite(lDesc);
441
442         /* is someone already listening.  One listener per relation */
443         tdesc = RelationGetTupleDescriptor(lDesc);
444         s = heap_beginscan(lDesc, 0, false, 0, (ScanKey) NULL);
445         while (HeapTupleIsValid(htup = heap_getnext(s, 0, &b)))
446         {
447                 d = heap_getattr(htup, b, Anum_pg_listener_relname, tdesc,
448                                                  &isnull);
449                 relnamei = DatumGetPointer(d);
450                 if (!strncmp(relnamei, relname, NAMEDATALEN))
451                 {
452                         d = heap_getattr(htup, b, Anum_pg_listener_pid, tdesc, &isnull);
453                         pid = DatumGetInt32(d);
454                         if (pid == MyProcPid)
455                         {
456                                 alreadyListener = 1;
457                         }
458                 }
459                 ReleaseBuffer(b);
460         }
461         heap_endscan(s);
462
463         if (alreadyListener)
464         {
465                 elog(NOTICE, "Async_Listen: We are already listening on %s",
466                          relname);
467                 return;
468         }
469
470         tupDesc = lDesc->rd_att;
471         tup = heap_formtuple(tupDesc,
472                                                  values,
473                                                  nulls);
474         heap_insert(lDesc, tup);
475
476         pfree(tup);
477
478         /*
479          * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
480          * listener on %s (possibly dead)",relname); }
481          */
482
483         RelationUnsetLockForWrite(lDesc);
484         heap_close(lDesc);
485
486         /*
487          * now that we are listening, we should make a note to ourselves to
488          * unlisten prior to dying.
489          */
490         relnamei = malloc(NAMEDATALEN);         /* persists to process exit */
491         StrNCpy(relnamei, relname, NAMEDATALEN);
492         on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
493 }
494
495 /*
496  *--------------------------------------------------------------
497  * Async_Unlisten --
498  *
499  *              Remove the backend from the list of listening backends
500  *              for the specified relation.
501  *
502  *              This would correspond to the 'unlisten <relation>'
503  *              command, but there isn't one yet.
504  *
505  * Results:
506  *              pg_listeners is updated.
507  *
508  * Side effects:
509  *              XXX
510  *
511  *--------------------------------------------------------------
512  */
513 void
514 Async_Unlisten(char *relname, int pid)
515 {
516         Relation        lDesc;
517         HeapTuple       lTuple;
518
519         lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
520                                                                  Int32GetDatum(pid),
521                                                                  0, 0);
522         lDesc = heap_openr(ListenerRelationName);
523         RelationSetLockForWrite(lDesc);
524
525         if (lTuple != NULL)
526         {
527                 heap_delete(lDesc, &lTuple->t_ctid);
528         }
529
530         RelationUnsetLockForWrite(lDesc);
531         heap_close(lDesc);
532 }
533
534 static void
535 Async_UnlistenOnExit(int code,  /* from exitpg */
536                                          char *relname)
537 {
538         Async_Unlisten((char *) relname, MyProcPid);
539 }
540
541 /*
542  * --------------------------------------------------------------
543  * Async_NotifyFrontEnd --
544  *
545  *              Perform an asynchronous notification to front end over
546  *              portal comm channel.  The name of the relation which contains the
547  *              data is sent to the front end.
548  *
549  *              We remove the notification flag from the pg_listener tuple
550  *              associated with our process.
551  *
552  * Results:
553  *              XXX
554  *
555  * Side effects:
556  *
557  *              We make use of the out-of-band channel to transmit the
558  *              notification to the front end.  The actual data transfer takes
559  *              place at the front end's request.
560  *
561  * --------------------------------------------------------------
562  */
563 GlobalMemory notifyContext = NULL;
564
565 static void
566 Async_NotifyFrontEnd()
567 {
568         extern CommandDest whereToSendOutput;
569         HeapTuple       lTuple,
570                                 rTuple;
571         Relation        lRel;
572         HeapScanDesc sRel;
573         TupleDesc       tdesc;
574         ScanKeyData key[2];
575         Datum           d,
576                                 value[3];
577         char            repl[3],
578                                 nulls[3];
579         Buffer          b;
580         bool            isnull;
581
582         notifyFrontEndPending = 0;
583
584 #ifdef ASYNC_DEBUG
585         elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
586 #endif
587
588         StartTransactionCommand();
589         ScanKeyEntryInitialize(&key[0], 0,
590                                                    Anum_pg_listener_notify,
591                                                    Integer32EqualRegProcedure,
592                                                    Int32GetDatum(1));
593         ScanKeyEntryInitialize(&key[1], 0,
594                                                    Anum_pg_listener_pid,
595                                                    Integer32EqualRegProcedure,
596                                                    Int32GetDatum(MyProcPid));
597         lRel = heap_openr(ListenerRelationName);
598         RelationSetLockForWrite(lRel);
599         tdesc = RelationGetTupleDescriptor(lRel);
600         sRel = heap_beginscan(lRel, 0, false, 2, key);
601
602         nulls[0] = nulls[1] = nulls[2] = ' ';
603         repl[0] = repl[1] = repl[2] = ' ';
604         repl[Anum_pg_listener_notify - 1] = 'r';
605         value[0] = value[1] = value[2] = (Datum) 0;
606         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
607
608         while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b)))
609         {
610                 d = heap_getattr(lTuple, b, Anum_pg_listener_relname,
611                                                  tdesc, &isnull);
612                 rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
613                 heap_replace(lRel, &lTuple->t_ctid, rTuple);
614
615                 /* notifying the front end */
616
617                 if (whereToSendOutput == Remote)
618                 {
619                         pq_putnchar("A", 1);
620                         pq_putint((int32)MyProcPid, sizeof(int32));
621                         pq_putstr(DatumGetName(d)->data);
622                         pq_flush();
623                 }
624                 else
625                 {
626                         elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
627                 }
628                 ReleaseBuffer(b);
629         }
630         CommitTransactionCommand();
631 }
632
633 static int
634 AsyncExistsPendingNotify(char *relname)
635 {
636         Dlelem     *p;
637
638         for (p = DLGetHead(pendingNotifies);
639                  p != NULL;
640                  p = DLGetSucc(p))
641         {
642                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
643                 if (!strncmp((const char *) DLE_VAL(p), relname, NAMEDATALEN))
644                         return 1;
645         }
646
647         return 0;
648 }
649
650 static void
651 ClearPendingNotify()
652 {
653         Dlelem     *p;
654
655         while ((p = DLRemHead(pendingNotifies)) != NULL)
656                 free(DLE_VAL(p));
657 }