]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
* Most changes are to fix warnings issued when compiling win32
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db base.
13  *
14  *      Copyright (c) 2001-2003, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.67 2004/04/19 17:42:58 momjian Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netdb.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <signal.h>
32
33 #include "pgstat.h"
34
35 #include "access/xact.h"
36 #include "access/heapam.h"
37 #include "catalog/catname.h"
38 #include "catalog/pg_shadow.h"
39 #include "catalog/pg_database.h"
40 #include "libpq/pqsignal.h"
41 #include "libpq/libpq.h"
42 #include "mb/pg_wchar.h"
43 #include "miscadmin.h"
44 #include "utils/memutils.h"
45 #include "storage/backendid.h"
46 #include "storage/ipc.h"
47 #include "storage/pg_shmem.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/rel.h"
50 #include "utils/hsearch.h"
51 #include "utils/ps_status.h"
52 #include "utils/syscache.h"
53
54 #ifdef EXEC_BACKEND
55 #include "utils/guc.h"
56 #endif
57
58 #ifdef WIN32
59 extern pid_t win32_forkexec(const char* path, char *argv[]);
60 #endif
61
62 /* ----------
63  * GUC parameters
64  * ----------
65  */
66 bool            pgstat_collect_startcollector = true;
67 bool            pgstat_collect_resetonpmstart = true;
68 bool            pgstat_collect_querystring = false;
69 bool            pgstat_collect_tuplelevel = false;
70 bool            pgstat_collect_blocklevel = false;
71
72 /* ----------
73  * Other global variables
74  * ----------
75  */
76 bool            pgstat_is_running = false;
77
78 /* ----------
79  * Local data
80  * ----------
81  */
82 NON_EXEC_STATIC int     pgStatSock = -1;
83 static int      pgStatPipe[2];
84 static struct sockaddr_storage pgStatAddr;
85 static int      pgStatPmPipe[2] = {-1, -1};
86 static int  pgStatCollectorPmPipe[2] = {-1, -1};
87
88 static int      pgStatPid;
89 static time_t last_pgstat_start_time;
90
91 static long pgStatNumMessages = 0;
92
93 static bool pgStatRunningInCollector = FALSE;
94
95 static int      pgStatTabstatAlloc = 0;
96 static int      pgStatTabstatUsed = 0;
97 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
98
99 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
100
101 static int      pgStatXactCommit = 0;
102 static int      pgStatXactRollback = 0;
103
104 static TransactionId pgStatDBHashXact = InvalidTransactionId;
105 static HTAB *pgStatDBHash = NULL;
106 static HTAB *pgStatBeDead = NULL;
107 static PgStat_StatBeEntry *pgStatBeTable = NULL;
108 static int      pgStatNumBackends = 0;
109
110 static char pgStat_tmpfname[MAXPGPATH];
111 static char pgStat_fname[MAXPGPATH];
112
113
114 /* ----------
115  * Local function forward declarations
116  * ----------
117  */
118 #ifdef EXEC_BACKEND
119 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
120 static void pgstat_parseArgs(PGSTAT_FORK_ARGS);
121 #endif
122 NON_EXEC_STATIC void pgstat_main(PGSTAT_FORK_ARGS);
123 NON_EXEC_STATIC void pgstat_mainChild(PGSTAT_FORK_ARGS);
124 static void pgstat_mainInit(void);
125 static void pgstat_recvbuffer(void);
126 static void pgstat_die(SIGNAL_ARGS);
127
128 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
129 static void pgstat_sub_backend(int procpid);
130 static void pgstat_drop_database(Oid databaseid);
131 static void pgstat_write_statsfile(void);
132 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
133                                           PgStat_StatBeEntry **betab,
134                                           int *numbackends);
135
136 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
137 static void pgstat_send(void *msg, int len);
138
139 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
140 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
141 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
142 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
143 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
144 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
145 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
146
147
148 /* ------------------------------------------------------------
149  * Public functions called from postmaster follow
150  * ------------------------------------------------------------
151  */
152
153 #ifdef EXEC_BACKEND
154
155 void
156 pgstat_init_forkexec_backend(void)
157 {
158         Assert(DataDir != NULL);
159         snprintf(pgStat_fname, MAXPGPATH,
160                          PGSTAT_STAT_FILENAME, DataDir);
161 }
162
163 #endif
164
165 /* ----------
166  * pgstat_init() -
167  *
168  *      Called from postmaster at startup. Create the resources required
169  *      by the statistics collector process.  If unable to do so, do not
170  *      fail --- better to let the postmaster start with stats collection
171  *      disabled.
172  * ----------
173  */
174 void
175 pgstat_init(void)
176 {
177         ACCEPT_TYPE_ARG3 alen;
178         struct addrinfo *addrs = NULL,
179                            *addr,
180                                 hints;
181         int                     ret;
182         fd_set      rset;
183         struct timeval tv;
184         char        test_byte;
185         int         sel_res;
186
187 #define TESTBYTEVAL ((char) 199)
188
189         /*
190          * Force start of collector daemon if something to collect
191          */
192         if (pgstat_collect_querystring ||
193                 pgstat_collect_tuplelevel ||
194                 pgstat_collect_blocklevel)
195                 pgstat_collect_startcollector = true;
196
197         /*
198          * Initialize the filenames for the status reports.
199          */
200         snprintf(pgStat_tmpfname, MAXPGPATH,
201                          PGSTAT_STAT_TMPFILE, DataDir, getpid());
202         snprintf(pgStat_fname, MAXPGPATH,
203                          PGSTAT_STAT_FILENAME, DataDir);
204
205         /*
206          * If we don't have to start a collector or should reset the collected
207          * statistics on postmaster start, simply remove the file.
208          */
209         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
210                 unlink(pgStat_fname);
211
212         /*
213          * Nothing else required if collector will not get started
214          */
215         if (!pgstat_collect_startcollector)
216                 return;
217
218         /*
219          * Create the UDP socket for sending and receiving statistic messages
220          */
221         hints.ai_flags = AI_PASSIVE;
222         hints.ai_family = PF_UNSPEC;
223         hints.ai_socktype = SOCK_DGRAM;
224         hints.ai_protocol = 0;
225         hints.ai_addrlen = 0;
226         hints.ai_addr = NULL;
227         hints.ai_canonname = NULL;
228         hints.ai_next = NULL;
229         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
230         if (ret || !addrs)
231         {
232                 ereport(LOG,
233                                 (errmsg("could not resolve \"localhost\": %s",
234                                                 gai_strerror(ret))));
235                 goto startup_failed;
236         }
237
238         /*
239          * On some platforms, getaddrinfo_all() may return multiple addresses
240          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
241          * when kernel will reject IPv6).  Worse, the failure may occur at the
242          * bind() or perhaps even connect() stage.  So we must loop through the
243          * results till we find a working combination.  We will generate LOG
244          * messages, but no error, for bogus combinations.
245          */
246         for (addr = addrs; addr; addr = addr->ai_next)
247         {
248 #ifdef HAVE_UNIX_SOCKETS
249                 /* Ignore AF_UNIX sockets, if any are returned. */
250                 if (addr->ai_family == AF_UNIX)
251                         continue;
252 #endif
253                 /*
254                  * Create the socket.
255                  */
256                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
257                 {
258                         ereport(LOG,
259                                         (errcode_for_socket_access(),
260                                          errmsg("could not create socket for statistics collector: %m")));
261                         continue;
262                 }
263
264                 /*
265                  * Bind it to a kernel assigned port on localhost and get the assigned
266                  * port via getsockname().
267                  */
268                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
269                 {
270                         ereport(LOG,
271                                         (errcode_for_socket_access(),
272                                          errmsg("could not bind socket for statistics collector: %m")));
273                         closesocket(pgStatSock);
274                         pgStatSock = -1;
275                         continue;
276                 }
277
278                 alen = sizeof(pgStatAddr);
279                 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
280                 {
281                         ereport(LOG,
282                                         (errcode_for_socket_access(),
283                                          errmsg("could not get address of socket for statistics collector: %m")));
284                         closesocket(pgStatSock);
285                         pgStatSock = -1;
286                         continue;
287                 }
288
289                 /*
290                  * Connect the socket to its own address.  This saves a few cycles by
291                  * not having to respecify the target address on every send. This also
292                  * provides a kernel-level check that only packets from this same
293                  * address will be received.
294                  */
295                 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
296                 {
297                         ereport(LOG,
298                                         (errcode_for_socket_access(),
299                                          errmsg("could not connect socket for statistics collector: %m")));
300                         closesocket(pgStatSock);
301                         pgStatSock = -1;
302                         continue;
303                 }
304
305                 /*
306                  * Try to send and receive a one-byte test message on the socket.
307                  * This is to catch situations where the socket can be created but
308                  * will not actually pass data (for instance, because kernel packet
309                  * filtering rules prevent it).
310                  */
311                 test_byte = TESTBYTEVAL;
312                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
313                 {
314                         ereport(LOG,
315                                         (errcode_for_socket_access(),
316                                          errmsg("could not send test message on socket for statistics collector: %m")));
317                         closesocket(pgStatSock);
318                         pgStatSock = -1;
319                         continue;
320                 }
321
322                 /*
323                  * There could possibly be a little delay before the message can be
324                  * received.  We arbitrarily allow up to half a second before deciding
325                  * it's broken.
326                  */
327                 for (;;)                                /* need a loop to handle EINTR */
328                 {
329                         FD_ZERO(&rset);
330                         FD_SET(pgStatSock, &rset);
331                         tv.tv_sec = 0;
332                         tv.tv_usec = 500000;
333                         sel_res = select(pgStatSock+1, &rset, NULL, NULL, &tv);
334                         if (sel_res >= 0 || errno != EINTR)
335                                 break;
336                 }
337                 if (sel_res < 0)
338                 {
339                         ereport(LOG,
340                                         (errcode_for_socket_access(),
341                                          errmsg("select() failed in statistics collector: %m")));
342                         closesocket(pgStatSock);
343                         pgStatSock = -1;
344                         continue;
345                 }
346                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
347                 {
348                         /*
349                          * This is the case we actually think is likely, so take pains to
350                          * give a specific message for it.
351                          *
352                          * errno will not be set meaningfully here, so don't use it.
353                          */
354                         ereport(LOG,
355                                         (ERRCODE_CONNECTION_FAILURE,
356                                          errmsg("test message did not get through on socket for statistics collector")));
357                         closesocket(pgStatSock);
358                         pgStatSock = -1;
359                         continue;
360                 }
361
362                 test_byte++;                    /* just make sure variable is changed */
363
364                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
365                 {
366                         ereport(LOG,
367                                         (errcode_for_socket_access(),
368                                          errmsg("could not receive test message on socket for statistics collector: %m")));
369                         closesocket(pgStatSock);
370                         pgStatSock = -1;
371                         continue;
372                 }
373
374                 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
375                 {
376                         ereport(LOG,
377                                         (ERRCODE_INTERNAL_ERROR,
378                                          errmsg("incorrect test message transmission on socket for statistics collector")));
379                         closesocket(pgStatSock);
380                         pgStatSock = -1;
381                         continue;
382                 }
383
384                 /* If we get here, we have a working socket */
385                 break;
386         }
387
388         /* Did we find a working address? */
389         if (!addr || pgStatSock < 0)
390         {
391                 ereport(LOG,
392                                 (errcode_for_socket_access(),
393                                  errmsg("disabling statistics collector for lack of working socket")));
394                 goto startup_failed;
395         }
396
397         /*
398          * Set the socket to non-blocking IO.  This ensures that if the
399          * collector falls behind (despite the buffering process), statistics
400          * messages will be discarded; backends won't block waiting to send
401          * messages to the collector.
402          */
403         if (!set_noblock(pgStatSock))
404         {
405                 ereport(LOG,
406                                 (errcode_for_socket_access(),
407                 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
408                 goto startup_failed;
409         }
410
411         /*
412          * Create the pipe that controls the statistics collector shutdown
413          */
414         if (pgpipe(pgStatPmPipe) < 0 || pgpipe(pgStatCollectorPmPipe) < 0)
415         {
416                 ereport(LOG,
417                                 (errcode_for_socket_access(),
418                   errmsg("could not create pipe for statistics collector: %m")));
419                 goto startup_failed;
420         }
421
422         freeaddrinfo_all(hints.ai_family, addrs);
423
424         return;
425
426 startup_failed:
427         if (addrs)
428                 freeaddrinfo_all(hints.ai_family, addrs);
429
430         if (pgStatSock >= 0)
431                 closesocket(pgStatSock);
432         pgStatSock = -1;
433
434         /* Adjust GUC variables to suppress useless activity */
435         pgstat_collect_startcollector = false;
436         pgstat_collect_querystring = false;
437         pgstat_collect_tuplelevel = false;
438         pgstat_collect_blocklevel = false;
439 }
440
441
442 #ifdef EXEC_BACKEND
443
444 /* ----------
445  * pgstat_forkexec() -
446  *
447  * Used to format up the arglist for, then fork and exec, statistics
448  * (buffer and collector) processes
449  *
450  */
451 static pid_t
452 pgstat_forkexec(STATS_PROCESS_TYPE procType)
453 {
454         pid_t pid;
455         char *av[15];
456         int ac = 0, bufc = 0, i;
457         char pgstatBuf[12][MAXPGPATH];
458
459         av[ac++] = "postgres";
460         switch (procType)
461         {
462                 case STAT_PROC_BUFFER:
463                         av[ac++] = "-statBuf";
464                         break;
465
466                 case STAT_PROC_COLLECTOR:
467                         av[ac++] = "-statCol";
468                         break;
469
470                 default:
471                         Assert(false);
472         }
473
474         /* Sockets + pipes */
475         bufc = 0;
476         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatSock);
477         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPmPipe[0]);
478         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPmPipe[1]);
479         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatCollectorPmPipe[0]);
480         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatCollectorPmPipe[1]);
481         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPipe[0]);
482         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",pgStatPipe[1]);
483
484         /* + misc */
485         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%d",MaxBackends);
486
487         /* + the pstat file names, and postgres pathname */
488         /* FIXME: [fork/exec] whitespaces in directories? */
489         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%s",pgStat_tmpfname);
490         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%s",pgStat_fname);
491         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%s",pg_pathname);
492         snprintf(pgstatBuf[bufc++],MAXPGPATH,"%s",DataDir);
493
494         /* Add to the arg list */
495         Assert(bufc <= lengthof(pgstatBuf));
496         for (i = 0; i < bufc; i++)
497                 av[ac++] = pgstatBuf[i];
498
499         av[ac++] = NULL;
500         Assert(ac <= lengthof(av));
501
502         /* Fire off execv in child */
503 #ifdef WIN32
504         pid = win32_forkexec(pg_pathname,av);
505 #else
506         if ((pid = fork()) == 0 && (execv(pg_pathname,av) == -1))
507                 /* FIXME: [fork/exec] suggestions for what to do here? Can't call elog... */
508                 abort();
509 #endif
510         return pid; /* Parent returns pid */
511 }
512
513
514 /* ----------
515  * pgstat_parseArgs() -
516  *
517  * Used to unformat the arglist for exec'ed statistics
518  * (buffer and collector) processes
519  *
520  */
521 static void
522 pgstat_parseArgs(PGSTAT_FORK_ARGS)
523 {
524         Assert(argc == 12);
525         argc = 0;
526         pgStatSock              = atoi(argv[argc++]);
527         pgStatPmPipe[0] = atoi(argv[argc++]);
528         pgStatPmPipe[1] = atoi(argv[argc++]);
529         pgStatCollectorPmPipe[0] = atoi(argv[argc++]);
530         pgStatCollectorPmPipe[1] = atoi(argv[argc++]);
531         pgStatPipe[0]   = atoi(argv[argc++]);
532         pgStatPipe[1]   = atoi(argv[argc++]);
533         MaxBackends             = atoi(argv[argc++]);
534         strncpy(pgStat_tmpfname,argv[argc++],MAXPGPATH);
535         strncpy(pgStat_fname,   argv[argc++],MAXPGPATH);
536         strncpy(pg_pathname,    argv[argc++],MAXPGPATH);
537         DataDir                 = strdup(argv[argc++]);
538
539         read_nondefault_variables();
540 }
541
542 #endif
543
544 /* ----------
545  * pgstat_start() -
546  *
547  *      Called from postmaster at startup or after an existing collector
548  *      died.  Attempt to fire up a fresh statistics collector.
549  *
550  *      Note: if fail, we will be called again from the postmaster main loop.
551  * ----------
552  */
553 void
554 pgstat_start(void)
555 {
556         time_t          curtime;
557
558         /*
559          * Do nothing if no collector needed
560          */
561         if (pgstat_is_running || !pgstat_collect_startcollector)
562                 return;
563
564         /*
565          * Do nothing if too soon since last collector start.  This is a
566          * safety valve to protect against continuous respawn attempts if the
567          * collector is dying immediately at launch.  Note that since we will
568          * be re-called from the postmaster main loop, we will get another
569          * chance later.
570          */
571         curtime = time(NULL);
572         if ((unsigned int) (curtime - last_pgstat_start_time) <
573                 (unsigned int) PGSTAT_RESTART_INTERVAL)
574                 return;
575         last_pgstat_start_time = curtime;
576
577         /*
578          * Check that the socket is there, else pgstat_init failed.
579          */
580         if (pgStatSock < 0)
581         {
582                 ereport(LOG,
583                                 (errmsg("statistics collector startup skipped")));
584
585                 /*
586                  * We can only get here if someone tries to manually turn
587                  * pgstat_collect_startcollector on after it had been off.
588                  */
589                 pgstat_collect_startcollector = false;
590                 return;
591         }
592
593         /*
594          * Okay, fork off the collector.  Remember its PID for
595          * pgstat_ispgstat.
596          */
597
598         fflush(stdout);
599         fflush(stderr);
600
601 #ifdef __BEOS__
602         /* Specific beos actions before backend startup */
603         beos_before_backend_startup();
604 #endif
605
606 #ifdef EXEC_BACKEND
607         switch ((pgStatPid = (int) pgstat_forkexec(STAT_PROC_BUFFER)))
608 #else
609         switch ((pgStatPid = (int) fork()))
610 #endif
611         {
612                 case -1:
613 #ifdef __BEOS__
614                         /* Specific beos actions */
615                         beos_backend_startup_failed();
616 #endif
617                         ereport(LOG,
618                                         (errmsg("could not fork statistics buffer: %m")));
619                         return;
620
621 #ifndef EXEC_BACKEND
622                 case 0:
623                         /* in postmaster child ... */
624 #ifdef __BEOS__
625                         /* Specific beos actions after backend startup */
626                         beos_backend_startup();
627 #endif
628                         /* Close the postmaster's sockets, except for pgstat link */
629                         ClosePostmasterPorts(false);
630
631                         /* Drop our connection to postmaster's shared memory, as well */
632                         PGSharedMemoryDetach();
633
634                         pgstat_main();
635                         break;
636 #endif
637
638                 default:
639                         pgstat_is_running = true;
640                         return;
641         }
642 }
643
644
645 /* ----------
646  * pgstat_ispgstat() -
647  *
648  *      Called from postmaster to check if a terminated child process
649  *      was the statistics collector.
650  * ----------
651  */
652 bool
653 pgstat_ispgstat(int pid)
654 {
655         if (!pgstat_is_running)
656                 return false;
657
658         if (pgStatPid != pid)
659                 return false;
660
661         /* Oh dear ... */
662         pgstat_is_running = false;
663
664         return true;
665 }
666
667
668 /* ----------
669  * pgstat_close_sockets() -
670  *
671  *      Called when postmaster forks a non-pgstat child process, to close off
672  *      file descriptors that should not be held open in child processes.
673  * ----------
674  */
675 void
676 pgstat_close_sockets(void)
677 {
678         if (pgStatPmPipe[0] >= 0)
679                 closesocket(pgStatPmPipe[0]);
680         pgStatPmPipe[0] = -1;
681         if (pgStatPmPipe[1] >= 0)
682                 closesocket(pgStatPmPipe[1]);
683         pgStatPmPipe[1] = -1;
684         if (pgStatCollectorPmPipe[0] >= 0)
685                 closesocket(pgStatCollectorPmPipe[0]);
686         pgStatCollectorPmPipe[0] = -1;
687         if (pgStatCollectorPmPipe[1] >= 0)
688                 closesocket(pgStatCollectorPmPipe[1]);
689         pgStatCollectorPmPipe[1] = -1;
690 }
691
692
693 /* ----------
694  * pgstat_beterm() -
695  *
696  *      Called from postmaster to tell collector a backend terminated.
697  * ----------
698  */
699 void
700 pgstat_beterm(int pid)
701 {
702         PgStat_MsgBeterm msg;
703
704         if (pgStatSock < 0)
705                 return;
706
707         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
708         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
709         msg.m_hdr.m_procpid = pid;
710
711         pgstat_send(&msg, sizeof(msg));
712 }
713
714
715 /* ------------------------------------------------------------
716  * Public functions used by backends follow
717  *------------------------------------------------------------
718  */
719
720
721 /* ----------
722  * pgstat_bestart() -
723  *
724  *      Tell the collector that this new backend is soon ready to process
725  *      queries. Called from tcop/postgres.c before entering the mainloop.
726  * ----------
727  */
728 void
729 pgstat_bestart(void)
730 {
731         PgStat_MsgBestart msg;
732
733         if (pgStatSock < 0)
734                 return;
735
736         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
737         pgstat_send(&msg, sizeof(msg));
738 }
739
740
741 /* ----------
742  * pgstat_report_activity() -
743  *
744  *      Called in tcop/postgres.c to tell the collector what the backend
745  *      is actually doing (usually "<IDLE>" or the start of the query to
746  *      be executed).
747  * ----------
748  */
749 void
750 pgstat_report_activity(const char *what)
751 {
752         PgStat_MsgActivity msg;
753         int                     len;
754
755         if (!pgstat_collect_querystring || pgStatSock < 0)
756                 return;
757
758         len = strlen(what);
759         len = pg_mbcliplen((const unsigned char *) what, len,
760                                            PGSTAT_ACTIVITY_SIZE - 1);
761
762         memcpy(msg.m_what, what, len);
763         msg.m_what[len] = '\0';
764         len += offsetof(PgStat_MsgActivity, m_what) +1;
765
766         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
767         pgstat_send(&msg, len);
768 }
769
770
771 /* ----------
772  * pgstat_report_tabstat() -
773  *
774  *      Called from tcop/postgres.c to send the so far collected
775  *      per table access statistics to the collector.
776  * ----------
777  */
778 void
779 pgstat_report_tabstat(void)
780 {
781         int                     i;
782
783         if (pgStatSock < 0 ||
784                 !(pgstat_collect_querystring ||
785                   pgstat_collect_tuplelevel ||
786                   pgstat_collect_blocklevel))
787         {
788                 /* Not reporting stats, so just flush whatever we have */
789                 pgStatTabstatUsed = 0;
790                 return;
791         }
792
793         /*
794          * For each message buffer used during the last query set the header
795          * fields and send it out.
796          */
797         for (i = 0; i < pgStatTabstatUsed; i++)
798         {
799                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
800                 int                     n;
801                 int                     len;
802
803                 n = tsmsg->m_nentries;
804                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
805                         n * sizeof(PgStat_TableEntry);
806
807                 tsmsg->m_xact_commit = pgStatXactCommit;
808                 tsmsg->m_xact_rollback = pgStatXactRollback;
809                 pgStatXactCommit = 0;
810                 pgStatXactRollback = 0;
811
812                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
813                 pgstat_send(tsmsg, len);
814         }
815
816         pgStatTabstatUsed = 0;
817 }
818
819
820 /* ----------
821  * pgstat_vacuum_tabstat() -
822  *
823  *      Will tell the collector about objects he can get rid of.
824  * ----------
825  */
826 int
827 pgstat_vacuum_tabstat(void)
828 {
829         Relation        dbrel;
830         HeapScanDesc dbscan;
831         HeapTuple       dbtup;
832         Oid                *dbidlist;
833         int                     dbidalloc;
834         int                     dbidused;
835         HASH_SEQ_STATUS hstat;
836         PgStat_StatDBEntry *dbentry;
837         PgStat_StatTabEntry *tabentry;
838         HeapTuple       reltup;
839         int                     nobjects = 0;
840         PgStat_MsgTabpurge msg;
841         int                     len;
842         int                     i;
843
844         if (pgStatSock < 0)
845                 return 0;
846
847         /*
848          * If not done for this transaction, read the statistics collector
849          * stats file into some hash tables.
850          */
851         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
852         {
853                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
854                                                           &pgStatBeTable, &pgStatNumBackends);
855                 pgStatDBHashXact = GetCurrentTransactionId();
856         }
857
858         /*
859          * Lookup our own database entry
860          */
861         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
862                                                                                                  (void *) &MyDatabaseId,
863                                                                                                  HASH_FIND, NULL);
864         if (dbentry == NULL)
865                 return -1;
866
867         if (dbentry->tables == NULL)
868                 return 0;
869
870         /*
871          * Initialize our messages table counter to zero
872          */
873         msg.m_nentries = 0;
874
875         /*
876          * Check for all tables if they still exist.
877          */
878         hash_seq_init(&hstat, dbentry->tables);
879         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
880         {
881                 /*
882                  * Check if this relation is still alive by looking up it's
883                  * pg_class tuple in the system catalog cache.
884                  */
885                 reltup = SearchSysCache(RELOID,
886                                                                 ObjectIdGetDatum(tabentry->tableid),
887                                                                 0, 0, 0);
888                 if (HeapTupleIsValid(reltup))
889                 {
890                         ReleaseSysCache(reltup);
891                         continue;
892                 }
893
894                 /*
895                  * Add this tables Oid to the message
896                  */
897                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
898                 nobjects++;
899
900                 /*
901                  * If the message is full, send it out and reinitialize ot zero
902                  */
903                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
904                 {
905                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
906                                 +msg.m_nentries * sizeof(Oid);
907
908                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
909                         pgstat_send(&msg, len);
910
911                         msg.m_nentries = 0;
912                 }
913         }
914
915         /*
916          * Send the rest
917          */
918         if (msg.m_nentries > 0)
919         {
920                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
921                         +msg.m_nentries * sizeof(Oid);
922
923                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
924                 pgstat_send(&msg, len);
925         }
926
927         /*
928          * Read pg_database and remember the Oid's of all existing databases
929          */
930         dbidalloc = 256;
931         dbidused = 0;
932         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
933
934         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
935         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
936         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
937         {
938                 if (dbidused >= dbidalloc)
939                 {
940                         dbidalloc *= 2;
941                         dbidlist = (Oid *) repalloc((char *) dbidlist,
942                                                                                 sizeof(Oid) * dbidalloc);
943                 }
944                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
945         }
946         heap_endscan(dbscan);
947         heap_close(dbrel, AccessShareLock);
948
949         /*
950          * Search the database hash table for dead databases and tell the
951          * collector to drop them as well.
952          */
953         hash_seq_init(&hstat, pgStatDBHash);
954         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
955         {
956                 Oid                     dbid = dbentry->databaseid;
957
958                 for (i = 0; i < dbidused; i++)
959                 {
960                         if (dbidlist[i] == dbid)
961                         {
962                                 dbid = InvalidOid;
963                                 break;
964                         }
965                 }
966
967                 if (dbid != InvalidOid)
968                 {
969                         nobjects++;
970                         pgstat_drop_database(dbid);
971                 }
972         }
973
974         /*
975          * Free the dbid list.
976          */
977         pfree((char *) dbidlist);
978
979         /*
980          * Tell the caller how many removeable objects we found
981          */
982         return nobjects;
983 }
984
985
986 /* ----------
987  * pgstat_drop_database() -
988  *
989  *      Tell the collector that we just dropped a database.
990  *      This is the only message that shouldn't get lost in space. Otherwise
991  *      the collector will keep the statistics for the dead DB until his
992  *      stats file got removed while the postmaster is down.
993  * ----------
994  */
995 static void
996 pgstat_drop_database(Oid databaseid)
997 {
998         PgStat_MsgDropdb msg;
999
1000         if (pgStatSock < 0)
1001                 return;
1002
1003         msg.m_databaseid = databaseid;
1004
1005         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1006         pgstat_send(&msg, sizeof(msg));
1007 }
1008
1009
1010 /* ----------
1011  * pgstat_reset_counters() -
1012  *
1013  *      Tell the statistics collector to reset counters for our database.
1014  * ----------
1015  */
1016 void
1017 pgstat_reset_counters(void)
1018 {
1019         PgStat_MsgResetcounter msg;
1020
1021         if (pgStatSock < 0)
1022                 return;
1023
1024         if (!superuser())
1025                 ereport(ERROR,
1026                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1027                           errmsg("must be superuser to reset statistics counters")));
1028
1029         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1030         pgstat_send(&msg, sizeof(msg));
1031 }
1032
1033
1034 /* ----------
1035  * pgstat_ping() -
1036  *
1037  *      Send some junk data to the collector to increase traffic.
1038  * ----------
1039  */
1040 void
1041 pgstat_ping(void)
1042 {
1043         PgStat_MsgDummy msg;
1044
1045         if (pgStatSock < 0)
1046                 return;
1047
1048         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1049         pgstat_send(&msg, sizeof(msg));
1050 }
1051
1052 /*
1053  * Create or enlarge the pgStatTabstatMessages array
1054  */
1055 static bool
1056 more_tabstat_space(void)
1057 {
1058         PgStat_MsgTabstat *newMessages;
1059         PgStat_MsgTabstat **msgArray;
1060         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
1061         int                     i;
1062
1063         /* Create (another) quantum of message buffers */
1064         newMessages = (PgStat_MsgTabstat *)
1065                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1066         if (newMessages == NULL)
1067         {
1068                 ereport(LOG,
1069                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1070                                  errmsg("out of memory")));
1071                 return false;
1072         }
1073
1074         /* Create or enlarge the pointer array */
1075         if (pgStatTabstatMessages == NULL)
1076                 msgArray = (PgStat_MsgTabstat **)
1077                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1078         else
1079                 msgArray = (PgStat_MsgTabstat **)
1080                         realloc(pgStatTabstatMessages,
1081                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1082         if (msgArray == NULL)
1083         {
1084                 free(newMessages);
1085                 ereport(LOG,
1086                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1087                                  errmsg("out of memory")));
1088                 return false;
1089         }
1090
1091         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1092         for (i = 0; i < TABSTAT_QUANTUM; i++)
1093                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1094         pgStatTabstatMessages = msgArray;
1095         pgStatTabstatAlloc = newAlloc;
1096
1097         return true;
1098 }
1099
1100 /* ----------
1101  * pgstat_initstats() -
1102  *
1103  *      Called from various places usually dealing with initialization
1104  *      of Relation or Scan structures. The data placed into these
1105  *      structures from here tell where later to count for buffer reads,
1106  *      scans and tuples fetched.
1107  * ----------
1108  */
1109 void
1110 pgstat_initstats(PgStat_Info *stats, Relation rel)
1111 {
1112         Oid                     rel_id = rel->rd_id;
1113         PgStat_TableEntry *useent;
1114         PgStat_MsgTabstat *tsmsg;
1115         int                     mb;
1116         int                     i;
1117
1118         /*
1119          * Initialize data not to count at all.
1120          */
1121         stats->tabentry = NULL;
1122         stats->no_stats = FALSE;
1123         stats->heap_scan_counted = FALSE;
1124         stats->index_scan_counted = FALSE;
1125
1126         if (pgStatSock < 0 ||
1127                 !(pgstat_collect_tuplelevel ||
1128                   pgstat_collect_blocklevel))
1129         {
1130                 stats->no_stats = TRUE;
1131                 return;
1132         }
1133
1134         /*
1135          * Search the already-used message slots for this relation.
1136          */
1137         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1138         {
1139                 tsmsg = pgStatTabstatMessages[mb];
1140
1141                 for (i = tsmsg->m_nentries; --i >= 0; )
1142                 {
1143                         if (tsmsg->m_entry[i].t_id == rel_id)
1144                         {
1145                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1146                                 return;
1147                         }
1148                 }
1149
1150                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1151                         continue;
1152
1153                 /*
1154                  * Not found, but found a message buffer with an empty slot
1155                  * instead. Fine, let's use this one.
1156                  */
1157                 i = tsmsg->m_nentries++;
1158                 useent = &tsmsg->m_entry[i];
1159                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1160                 useent->t_id = rel_id;
1161                 stats->tabentry = (void *) useent;
1162                 return;
1163         }
1164
1165         /*
1166          * If we ran out of message buffers, we just allocate more.
1167          */
1168         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1169         {
1170                 if (!more_tabstat_space())
1171                 {
1172                         stats->no_stats = TRUE;
1173                         return;
1174                 }
1175                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1176         }
1177
1178         /*
1179          * Use the first entry of the next message buffer.
1180          */
1181         mb = pgStatTabstatUsed++;
1182         tsmsg = pgStatTabstatMessages[mb];
1183         tsmsg->m_nentries = 1;
1184         useent = &tsmsg->m_entry[0];
1185         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1186         useent->t_id = rel_id;
1187         stats->tabentry = (void *) useent;
1188 }
1189
1190
1191 /* ----------
1192  * pgstat_count_xact_commit() -
1193  *
1194  *      Called from access/transam/xact.c to count transaction commits.
1195  * ----------
1196  */
1197 void
1198 pgstat_count_xact_commit(void)
1199 {
1200         if (!(pgstat_collect_querystring ||
1201                   pgstat_collect_tuplelevel ||
1202                   pgstat_collect_blocklevel))
1203                 return;
1204
1205         pgStatXactCommit++;
1206
1207         /*
1208          * If there was no relation activity yet, just make one existing
1209          * message buffer used without slots, causing the next report to tell
1210          * new xact-counters.
1211          */
1212         if (pgStatTabstatAlloc == 0)
1213         {
1214                 if (!more_tabstat_space())
1215                         return;
1216         }
1217         if (pgStatTabstatUsed == 0)
1218         {
1219                 pgStatTabstatUsed++;
1220                 pgStatTabstatMessages[0]->m_nentries = 0;
1221         }
1222 }
1223
1224
1225 /* ----------
1226  * pgstat_count_xact_rollback() -
1227  *
1228  *      Called from access/transam/xact.c to count transaction rollbacks.
1229  * ----------
1230  */
1231 void
1232 pgstat_count_xact_rollback(void)
1233 {
1234         if (!(pgstat_collect_querystring ||
1235                   pgstat_collect_tuplelevel ||
1236                   pgstat_collect_blocklevel))
1237                 return;
1238
1239         pgStatXactRollback++;
1240
1241         /*
1242          * If there was no relation activity yet, just make one existing
1243          * message buffer used without slots, causing the next report to tell
1244          * new xact-counters.
1245          */
1246         if (pgStatTabstatAlloc == 0)
1247         {
1248                 if (!more_tabstat_space())
1249                         return;
1250         }
1251         if (pgStatTabstatUsed == 0)
1252         {
1253                 pgStatTabstatUsed++;
1254                 pgStatTabstatMessages[0]->m_nentries = 0;
1255         }
1256 }
1257
1258
1259 /* ----------
1260  * pgstat_fetch_stat_dbentry() -
1261  *
1262  *      Support function for the SQL-callable pgstat* functions. Returns
1263  *      the collected statistics for one database or NULL. NULL doesn't mean
1264  *      that the database doesn't exist, it is just not yet known by the
1265  *      collector, so the caller is better off to report ZERO instead.
1266  * ----------
1267  */
1268 PgStat_StatDBEntry *
1269 pgstat_fetch_stat_dbentry(Oid dbid)
1270 {
1271         PgStat_StatDBEntry *dbentry;
1272
1273         /*
1274          * If not done for this transaction, read the statistics collector
1275          * stats file into some hash tables. Be careful with the
1276          * read_statsfile() call below!
1277          */
1278         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1279         {
1280                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1281                                                           &pgStatBeTable, &pgStatNumBackends);
1282                 pgStatDBHashXact = GetCurrentTransactionId();
1283         }
1284
1285         /*
1286          * Lookup the requested database
1287          */
1288         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1289                                                                                                  (void *) &dbid,
1290                                                                                                  HASH_FIND, NULL);
1291         if (dbentry == NULL)
1292                 return NULL;
1293
1294         return dbentry;
1295 }
1296
1297
1298 /* ----------
1299  * pgstat_fetch_stat_tabentry() -
1300  *
1301  *      Support function for the SQL-callable pgstat* functions. Returns
1302  *      the collected statistics for one table or NULL. NULL doesn't mean
1303  *      that the table doesn't exist, it is just not yet known by the
1304  *      collector, so the caller is better off to report ZERO instead.
1305  * ----------
1306  */
1307 PgStat_StatTabEntry *
1308 pgstat_fetch_stat_tabentry(Oid relid)
1309 {
1310         PgStat_StatDBEntry *dbentry;
1311         PgStat_StatTabEntry *tabentry;
1312
1313         /*
1314          * If not done for this transaction, read the statistics collector
1315          * stats file into some hash tables. Be careful with the
1316          * read_statsfile() call below!
1317          */
1318         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1319         {
1320                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1321                                                           &pgStatBeTable, &pgStatNumBackends);
1322                 pgStatDBHashXact = GetCurrentTransactionId();
1323         }
1324
1325         /*
1326          * Lookup our database.
1327          */
1328         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1329                                                                                                  (void *) &MyDatabaseId,
1330                                                                                                  HASH_FIND, NULL);
1331         if (dbentry == NULL)
1332                 return NULL;
1333
1334         /*
1335          * Now inside the DB's table hash table lookup the requested one.
1336          */
1337         if (dbentry->tables == NULL)
1338                 return NULL;
1339         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1340                                                                                                    (void *) &relid,
1341                                                                                                    HASH_FIND, NULL);
1342         if (tabentry == NULL)
1343                 return NULL;
1344
1345         return tabentry;
1346 }
1347
1348
1349 /* ----------
1350  * pgstat_fetch_stat_beentry() -
1351  *
1352  *      Support function for the SQL-callable pgstat* functions. Returns
1353  *      the actual activity slot of one active backend. The caller is
1354  *      responsible for a check if the actual user is permitted to see
1355  *      that info (especially the querystring).
1356  * ----------
1357  */
1358 PgStat_StatBeEntry *
1359 pgstat_fetch_stat_beentry(int beid)
1360 {
1361         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1362         {
1363                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1364                                                           &pgStatBeTable, &pgStatNumBackends);
1365                 pgStatDBHashXact = GetCurrentTransactionId();
1366         }
1367
1368         if (beid < 1 || beid > pgStatNumBackends)
1369                 return NULL;
1370
1371         return &pgStatBeTable[beid - 1];
1372 }
1373
1374
1375 /* ----------
1376  * pgstat_fetch_stat_numbackends() -
1377  *
1378  *      Support function for the SQL-callable pgstat* functions. Returns
1379  *      the maximum current backend id.
1380  * ----------
1381  */
1382 int
1383 pgstat_fetch_stat_numbackends(void)
1384 {
1385         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1386         {
1387                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1388                                                           &pgStatBeTable, &pgStatNumBackends);
1389                 pgStatDBHashXact = GetCurrentTransactionId();
1390         }
1391
1392         return pgStatNumBackends;
1393 }
1394
1395
1396
1397 /* ------------------------------------------------------------
1398  * Local support functions follow
1399  * ------------------------------------------------------------
1400  */
1401
1402
1403 /* ----------
1404  * pgstat_setheader() -
1405  *
1406  *              Set common header fields in a statistics message
1407  * ----------
1408  */
1409 static void
1410 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1411 {
1412         hdr->m_type = mtype;
1413         hdr->m_backendid = MyBackendId;
1414         hdr->m_procpid = MyProcPid;
1415         hdr->m_databaseid = MyDatabaseId;
1416         hdr->m_userid = GetSessionUserId();
1417 }
1418
1419
1420 /* ----------
1421  * pgstat_send() -
1422  *
1423  *              Send out one statistics message to the collector
1424  * ----------
1425  */
1426 static void
1427 pgstat_send(void *msg, int len)
1428 {
1429         if (pgStatSock < 0)
1430                 return;
1431
1432         ((PgStat_MsgHdr *) msg)->m_size = len;
1433
1434         send(pgStatSock, msg, len, 0);
1435         /* We deliberately ignore any error from send() */
1436 }
1437
1438
1439 /* ------------------------------------------------------------
1440  * Local functions implementing the statistics collector itself follow
1441  *------------------------------------------------------------
1442  */
1443
1444 static void
1445 pgstat_mainInit(void)
1446 {
1447         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1448
1449 #ifdef EXEC_BACKEND
1450         /* In EXEC case we will not have inherited these settings */
1451         IsPostmasterEnvironment = true;
1452         whereToSendOutput = None;
1453
1454         /* Setup global context */
1455         MemoryContextInit(); /* before any elog'ing can occur */
1456         InitializeGUCOptions();
1457 #endif
1458
1459         MyProcPid = getpid();           /* reset MyProcPid */
1460
1461         /* Lose the postmaster's on-exit routines */
1462         on_exit_reset();
1463
1464         /*
1465          * Ignore all signals usually bound to some action in the postmaster,
1466          * except for SIGCHLD --- see pgstat_recvbuffer.
1467          */
1468         pqsignal(SIGHUP, SIG_IGN);
1469         pqsignal(SIGINT, SIG_IGN);
1470         pqsignal(SIGTERM, SIG_IGN);
1471         pqsignal(SIGQUIT, SIG_IGN);
1472         pqsignal(SIGALRM, SIG_IGN);
1473         pqsignal(SIGPIPE, SIG_IGN);
1474         pqsignal(SIGUSR1, SIG_IGN);
1475         pqsignal(SIGUSR2, SIG_IGN);
1476         pqsignal(SIGCHLD, pgstat_die);
1477         pqsignal(SIGTTIN, SIG_DFL);
1478         pqsignal(SIGTTOU, SIG_DFL);
1479         pqsignal(SIGCONT, SIG_DFL);
1480         pqsignal(SIGWINCH, SIG_DFL);
1481 }
1482
1483
1484 /* ----------
1485  * pgstat_main() -
1486  *
1487  *      Start up the statistics collector itself.  This is the body of the
1488  *      postmaster child process.
1489  * ----------
1490  */
1491 NON_EXEC_STATIC void
1492 pgstat_main(PGSTAT_FORK_ARGS)
1493 {
1494         pgstat_mainInit(); /* Note: for *both* EXEC_BACKEND and regular cases */
1495 #ifdef EXEC_BACKEND
1496         pgstat_parseArgs(argc,argv);
1497 #endif
1498
1499         /*
1500          * Close the writing end of the postmaster pipe, so we'll see it
1501          * closing when the postmaster terminates and can terminate as well.
1502          */
1503         closesocket(pgStatPmPipe[1]);
1504         pgStatPmPipe[1] = -1;
1505         closesocket(pgStatCollectorPmPipe[1]);
1506         pgStatCollectorPmPipe[1] = -1;
1507
1508         /*
1509          * Start a buffering process to read from the socket, so we have a
1510          * little more time to process incoming messages.
1511          *
1512          * NOTE: the process structure is: postmaster is parent of buffer process
1513          * is parent of collector process.      This way, the buffer can detect
1514          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1515          * collector failure until it tried to write on the pipe.  That would
1516          * mean that after the postmaster started a new collector, we'd have
1517          * two buffer processes competing to read from the UDP socket --- not
1518          * good.
1519          */
1520         if (pgpipe(pgStatPipe) < 0)
1521         {
1522                 ereport(LOG,
1523                                 (errcode_for_socket_access(),
1524                          errmsg("could not create pipe for statistics buffer: %m")));
1525                 exit(1);
1526         }
1527
1528 #ifdef EXEC_BACKEND
1529         /* child becomes collector process */
1530         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1531 #else
1532         switch (fork())
1533 #endif
1534         {
1535                 case -1:
1536                         ereport(LOG,
1537                                         (errmsg("could not fork statistics collector: %m")));
1538                         exit(1);
1539
1540 #ifndef EXEC_BACKEND
1541                 case 0:
1542                         /* child becomes collector process */
1543                         pgstat_mainChild();
1544                         break;
1545 #endif
1546
1547                 default:
1548                         /* parent becomes buffer process */
1549                         closesocket(pgStatPipe[0]);
1550                         pgstat_recvbuffer();
1551         }
1552         exit(0);
1553 }
1554
1555
1556 NON_EXEC_STATIC void
1557 pgstat_mainChild(PGSTAT_FORK_ARGS)
1558 {
1559         PgStat_Msg      msg;
1560         fd_set          rfds;
1561         int                     readPipe;
1562         int                     pmPipe;
1563         int                     nready;
1564         int                     len = 0;
1565         struct timeval timeout;
1566         struct timeval next_statwrite;
1567         bool            need_statwrite;
1568         HASHCTL         hash_ctl;
1569
1570 #ifdef EXEC_BACKEND
1571         pgstat_mainInit();  /* Note: only in EXEC_BACKEND case */
1572         pgstat_parseArgs(argc,argv);
1573 #else
1574         MyProcPid = getpid();           /* reset MyProcPid */
1575 #endif
1576
1577         closesocket(pgStatPipe[1]);
1578         closesocket(pgStatSock);
1579         pmPipe = pgStatCollectorPmPipe[0];
1580
1581         /*
1582          * In the child we can have default SIGCHLD handling (in case we want
1583          * to call system() here...)
1584          */
1585         pqsignal(SIGCHLD, SIG_DFL);
1586
1587         /*
1588          * Identify myself via ps
1589          */
1590         init_ps_display("stats collector process", "", "");
1591         set_ps_display("");
1592
1593         /*
1594          * Arrange to write the initial status file right away
1595          */
1596         gettimeofday(&next_statwrite, NULL);
1597         need_statwrite = TRUE;
1598
1599         /*
1600          * Read in an existing statistics stats file or initialize the stats
1601          * to zero.
1602          */
1603         pgStatRunningInCollector = TRUE;
1604         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1605
1606         /*
1607          * Create the dead backend hashtable
1608          */
1609         memset(&hash_ctl, 0, sizeof(hash_ctl));
1610         hash_ctl.keysize = sizeof(int);
1611         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1612         hash_ctl.hash = tag_hash;
1613         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1614                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1615         if (pgStatBeDead == NULL)
1616         {
1617                 /* assume the problem is out-of-memory */
1618                 ereport(LOG,
1619                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1620                                  errmsg("out of memory in statistics collector --- abort")));
1621                 exit(1);
1622         }
1623
1624         /*
1625          * Create the known backends table
1626          */
1627         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1628                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1629         if (pgStatBeTable == NULL)
1630         {
1631                 ereport(LOG,
1632                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1633                                  errmsg("out of memory in statistics collector --- abort")));
1634                 exit(1);
1635         }
1636         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1637
1638         readPipe = pgStatPipe[0];
1639
1640         /*
1641          * Process incoming messages and handle all the reporting stuff until
1642          * there are no more messages.
1643          */
1644         for (;;)
1645         {
1646                 /*
1647                  * If we need to write the status file again (there have been
1648                  * changes in the statistics since we wrote it last) calculate the
1649                  * timeout until we have to do so.
1650                  */
1651                 if (need_statwrite)
1652                 {
1653                         struct timeval now;
1654
1655                         gettimeofday(&now, NULL);
1656                         /* avoid assuming that tv_sec is signed */
1657                         if (now.tv_sec > next_statwrite.tv_sec ||
1658                                 (now.tv_sec == next_statwrite.tv_sec &&
1659                                  now.tv_usec >= next_statwrite.tv_usec))
1660                         {
1661                                 timeout.tv_sec = 0;
1662                                 timeout.tv_usec = 0;
1663                         }
1664                         else
1665                         {
1666                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1667                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1668                                 if (timeout.tv_usec < 0)
1669                                 {
1670                                         timeout.tv_sec--;
1671                                         timeout.tv_usec += 1000000;
1672                                 }
1673                         }
1674                 }
1675
1676                 /*
1677                  * Setup the descriptor set for select(2)
1678                  */
1679                 FD_ZERO(&rfds);
1680                 FD_SET(readPipe, &rfds);
1681
1682                 /*
1683                  * Now wait for something to do.
1684                  */
1685                 nready = select(readPipe+1, &rfds, NULL, NULL,
1686                                                 (need_statwrite) ? &timeout : NULL);
1687                 if (nready < 0)
1688                 {
1689                         if (errno == EINTR)
1690                                 continue;
1691                         ereport(LOG,
1692                                         (errcode_for_socket_access(),
1693                                    errmsg("select() failed in statistics collector: %m")));
1694                         exit(1);
1695                 }
1696
1697                 /*
1698                  * If there are no descriptors ready, our timeout for writing the
1699                  * stats file happened.
1700                  */
1701                 if (nready == 0)
1702                 {
1703                         pgstat_write_statsfile();
1704                         need_statwrite = FALSE;
1705
1706                         continue;
1707                 }
1708
1709                 /*
1710                  * Check if there is a new statistics message to collect.
1711                  */
1712                 if (FD_ISSET(readPipe, &rfds))
1713                 {
1714                         /*
1715                          * We may need to issue multiple read calls in case the buffer
1716                          * process didn't write the message in a single write, which
1717                          * is possible since it dumps its buffer bytewise. In any
1718                          * case, we'd need two reads since we don't know the message
1719                          * length initially.
1720                          */
1721                         int                     nread = 0;
1722                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1723                         bool            pipeEOF = false;
1724
1725                         while (nread < targetlen)
1726                         {
1727                                 len = piperead(readPipe, ((char *) &msg) + nread,
1728                                                                 targetlen - nread);
1729                                 if (len < 0)
1730                                 {
1731                                         if (errno == EINTR)
1732                                                 continue;
1733                                         ereport(LOG,
1734                                                         (errcode_for_socket_access(),
1735                                                          errmsg("could not read from statistics collector pipe: %m")));
1736                                         exit(1);
1737                                 }
1738                                 if (len == 0)   /* EOF on the pipe! */
1739                                 {
1740                                         pipeEOF = true;
1741                                         break;
1742                                 }
1743                                 nread += len;
1744                                 if (nread == sizeof(PgStat_MsgHdr))
1745                                 {
1746                                         /* we have the header, compute actual msg length */
1747                                         targetlen = msg.msg_hdr.m_size;
1748                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1749                                                 targetlen > (int) sizeof(msg))
1750                                         {
1751                                                 /*
1752                                                  * Bogus message length implies that we got out of
1753                                                  * sync with the buffer process somehow. Abort so
1754                                                  * that we can restart both processes.
1755                                                  */
1756                                                 ereport(LOG,
1757                                                   (errmsg("invalid statistics message length")));
1758                                                 exit(1);
1759                                         }
1760                                 }
1761                         }
1762
1763                         /*
1764                          * EOF on the pipe implies that the buffer process exited.
1765                          * Fall out of outer loop.
1766                          */
1767                         if (pipeEOF)
1768                                 break;
1769
1770                         /*
1771                          * Distribute the message to the specific function handling
1772                          * it.
1773                          */
1774                         switch (msg.msg_hdr.m_type)
1775                         {
1776                                 case PGSTAT_MTYPE_DUMMY:
1777                                         break;
1778
1779                                 case PGSTAT_MTYPE_BESTART:
1780                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1781                                         break;
1782
1783                                 case PGSTAT_MTYPE_BETERM:
1784                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1785                                         break;
1786
1787                                 case PGSTAT_MTYPE_TABSTAT:
1788                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1789                                         break;
1790
1791                                 case PGSTAT_MTYPE_TABPURGE:
1792                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1793                                         break;
1794
1795                                 case PGSTAT_MTYPE_ACTIVITY:
1796                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1797                                         break;
1798
1799                                 case PGSTAT_MTYPE_DROPDB:
1800                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1801                                         break;
1802
1803                                 case PGSTAT_MTYPE_RESETCOUNTER:
1804                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1805                                                                                          nread);
1806                                         break;
1807
1808                                 default:
1809                                         break;
1810                         }
1811
1812                         /*
1813                          * Globally count messages.
1814                          */
1815                         pgStatNumMessages++;
1816
1817                         /*
1818                          * If this is the first message after we wrote the stats file
1819                          * the last time, setup the timeout that it'd be written.
1820                          */
1821                         if (!need_statwrite)
1822                         {
1823                                 gettimeofday(&next_statwrite, NULL);
1824                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1825                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1826                                 next_statwrite.tv_usec %= 1000000;
1827                                 need_statwrite = TRUE;
1828                         }
1829                 }
1830
1831                 /*
1832                  * Note that we do NOT check for postmaster exit inside the loop;
1833                  * only EOF on the buffer pipe causes us to fall out.  This
1834                  * ensures we don't exit prematurely if there are still a few
1835                  * messages in the buffer or pipe at postmaster shutdown.
1836                  */
1837         }
1838
1839         /*
1840          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1841          * to process.  If the buffer process quit because of postmaster
1842          * shutdown, we want to save the final stats to reuse at next startup.
1843          * But if the buffer process failed, it seems best not to (there may
1844          * even now be a new collector firing up, and we don't want it to read
1845          * a partially- rewritten stats file).  We can tell whether the
1846          * postmaster is still alive by checking to see if the postmaster pipe
1847          * is still open.  If it is read-ready (ie, EOF), the postmaster must
1848          * have quit.
1849          */
1850         FD_ZERO(&rfds);
1851         FD_SET(pmPipe, &rfds);
1852         timeout.tv_sec = 0;
1853         timeout.tv_usec = 0;
1854         nready = select(pmPipe+1,&rfds,NULL,NULL,&timeout);
1855         if (nready > 0 && FD_ISSET(pmPipe, &rfds)) 
1856                 pgstat_write_statsfile();
1857 }
1858
1859
1860 /* ----------
1861  * pgstat_recvbuffer() -
1862  *
1863  *      This is the body of the separate buffering process. Its only
1864  *      purpose is to receive messages from the UDP socket as fast as
1865  *      possible and forward them over a pipe into the collector itself.
1866  *      If the collector is slow to absorb messages, they are buffered here.
1867  * ----------
1868  */
1869 static void
1870 pgstat_recvbuffer(void)
1871 {
1872         fd_set          rfds;
1873         fd_set          wfds;
1874         int                     writePipe = pgStatPipe[1];
1875         int                     pmPipe = pgStatPmPipe[0];
1876         int                     maxfd;
1877         int                     nready;
1878         int                     len;
1879         int                     xfr;
1880         int                     frm;
1881         PgStat_Msg      input_buffer;
1882         char       *msgbuffer;
1883         int                     msg_send = 0;   /* next send index in buffer */
1884         int                     msg_recv = 0;   /* next receive index */
1885         int                     msg_have = 0;   /* number of bytes stored */
1886         bool            overflow = false;
1887
1888         /*
1889          * Identify myself via ps
1890          */
1891         init_ps_display("stats buffer process", "", "");
1892         set_ps_display("");
1893
1894         /*
1895          * We want to die if our child collector process does.  There are two
1896          * ways we might notice that it has died: receive SIGCHLD, or get a
1897          * write failure on the pipe leading to the child.      We can set SIGPIPE
1898          * to kill us here.  Our SIGCHLD handler was already set up before we
1899          * forked (must do it that way, else it's a race condition).
1900          */
1901         pqsignal(SIGPIPE, SIG_DFL);
1902         PG_SETMASK(&UnBlockSig);
1903
1904         /*
1905          * Set the write pipe to nonblock mode, so that we cannot block when
1906          * the collector falls behind.
1907          */
1908         if (!set_noblock(writePipe))
1909         {
1910                 ereport(LOG,
1911                                 (errcode_for_socket_access(),
1912                   errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1913                 exit(1);
1914         }
1915
1916         /*
1917          * Allocate the message buffer
1918          */
1919         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1920         if (msgbuffer == NULL)
1921         {
1922                 ereport(LOG,
1923                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1924                          errmsg("out of memory in statistics collector --- abort")));
1925                 exit(1);
1926         }
1927
1928         /*
1929          * Loop forever
1930          */
1931         for (;;)
1932         {
1933                 FD_ZERO(&rfds);
1934                 FD_ZERO(&wfds);
1935                 maxfd = -1;
1936
1937                 /*
1938                  * As long as we have buffer space we add the socket to the read
1939                  * descriptor set.
1940                  */
1941                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1942                 {
1943                         FD_SET(pgStatSock, &rfds);
1944                         maxfd = pgStatSock;
1945                         overflow = false;
1946                 }
1947                 else
1948                 {
1949                         if (!overflow)
1950                         {
1951                                 ereport(LOG,
1952                                                 (errmsg("statistics buffer is full")));
1953                                 overflow = true;
1954                         }
1955                 }
1956
1957                 /*
1958                  * If we have messages to write out, we add the pipe to the write
1959                  * descriptor set. Otherwise, we check if the postmaster might
1960                  * have terminated.
1961                  */
1962                 if (msg_have > 0)
1963                 {
1964                         FD_SET(writePipe, &wfds);
1965                         if (writePipe > maxfd)
1966                                 maxfd = writePipe;
1967                 }
1968                 else
1969                 {
1970                         FD_SET(pmPipe, &rfds);
1971                         if (pmPipe > maxfd)
1972                                 maxfd = pmPipe;
1973                 }
1974
1975                 /*
1976                  * Wait for some work to do.
1977                  */
1978                 nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL);
1979                 if (nready < 0)
1980                 {
1981                         if (errno == EINTR)
1982                                 continue;
1983                         ereport(LOG,
1984                                         (errcode_for_socket_access(),
1985                                          errmsg("select() failed in statistics buffer: %m")));
1986                         exit(1);
1987                 }
1988
1989                 /*
1990                  * If there is a message on the socket, read it and check for
1991                  * validity.
1992                  */
1993                 if (FD_ISSET(pgStatSock, &rfds))
1994                 {
1995                         len = recv(pgStatSock, (char *) &input_buffer,
1996                                            sizeof(PgStat_Msg), 0);
1997                         if (len < 0)
1998                         {
1999                                 ereport(LOG,
2000                                                 (errcode_for_socket_access(),
2001                                            errmsg("could not read statistics message: %m")));
2002                                 exit(1);
2003                         }
2004
2005                         /*
2006                          * We ignore messages that are smaller than our common header
2007                          */
2008                         if (len < sizeof(PgStat_MsgHdr))
2009                                 continue;
2010
2011                         /*
2012                          * The received length must match the length in the header
2013                          */
2014                         if (input_buffer.msg_hdr.m_size != len)
2015                                 continue;
2016
2017                         /*
2018                          * O.K. - we accept this message.  Copy it to the circular
2019                          * msgbuffer.
2020                          */
2021                         frm = 0;
2022                         while (len > 0)
2023                         {
2024                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
2025                                 if (xfr > len)
2026                                         xfr = len;
2027                                 Assert(xfr > 0);
2028                                 memcpy(msgbuffer + msg_recv,
2029                                            ((char *) &input_buffer) + frm,
2030                                            xfr);
2031                                 msg_recv += xfr;
2032                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
2033                                         msg_recv = 0;
2034                                 msg_have += xfr;
2035                                 frm += xfr;
2036                                 len -= xfr;
2037                         }
2038                 }
2039
2040                 /*
2041                  * If the collector is ready to receive, write some data into his
2042                  * pipe.  We may or may not be able to write all that we have.
2043                  *
2044                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
2045                  * the space available in the pipe buffer, most kernels will
2046                  * refuse to write any of it, and will return EAGAIN.  This means
2047                  * we will busy-loop until the situation changes (either because
2048                  * the collector caught up, or because more data arrives so that
2049                  * we have more than PIPE_BUF bytes buffered).  This is not good,
2050                  * but is there any way around it?      We have no way to tell when
2051                  * the collector has caught up...
2052                  */
2053                 if (FD_ISSET(writePipe, &wfds))
2054                 {
2055                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
2056                         if (xfr > msg_have)
2057                                 xfr = msg_have;
2058                         Assert(xfr > 0);
2059                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
2060                         if (len < 0)
2061                         {
2062                                 if (errno == EINTR || errno == EAGAIN)
2063                                         continue;       /* not enough space in pipe */
2064                                 ereport(LOG,
2065                                                 (errcode_for_socket_access(),
2066                                                  errmsg("could not write to statistics collector pipe: %m")));
2067                                 exit(1);
2068                         }
2069                         /* NB: len < xfr is okay */
2070                         msg_send += len;
2071                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2072                                 msg_send = 0;
2073                         msg_have -= len;
2074                 }
2075
2076                 /*
2077                  * Make sure we forwarded all messages before we check for
2078                  * Postmaster termination.
2079                  */
2080                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2081                         continue;
2082
2083                 /*
2084                  * If the pipe from the postmaster is ready for reading, the
2085                  * kernel must have closed it on exit() (the postmaster never
2086                  * really writes to it). So we've done our job.
2087                  */
2088                 if (FD_ISSET(pmPipe, &rfds))
2089                         exit(0);
2090         }
2091 }
2092
2093 static void
2094 pgstat_die(SIGNAL_ARGS)
2095 {
2096         exit(1);
2097 }
2098
2099
2100 /* ----------
2101  * pgstat_add_backend() -
2102  *
2103  *      Support function to keep our backend list up to date.
2104  * ----------
2105  */
2106 static int
2107 pgstat_add_backend(PgStat_MsgHdr *msg)
2108 {
2109         PgStat_StatDBEntry *dbentry;
2110         PgStat_StatBeEntry *beentry;
2111         PgStat_StatBeDead *deadbe;
2112         bool            found;
2113
2114         /*
2115          * Check that the backend ID is valid
2116          */
2117         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2118         {
2119                 ereport(LOG,
2120                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2121                 return -1;
2122         }
2123
2124         /*
2125          * Get the slot for this backendid.
2126          */
2127         beentry = &pgStatBeTable[msg->m_backendid - 1];
2128         if (beentry->databaseid != InvalidOid)
2129         {
2130                 /*
2131                  * If the slot contains the PID of this backend, everything is
2132                  * fine and we got nothing to do.
2133                  */
2134                 if (beentry->procpid == msg->m_procpid)
2135                         return 0;
2136         }
2137
2138         /*
2139          * Lookup if this backend is known to be dead. This can be caused due
2140          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2141          * message might have arrived before we received all the backends
2142          * stats messages, or even a new backend with the same backendid was
2143          * faster in sending his BESTART.
2144          *
2145          * If the backend is known to be dead, we ignore this add.
2146          */
2147         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2148                                                                                            (void *) &(msg->m_procpid),
2149                                                                                            HASH_FIND, NULL);
2150         if (deadbe)
2151                 return 1;
2152
2153         /*
2154          * Backend isn't known to be dead. If it's slot is currently used, we
2155          * have to kick out the old backend.
2156          */
2157         if (beentry->databaseid != InvalidOid)
2158                 pgstat_sub_backend(beentry->procpid);
2159
2160         /*
2161          * Put this new backend into the slot.
2162          */
2163         beentry->databaseid = msg->m_databaseid;
2164         beentry->procpid = msg->m_procpid;
2165         beentry->userid = msg->m_userid;
2166         beentry->activity_start_sec = 0;
2167         beentry->activity_start_usec = 0;
2168         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2169
2170         /*
2171          * Lookup or create the database entry for this backends DB.
2172          */
2173         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2174                                                                                    (void *) &(msg->m_databaseid),
2175                                                                                                  HASH_ENTER, &found);
2176         if (dbentry == NULL)
2177         {
2178                 ereport(LOG,
2179                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2180                          errmsg("out of memory in statistics collector --- abort")));
2181                 exit(1);
2182         }
2183
2184         /*
2185          * If not found, initialize the new one.
2186          */
2187         if (!found)
2188         {
2189                 HASHCTL         hash_ctl;
2190
2191                 dbentry->tables = NULL;
2192                 dbentry->n_xact_commit = 0;
2193                 dbentry->n_xact_rollback = 0;
2194                 dbentry->n_blocks_fetched = 0;
2195                 dbentry->n_blocks_hit = 0;
2196                 dbentry->n_connects = 0;
2197                 dbentry->destroy = 0;
2198
2199                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2200                 hash_ctl.keysize = sizeof(Oid);
2201                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2202                 hash_ctl.hash = tag_hash;
2203                 dbentry->tables = hash_create("Per-database table",
2204                                                                           PGSTAT_TAB_HASH_SIZE,
2205                                                                           &hash_ctl,
2206                                                                           HASH_ELEM | HASH_FUNCTION);
2207                 if (dbentry->tables == NULL)
2208                 {
2209                         /* assume the problem is out-of-memory */
2210                         ereport(LOG,
2211                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2212                          errmsg("out of memory in statistics collector --- abort")));
2213                         exit(1);
2214                 }
2215         }
2216
2217         /*
2218          * Count number of connects to the database
2219          */
2220         dbentry->n_connects++;
2221
2222         return 0;
2223 }
2224
2225
2226 /* ----------
2227  * pgstat_sub_backend() -
2228  *
2229  *      Remove a backend from the actual backends list.
2230  * ----------
2231  */
2232 static void
2233 pgstat_sub_backend(int procpid)
2234 {
2235         int                     i;
2236         PgStat_StatBeDead *deadbe;
2237         bool            found;
2238
2239         /*
2240          * Search in the known-backends table for the slot containing this
2241          * PID.
2242          */
2243         for (i = 0; i < MaxBackends; i++)
2244         {
2245                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2246                         pgStatBeTable[i].procpid == procpid)
2247                 {
2248                         /*
2249                          * That's him. Add an entry to the known to be dead backends.
2250                          * Due to possible misorder in the arrival of UDP packets it's
2251                          * possible that even if we know the backend is dead, there
2252                          * could still be messages queued that arrive later. Those
2253                          * messages must not cause our number of backends statistics
2254                          * to get screwed up, so we remember for a couple of seconds
2255                          * that this PID is dead and ignore them (only the counting of
2256                          * backends, not the table access stats they sent).
2257                          */
2258                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2259                                                                                                            (void *) &procpid,
2260                                                                                                            HASH_ENTER,
2261                                                                                                            &found);
2262                         if (deadbe == NULL)
2263                         {
2264                                 ereport(LOG,
2265                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2266                                                  errmsg("out of memory in statistics collector --- abort")));
2267                                 exit(1);
2268                         }
2269                         if (!found)
2270                         {
2271                                 deadbe->backendid = i + 1;
2272                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2273                         }
2274
2275                         /*
2276                          * Declare the backend slot empty.
2277                          */
2278                         pgStatBeTable[i].databaseid = InvalidOid;
2279                         return;
2280                 }
2281         }
2282
2283         /*
2284          * No big problem if not found. This can happen if UDP messages arrive
2285          * out of order here.
2286          */
2287 }
2288
2289
2290 /* ----------
2291  * pgstat_write_statsfile() -
2292  *
2293  *      Tell the news.
2294  * ----------
2295  */
2296 static void
2297 pgstat_write_statsfile(void)
2298 {
2299         HASH_SEQ_STATUS hstat;
2300         HASH_SEQ_STATUS tstat;
2301         PgStat_StatDBEntry *dbentry;
2302         PgStat_StatTabEntry *tabentry;
2303         PgStat_StatBeDead *deadbe;
2304         FILE       *fpout;
2305         int                     i;
2306
2307         /*
2308          * Open the statistics temp file to write out the current values.
2309          */
2310         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2311         if (fpout == NULL)
2312         {
2313                 ereport(LOG,
2314                                 (errcode_for_file_access(),
2315                                  errmsg("could not open temporary statistics file \"%s\": %m",
2316                                                 pgStat_tmpfname)));
2317                 return;
2318         }
2319
2320         /*
2321          * Walk through the database table.
2322          */
2323         hash_seq_init(&hstat, pgStatDBHash);
2324         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2325         {
2326                 /*
2327                  * If this database is marked destroyed, count down and do so if
2328                  * it reaches 0.
2329                  */
2330                 if (dbentry->destroy > 0)
2331                 {
2332                         if (--(dbentry->destroy) == 0)
2333                         {
2334                                 if (dbentry->tables != NULL)
2335                                         hash_destroy(dbentry->tables);
2336
2337                                 if (hash_search(pgStatDBHash,
2338                                                                 (void *) &(dbentry->databaseid),
2339                                                                 HASH_REMOVE, NULL) == NULL)
2340                                 {
2341                                         ereport(LOG,
2342                                                         (errmsg("database hash table corrupted "
2343                                                                         "during cleanup --- abort")));
2344                                         exit(1);
2345                                 }
2346                         }
2347
2348                         /*
2349                          * Don't include statistics for it.
2350                          */
2351                         continue;
2352                 }
2353
2354                 /*
2355                  * Write out the DB line including the number of life backends.
2356                  */
2357                 fputc('D', fpout);
2358                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2359
2360                 /*
2361                  * Walk through the databases access stats per table.
2362                  */
2363                 hash_seq_init(&tstat, dbentry->tables);
2364                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2365                 {
2366                         /*
2367                          * If table entry marked for destruction, same as above for
2368                          * the database entry.
2369                          */
2370                         if (tabentry->destroy > 0)
2371                         {
2372                                 if (--(tabentry->destroy) == 0)
2373                                 {
2374                                         if (hash_search(dbentry->tables,
2375                                                                         (void *) &(tabentry->tableid),
2376                                                                         HASH_REMOVE, NULL) == NULL)
2377                                         {
2378                                                 ereport(LOG,
2379                                                                 (errmsg("tables hash table for "
2380                                                                                 "database %u corrupted during "
2381                                                                                 "cleanup --- abort",
2382                                                                                 dbentry->databaseid)));
2383                                                 exit(1);
2384                                         }
2385                                 }
2386                                 continue;
2387                         }
2388
2389                         /*
2390                          * At least we think this is still a life table. Print it's
2391                          * access stats.
2392                          */
2393                         fputc('T', fpout);
2394                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2395                 }
2396
2397                 /*
2398                  * Mark the end of this DB
2399                  */
2400                 fputc('d', fpout);
2401         }
2402
2403         /*
2404          * Write out the known running backends to the stats file.
2405          */
2406         i = MaxBackends;
2407         fputc('M', fpout);
2408         fwrite(&i, sizeof(i), 1, fpout);
2409
2410         for (i = 0; i < MaxBackends; i++)
2411         {
2412                 if (pgStatBeTable[i].databaseid != InvalidOid)
2413                 {
2414                         fputc('B', fpout);
2415                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2416                 }
2417         }
2418
2419         /*
2420          * No more output to be done. Close the temp file and replace the old
2421          * pgstat.stat with it.
2422          */
2423         fputc('E', fpout);
2424         if (fclose(fpout) < 0)
2425         {
2426                 ereport(LOG,
2427                                 (errcode_for_file_access(),
2428                                  errmsg("could not close temporary statistics file \"%s\": %m",
2429                                                 pgStat_tmpfname)));
2430         }
2431         else
2432         {
2433                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2434                 {
2435                         ereport(LOG,
2436                                         (errcode_for_file_access(),
2437                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2438                                                         pgStat_tmpfname, pgStat_fname)));
2439                 }
2440         }
2441
2442         /*
2443          * Clear out the dead backends table
2444          */
2445         hash_seq_init(&hstat, pgStatBeDead);
2446         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2447         {
2448                 /*
2449                  * Count down the destroy delay and remove entries where it
2450                  * reaches 0.
2451                  */
2452                 if (--(deadbe->destroy) <= 0)
2453                 {
2454                         if (hash_search(pgStatBeDead,
2455                                                         (void *) &(deadbe->procpid),
2456                                                         HASH_REMOVE, NULL) == NULL)
2457                         {
2458                                 ereport(LOG,
2459                                                 (errmsg("dead-server-process hash table corrupted "
2460                                                                 "during cleanup --- abort")));
2461                                 exit(1);
2462                         }
2463                 }
2464         }
2465 }
2466
2467
2468 /* ----------
2469  * pgstat_read_statsfile() -
2470  *
2471  *      Reads in an existing statistics collector and initializes the
2472  *      databases hash table (who's entries point to the tables hash tables)
2473  *      and the current backend table.
2474  * ----------
2475  */
2476 static void
2477 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2478                                           PgStat_StatBeEntry **betab, int *numbackends)
2479 {
2480         PgStat_StatDBEntry *dbentry;
2481         PgStat_StatDBEntry dbbuf;
2482         PgStat_StatTabEntry *tabentry;
2483         PgStat_StatTabEntry tabbuf;
2484         HASHCTL         hash_ctl;
2485         HTAB       *tabhash = NULL;
2486         FILE       *fpin;
2487         int                     maxbackends = 0;
2488         int                     havebackends = 0;
2489         bool            found;
2490         MemoryContext use_mcxt;
2491         int                     mcxt_flags;
2492
2493         /*
2494          * If running in the collector we use the DynaHashCxt memory context.
2495          * If running in a backend, we use the TopTransactionContext instead,
2496          * so the caller must only know the last XactId when this call
2497          * happened to know if his tables are still valid or already gone!
2498          */
2499         if (pgStatRunningInCollector)
2500         {
2501                 use_mcxt = NULL;
2502                 mcxt_flags = 0;
2503         }
2504         else
2505         {
2506                 use_mcxt = TopTransactionContext;
2507                 mcxt_flags = HASH_CONTEXT;
2508         }
2509
2510         /*
2511          * Create the DB hashtable
2512          */
2513         memset(&hash_ctl, 0, sizeof(hash_ctl));
2514         hash_ctl.keysize = sizeof(Oid);
2515         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2516         hash_ctl.hash = tag_hash;
2517         hash_ctl.hcxt = use_mcxt;
2518         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2519                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2520         if (*dbhash == NULL)
2521         {
2522                 /* assume the problem is out-of-memory */
2523                 if (pgStatRunningInCollector)
2524                 {
2525                         ereport(LOG,
2526                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2527                          errmsg("out of memory in statistics collector --- abort")));
2528                         exit(1);
2529                 }
2530                 /* in backend, can do normal error */
2531                 ereport(ERROR,
2532                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2533                                  errmsg("out of memory")));
2534         }
2535
2536         /*
2537          * Initialize the number of known backends to zero, just in case we do
2538          * a silent error return below.
2539          */
2540         if (numbackends != NULL)
2541                 *numbackends = 0;
2542         if (betab != NULL)
2543                 *betab = NULL;
2544
2545         /*
2546          * Try to open the status file. If it doesn't exist, the backends
2547          * simply return zero for anything and the collector simply starts
2548          * from scratch with empty counters.
2549          */
2550         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2551                 return;
2552
2553         /*
2554          * We found an existing collector stats file. Read it and put all the
2555          * hashtable entries into place.
2556          */
2557         for (;;)
2558         {
2559                 switch (fgetc(fpin))
2560                 {
2561                                 /*
2562                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2563                                  * follows. Subsequently, zero to many 'T' entries will
2564                                  * follow until a 'd' is encountered.
2565                                  */
2566                         case 'D':
2567                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2568                                 {
2569                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2570                                                         (errmsg("corrupted pgstat.stat file")));
2571                                         fclose(fpin);
2572                                         return;
2573                                 }
2574
2575                                 /*
2576                                  * Add to the DB hash
2577                                  */
2578                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2579                                                                                           (void *) &dbbuf.databaseid,
2580                                                                                                                          HASH_ENTER,
2581                                                                                                                          &found);
2582                                 if (dbentry == NULL)
2583                                 {
2584                                         if (pgStatRunningInCollector)
2585                                         {
2586                                                 ereport(LOG,
2587                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2588                                                                  errmsg("out of memory in statistics collector --- abort")));
2589                                                 exit(1);
2590                                         }
2591                                         else
2592                                         {
2593                                                 fclose(fpin);
2594                                                 ereport(ERROR,
2595                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2596                                                                  errmsg("out of memory")));
2597                                         }
2598                                 }
2599                                 if (found)
2600                                 {
2601                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2602                                                         (errmsg("corrupted pgstat.stat file")));
2603                                         fclose(fpin);
2604                                         return;
2605                                 }
2606
2607                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2608                                 dbentry->tables = NULL;
2609                                 dbentry->destroy = 0;
2610                                 dbentry->n_backends = 0;
2611
2612                                 /*
2613                                  * Don't collect tables if not the requested DB
2614                                  */
2615                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2616                                         break;
2617
2618                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2619                                 hash_ctl.keysize = sizeof(Oid);
2620                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2621                                 hash_ctl.hash = tag_hash;
2622                                 hash_ctl.hcxt = use_mcxt;
2623                                 dbentry->tables = hash_create("Per-database table",
2624                                                                                           PGSTAT_TAB_HASH_SIZE,
2625                                                                                           &hash_ctl,
2626                                                                  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2627                                 if (dbentry->tables == NULL)
2628                                 {
2629                                         /* assume the problem is out-of-memory */
2630                                         if (pgStatRunningInCollector)
2631                                         {
2632                                                 ereport(LOG,
2633                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2634                                                                  errmsg("out of memory in statistics collector --- abort")));
2635                                                 exit(1);
2636                                         }
2637                                         /* in backend, can do normal error */
2638                                         fclose(fpin);
2639                                         ereport(ERROR,
2640                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2641                                                          errmsg("out of memory")));
2642                                 }
2643
2644                                 /*
2645                                  * Arrange that following 'T's add entries to this
2646                                  * databases tables hash table.
2647                                  */
2648                                 tabhash = dbentry->tables;
2649                                 break;
2650
2651                                 /*
2652                                  * 'd'  End of this database.
2653                                  */
2654                         case 'd':
2655                                 tabhash = NULL;
2656                                 break;
2657
2658                                 /*
2659                                  * 'T'  A PgStat_StatTabEntry follows.
2660                                  */
2661                         case 'T':
2662                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2663                                 {
2664                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2665                                                         (errmsg("corrupted pgstat.stat file")));
2666                                         fclose(fpin);
2667                                         return;
2668                                 }
2669
2670                                 /*
2671                                  * Skip if table belongs to a not requested database.
2672                                  */
2673                                 if (tabhash == NULL)
2674                                         break;
2675
2676                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2677                                                                                                 (void *) &tabbuf.tableid,
2678                                                                                                          HASH_ENTER, &found);
2679                                 if (tabentry == NULL)
2680                                 {
2681                                         if (pgStatRunningInCollector)
2682                                         {
2683                                                 ereport(LOG,
2684                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2685                                                                  errmsg("out of memory in statistics collector --- abort")));
2686                                                 exit(1);
2687                                         }
2688                                         /* in backend, can do normal error */
2689                                         fclose(fpin);
2690                                         ereport(ERROR,
2691                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2692                                                          errmsg("out of memory")));
2693                                 }
2694
2695                                 if (found)
2696                                 {
2697                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2698                                                         (errmsg("corrupted pgstat.stat file")));
2699                                         fclose(fpin);
2700                                         return;
2701                                 }
2702
2703                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2704                                 break;
2705
2706                                 /*
2707                                  * 'M'  The maximum number of backends to expect follows.
2708                                  */
2709                         case 'M':
2710                                 if (betab == NULL || numbackends == NULL)
2711                                 {
2712                                         fclose(fpin);
2713                                         return;
2714                                 }
2715                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2716                                         sizeof(maxbackends))
2717                                 {
2718                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2719                                                         (errmsg("corrupted pgstat.stat file")));
2720                                         fclose(fpin);
2721                                         return;
2722                                 }
2723                                 if (maxbackends == 0)
2724                                 {
2725                                         fclose(fpin);
2726                                         return;
2727                                 }
2728
2729                                 /*
2730                                  * Allocate space (in TopTransactionContext too) for the
2731                                  * backend table.
2732                                  */
2733                                 if (use_mcxt == NULL)
2734                                         *betab = (PgStat_StatBeEntry *) malloc(
2735                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2736                                 else
2737                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2738                                                                                                                                 use_mcxt,
2739                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2740                                 break;
2741
2742                                 /*
2743                                  * 'B'  A PgStat_StatBeEntry follows.
2744                                  */
2745                         case 'B':
2746                                 if (betab == NULL || numbackends == NULL)
2747                                 {
2748                                         fclose(fpin);
2749                                         return;
2750                                 }
2751                                 if (*betab == NULL)
2752                                 {
2753                                         fclose(fpin);
2754                                         return;
2755                                 }
2756
2757                                 /*
2758                                  * Read it directly into the table.
2759                                  */
2760                                 if (fread(&(*betab)[havebackends], 1,
2761                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2762                                         sizeof(PgStat_StatBeEntry))
2763                                 {
2764                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2765                                                         (errmsg("corrupted pgstat.stat file")));
2766                                         fclose(fpin);
2767                                         return;
2768                                 }
2769
2770                                 /*
2771                                  * Count backends per database here.
2772                                  */
2773                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2774                                                    (void *) &((*betab)[havebackends].databaseid),
2775                                                                                                                 HASH_FIND, NULL);
2776                                 if (dbentry)
2777                                         dbentry->n_backends++;
2778
2779                                 havebackends++;
2780                                 if (numbackends != 0)
2781                                         *numbackends = havebackends;
2782                                 if (havebackends >= maxbackends)
2783                                 {
2784                                         fclose(fpin);
2785                                         return;
2786                                 }
2787                                 break;
2788
2789                                 /*
2790                                  * 'E'  The EOF marker of a complete stats file.
2791                                  */
2792                         case 'E':
2793                                 fclose(fpin);
2794                                 return;
2795
2796                         default:
2797                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2798                                                 (errmsg("corrupted pgstat.stat file")));
2799                                 fclose(fpin);
2800                                 return;
2801                 }
2802         }
2803
2804         fclose(fpin);
2805 }
2806
2807
2808 /* ----------
2809  * pgstat_recv_bestart() -
2810  *
2811  *      Process a backend starup message.
2812  * ----------
2813  */
2814 static void
2815 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2816 {
2817         pgstat_add_backend(&msg->m_hdr);
2818 }
2819
2820
2821 /* ----------
2822  * pgstat_recv_beterm() -
2823  *
2824  *      Process a backend termination message.
2825  * ----------
2826  */
2827 static void
2828 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2829 {
2830         pgstat_sub_backend(msg->m_hdr.m_procpid);
2831 }
2832
2833
2834 /* ----------
2835  * pgstat_recv_activity() -
2836  *
2837  *      Remember what the backend is doing.
2838  * ----------
2839  */
2840 static void
2841 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2842 {
2843         PgStat_StatBeEntry *entry;
2844
2845         /*
2846          * Here we check explicitly for 0 return, since we don't want to
2847          * mangle the activity of an active backend by a delayed packed from a
2848          * dead one.
2849          */
2850         if (pgstat_add_backend(&msg->m_hdr) != 0)
2851                 return;
2852
2853         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2854
2855         strncpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2856
2857         entry->activity_start_sec =
2858                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2859 }
2860
2861
2862 /* ----------
2863  * pgstat_recv_tabstat() -
2864  *
2865  *      Count what the backend has done.
2866  * ----------
2867  */
2868 static void
2869 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2870 {
2871         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2872         PgStat_StatDBEntry *dbentry;
2873         PgStat_StatTabEntry *tabentry;
2874         int                     i;
2875         bool            found;
2876
2877         /*
2878          * Make sure the backend is counted for.
2879          */
2880         if (pgstat_add_backend(&msg->m_hdr) < 0)
2881                 return;
2882
2883         /*
2884          * Lookup the database in the hashtable.
2885          */
2886         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2887                                                                          (void *) &(msg->m_hdr.m_databaseid),
2888                                                                                                  HASH_FIND, NULL);
2889         if (!dbentry)
2890                 return;
2891
2892         /*
2893          * If the database is marked for destroy, this is a delayed UDP packet
2894          * and not worth being counted.
2895          */
2896         if (dbentry->destroy > 0)
2897                 return;
2898
2899         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2900         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2901
2902         /*
2903          * Process all table entries in the message.
2904          */
2905         for (i = 0; i < msg->m_nentries; i++)
2906         {
2907                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2908                                                                                           (void *) &(tabmsg[i].t_id),
2909                                                                                                          HASH_ENTER, &found);
2910                 if (tabentry == NULL)
2911                 {
2912                         ereport(LOG,
2913                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2914                          errmsg("out of memory in statistics collector --- abort")));
2915                         exit(1);
2916                 }
2917
2918                 if (!found)
2919                 {
2920                         /*
2921                          * If it's a new table entry, initialize counters to the
2922                          * values we just got.
2923                          */
2924                         tabentry->numscans = tabmsg[i].t_numscans;
2925                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2926                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2927                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2928                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2929                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2930                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2931                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2932
2933                         tabentry->destroy = 0;
2934                 }
2935                 else
2936                 {
2937                         /*
2938                          * Otherwise add the values to the existing entry.
2939                          */
2940                         tabentry->numscans += tabmsg[i].t_numscans;
2941                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2942                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2943                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2944                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2945                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2946                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2947                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2948                 }
2949
2950                 /*
2951                  * And add the block IO to the database entry.
2952                  */
2953                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2954                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2955         }
2956 }
2957
2958
2959 /* ----------
2960  * pgstat_recv_tabpurge() -
2961  *
2962  *      Arrange for dead table removal.
2963  * ----------
2964  */
2965 static void
2966 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2967 {
2968         PgStat_StatDBEntry *dbentry;
2969         PgStat_StatTabEntry *tabentry;
2970         int                     i;
2971
2972         /*
2973          * Make sure the backend is counted for.
2974          */
2975         if (pgstat_add_backend(&msg->m_hdr) < 0)
2976                 return;
2977
2978         /*
2979          * Lookup the database in the hashtable.
2980          */
2981         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2982                                                                          (void *) &(msg->m_hdr.m_databaseid),
2983                                                                                                  HASH_FIND, NULL);
2984         if (!dbentry)
2985                 return;
2986
2987         /*
2988          * If the database is marked for destroy, this is a delayed UDP packet
2989          * and the tables will go away at DB destruction.
2990          */
2991         if (dbentry->destroy > 0)
2992                 return;
2993
2994         /*
2995          * Process all table entries in the message.
2996          */
2997         for (i = 0; i < msg->m_nentries; i++)
2998         {
2999                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3000                                                                                    (void *) &(msg->m_tableid[i]),
3001                                                                                                            HASH_FIND, NULL);
3002                 if (tabentry)
3003                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
3004         }
3005 }
3006
3007
3008 /* ----------
3009  * pgstat_recv_dropdb() -
3010  *
3011  *      Arrange for dead database removal
3012  * ----------
3013  */
3014 static void
3015 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
3016 {
3017         PgStat_StatDBEntry *dbentry;
3018
3019         /*
3020          * Make sure the backend is counted for.
3021          */
3022         if (pgstat_add_backend(&msg->m_hdr) < 0)
3023                 return;
3024
3025         /*
3026          * Lookup the database in the hashtable.
3027          */
3028         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3029                                                                                    (void *) &(msg->m_databaseid),
3030                                                                                                  HASH_FIND, NULL);
3031         if (!dbentry)
3032                 return;
3033
3034         /*
3035          * Mark the database for destruction.
3036          */
3037         dbentry->destroy = PGSTAT_DESTROY_COUNT;
3038 }
3039
3040
3041 /* ----------
3042  * pgstat_recv_dropdb() -
3043  *
3044  *      Arrange for dead database removal
3045  * ----------
3046  */
3047 static void
3048 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3049 {
3050         HASHCTL         hash_ctl;
3051         PgStat_StatDBEntry *dbentry;
3052
3053         /*
3054          * Make sure the backend is counted for.
3055          */
3056         if (pgstat_add_backend(&msg->m_hdr) < 0)
3057                 return;
3058
3059         /*
3060          * Lookup the database in the hashtable.
3061          */
3062         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3063                                                                          (void *) &(msg->m_hdr.m_databaseid),
3064                                                                                                  HASH_FIND, NULL);
3065         if (!dbentry)
3066                 return;
3067
3068         /*
3069          * We simply throw away all the databases table entries by recreating
3070          * a new hash table for them.
3071          */
3072         if (dbentry->tables != NULL)
3073                 hash_destroy(dbentry->tables);
3074
3075         dbentry->tables = NULL;
3076         dbentry->n_xact_commit = 0;
3077         dbentry->n_xact_rollback = 0;
3078         dbentry->n_blocks_fetched = 0;
3079         dbentry->n_blocks_hit = 0;
3080         dbentry->n_connects = 0;
3081         dbentry->destroy = 0;
3082
3083         memset(&hash_ctl, 0, sizeof(hash_ctl));
3084         hash_ctl.keysize = sizeof(Oid);
3085         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3086         hash_ctl.hash = tag_hash;
3087         dbentry->tables = hash_create("Per-database table",
3088                                                                   PGSTAT_TAB_HASH_SIZE,
3089                                                                   &hash_ctl,
3090                                                                   HASH_ELEM | HASH_FUNCTION);
3091         if (dbentry->tables == NULL)
3092         {
3093                 /* assume the problem is out-of-memory */
3094                 ereport(LOG,
3095                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3096                          errmsg("out of memory in statistics collector --- abort")));
3097                 exit(1);
3098         }
3099 }