]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Add 'int' cast for getpid() because some Solaris releases return long
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2004, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.81 2004/10/14 20:23:44 momjian Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/catname.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_shadow.h"
39 #include "libpq/libpq.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/ipc.h"
46 #include "storage/pg_shmem.h"
47 #include "storage/pmsignal.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/hsearch.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54
55
56 /* ----------
57  * Paths for the statistics files. The %s is replaced with the
58  * installation's $PGDATA.
59  * ----------
60  */
61 #define PGSTAT_STAT_FILENAME    "%s/global/pgstat.stat"
62 #define PGSTAT_STAT_TMPFILE             "%s/global/pgstat.tmp.%d"
63
64 /* ----------
65  * Timer definitions.
66  * ----------
67  */
68 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status
69                                                                                  * file; in milliseconds. */
70
71 #define PGSTAT_DESTROY_DELAY    10000   /* How long to keep destroyed
72                                                                                  * objects known, to give delayed
73                                                                                  * UDP packets time to arrive; in
74                                                                                  * milliseconds. */
75
76 #define PGSTAT_DESTROY_COUNT    (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
77
78 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart
79                                                                                  * a failed statistics collector;
80                                                                                  * in seconds. */
81
82 /* ----------
83  * Amount of space reserved in pgstat_recvbuffer().
84  * ----------
85  */
86 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
87
88 /* ----------
89  * The initial size hints for the hash tables used in the collector.
90  * ----------
91  */
92 #define PGSTAT_DB_HASH_SIZE             16
93 #define PGSTAT_BE_HASH_SIZE             512
94 #define PGSTAT_TAB_HASH_SIZE    512
95
96
97 /* ----------
98  * GUC parameters
99  * ----------
100  */
101 bool            pgstat_collect_startcollector = true;
102 bool            pgstat_collect_resetonpmstart = true;
103 bool            pgstat_collect_querystring = false;
104 bool            pgstat_collect_tuplelevel = false;
105 bool            pgstat_collect_blocklevel = false;
106
107 /* ----------
108  * Local data
109  * ----------
110  */
111 NON_EXEC_STATIC int pgStatSock = -1;
112 static int      pgStatPipe[2];
113 static struct sockaddr_storage pgStatAddr;
114
115 static time_t last_pgstat_start_time;
116
117 static long pgStatNumMessages = 0;
118
119 static bool pgStatRunningInCollector = FALSE;
120
121 static int      pgStatTabstatAlloc = 0;
122 static int      pgStatTabstatUsed = 0;
123 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
124
125 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
126
127 static int      pgStatXactCommit = 0;
128 static int      pgStatXactRollback = 0;
129
130 static TransactionId pgStatDBHashXact = InvalidTransactionId;
131 static HTAB *pgStatDBHash = NULL;
132 static HTAB *pgStatBeDead = NULL;
133 static PgStat_StatBeEntry *pgStatBeTable = NULL;
134 static int      pgStatNumBackends = 0;
135
136 static char pgStat_fname[MAXPGPATH];
137 static char pgStat_tmpfname[MAXPGPATH];
138
139
140 /* ----------
141  * Local function forward declarations
142  * ----------
143  */
144 #ifdef EXEC_BACKEND
145
146 typedef enum STATS_PROCESS_TYPE
147 {
148         STAT_PROC_BUFFER,
149         STAT_PROC_COLLECTOR
150 }       STATS_PROCESS_TYPE;
151
152 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
153 static void pgstat_parseArgs(int argc, char *argv[]);
154 #endif
155
156 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
157 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
158 static void pgstat_recvbuffer(void);
159 static void pgstat_exit(SIGNAL_ARGS);
160 static void pgstat_die(SIGNAL_ARGS);
161
162 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
163 static void pgstat_sub_backend(int procpid);
164 static void pgstat_drop_database(Oid databaseid);
165 static void pgstat_write_statsfile(void);
166 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
167                                           PgStat_StatBeEntry **betab,
168                                           int *numbackends);
169 static void backend_read_statsfile(void);
170
171 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
172 static void pgstat_send(void *msg, int len);
173
174 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
175 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
176 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
177 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
178 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
179 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
180 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
181
182
183 /* ------------------------------------------------------------
184  * Public functions called from postmaster follow
185  * ------------------------------------------------------------
186  */
187
188 /* ----------
189  * pgstat_init() -
190  *
191  *      Called from postmaster at startup. Create the resources required
192  *      by the statistics collector process.  If unable to do so, do not
193  *      fail --- better to let the postmaster start with stats collection
194  *      disabled.
195  * ----------
196  */
197 void
198 pgstat_init(void)
199 {
200         ACCEPT_TYPE_ARG3 alen;
201         struct addrinfo *addrs = NULL,
202                            *addr,
203                                 hints;
204         int                     ret;
205         fd_set          rset;
206         struct timeval tv;
207         char            test_byte;
208         int                     sel_res;
209
210 #define TESTBYTEVAL ((char) 199)
211
212         /*
213          * Force start of collector daemon if something to collect
214          */
215         if (pgstat_collect_querystring ||
216                 pgstat_collect_tuplelevel ||
217                 pgstat_collect_blocklevel)
218                 pgstat_collect_startcollector = true;
219
220         /*
221          * Initialize the filename for the status reports.      (In the
222          * EXEC_BACKEND case, this only sets the value in the postmaster.  The
223          * collector subprocess will recompute the value for itself, and
224          * individual backends must do so also if they want to access the
225          * file.)
226          */
227         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
228
229         /*
230          * If we don't have to start a collector or should reset the collected
231          * statistics on postmaster start, simply remove the file.
232          */
233         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
234                 unlink(pgStat_fname);
235
236         /*
237          * Nothing else required if collector will not get started
238          */
239         if (!pgstat_collect_startcollector)
240                 return;
241
242         /*
243          * Create the UDP socket for sending and receiving statistic messages
244          */
245         hints.ai_flags = AI_PASSIVE;
246         hints.ai_family = PF_UNSPEC;
247         hints.ai_socktype = SOCK_DGRAM;
248         hints.ai_protocol = 0;
249         hints.ai_addrlen = 0;
250         hints.ai_addr = NULL;
251         hints.ai_canonname = NULL;
252         hints.ai_next = NULL;
253         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
254         if (ret || !addrs)
255         {
256                 ereport(LOG,
257                                 (errmsg("could not resolve \"localhost\": %s",
258                                                 gai_strerror(ret))));
259                 goto startup_failed;
260         }
261
262         /*
263          * On some platforms, getaddrinfo_all() may return multiple addresses
264          * only one of which will actually work (eg, both IPv6 and IPv4
265          * addresses when kernel will reject IPv6).  Worse, the failure may
266          * occur at the bind() or perhaps even connect() stage.  So we must
267          * loop through the results till we find a working combination.  We
268          * will generate LOG messages, but no error, for bogus combinations.
269          */
270         for (addr = addrs; addr; addr = addr->ai_next)
271         {
272 #ifdef HAVE_UNIX_SOCKETS
273                 /* Ignore AF_UNIX sockets, if any are returned. */
274                 if (addr->ai_family == AF_UNIX)
275                         continue;
276 #endif
277
278                 /*
279                  * Create the socket.
280                  */
281                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
282                 {
283                         ereport(LOG,
284                                         (errcode_for_socket_access(),
285                                          errmsg("could not create socket for statistics collector: %m")));
286                         continue;
287                 }
288
289                 /*
290                  * Bind it to a kernel assigned port on localhost and get the
291                  * assigned port via getsockname().
292                  */
293                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
294                 {
295                         ereport(LOG,
296                                         (errcode_for_socket_access(),
297                                          errmsg("could not bind socket for statistics collector: %m")));
298                         closesocket(pgStatSock);
299                         pgStatSock = -1;
300                         continue;
301                 }
302
303                 alen = sizeof(pgStatAddr);
304                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
305                 {
306                         ereport(LOG,
307                                         (errcode_for_socket_access(),
308                                          errmsg("could not get address of socket for statistics collector: %m")));
309                         closesocket(pgStatSock);
310                         pgStatSock = -1;
311                         continue;
312                 }
313
314                 /*
315                  * Connect the socket to its own address.  This saves a few cycles
316                  * by not having to respecify the target address on every send.
317                  * This also provides a kernel-level check that only packets from
318                  * this same address will be received.
319                  */
320                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
321                 {
322                         ereport(LOG,
323                                         (errcode_for_socket_access(),
324                                          errmsg("could not connect socket for statistics collector: %m")));
325                         closesocket(pgStatSock);
326                         pgStatSock = -1;
327                         continue;
328                 }
329
330                 /*
331                  * Try to send and receive a one-byte test message on the socket.
332                  * This is to catch situations where the socket can be created but
333                  * will not actually pass data (for instance, because kernel
334                  * packet filtering rules prevent it).
335                  */
336                 test_byte = TESTBYTEVAL;
337                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
338                 {
339                         ereport(LOG,
340                                         (errcode_for_socket_access(),
341                                          errmsg("could not send test message on socket for statistics collector: %m")));
342                         closesocket(pgStatSock);
343                         pgStatSock = -1;
344                         continue;
345                 }
346
347                 /*
348                  * There could possibly be a little delay before the message can
349                  * be received.  We arbitrarily allow up to half a second before
350                  * deciding it's broken.
351                  */
352                 for (;;)                                /* need a loop to handle EINTR */
353                 {
354                         FD_ZERO(&rset);
355                         FD_SET(pgStatSock, &rset);
356                         tv.tv_sec = 0;
357                         tv.tv_usec = 500000;
358                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
359                         if (sel_res >= 0 || errno != EINTR)
360                                 break;
361                 }
362                 if (sel_res < 0)
363                 {
364                         ereport(LOG,
365                                         (errcode_for_socket_access(),
366                                  errmsg("select() failed in statistics collector: %m")));
367                         closesocket(pgStatSock);
368                         pgStatSock = -1;
369                         continue;
370                 }
371                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
372                 {
373                         /*
374                          * This is the case we actually think is likely, so take pains
375                          * to give a specific message for it.
376                          *
377                          * errno will not be set meaningfully here, so don't use it.
378                          */
379                         ereport(LOG,
380                                         (ERRCODE_CONNECTION_FAILURE,
381                                          errmsg("test message did not get through on socket for statistics collector")));
382                         closesocket(pgStatSock);
383                         pgStatSock = -1;
384                         continue;
385                 }
386
387                 test_byte++;                    /* just make sure variable is changed */
388
389                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
390                 {
391                         ereport(LOG,
392                                         (errcode_for_socket_access(),
393                                          errmsg("could not receive test message on socket for statistics collector: %m")));
394                         closesocket(pgStatSock);
395                         pgStatSock = -1;
396                         continue;
397                 }
398
399                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
400                 {
401                         ereport(LOG,
402                                         (ERRCODE_INTERNAL_ERROR,
403                                          errmsg("incorrect test message transmission on socket for statistics collector")));
404                         closesocket(pgStatSock);
405                         pgStatSock = -1;
406                         continue;
407                 }
408
409                 /* If we get here, we have a working socket */
410                 break;
411         }
412
413         /* Did we find a working address? */
414         if (!addr || pgStatSock < 0)
415         {
416                 ereport(LOG,
417                                 (errcode_for_socket_access(),
418                                  errmsg("disabling statistics collector for lack of working socket")));
419                 goto startup_failed;
420         }
421
422         /*
423          * Set the socket to non-blocking IO.  This ensures that if the
424          * collector falls behind (despite the buffering process), statistics
425          * messages will be discarded; backends won't block waiting to send
426          * messages to the collector.
427          */
428         if (!set_noblock(pgStatSock))
429         {
430                 ereport(LOG,
431                                 (errcode_for_socket_access(),
432                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
433                 goto startup_failed;
434         }
435
436         freeaddrinfo_all(hints.ai_family, addrs);
437
438         return;
439
440 startup_failed:
441         if (addrs)
442                 freeaddrinfo_all(hints.ai_family, addrs);
443
444         if (pgStatSock >= 0)
445                 closesocket(pgStatSock);
446         pgStatSock = -1;
447
448         /* Adjust GUC variables to suppress useless activity */
449         pgstat_collect_startcollector = false;
450         pgstat_collect_querystring = false;
451         pgstat_collect_tuplelevel = false;
452         pgstat_collect_blocklevel = false;
453 }
454
455
456 #ifdef EXEC_BACKEND
457
458 /*
459  * pgstat_forkexec() -
460  *
461  * Format up the arglist for, then fork and exec, statistics
462  * (buffer and collector) processes
463  */
464 static pid_t
465 pgstat_forkexec(STATS_PROCESS_TYPE procType)
466 {
467         char       *av[10];
468         int                     ac = 0,
469                                 bufc = 0,
470                                 i;
471         char            pgstatBuf[2][32];
472
473         av[ac++] = "postgres";
474
475         switch (procType)
476         {
477                 case STAT_PROC_BUFFER:
478                         av[ac++] = "-forkbuf";
479                         break;
480
481                 case STAT_PROC_COLLECTOR:
482                         av[ac++] = "-forkcol";
483                         break;
484
485                 default:
486                         Assert(false);
487         }
488
489         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
490
491         /* postgres_exec_path is not passed by write_backend_variables */
492         av[ac++] = postgres_exec_path;
493
494         /* Pipe file ids (those not passed by write_backend_variables) */
495         snprintf(pgstatBuf[bufc++], 32, "%d", pgStatPipe[0]);
496         snprintf(pgstatBuf[bufc++], 32, "%d", pgStatPipe[1]);
497
498         /* Add to the arg list */
499         Assert(bufc <= lengthof(pgstatBuf));
500         for (i = 0; i < bufc; i++)
501                 av[ac++] = pgstatBuf[i];
502
503         av[ac] = NULL;
504         Assert(ac < lengthof(av));
505
506         return postmaster_forkexec(ac, av);
507 }
508
509
510 /*
511  * pgstat_parseArgs() -
512  *
513  * Extract data from the arglist for exec'ed statistics
514  * (buffer and collector) processes
515  */
516 static void
517 pgstat_parseArgs(int argc, char *argv[])
518 {
519         Assert(argc == 6);
520
521         argc = 3;
522         StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
523         pgStatPipe[0] = atoi(argv[argc++]);
524         pgStatPipe[1] = atoi(argv[argc++]);
525 }
526 #endif   /* EXEC_BACKEND */
527
528
529 /* ----------
530  * pgstat_start() -
531  *
532  *      Called from postmaster at startup or after an existing collector
533  *      died.  Attempt to fire up a fresh statistics collector.
534  *
535  *      Returns PID of child process, or 0 if fail.
536  *
537  *      Note: if fail, we will be called again from the postmaster main loop.
538  * ----------
539  */
540 int
541 pgstat_start(void)
542 {
543         time_t          curtime;
544         pid_t           pgStatPid;
545
546         /*
547          * Do nothing if no collector needed
548          */
549         if (!pgstat_collect_startcollector)
550                 return 0;
551
552         /*
553          * Do nothing if too soon since last collector start.  This is a
554          * safety valve to protect against continuous respawn attempts if the
555          * collector is dying immediately at launch.  Note that since we will
556          * be re-called from the postmaster main loop, we will get another
557          * chance later.
558          */
559         curtime = time(NULL);
560         if ((unsigned int) (curtime - last_pgstat_start_time) <
561                 (unsigned int) PGSTAT_RESTART_INTERVAL)
562                 return 0;
563         last_pgstat_start_time = curtime;
564
565         /*
566          * Check that the socket is there, else pgstat_init failed.
567          */
568         if (pgStatSock < 0)
569         {
570                 ereport(LOG,
571                                 (errmsg("statistics collector startup skipped")));
572
573                 /*
574                  * We can only get here if someone tries to manually turn
575                  * pgstat_collect_startcollector on after it had been off.
576                  */
577                 pgstat_collect_startcollector = false;
578                 return 0;
579         }
580
581         /*
582          * Okay, fork off the collector.
583          */
584
585         fflush(stdout);
586         fflush(stderr);
587
588 #ifdef __BEOS__
589         /* Specific beos actions before backend startup */
590         beos_before_backend_startup();
591 #endif
592
593 #ifdef EXEC_BACKEND
594         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
595 #else
596         switch ((pgStatPid = fork()))
597 #endif
598         {
599                 case -1:
600 #ifdef __BEOS__
601                         /* Specific beos actions */
602                         beos_backend_startup_failed();
603 #endif
604                         ereport(LOG,
605                                         (errmsg("could not fork statistics buffer: %m")));
606                         return 0;
607
608 #ifndef EXEC_BACKEND
609                 case 0:
610                         /* in postmaster child ... */
611 #ifdef __BEOS__
612                         /* Specific beos actions after backend startup */
613                         beos_backend_startup();
614 #endif
615                         /* Close the postmaster's sockets */
616                         ClosePostmasterPorts(false);
617
618                         /* Drop our connection to postmaster's shared memory, as well */
619                         PGSharedMemoryDetach();
620
621                         PgstatBufferMain(0, NULL);
622                         break;
623 #endif
624
625                 default:
626                         return (int) pgStatPid;
627         }
628
629         /* shouldn't get here */
630         return 0;
631 }
632
633
634 /* ----------
635  * pgstat_beterm() -
636  *
637  *      Called from postmaster to tell collector a backend terminated.
638  * ----------
639  */
640 void
641 pgstat_beterm(int pid)
642 {
643         PgStat_MsgBeterm msg;
644
645         if (pgStatSock < 0)
646                 return;
647
648         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
649         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
650         msg.m_hdr.m_procpid = pid;
651
652         pgstat_send(&msg, sizeof(msg));
653 }
654
655
656 /* ------------------------------------------------------------
657  * Public functions used by backends follow
658  *------------------------------------------------------------
659  */
660
661
662 /* ----------
663  * pgstat_bestart() -
664  *
665  *      Tell the collector that this new backend is soon ready to process
666  *      queries. Called from tcop/postgres.c before entering the mainloop.
667  * ----------
668  */
669 void
670 pgstat_bestart(void)
671 {
672         PgStat_MsgBestart msg;
673
674         if (pgStatSock < 0)
675                 return;
676
677         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
678         pgstat_send(&msg, sizeof(msg));
679 }
680
681
682 /* ----------
683  * pgstat_report_activity() -
684  *
685  *      Called in tcop/postgres.c to tell the collector what the backend
686  *      is actually doing (usually "<IDLE>" or the start of the query to
687  *      be executed).
688  * ----------
689  */
690 void
691 pgstat_report_activity(const char *what)
692 {
693         PgStat_MsgActivity msg;
694         int                     len;
695
696         if (!pgstat_collect_querystring || pgStatSock < 0)
697                 return;
698
699         len = strlen(what);
700         len = pg_mbcliplen((const unsigned char *) what, len,
701                                            PGSTAT_ACTIVITY_SIZE - 1);
702
703         memcpy(msg.m_what, what, len);
704         msg.m_what[len] = '\0';
705         len += offsetof(PgStat_MsgActivity, m_what) +1;
706
707         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
708         pgstat_send(&msg, len);
709 }
710
711
712 /* ----------
713  * pgstat_report_tabstat() -
714  *
715  *      Called from tcop/postgres.c to send the so far collected
716  *      per table access statistics to the collector.
717  * ----------
718  */
719 void
720 pgstat_report_tabstat(void)
721 {
722         int                     i;
723
724         if (pgStatSock < 0 ||
725                 !(pgstat_collect_querystring ||
726                   pgstat_collect_tuplelevel ||
727                   pgstat_collect_blocklevel))
728         {
729                 /* Not reporting stats, so just flush whatever we have */
730                 pgStatTabstatUsed = 0;
731                 return;
732         }
733
734         /*
735          * For each message buffer used during the last query set the header
736          * fields and send it out.
737          */
738         for (i = 0; i < pgStatTabstatUsed; i++)
739         {
740                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
741                 int                     n;
742                 int                     len;
743
744                 n = tsmsg->m_nentries;
745                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
746                         n * sizeof(PgStat_TableEntry);
747
748                 tsmsg->m_xact_commit = pgStatXactCommit;
749                 tsmsg->m_xact_rollback = pgStatXactRollback;
750                 pgStatXactCommit = 0;
751                 pgStatXactRollback = 0;
752
753                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
754                 pgstat_send(tsmsg, len);
755         }
756
757         pgStatTabstatUsed = 0;
758 }
759
760
761 /* ----------
762  * pgstat_vacuum_tabstat() -
763  *
764  *      Will tell the collector about objects he can get rid of.
765  * ----------
766  */
767 int
768 pgstat_vacuum_tabstat(void)
769 {
770         Relation        dbrel;
771         HeapScanDesc dbscan;
772         HeapTuple       dbtup;
773         Oid                *dbidlist;
774         int                     dbidalloc;
775         int                     dbidused;
776         HASH_SEQ_STATUS hstat;
777         PgStat_StatDBEntry *dbentry;
778         PgStat_StatTabEntry *tabentry;
779         HeapTuple       reltup;
780         int                     nobjects = 0;
781         PgStat_MsgTabpurge msg;
782         int                     len;
783         int                     i;
784
785         if (pgStatSock < 0)
786                 return 0;
787
788         /*
789          * If not done for this transaction, read the statistics collector
790          * stats file into some hash tables.
791          */
792         backend_read_statsfile();
793
794         /*
795          * Lookup our own database entry
796          */
797         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
798                                                                                                  (void *) &MyDatabaseId,
799                                                                                                  HASH_FIND, NULL);
800         if (dbentry == NULL)
801                 return -1;
802
803         if (dbentry->tables == NULL)
804                 return 0;
805
806         /*
807          * Initialize our messages table counter to zero
808          */
809         msg.m_nentries = 0;
810
811         /*
812          * Check for all tables if they still exist.
813          */
814         hash_seq_init(&hstat, dbentry->tables);
815         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
816         {
817                 /*
818                  * Check if this relation is still alive by looking up it's
819                  * pg_class tuple in the system catalog cache.
820                  */
821                 reltup = SearchSysCache(RELOID,
822                                                                 ObjectIdGetDatum(tabentry->tableid),
823                                                                 0, 0, 0);
824                 if (HeapTupleIsValid(reltup))
825                 {
826                         ReleaseSysCache(reltup);
827                         continue;
828                 }
829
830                 /*
831                  * Add this tables Oid to the message
832                  */
833                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
834                 nobjects++;
835
836                 /*
837                  * If the message is full, send it out and reinitialize ot zero
838                  */
839                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
840                 {
841                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
842                                 +msg.m_nentries * sizeof(Oid);
843
844                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
845                         pgstat_send(&msg, len);
846
847                         msg.m_nentries = 0;
848                 }
849         }
850
851         /*
852          * Send the rest
853          */
854         if (msg.m_nentries > 0)
855         {
856                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
857                         +msg.m_nentries * sizeof(Oid);
858
859                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
860                 pgstat_send(&msg, len);
861         }
862
863         /*
864          * Read pg_database and remember the Oid's of all existing databases
865          */
866         dbidalloc = 256;
867         dbidused = 0;
868         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
869
870         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
871         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
872         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
873         {
874                 if (dbidused >= dbidalloc)
875                 {
876                         dbidalloc *= 2;
877                         dbidlist = (Oid *) repalloc((char *) dbidlist,
878                                                                                 sizeof(Oid) * dbidalloc);
879                 }
880                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
881         }
882         heap_endscan(dbscan);
883         heap_close(dbrel, AccessShareLock);
884
885         /*
886          * Search the database hash table for dead databases and tell the
887          * collector to drop them as well.
888          */
889         hash_seq_init(&hstat, pgStatDBHash);
890         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
891         {
892                 Oid                     dbid = dbentry->databaseid;
893
894                 for (i = 0; i < dbidused; i++)
895                 {
896                         if (dbidlist[i] == dbid)
897                         {
898                                 dbid = InvalidOid;
899                                 break;
900                         }
901                 }
902
903                 if (dbid != InvalidOid)
904                 {
905                         nobjects++;
906                         pgstat_drop_database(dbid);
907                 }
908         }
909
910         /*
911          * Free the dbid list.
912          */
913         pfree((char *) dbidlist);
914
915         /*
916          * Tell the caller how many removeable objects we found
917          */
918         return nobjects;
919 }
920
921
922 /* ----------
923  * pgstat_drop_database() -
924  *
925  *      Tell the collector that we just dropped a database.
926  *      This is the only message that shouldn't get lost in space. Otherwise
927  *      the collector will keep the statistics for the dead DB until his
928  *      stats file got removed while the postmaster is down.
929  * ----------
930  */
931 static void
932 pgstat_drop_database(Oid databaseid)
933 {
934         PgStat_MsgDropdb msg;
935
936         if (pgStatSock < 0)
937                 return;
938
939         msg.m_databaseid = databaseid;
940
941         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
942         pgstat_send(&msg, sizeof(msg));
943 }
944
945
946 /* ----------
947  * pgstat_reset_counters() -
948  *
949  *      Tell the statistics collector to reset counters for our database.
950  * ----------
951  */
952 void
953 pgstat_reset_counters(void)
954 {
955         PgStat_MsgResetcounter msg;
956
957         if (pgStatSock < 0)
958                 return;
959
960         if (!superuser())
961                 ereport(ERROR,
962                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
963                           errmsg("must be superuser to reset statistics counters")));
964
965         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
966         pgstat_send(&msg, sizeof(msg));
967 }
968
969
970 /* ----------
971  * pgstat_ping() -
972  *
973  *      Send some junk data to the collector to increase traffic.
974  * ----------
975  */
976 void
977 pgstat_ping(void)
978 {
979         PgStat_MsgDummy msg;
980
981         if (pgStatSock < 0)
982                 return;
983
984         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
985         pgstat_send(&msg, sizeof(msg));
986 }
987
988 /*
989  * Create or enlarge the pgStatTabstatMessages array
990  */
991 static bool
992 more_tabstat_space(void)
993 {
994         PgStat_MsgTabstat *newMessages;
995         PgStat_MsgTabstat **msgArray;
996         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
997         int                     i;
998
999         /* Create (another) quantum of message buffers */
1000         newMessages = (PgStat_MsgTabstat *)
1001                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1002         if (newMessages == NULL)
1003         {
1004                 ereport(LOG,
1005                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1006                                  errmsg("out of memory")));
1007                 return false;
1008         }
1009
1010         /* Create or enlarge the pointer array */
1011         if (pgStatTabstatMessages == NULL)
1012                 msgArray = (PgStat_MsgTabstat **)
1013                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1014         else
1015                 msgArray = (PgStat_MsgTabstat **)
1016                         realloc(pgStatTabstatMessages,
1017                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1018         if (msgArray == NULL)
1019         {
1020                 free(newMessages);
1021                 ereport(LOG,
1022                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1023                                  errmsg("out of memory")));
1024                 return false;
1025         }
1026
1027         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1028         for (i = 0; i < TABSTAT_QUANTUM; i++)
1029                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1030         pgStatTabstatMessages = msgArray;
1031         pgStatTabstatAlloc = newAlloc;
1032
1033         return true;
1034 }
1035
1036 /* ----------
1037  * pgstat_initstats() -
1038  *
1039  *      Called from various places usually dealing with initialization
1040  *      of Relation or Scan structures. The data placed into these
1041  *      structures from here tell where later to count for buffer reads,
1042  *      scans and tuples fetched.
1043  * ----------
1044  */
1045 void
1046 pgstat_initstats(PgStat_Info *stats, Relation rel)
1047 {
1048         Oid                     rel_id = rel->rd_id;
1049         PgStat_TableEntry *useent;
1050         PgStat_MsgTabstat *tsmsg;
1051         int                     mb;
1052         int                     i;
1053
1054         /*
1055          * Initialize data not to count at all.
1056          */
1057         stats->tabentry = NULL;
1058         stats->no_stats = FALSE;
1059         stats->heap_scan_counted = FALSE;
1060         stats->index_scan_counted = FALSE;
1061
1062         if (pgStatSock < 0 ||
1063                 !(pgstat_collect_tuplelevel ||
1064                   pgstat_collect_blocklevel))
1065         {
1066                 stats->no_stats = TRUE;
1067                 return;
1068         }
1069
1070         /*
1071          * Search the already-used message slots for this relation.
1072          */
1073         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1074         {
1075                 tsmsg = pgStatTabstatMessages[mb];
1076
1077                 for (i = tsmsg->m_nentries; --i >= 0;)
1078                 {
1079                         if (tsmsg->m_entry[i].t_id == rel_id)
1080                         {
1081                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1082                                 return;
1083                         }
1084                 }
1085
1086                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1087                         continue;
1088
1089                 /*
1090                  * Not found, but found a message buffer with an empty slot
1091                  * instead. Fine, let's use this one.
1092                  */
1093                 i = tsmsg->m_nentries++;
1094                 useent = &tsmsg->m_entry[i];
1095                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1096                 useent->t_id = rel_id;
1097                 stats->tabentry = (void *) useent;
1098                 return;
1099         }
1100
1101         /*
1102          * If we ran out of message buffers, we just allocate more.
1103          */
1104         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1105         {
1106                 if (!more_tabstat_space())
1107                 {
1108                         stats->no_stats = TRUE;
1109                         return;
1110                 }
1111                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1112         }
1113
1114         /*
1115          * Use the first entry of the next message buffer.
1116          */
1117         mb = pgStatTabstatUsed++;
1118         tsmsg = pgStatTabstatMessages[mb];
1119         tsmsg->m_nentries = 1;
1120         useent = &tsmsg->m_entry[0];
1121         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1122         useent->t_id = rel_id;
1123         stats->tabentry = (void *) useent;
1124 }
1125
1126
1127 /* ----------
1128  * pgstat_count_xact_commit() -
1129  *
1130  *      Called from access/transam/xact.c to count transaction commits.
1131  * ----------
1132  */
1133 void
1134 pgstat_count_xact_commit(void)
1135 {
1136         if (!(pgstat_collect_querystring ||
1137                   pgstat_collect_tuplelevel ||
1138                   pgstat_collect_blocklevel))
1139                 return;
1140
1141         pgStatXactCommit++;
1142
1143         /*
1144          * If there was no relation activity yet, just make one existing
1145          * message buffer used without slots, causing the next report to tell
1146          * new xact-counters.
1147          */
1148         if (pgStatTabstatAlloc == 0)
1149         {
1150                 if (!more_tabstat_space())
1151                         return;
1152         }
1153         if (pgStatTabstatUsed == 0)
1154         {
1155                 pgStatTabstatUsed++;
1156                 pgStatTabstatMessages[0]->m_nentries = 0;
1157         }
1158 }
1159
1160
1161 /* ----------
1162  * pgstat_count_xact_rollback() -
1163  *
1164  *      Called from access/transam/xact.c to count transaction rollbacks.
1165  * ----------
1166  */
1167 void
1168 pgstat_count_xact_rollback(void)
1169 {
1170         if (!(pgstat_collect_querystring ||
1171                   pgstat_collect_tuplelevel ||
1172                   pgstat_collect_blocklevel))
1173                 return;
1174
1175         pgStatXactRollback++;
1176
1177         /*
1178          * If there was no relation activity yet, just make one existing
1179          * message buffer used without slots, causing the next report to tell
1180          * new xact-counters.
1181          */
1182         if (pgStatTabstatAlloc == 0)
1183         {
1184                 if (!more_tabstat_space())
1185                         return;
1186         }
1187         if (pgStatTabstatUsed == 0)
1188         {
1189                 pgStatTabstatUsed++;
1190                 pgStatTabstatMessages[0]->m_nentries = 0;
1191         }
1192 }
1193
1194
1195 /* ----------
1196  * pgstat_fetch_stat_dbentry() -
1197  *
1198  *      Support function for the SQL-callable pgstat* functions. Returns
1199  *      the collected statistics for one database or NULL. NULL doesn't mean
1200  *      that the database doesn't exist, it is just not yet known by the
1201  *      collector, so the caller is better off to report ZERO instead.
1202  * ----------
1203  */
1204 PgStat_StatDBEntry *
1205 pgstat_fetch_stat_dbentry(Oid dbid)
1206 {
1207         PgStat_StatDBEntry *dbentry;
1208
1209         /*
1210          * If not done for this transaction, read the statistics collector
1211          * stats file into some hash tables.
1212          */
1213         backend_read_statsfile();
1214
1215         /*
1216          * Lookup the requested database
1217          */
1218         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1219                                                                                                  (void *) &dbid,
1220                                                                                                  HASH_FIND, NULL);
1221         if (dbentry == NULL)
1222                 return NULL;
1223
1224         return dbentry;
1225 }
1226
1227
1228 /* ----------
1229  * pgstat_fetch_stat_tabentry() -
1230  *
1231  *      Support function for the SQL-callable pgstat* functions. Returns
1232  *      the collected statistics for one table or NULL. NULL doesn't mean
1233  *      that the table doesn't exist, it is just not yet known by the
1234  *      collector, so the caller is better off to report ZERO instead.
1235  * ----------
1236  */
1237 PgStat_StatTabEntry *
1238 pgstat_fetch_stat_tabentry(Oid relid)
1239 {
1240         PgStat_StatDBEntry *dbentry;
1241         PgStat_StatTabEntry *tabentry;
1242
1243         /*
1244          * If not done for this transaction, read the statistics collector
1245          * stats file into some hash tables.
1246          */
1247         backend_read_statsfile();
1248
1249         /*
1250          * Lookup our database.
1251          */
1252         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1253                                                                                                  (void *) &MyDatabaseId,
1254                                                                                                  HASH_FIND, NULL);
1255         if (dbentry == NULL)
1256                 return NULL;
1257
1258         /*
1259          * Now inside the DB's table hash table lookup the requested one.
1260          */
1261         if (dbentry->tables == NULL)
1262                 return NULL;
1263         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1264                                                                                                    (void *) &relid,
1265                                                                                                    HASH_FIND, NULL);
1266         if (tabentry == NULL)
1267                 return NULL;
1268
1269         return tabentry;
1270 }
1271
1272
1273 /* ----------
1274  * pgstat_fetch_stat_beentry() -
1275  *
1276  *      Support function for the SQL-callable pgstat* functions. Returns
1277  *      the actual activity slot of one active backend. The caller is
1278  *      responsible for a check if the actual user is permitted to see
1279  *      that info (especially the querystring).
1280  * ----------
1281  */
1282 PgStat_StatBeEntry *
1283 pgstat_fetch_stat_beentry(int beid)
1284 {
1285         backend_read_statsfile();
1286
1287         if (beid < 1 || beid > pgStatNumBackends)
1288                 return NULL;
1289
1290         return &pgStatBeTable[beid - 1];
1291 }
1292
1293
1294 /* ----------
1295  * pgstat_fetch_stat_numbackends() -
1296  *
1297  *      Support function for the SQL-callable pgstat* functions. Returns
1298  *      the maximum current backend id.
1299  * ----------
1300  */
1301 int
1302 pgstat_fetch_stat_numbackends(void)
1303 {
1304         backend_read_statsfile();
1305
1306         return pgStatNumBackends;
1307 }
1308
1309
1310
1311 /* ------------------------------------------------------------
1312  * Local support functions follow
1313  * ------------------------------------------------------------
1314  */
1315
1316
1317 /* ----------
1318  * pgstat_setheader() -
1319  *
1320  *              Set common header fields in a statistics message
1321  * ----------
1322  */
1323 static void
1324 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1325 {
1326         hdr->m_type = mtype;
1327         hdr->m_backendid = MyBackendId;
1328         hdr->m_procpid = MyProcPid;
1329         hdr->m_databaseid = MyDatabaseId;
1330         hdr->m_userid = GetSessionUserId();
1331 }
1332
1333
1334 /* ----------
1335  * pgstat_send() -
1336  *
1337  *              Send out one statistics message to the collector
1338  * ----------
1339  */
1340 static void
1341 pgstat_send(void *msg, int len)
1342 {
1343         if (pgStatSock < 0)
1344                 return;
1345
1346         ((PgStat_MsgHdr *) msg)->m_size = len;
1347
1348         send(pgStatSock, msg, len, 0);
1349         /* We deliberately ignore any error from send() */
1350 }
1351
1352
1353 /* ----------
1354  * PgstatBufferMain() -
1355  *
1356  *      Start up the statistics buffer process.  This is the body of the
1357  *      postmaster child process.
1358  *
1359  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1360  * ----------
1361  */
1362 NON_EXEC_STATIC void
1363 PgstatBufferMain(int argc, char *argv[])
1364 {
1365         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1366
1367         MyProcPid = getpid();           /* reset MyProcPid */
1368
1369         /* Lose the postmaster's on-exit routines */
1370         on_exit_reset();
1371
1372         /*
1373          * Ignore all signals usually bound to some action in the postmaster,
1374          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1375          */
1376         pqsignal(SIGHUP, SIG_IGN);
1377         pqsignal(SIGINT, SIG_IGN);
1378         pqsignal(SIGTERM, SIG_IGN);
1379         pqsignal(SIGQUIT, pgstat_exit);
1380         pqsignal(SIGALRM, SIG_IGN);
1381         pqsignal(SIGPIPE, SIG_IGN);
1382         pqsignal(SIGUSR1, SIG_IGN);
1383         pqsignal(SIGUSR2, SIG_IGN);
1384         pqsignal(SIGCHLD, pgstat_die);
1385         pqsignal(SIGTTIN, SIG_DFL);
1386         pqsignal(SIGTTOU, SIG_DFL);
1387         pqsignal(SIGCONT, SIG_DFL);
1388         pqsignal(SIGWINCH, SIG_DFL);
1389         /* unblock will happen in pgstat_recvbuffer */
1390
1391 #ifdef EXEC_BACKEND
1392         pgstat_parseArgs(argc, argv);
1393 #endif
1394
1395         /*
1396          * Start a buffering process to read from the socket, so we have a
1397          * little more time to process incoming messages.
1398          *
1399          * NOTE: the process structure is: postmaster is parent of buffer process
1400          * is parent of collector process.      This way, the buffer can detect
1401          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1402          * collector failure until it tried to write on the pipe.  That would
1403          * mean that after the postmaster started a new collector, we'd have
1404          * two buffer processes competing to read from the UDP socket --- not
1405          * good.
1406          */
1407         if (pgpipe(pgStatPipe) < 0)
1408         {
1409                 ereport(LOG,
1410                                 (errcode_for_socket_access(),
1411                          errmsg("could not create pipe for statistics buffer: %m")));
1412                 exit(1);
1413         }
1414
1415 #ifdef EXEC_BACKEND
1416         /* child becomes collector process */
1417         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1418 #else
1419         switch (fork())
1420 #endif
1421         {
1422                 case -1:
1423                         ereport(LOG,
1424                                         (errmsg("could not fork statistics collector: %m")));
1425                         exit(1);
1426
1427 #ifndef EXEC_BACKEND
1428                 case 0:
1429                         /* child becomes collector process */
1430                         PgstatCollectorMain(0, NULL);
1431                         break;
1432 #endif
1433
1434                 default:
1435                         /* parent becomes buffer process */
1436                         closesocket(pgStatPipe[0]);
1437                         pgstat_recvbuffer();
1438         }
1439         exit(0);
1440 }
1441
1442
1443 /* ----------
1444  * PgstatCollectorMain() -
1445  *
1446  *      Start up the statistics collector itself.  This is the body of the
1447  *      postmaster grandchild process.
1448  *
1449  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1450  * ----------
1451  */
1452 NON_EXEC_STATIC void
1453 PgstatCollectorMain(int argc, char *argv[])
1454 {
1455         PgStat_Msg      msg;
1456         fd_set          rfds;
1457         int                     readPipe;
1458         int                     nready;
1459         int                     len = 0;
1460         struct timeval timeout;
1461         struct timeval next_statwrite;
1462         bool            need_statwrite;
1463         HASHCTL         hash_ctl;
1464
1465         MyProcPid = getpid();           /* reset MyProcPid */
1466
1467         /*
1468          * Reset signal handling.  With the exception of restoring default
1469          * SIGCHLD and SIGQUIT handling, this is a no-op in the
1470          * non-EXEC_BACKEND case because we'll have inherited these settings
1471          * from the buffer process; but it's not a no-op for EXEC_BACKEND.
1472          */
1473         pqsignal(SIGHUP, SIG_IGN);
1474         pqsignal(SIGINT, SIG_IGN);
1475         pqsignal(SIGTERM, SIG_IGN);
1476         pqsignal(SIGQUIT, SIG_IGN);
1477         pqsignal(SIGALRM, SIG_IGN);
1478         pqsignal(SIGPIPE, SIG_IGN);
1479         pqsignal(SIGUSR1, SIG_IGN);
1480         pqsignal(SIGUSR2, SIG_IGN);
1481         pqsignal(SIGCHLD, SIG_DFL);
1482         pqsignal(SIGTTIN, SIG_DFL);
1483         pqsignal(SIGTTOU, SIG_DFL);
1484         pqsignal(SIGCONT, SIG_DFL);
1485         pqsignal(SIGWINCH, SIG_DFL);
1486         PG_SETMASK(&UnBlockSig);
1487
1488 #ifdef EXEC_BACKEND
1489         pgstat_parseArgs(argc, argv);
1490 #endif
1491
1492         /* Close unwanted files */
1493         closesocket(pgStatPipe[1]);
1494         closesocket(pgStatSock);
1495
1496         /*
1497          * Identify myself via ps
1498          */
1499         init_ps_display("stats collector process", "", "");
1500         set_ps_display("");
1501
1502         /*
1503          * Initialize filenames needed for status reports.
1504          */
1505         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1506         /* tmpfname need only be set correctly in this process */
1507         snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1508                          DataDir, (int)getpid());
1509
1510         /*
1511          * Arrange to write the initial status file right away
1512          */
1513         gettimeofday(&next_statwrite, NULL);
1514         need_statwrite = TRUE;
1515
1516         /*
1517          * Read in an existing statistics stats file or initialize the stats
1518          * to zero.
1519          */
1520         pgStatRunningInCollector = TRUE;
1521         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1522
1523         /*
1524          * Create the dead backend hashtable
1525          */
1526         memset(&hash_ctl, 0, sizeof(hash_ctl));
1527         hash_ctl.keysize = sizeof(int);
1528         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1529         hash_ctl.hash = tag_hash;
1530         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1531                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1532         if (pgStatBeDead == NULL)
1533         {
1534                 /* assume the problem is out-of-memory */
1535                 ereport(LOG,
1536                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1537                          errmsg("out of memory in statistics collector --- abort")));
1538                 exit(1);
1539         }
1540
1541         /*
1542          * Create the known backends table
1543          */
1544         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1545                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1546         if (pgStatBeTable == NULL)
1547         {
1548                 ereport(LOG,
1549                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1550                          errmsg("out of memory in statistics collector --- abort")));
1551                 exit(1);
1552         }
1553         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1554
1555         readPipe = pgStatPipe[0];
1556
1557         /*
1558          * Process incoming messages and handle all the reporting stuff until
1559          * there are no more messages.
1560          */
1561         for (;;)
1562         {
1563                 /*
1564                  * If we need to write the status file again (there have been
1565                  * changes in the statistics since we wrote it last) calculate the
1566                  * timeout until we have to do so.
1567                  */
1568                 if (need_statwrite)
1569                 {
1570                         struct timeval now;
1571
1572                         gettimeofday(&now, NULL);
1573                         /* avoid assuming that tv_sec is signed */
1574                         if (now.tv_sec > next_statwrite.tv_sec ||
1575                                 (now.tv_sec == next_statwrite.tv_sec &&
1576                                  now.tv_usec >= next_statwrite.tv_usec))
1577                         {
1578                                 timeout.tv_sec = 0;
1579                                 timeout.tv_usec = 0;
1580                         }
1581                         else
1582                         {
1583                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1584                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1585                                 if (timeout.tv_usec < 0)
1586                                 {
1587                                         timeout.tv_sec--;
1588                                         timeout.tv_usec += 1000000;
1589                                 }
1590                         }
1591                 }
1592
1593                 /*
1594                  * Setup the descriptor set for select(2)
1595                  */
1596                 FD_ZERO(&rfds);
1597                 FD_SET(readPipe, &rfds);
1598
1599                 /*
1600                  * Now wait for something to do.
1601                  */
1602                 nready = select(readPipe + 1, &rfds, NULL, NULL,
1603                                                 (need_statwrite) ? &timeout : NULL);
1604                 if (nready < 0)
1605                 {
1606                         if (errno == EINTR)
1607                                 continue;
1608                         ereport(LOG,
1609                                         (errcode_for_socket_access(),
1610                                  errmsg("select() failed in statistics collector: %m")));
1611                         exit(1);
1612                 }
1613
1614                 /*
1615                  * If there are no descriptors ready, our timeout for writing the
1616                  * stats file happened.
1617                  */
1618                 if (nready == 0)
1619                 {
1620                         pgstat_write_statsfile();
1621                         need_statwrite = FALSE;
1622
1623                         continue;
1624                 }
1625
1626                 /*
1627                  * Check if there is a new statistics message to collect.
1628                  */
1629                 if (FD_ISSET(readPipe, &rfds))
1630                 {
1631                         /*
1632                          * We may need to issue multiple read calls in case the buffer
1633                          * process didn't write the message in a single write, which
1634                          * is possible since it dumps its buffer bytewise. In any
1635                          * case, we'd need two reads since we don't know the message
1636                          * length initially.
1637                          */
1638                         int                     nread = 0;
1639                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1640                         bool            pipeEOF = false;
1641
1642                         while (nread < targetlen)
1643                         {
1644                                 len = piperead(readPipe, ((char *) &msg) + nread,
1645                                                            targetlen - nread);
1646                                 if (len < 0)
1647                                 {
1648                                         if (errno == EINTR)
1649                                                 continue;
1650                                         ereport(LOG,
1651                                                         (errcode_for_socket_access(),
1652                                                          errmsg("could not read from statistics collector pipe: %m")));
1653                                         exit(1);
1654                                 }
1655                                 if (len == 0)   /* EOF on the pipe! */
1656                                 {
1657                                         pipeEOF = true;
1658                                         break;
1659                                 }
1660                                 nread += len;
1661                                 if (nread == sizeof(PgStat_MsgHdr))
1662                                 {
1663                                         /* we have the header, compute actual msg length */
1664                                         targetlen = msg.msg_hdr.m_size;
1665                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1666                                                 targetlen > (int) sizeof(msg))
1667                                         {
1668                                                 /*
1669                                                  * Bogus message length implies that we got out of
1670                                                  * sync with the buffer process somehow. Abort so
1671                                                  * that we can restart both processes.
1672                                                  */
1673                                                 ereport(LOG,
1674                                                   (errmsg("invalid statistics message length")));
1675                                                 exit(1);
1676                                         }
1677                                 }
1678                         }
1679
1680                         /*
1681                          * EOF on the pipe implies that the buffer process exited.
1682                          * Fall out of outer loop.
1683                          */
1684                         if (pipeEOF)
1685                                 break;
1686
1687                         /*
1688                          * Distribute the message to the specific function handling
1689                          * it.
1690                          */
1691                         switch (msg.msg_hdr.m_type)
1692                         {
1693                                 case PGSTAT_MTYPE_DUMMY:
1694                                         break;
1695
1696                                 case PGSTAT_MTYPE_BESTART:
1697                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1698                                         break;
1699
1700                                 case PGSTAT_MTYPE_BETERM:
1701                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1702                                         break;
1703
1704                                 case PGSTAT_MTYPE_TABSTAT:
1705                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1706                                         break;
1707
1708                                 case PGSTAT_MTYPE_TABPURGE:
1709                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1710                                         break;
1711
1712                                 case PGSTAT_MTYPE_ACTIVITY:
1713                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1714                                         break;
1715
1716                                 case PGSTAT_MTYPE_DROPDB:
1717                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1718                                         break;
1719
1720                                 case PGSTAT_MTYPE_RESETCOUNTER:
1721                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1722                                                                                          nread);
1723                                         break;
1724
1725                                 default:
1726                                         break;
1727                         }
1728
1729                         /*
1730                          * Globally count messages.
1731                          */
1732                         pgStatNumMessages++;
1733
1734                         /*
1735                          * If this is the first message after we wrote the stats file
1736                          * the last time, setup the timeout that it'd be written.
1737                          */
1738                         if (!need_statwrite)
1739                         {
1740                                 gettimeofday(&next_statwrite, NULL);
1741                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1742                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1743                                 next_statwrite.tv_usec %= 1000000;
1744                                 need_statwrite = TRUE;
1745                         }
1746                 }
1747
1748                 /*
1749                  * Note that we do NOT check for postmaster exit inside the loop;
1750                  * only EOF on the buffer pipe causes us to fall out.  This
1751                  * ensures we don't exit prematurely if there are still a few
1752                  * messages in the buffer or pipe at postmaster shutdown.
1753                  */
1754         }
1755
1756         /*
1757          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1758          * to process.  If the buffer process quit because of postmaster
1759          * shutdown, we want to save the final stats to reuse at next startup.
1760          * But if the buffer process failed, it seems best not to (there may
1761          * even now be a new collector firing up, and we don't want it to read
1762          * a partially-rewritten stats file).
1763          */
1764         if (!PostmasterIsAlive(false))
1765                 pgstat_write_statsfile();
1766 }
1767
1768
1769 /* ----------
1770  * pgstat_recvbuffer() -
1771  *
1772  *      This is the body of the separate buffering process. Its only
1773  *      purpose is to receive messages from the UDP socket as fast as
1774  *      possible and forward them over a pipe into the collector itself.
1775  *      If the collector is slow to absorb messages, they are buffered here.
1776  * ----------
1777  */
1778 static void
1779 pgstat_recvbuffer(void)
1780 {
1781         fd_set          rfds;
1782         fd_set          wfds;
1783         struct timeval timeout;
1784         int                     writePipe = pgStatPipe[1];
1785         int                     maxfd;
1786         int                     nready;
1787         int                     len;
1788         int                     xfr;
1789         int                     frm;
1790         PgStat_Msg      input_buffer;
1791         char       *msgbuffer;
1792         int                     msg_send = 0;   /* next send index in buffer */
1793         int                     msg_recv = 0;   /* next receive index */
1794         int                     msg_have = 0;   /* number of bytes stored */
1795         bool            overflow = false;
1796
1797         /*
1798          * Identify myself via ps
1799          */
1800         init_ps_display("stats buffer process", "", "");
1801         set_ps_display("");
1802
1803         /*
1804          * We want to die if our child collector process does.  There are two
1805          * ways we might notice that it has died: receive SIGCHLD, or get a
1806          * write failure on the pipe leading to the child.      We can set SIGPIPE
1807          * to kill us here.  Our SIGCHLD handler was already set up before we
1808          * forked (must do it that way, else it's a race condition).
1809          */
1810         pqsignal(SIGPIPE, SIG_DFL);
1811         PG_SETMASK(&UnBlockSig);
1812
1813         /*
1814          * Set the write pipe to nonblock mode, so that we cannot block when
1815          * the collector falls behind.
1816          */
1817         if (!set_noblock(writePipe))
1818         {
1819                 ereport(LOG,
1820                                 (errcode_for_socket_access(),
1821                                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1822                 exit(1);
1823         }
1824
1825         /*
1826          * Allocate the message buffer
1827          */
1828         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1829         if (msgbuffer == NULL)
1830         {
1831                 ereport(LOG,
1832                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1833                          errmsg("out of memory in statistics collector --- abort")));
1834                 exit(1);
1835         }
1836
1837         /*
1838          * Loop forever
1839          */
1840         for (;;)
1841         {
1842                 FD_ZERO(&rfds);
1843                 FD_ZERO(&wfds);
1844                 maxfd = -1;
1845
1846                 /*
1847                  * As long as we have buffer space we add the socket to the read
1848                  * descriptor set.
1849                  */
1850                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1851                 {
1852                         FD_SET(pgStatSock, &rfds);
1853                         maxfd = pgStatSock;
1854                         overflow = false;
1855                 }
1856                 else
1857                 {
1858                         if (!overflow)
1859                         {
1860                                 ereport(LOG,
1861                                                 (errmsg("statistics buffer is full")));
1862                                 overflow = true;
1863                         }
1864                 }
1865
1866                 /*
1867                  * If we have messages to write out, we add the pipe to the write
1868                  * descriptor set.
1869                  */
1870                 if (msg_have > 0)
1871                 {
1872                         FD_SET(writePipe, &wfds);
1873                         if (writePipe > maxfd)
1874                                 maxfd = writePipe;
1875                 }
1876
1877                 /*
1878                  * Wait for some work to do; but not for more than 10 seconds.
1879                  * (This determines how quickly we will shut down after an
1880                  * ungraceful postmaster termination; so it needn't be very fast.)
1881                  */
1882                 timeout.tv_sec = 10;
1883                 timeout.tv_usec = 0;
1884
1885                 nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1886                 if (nready < 0)
1887                 {
1888                         if (errno == EINTR)
1889                                 continue;
1890                         ereport(LOG,
1891                                         (errcode_for_socket_access(),
1892                                          errmsg("select() failed in statistics buffer: %m")));
1893                         exit(1);
1894                 }
1895
1896                 /*
1897                  * If there is a message on the socket, read it and check for
1898                  * validity.
1899                  */
1900                 if (FD_ISSET(pgStatSock, &rfds))
1901                 {
1902                         len = recv(pgStatSock, (char *) &input_buffer,
1903                                            sizeof(PgStat_Msg), 0);
1904                         if (len < 0)
1905                         {
1906                                 ereport(LOG,
1907                                                 (errcode_for_socket_access(),
1908                                            errmsg("could not read statistics message: %m")));
1909                                 exit(1);
1910                         }
1911
1912                         /*
1913                          * We ignore messages that are smaller than our common header
1914                          */
1915                         if (len < sizeof(PgStat_MsgHdr))
1916                                 continue;
1917
1918                         /*
1919                          * The received length must match the length in the header
1920                          */
1921                         if (input_buffer.msg_hdr.m_size != len)
1922                                 continue;
1923
1924                         /*
1925                          * O.K. - we accept this message.  Copy it to the circular
1926                          * msgbuffer.
1927                          */
1928                         frm = 0;
1929                         while (len > 0)
1930                         {
1931                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1932                                 if (xfr > len)
1933                                         xfr = len;
1934                                 Assert(xfr > 0);
1935                                 memcpy(msgbuffer + msg_recv,
1936                                            ((char *) &input_buffer) + frm,
1937                                            xfr);
1938                                 msg_recv += xfr;
1939                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1940                                         msg_recv = 0;
1941                                 msg_have += xfr;
1942                                 frm += xfr;
1943                                 len -= xfr;
1944                         }
1945                 }
1946
1947                 /*
1948                  * If the collector is ready to receive, write some data into his
1949                  * pipe.  We may or may not be able to write all that we have.
1950                  *
1951                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
1952                  * the space available in the pipe buffer, most kernels will
1953                  * refuse to write any of it, and will return EAGAIN.  This means
1954                  * we will busy-loop until the situation changes (either because
1955                  * the collector caught up, or because more data arrives so that
1956                  * we have more than PIPE_BUF bytes buffered).  This is not good,
1957                  * but is there any way around it?      We have no way to tell when
1958                  * the collector has caught up...
1959                  */
1960                 if (FD_ISSET(writePipe, &wfds))
1961                 {
1962                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1963                         if (xfr > msg_have)
1964                                 xfr = msg_have;
1965                         Assert(xfr > 0);
1966                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1967                         if (len < 0)
1968                         {
1969                                 if (errno == EINTR || errno == EAGAIN)
1970                                         continue;       /* not enough space in pipe */
1971                                 ereport(LOG,
1972                                                 (errcode_for_socket_access(),
1973                                                  errmsg("could not write to statistics collector pipe: %m")));
1974                                 exit(1);
1975                         }
1976                         /* NB: len < xfr is okay */
1977                         msg_send += len;
1978                         if (msg_send == PGSTAT_RECVBUFFERSZ)
1979                                 msg_send = 0;
1980                         msg_have -= len;
1981                 }
1982
1983                 /*
1984                  * Make sure we forwarded all messages before we check for
1985                  * postmaster termination.
1986                  */
1987                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
1988                         continue;
1989
1990                 /*
1991                  * If the postmaster has terminated, we die too.  (This is no
1992                  * longer the normal exit path, however.)
1993                  */
1994                 if (!PostmasterIsAlive(true))
1995                         exit(0);
1996         }
1997 }
1998
1999 /* SIGQUIT signal handler for buffer process */
2000 static void
2001 pgstat_exit(SIGNAL_ARGS)
2002 {
2003         /*
2004          * For now, we just nail the doors shut and get out of town.  It might
2005          * be cleaner to allow any pending messages to be sent, but that
2006          * creates a tradeoff against speed of exit.
2007          */
2008         exit(0);
2009 }
2010
2011 /* SIGCHLD signal handler for buffer process */
2012 static void
2013 pgstat_die(SIGNAL_ARGS)
2014 {
2015         exit(1);
2016 }
2017
2018
2019 /* ----------
2020  * pgstat_add_backend() -
2021  *
2022  *      Support function to keep our backend list up to date.
2023  * ----------
2024  */
2025 static int
2026 pgstat_add_backend(PgStat_MsgHdr *msg)
2027 {
2028         PgStat_StatDBEntry *dbentry;
2029         PgStat_StatBeEntry *beentry;
2030         PgStat_StatBeDead *deadbe;
2031         bool            found;
2032
2033         /*
2034          * Check that the backend ID is valid
2035          */
2036         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2037         {
2038                 ereport(LOG,
2039                          (errmsg("invalid server process ID %d", msg->m_backendid)));
2040                 return -1;
2041         }
2042
2043         /*
2044          * Get the slot for this backendid.
2045          */
2046         beentry = &pgStatBeTable[msg->m_backendid - 1];
2047         if (beentry->databaseid != InvalidOid)
2048         {
2049                 /*
2050                  * If the slot contains the PID of this backend, everything is
2051                  * fine and we got nothing to do.
2052                  */
2053                 if (beentry->procpid == msg->m_procpid)
2054                         return 0;
2055         }
2056
2057         /*
2058          * Lookup if this backend is known to be dead. This can be caused due
2059          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2060          * message might have arrived before we received all the backends
2061          * stats messages, or even a new backend with the same backendid was
2062          * faster in sending his BESTART.
2063          *
2064          * If the backend is known to be dead, we ignore this add.
2065          */
2066         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2067                                                                                            (void *) &(msg->m_procpid),
2068                                                                                            HASH_FIND, NULL);
2069         if (deadbe)
2070                 return 1;
2071
2072         /*
2073          * Backend isn't known to be dead. If it's slot is currently used, we
2074          * have to kick out the old backend.
2075          */
2076         if (beentry->databaseid != InvalidOid)
2077                 pgstat_sub_backend(beentry->procpid);
2078
2079         /*
2080          * Put this new backend into the slot.
2081          */
2082         beentry->databaseid = msg->m_databaseid;
2083         beentry->procpid = msg->m_procpid;
2084         beentry->userid = msg->m_userid;
2085         beentry->activity_start_sec = 0;
2086         beentry->activity_start_usec = 0;
2087         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2088
2089         /*
2090          * Lookup or create the database entry for this backends DB.
2091          */
2092         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2093                                                                                    (void *) &(msg->m_databaseid),
2094                                                                                                  HASH_ENTER, &found);
2095         if (dbentry == NULL)
2096         {
2097                 ereport(LOG,
2098                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2099                          errmsg("out of memory in statistics collector --- abort")));
2100                 exit(1);
2101         }
2102
2103         /*
2104          * If not found, initialize the new one.
2105          */
2106         if (!found)
2107         {
2108                 HASHCTL         hash_ctl;
2109
2110                 dbentry->tables = NULL;
2111                 dbentry->n_xact_commit = 0;
2112                 dbentry->n_xact_rollback = 0;
2113                 dbentry->n_blocks_fetched = 0;
2114                 dbentry->n_blocks_hit = 0;
2115                 dbentry->n_connects = 0;
2116                 dbentry->destroy = 0;
2117
2118                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2119                 hash_ctl.keysize = sizeof(Oid);
2120                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2121                 hash_ctl.hash = tag_hash;
2122                 dbentry->tables = hash_create("Per-database table",
2123                                                                           PGSTAT_TAB_HASH_SIZE,
2124                                                                           &hash_ctl,
2125                                                                           HASH_ELEM | HASH_FUNCTION);
2126                 if (dbentry->tables == NULL)
2127                 {
2128                         /* assume the problem is out-of-memory */
2129                         ereport(LOG,
2130                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2131                          errmsg("out of memory in statistics collector --- abort")));
2132                         exit(1);
2133                 }
2134         }
2135
2136         /*
2137          * Count number of connects to the database
2138          */
2139         dbentry->n_connects++;
2140
2141         return 0;
2142 }
2143
2144
2145 /* ----------
2146  * pgstat_sub_backend() -
2147  *
2148  *      Remove a backend from the actual backends list.
2149  * ----------
2150  */
2151 static void
2152 pgstat_sub_backend(int procpid)
2153 {
2154         int                     i;
2155         PgStat_StatBeDead *deadbe;
2156         bool            found;
2157
2158         /*
2159          * Search in the known-backends table for the slot containing this
2160          * PID.
2161          */
2162         for (i = 0; i < MaxBackends; i++)
2163         {
2164                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2165                         pgStatBeTable[i].procpid == procpid)
2166                 {
2167                         /*
2168                          * That's him. Add an entry to the known to be dead backends.
2169                          * Due to possible misorder in the arrival of UDP packets it's
2170                          * possible that even if we know the backend is dead, there
2171                          * could still be messages queued that arrive later. Those
2172                          * messages must not cause our number of backends statistics
2173                          * to get screwed up, so we remember for a couple of seconds
2174                          * that this PID is dead and ignore them (only the counting of
2175                          * backends, not the table access stats they sent).
2176                          */
2177                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2178                                                                                                            (void *) &procpid,
2179                                                                                                            HASH_ENTER,
2180                                                                                                            &found);
2181                         if (deadbe == NULL)
2182                         {
2183                                 ereport(LOG,
2184                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2185                                                  errmsg("out of memory in statistics collector --- abort")));
2186                                 exit(1);
2187                         }
2188                         if (!found)
2189                         {
2190                                 deadbe->backendid = i + 1;
2191                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2192                         }
2193
2194                         /*
2195                          * Declare the backend slot empty.
2196                          */
2197                         pgStatBeTable[i].databaseid = InvalidOid;
2198                         return;
2199                 }
2200         }
2201
2202         /*
2203          * No big problem if not found. This can happen if UDP messages arrive
2204          * out of order here.
2205          */
2206 }
2207
2208
2209 /* ----------
2210  * pgstat_write_statsfile() -
2211  *
2212  *      Tell the news.
2213  * ----------
2214  */
2215 static void
2216 pgstat_write_statsfile(void)
2217 {
2218         HASH_SEQ_STATUS hstat;
2219         HASH_SEQ_STATUS tstat;
2220         PgStat_StatDBEntry *dbentry;
2221         PgStat_StatTabEntry *tabentry;
2222         PgStat_StatBeDead *deadbe;
2223         FILE       *fpout;
2224         int                     i;
2225
2226         /*
2227          * Open the statistics temp file to write out the current values.
2228          */
2229         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2230         if (fpout == NULL)
2231         {
2232                 ereport(LOG,
2233                                 (errcode_for_file_access(),
2234                         errmsg("could not open temporary statistics file \"%s\": %m",
2235                                    pgStat_tmpfname)));
2236                 return;
2237         }
2238
2239         /*
2240          * Walk through the database table.
2241          */
2242         hash_seq_init(&hstat, pgStatDBHash);
2243         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2244         {
2245                 /*
2246                  * If this database is marked destroyed, count down and do so if
2247                  * it reaches 0.
2248                  */
2249                 if (dbentry->destroy > 0)
2250                 {
2251                         if (--(dbentry->destroy) == 0)
2252                         {
2253                                 if (dbentry->tables != NULL)
2254                                         hash_destroy(dbentry->tables);
2255
2256                                 if (hash_search(pgStatDBHash,
2257                                                                 (void *) &(dbentry->databaseid),
2258                                                                 HASH_REMOVE, NULL) == NULL)
2259                                 {
2260                                         ereport(LOG,
2261                                                         (errmsg("database hash table corrupted "
2262                                                                         "during cleanup --- abort")));
2263                                         exit(1);
2264                                 }
2265                         }
2266
2267                         /*
2268                          * Don't include statistics for it.
2269                          */
2270                         continue;
2271                 }
2272
2273                 /*
2274                  * Write out the DB line including the number of life backends.
2275                  */
2276                 fputc('D', fpout);
2277                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2278
2279                 /*
2280                  * Walk through the databases access stats per table.
2281                  */
2282                 hash_seq_init(&tstat, dbentry->tables);
2283                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2284                 {
2285                         /*
2286                          * If table entry marked for destruction, same as above for
2287                          * the database entry.
2288                          */
2289                         if (tabentry->destroy > 0)
2290                         {
2291                                 if (--(tabentry->destroy) == 0)
2292                                 {
2293                                         if (hash_search(dbentry->tables,
2294                                                                         (void *) &(tabentry->tableid),
2295                                                                         HASH_REMOVE, NULL) == NULL)
2296                                         {
2297                                                 ereport(LOG,
2298                                                                 (errmsg("tables hash table for "
2299                                                                                 "database %u corrupted during "
2300                                                                                 "cleanup --- abort",
2301                                                                                 dbentry->databaseid)));
2302                                                 exit(1);
2303                                         }
2304                                 }
2305                                 continue;
2306                         }
2307
2308                         /*
2309                          * At least we think this is still a life table. Print it's
2310                          * access stats.
2311                          */
2312                         fputc('T', fpout);
2313                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2314                 }
2315
2316                 /*
2317                  * Mark the end of this DB
2318                  */
2319                 fputc('d', fpout);
2320         }
2321
2322         /*
2323          * Write out the known running backends to the stats file.
2324          */
2325         i = MaxBackends;
2326         fputc('M', fpout);
2327         fwrite(&i, sizeof(i), 1, fpout);
2328
2329         for (i = 0; i < MaxBackends; i++)
2330         {
2331                 if (pgStatBeTable[i].databaseid != InvalidOid)
2332                 {
2333                         fputc('B', fpout);
2334                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2335                 }
2336         }
2337
2338         /*
2339          * No more output to be done. Close the temp file and replace the old
2340          * pgstat.stat with it.
2341          */
2342         fputc('E', fpout);
2343         if (fclose(fpout) < 0)
2344         {
2345                 ereport(LOG,
2346                                 (errcode_for_file_access(),
2347                    errmsg("could not close temporary statistics file \"%s\": %m",
2348                                   pgStat_tmpfname)));
2349         }
2350         else
2351         {
2352                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2353                 {
2354                         ereport(LOG,
2355                                         (errcode_for_file_access(),
2356                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2357                                                         pgStat_tmpfname, pgStat_fname)));
2358                 }
2359         }
2360
2361         /*
2362          * Clear out the dead backends table
2363          */
2364         hash_seq_init(&hstat, pgStatBeDead);
2365         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2366         {
2367                 /*
2368                  * Count down the destroy delay and remove entries where it
2369                  * reaches 0.
2370                  */
2371                 if (--(deadbe->destroy) <= 0)
2372                 {
2373                         if (hash_search(pgStatBeDead,
2374                                                         (void *) &(deadbe->procpid),
2375                                                         HASH_REMOVE, NULL) == NULL)
2376                         {
2377                                 ereport(LOG,
2378                                           (errmsg("dead-server-process hash table corrupted "
2379                                                           "during cleanup --- abort")));
2380                                 exit(1);
2381                         }
2382                 }
2383         }
2384 }
2385
2386
2387 /* ----------
2388  * pgstat_read_statsfile() -
2389  *
2390  *      Reads in an existing statistics collector and initializes the
2391  *      databases hash table (who's entries point to the tables hash tables)
2392  *      and the current backend table.
2393  * ----------
2394  */
2395 static void
2396 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2397                                           PgStat_StatBeEntry **betab, int *numbackends)
2398 {
2399         PgStat_StatDBEntry *dbentry;
2400         PgStat_StatDBEntry dbbuf;
2401         PgStat_StatTabEntry *tabentry;
2402         PgStat_StatTabEntry tabbuf;
2403         HASHCTL         hash_ctl;
2404         HTAB       *tabhash = NULL;
2405         FILE       *fpin;
2406         int                     maxbackends = 0;
2407         int                     havebackends = 0;
2408         bool            found;
2409         MemoryContext use_mcxt;
2410         int                     mcxt_flags;
2411
2412         /*
2413          * If running in the collector we use the DynaHashCxt memory context.
2414          * If running in a backend, we use the TopTransactionContext instead,
2415          * so the caller must only know the last XactId when this call
2416          * happened to know if his tables are still valid or already gone!
2417          */
2418         if (pgStatRunningInCollector)
2419         {
2420                 use_mcxt = NULL;
2421                 mcxt_flags = 0;
2422         }
2423         else
2424         {
2425                 use_mcxt = TopTransactionContext;
2426                 mcxt_flags = HASH_CONTEXT;
2427         }
2428
2429         /*
2430          * Create the DB hashtable
2431          */
2432         memset(&hash_ctl, 0, sizeof(hash_ctl));
2433         hash_ctl.keysize = sizeof(Oid);
2434         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2435         hash_ctl.hash = tag_hash;
2436         hash_ctl.hcxt = use_mcxt;
2437         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2438                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2439         if (*dbhash == NULL)
2440         {
2441                 /* assume the problem is out-of-memory */
2442                 if (pgStatRunningInCollector)
2443                 {
2444                         ereport(LOG,
2445                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2446                          errmsg("out of memory in statistics collector --- abort")));
2447                         exit(1);
2448                 }
2449                 /* in backend, can do normal error */
2450                 ereport(ERROR,
2451                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2452                                  errmsg("out of memory")));
2453         }
2454
2455         /*
2456          * Initialize the number of known backends to zero, just in case we do
2457          * a silent error return below.
2458          */
2459         if (numbackends != NULL)
2460                 *numbackends = 0;
2461         if (betab != NULL)
2462                 *betab = NULL;
2463
2464         /*
2465          * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2466          * postmaster, so compute it first time through.
2467          */
2468 #ifdef EXEC_BACKEND
2469         if (pgStat_fname[0] == '\0')
2470         {
2471                 Assert(DataDir != NULL);
2472                 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2473         }
2474 #endif
2475
2476         /*
2477          * Try to open the status file. If it doesn't exist, the backends
2478          * simply return zero for anything and the collector simply starts
2479          * from scratch with empty counters.
2480          */
2481         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2482                 return;
2483
2484         /*
2485          * We found an existing collector stats file. Read it and put all the
2486          * hashtable entries into place.
2487          */
2488         for (;;)
2489         {
2490                 switch (fgetc(fpin))
2491                 {
2492                                 /*
2493                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2494                                  * follows. Subsequently, zero to many 'T' entries will
2495                                  * follow until a 'd' is encountered.
2496                                  */
2497                         case 'D':
2498                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2499                                 {
2500                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2501                                                         (errmsg("corrupted pgstat.stat file")));
2502                                         fclose(fpin);
2503                                         return;
2504                                 }
2505
2506                                 /*
2507                                  * Add to the DB hash
2508                                  */
2509                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2510                                                                                           (void *) &dbbuf.databaseid,
2511                                                                                                                          HASH_ENTER,
2512                                                                                                                          &found);
2513                                 if (dbentry == NULL)
2514                                 {
2515                                         if (pgStatRunningInCollector)
2516                                         {
2517                                                 ereport(LOG,
2518                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2519                                                                  errmsg("out of memory in statistics collector --- abort")));
2520                                                 exit(1);
2521                                         }
2522                                         else
2523                                         {
2524                                                 fclose(fpin);
2525                                                 ereport(ERROR,
2526                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2527                                                                  errmsg("out of memory")));
2528                                         }
2529                                 }
2530                                 if (found)
2531                                 {
2532                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2533                                                         (errmsg("corrupted pgstat.stat file")));
2534                                         fclose(fpin);
2535                                         return;
2536                                 }
2537
2538                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2539                                 dbentry->tables = NULL;
2540                                 dbentry->destroy = 0;
2541                                 dbentry->n_backends = 0;
2542
2543                                 /*
2544                                  * Don't collect tables if not the requested DB
2545                                  */
2546                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2547                                         break;
2548
2549                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2550                                 hash_ctl.keysize = sizeof(Oid);
2551                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2552                                 hash_ctl.hash = tag_hash;
2553                                 hash_ctl.hcxt = use_mcxt;
2554                                 dbentry->tables = hash_create("Per-database table",
2555                                                                                           PGSTAT_TAB_HASH_SIZE,
2556                                                                                           &hash_ctl,
2557                                                                  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2558                                 if (dbentry->tables == NULL)
2559                                 {
2560                                         /* assume the problem is out-of-memory */
2561                                         if (pgStatRunningInCollector)
2562                                         {
2563                                                 ereport(LOG,
2564                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2565                                                                  errmsg("out of memory in statistics collector --- abort")));
2566                                                 exit(1);
2567                                         }
2568                                         /* in backend, can do normal error */
2569                                         fclose(fpin);
2570                                         ereport(ERROR,
2571                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2572                                                          errmsg("out of memory")));
2573                                 }
2574
2575                                 /*
2576                                  * Arrange that following 'T's add entries to this
2577                                  * databases tables hash table.
2578                                  */
2579                                 tabhash = dbentry->tables;
2580                                 break;
2581
2582                                 /*
2583                                  * 'd'  End of this database.
2584                                  */
2585                         case 'd':
2586                                 tabhash = NULL;
2587                                 break;
2588
2589                                 /*
2590                                  * 'T'  A PgStat_StatTabEntry follows.
2591                                  */
2592                         case 'T':
2593                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2594                                 {
2595                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2596                                                         (errmsg("corrupted pgstat.stat file")));
2597                                         fclose(fpin);
2598                                         return;
2599                                 }
2600
2601                                 /*
2602                                  * Skip if table belongs to a not requested database.
2603                                  */
2604                                 if (tabhash == NULL)
2605                                         break;
2606
2607                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2608                                                                                                 (void *) &tabbuf.tableid,
2609                                                                                                          HASH_ENTER, &found);
2610                                 if (tabentry == NULL)
2611                                 {
2612                                         if (pgStatRunningInCollector)
2613                                         {
2614                                                 ereport(LOG,
2615                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2616                                                                  errmsg("out of memory in statistics collector --- abort")));
2617                                                 exit(1);
2618                                         }
2619                                         /* in backend, can do normal error */
2620                                         fclose(fpin);
2621                                         ereport(ERROR,
2622                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2623                                                          errmsg("out of memory")));
2624                                 }
2625
2626                                 if (found)
2627                                 {
2628                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2629                                                         (errmsg("corrupted pgstat.stat file")));
2630                                         fclose(fpin);
2631                                         return;
2632                                 }
2633
2634                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2635                                 break;
2636
2637                                 /*
2638                                  * 'M'  The maximum number of backends to expect follows.
2639                                  */
2640                         case 'M':
2641                                 if (betab == NULL || numbackends == NULL)
2642                                 {
2643                                         fclose(fpin);
2644                                         return;
2645                                 }
2646                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2647                                         sizeof(maxbackends))
2648                                 {
2649                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2650                                                         (errmsg("corrupted pgstat.stat file")));
2651                                         fclose(fpin);
2652                                         return;
2653                                 }
2654                                 if (maxbackends == 0)
2655                                 {
2656                                         fclose(fpin);
2657                                         return;
2658                                 }
2659
2660                                 /*
2661                                  * Allocate space (in TopTransactionContext too) for the
2662                                  * backend table.
2663                                  */
2664                                 if (use_mcxt == NULL)
2665                                         *betab = (PgStat_StatBeEntry *) malloc(
2666                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2667                                 else
2668                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2669                                                                                                                                 use_mcxt,
2670                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2671                                 break;
2672
2673                                 /*
2674                                  * 'B'  A PgStat_StatBeEntry follows.
2675                                  */
2676                         case 'B':
2677                                 if (betab == NULL || numbackends == NULL)
2678                                 {
2679                                         fclose(fpin);
2680                                         return;
2681                                 }
2682                                 if (*betab == NULL)
2683                                 {
2684                                         fclose(fpin);
2685                                         return;
2686                                 }
2687
2688                                 /*
2689                                  * Read it directly into the table.
2690                                  */
2691                                 if (fread(&(*betab)[havebackends], 1,
2692                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2693                                         sizeof(PgStat_StatBeEntry))
2694                                 {
2695                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2696                                                         (errmsg("corrupted pgstat.stat file")));
2697                                         fclose(fpin);
2698                                         return;
2699                                 }
2700
2701                                 /*
2702                                  * Count backends per database here.
2703                                  */
2704                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2705                                                    (void *) &((*betab)[havebackends].databaseid),
2706                                                                                                                 HASH_FIND, NULL);
2707                                 if (dbentry)
2708                                         dbentry->n_backends++;
2709
2710                                 havebackends++;
2711                                 if (numbackends != 0)
2712                                         *numbackends = havebackends;
2713                                 if (havebackends >= maxbackends)
2714                                 {
2715                                         fclose(fpin);
2716                                         return;
2717                                 }
2718                                 break;
2719
2720                                 /*
2721                                  * 'E'  The EOF marker of a complete stats file.
2722                                  */
2723                         case 'E':
2724                                 fclose(fpin);
2725                                 return;
2726
2727                         default:
2728                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2729                                                 (errmsg("corrupted pgstat.stat file")));
2730                                 fclose(fpin);
2731                                 return;
2732                 }
2733         }
2734
2735         fclose(fpin);
2736 }
2737
2738 /*
2739  * If not done for this transaction, read the statistics collector
2740  * stats file into some hash tables.
2741  *
2742  * Because we store the hash tables in TopTransactionContext, the result
2743  * is good for the entire current main transaction.
2744  */
2745 static void
2746 backend_read_statsfile(void)
2747 {
2748         TransactionId topXid = GetTopTransactionId();
2749
2750         if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2751         {
2752                 Assert(!pgStatRunningInCollector);
2753                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2754                                                           &pgStatBeTable, &pgStatNumBackends);
2755                 pgStatDBHashXact = topXid;
2756         }
2757 }
2758
2759
2760 /* ----------
2761  * pgstat_recv_bestart() -
2762  *
2763  *      Process a backend startup message.
2764  * ----------
2765  */
2766 static void
2767 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2768 {
2769         pgstat_add_backend(&msg->m_hdr);
2770 }
2771
2772
2773 /* ----------
2774  * pgstat_recv_beterm() -
2775  *
2776  *      Process a backend termination message.
2777  * ----------
2778  */
2779 static void
2780 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2781 {
2782         pgstat_sub_backend(msg->m_hdr.m_procpid);
2783 }
2784
2785
2786 /* ----------
2787  * pgstat_recv_activity() -
2788  *
2789  *      Remember what the backend is doing.
2790  * ----------
2791  */
2792 static void
2793 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2794 {
2795         PgStat_StatBeEntry *entry;
2796
2797         /*
2798          * Here we check explicitly for 0 return, since we don't want to
2799          * mangle the activity of an active backend by a delayed packet from a
2800          * dead one.
2801          */
2802         if (pgstat_add_backend(&msg->m_hdr) != 0)
2803                 return;
2804
2805         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2806
2807         StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2808
2809         entry->activity_start_sec =
2810                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2811 }
2812
2813
2814 /* ----------
2815  * pgstat_recv_tabstat() -
2816  *
2817  *      Count what the backend has done.
2818  * ----------
2819  */
2820 static void
2821 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2822 {
2823         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2824         PgStat_StatDBEntry *dbentry;
2825         PgStat_StatTabEntry *tabentry;
2826         int                     i;
2827         bool            found;
2828
2829         /*
2830          * Make sure the backend is counted for.
2831          */
2832         if (pgstat_add_backend(&msg->m_hdr) < 0)
2833                 return;
2834
2835         /*
2836          * Lookup the database in the hashtable.
2837          */
2838         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2839                                                                          (void *) &(msg->m_hdr.m_databaseid),
2840                                                                                                  HASH_FIND, NULL);
2841         if (!dbentry)
2842                 return;
2843
2844         /*
2845          * If the database is marked for destroy, this is a delayed UDP packet
2846          * and not worth being counted.
2847          */
2848         if (dbentry->destroy > 0)
2849                 return;
2850
2851         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2852         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2853
2854         /*
2855          * Process all table entries in the message.
2856          */
2857         for (i = 0; i < msg->m_nentries; i++)
2858         {
2859                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2860                                                                                           (void *) &(tabmsg[i].t_id),
2861                                                                                                          HASH_ENTER, &found);
2862                 if (tabentry == NULL)
2863                 {
2864                         ereport(LOG,
2865                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2866                          errmsg("out of memory in statistics collector --- abort")));
2867                         exit(1);
2868                 }
2869
2870                 if (!found)
2871                 {
2872                         /*
2873                          * If it's a new table entry, initialize counters to the
2874                          * values we just got.
2875                          */
2876                         tabentry->numscans = tabmsg[i].t_numscans;
2877                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2878                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2879                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2880                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2881                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2882                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2883                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2884
2885                         tabentry->destroy = 0;
2886                 }
2887                 else
2888                 {
2889                         /*
2890                          * Otherwise add the values to the existing entry.
2891                          */
2892                         tabentry->numscans += tabmsg[i].t_numscans;
2893                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2894                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2895                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2896                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2897                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2898                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2899                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2900                 }
2901
2902                 /*
2903                  * And add the block IO to the database entry.
2904                  */
2905                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2906                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2907         }
2908 }
2909
2910
2911 /* ----------
2912  * pgstat_recv_tabpurge() -
2913  *
2914  *      Arrange for dead table removal.
2915  * ----------
2916  */
2917 static void
2918 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2919 {
2920         PgStat_StatDBEntry *dbentry;
2921         PgStat_StatTabEntry *tabentry;
2922         int                     i;
2923
2924         /*
2925          * Make sure the backend is counted for.
2926          */
2927         if (pgstat_add_backend(&msg->m_hdr) < 0)
2928                 return;
2929
2930         /*
2931          * Lookup the database in the hashtable.
2932          */
2933         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2934                                                                          (void *) &(msg->m_hdr.m_databaseid),
2935                                                                                                  HASH_FIND, NULL);
2936         if (!dbentry)
2937                 return;
2938
2939         /*
2940          * If the database is marked for destroy, this is a delayed UDP packet
2941          * and the tables will go away at DB destruction.
2942          */
2943         if (dbentry->destroy > 0)
2944                 return;
2945
2946         /*
2947          * Process all table entries in the message.
2948          */
2949         for (i = 0; i < msg->m_nentries; i++)
2950         {
2951                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2952                                                                                    (void *) &(msg->m_tableid[i]),
2953                                                                                                            HASH_FIND, NULL);
2954                 if (tabentry)
2955                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
2956         }
2957 }
2958
2959
2960 /* ----------
2961  * pgstat_recv_dropdb() -
2962  *
2963  *      Arrange for dead database removal
2964  * ----------
2965  */
2966 static void
2967 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2968 {
2969         PgStat_StatDBEntry *dbentry;
2970
2971         /*
2972          * Make sure the backend is counted for.
2973          */
2974         if (pgstat_add_backend(&msg->m_hdr) < 0)
2975                 return;
2976
2977         /*
2978          * Lookup the database in the hashtable.
2979          */
2980         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2981                                                                                    (void *) &(msg->m_databaseid),
2982                                                                                                  HASH_FIND, NULL);
2983         if (!dbentry)
2984                 return;
2985
2986         /*
2987          * Mark the database for destruction.
2988          */
2989         dbentry->destroy = PGSTAT_DESTROY_COUNT;
2990 }
2991
2992
2993 /* ----------
2994  * pgstat_recv_dropdb() -
2995  *
2996  *      Arrange for dead database removal
2997  * ----------
2998  */
2999 static void
3000 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3001 {
3002         HASHCTL         hash_ctl;
3003         PgStat_StatDBEntry *dbentry;
3004
3005         /*
3006          * Make sure the backend is counted for.
3007          */
3008         if (pgstat_add_backend(&msg->m_hdr) < 0)
3009                 return;
3010
3011         /*
3012          * Lookup the database in the hashtable.
3013          */
3014         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3015                                                                          (void *) &(msg->m_hdr.m_databaseid),
3016                                                                                                  HASH_FIND, NULL);
3017         if (!dbentry)
3018                 return;
3019
3020         /*
3021          * We simply throw away all the databases table entries by recreating
3022          * a new hash table for them.
3023          */
3024         if (dbentry->tables != NULL)
3025                 hash_destroy(dbentry->tables);
3026
3027         dbentry->tables = NULL;
3028         dbentry->n_xact_commit = 0;
3029         dbentry->n_xact_rollback = 0;
3030         dbentry->n_blocks_fetched = 0;
3031         dbentry->n_blocks_hit = 0;
3032         dbentry->n_connects = 0;
3033         dbentry->destroy = 0;
3034
3035         memset(&hash_ctl, 0, sizeof(hash_ctl));
3036         hash_ctl.keysize = sizeof(Oid);
3037         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3038         hash_ctl.hash = tag_hash;
3039         dbentry->tables = hash_create("Per-database table",
3040                                                                   PGSTAT_TAB_HASH_SIZE,
3041                                                                   &hash_ctl,
3042                                                                   HASH_ELEM | HASH_FUNCTION);
3043         if (dbentry->tables == NULL)
3044         {
3045                 /* assume the problem is out-of-memory */
3046                 ereport(LOG,
3047                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3048                          errmsg("out of memory in statistics collector --- abort")));
3049                 exit(1);
3050         }
3051 }