]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2003, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.77 2004/07/01 00:50:36 tgl Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/catname.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_shadow.h"
39 #include "libpq/libpq.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/ipc.h"
46 #include "storage/pg_shmem.h"
47 #include "storage/pmsignal.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/hsearch.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54
55
56 /* ----------
57  * Paths for the statistics files. The %s is replaced with the
58  * installation's $PGDATA.
59  * ----------
60  */
61 #define PGSTAT_STAT_FILENAME    "%s/global/pgstat.stat"
62 #define PGSTAT_STAT_TMPFILE             "%s/global/pgstat.tmp.%d"
63
64 /* ----------
65  * Timer definitions.
66  * ----------
67  */
68 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status
69                                                                                  * file; in milliseconds. */
70
71 #define PGSTAT_DESTROY_DELAY    10000   /* How long to keep destroyed
72                                                                                  * objects known, to give delayed
73                                                                                  * UDP packets time to arrive;
74                                                                                  * in milliseconds. */
75
76 #define PGSTAT_DESTROY_COUNT    (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
77
78 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart
79                                                                                  * a failed statistics collector;
80                                                                                  * in seconds. */
81
82 /* ----------
83  * Amount of space reserved in pgstat_recvbuffer().
84  * ----------
85  */
86 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
87
88 /* ----------
89  * The initial size hints for the hash tables used in the collector.
90  * ----------
91  */
92 #define PGSTAT_DB_HASH_SIZE             16
93 #define PGSTAT_BE_HASH_SIZE             512
94 #define PGSTAT_TAB_HASH_SIZE    512
95
96
97 /* ----------
98  * GUC parameters
99  * ----------
100  */
101 bool            pgstat_collect_startcollector = true;
102 bool            pgstat_collect_resetonpmstart = true;
103 bool            pgstat_collect_querystring = false;
104 bool            pgstat_collect_tuplelevel = false;
105 bool            pgstat_collect_blocklevel = false;
106
107 /* ----------
108  * Local data
109  * ----------
110  */
111 NON_EXEC_STATIC int     pgStatSock = -1;
112 static int      pgStatPipe[2];
113 static struct sockaddr_storage pgStatAddr;
114
115 static time_t last_pgstat_start_time;
116
117 static long pgStatNumMessages = 0;
118
119 static bool pgStatRunningInCollector = FALSE;
120
121 static int      pgStatTabstatAlloc = 0;
122 static int      pgStatTabstatUsed = 0;
123 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
124
125 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
126
127 static int      pgStatXactCommit = 0;
128 static int      pgStatXactRollback = 0;
129
130 static TransactionId pgStatDBHashXact = InvalidTransactionId;
131 static HTAB *pgStatDBHash = NULL;
132 static HTAB *pgStatBeDead = NULL;
133 static PgStat_StatBeEntry *pgStatBeTable = NULL;
134 static int      pgStatNumBackends = 0;
135
136 static char pgStat_fname[MAXPGPATH];
137 static char pgStat_tmpfname[MAXPGPATH];
138
139
140 /* ----------
141  * Local function forward declarations
142  * ----------
143  */
144 #ifdef EXEC_BACKEND
145
146 typedef enum STATS_PROCESS_TYPE
147 {
148         STAT_PROC_BUFFER,
149         STAT_PROC_COLLECTOR
150 } STATS_PROCESS_TYPE;
151
152 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
153 static void pgstat_parseArgs(int argc, char *argv[]);
154
155 #endif
156
157 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
158 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
159 static void pgstat_recvbuffer(void);
160 static void pgstat_exit(SIGNAL_ARGS);
161 static void pgstat_die(SIGNAL_ARGS);
162
163 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
164 static void pgstat_sub_backend(int procpid);
165 static void pgstat_drop_database(Oid databaseid);
166 static void pgstat_write_statsfile(void);
167 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
168                                           PgStat_StatBeEntry **betab,
169                                           int *numbackends);
170 static void backend_read_statsfile(void);
171
172 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
173 static void pgstat_send(void *msg, int len);
174
175 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
176 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
177 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
178 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
179 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
180 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
181 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
182
183
184 /* ------------------------------------------------------------
185  * Public functions called from postmaster follow
186  * ------------------------------------------------------------
187  */
188
189 /* ----------
190  * pgstat_init() -
191  *
192  *      Called from postmaster at startup. Create the resources required
193  *      by the statistics collector process.  If unable to do so, do not
194  *      fail --- better to let the postmaster start with stats collection
195  *      disabled.
196  * ----------
197  */
198 void
199 pgstat_init(void)
200 {
201         ACCEPT_TYPE_ARG3 alen;
202         struct addrinfo *addrs = NULL,
203                            *addr,
204                                 hints;
205         int                     ret;
206         fd_set      rset;
207         struct timeval tv;
208         char        test_byte;
209         int         sel_res;
210
211 #define TESTBYTEVAL ((char) 199)
212
213         /*
214          * Force start of collector daemon if something to collect
215          */
216         if (pgstat_collect_querystring ||
217                 pgstat_collect_tuplelevel ||
218                 pgstat_collect_blocklevel)
219                 pgstat_collect_startcollector = true;
220
221         /*
222          * Initialize the filename for the status reports.  (In the EXEC_BACKEND
223          * case, this only sets the value in the postmaster.  The collector
224          * subprocess will recompute the value for itself, and individual
225          * backends must do so also if they want to access the file.)
226          */
227         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
228
229         /*
230          * If we don't have to start a collector or should reset the collected
231          * statistics on postmaster start, simply remove the file.
232          */
233         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
234                 unlink(pgStat_fname);
235
236         /*
237          * Nothing else required if collector will not get started
238          */
239         if (!pgstat_collect_startcollector)
240                 return;
241
242         /*
243          * Create the UDP socket for sending and receiving statistic messages
244          */
245         hints.ai_flags = AI_PASSIVE;
246         hints.ai_family = PF_UNSPEC;
247         hints.ai_socktype = SOCK_DGRAM;
248         hints.ai_protocol = 0;
249         hints.ai_addrlen = 0;
250         hints.ai_addr = NULL;
251         hints.ai_canonname = NULL;
252         hints.ai_next = NULL;
253         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
254         if (ret || !addrs)
255         {
256                 ereport(LOG,
257                                 (errmsg("could not resolve \"localhost\": %s",
258                                                 gai_strerror(ret))));
259                 goto startup_failed;
260         }
261
262         /*
263          * On some platforms, getaddrinfo_all() may return multiple addresses
264          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
265          * when kernel will reject IPv6).  Worse, the failure may occur at the
266          * bind() or perhaps even connect() stage.  So we must loop through the
267          * results till we find a working combination.  We will generate LOG
268          * messages, but no error, for bogus combinations.
269          */
270         for (addr = addrs; addr; addr = addr->ai_next)
271         {
272 #ifdef HAVE_UNIX_SOCKETS
273                 /* Ignore AF_UNIX sockets, if any are returned. */
274                 if (addr->ai_family == AF_UNIX)
275                         continue;
276 #endif
277                 /*
278                  * Create the socket.
279                  */
280                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
281                 {
282                         ereport(LOG,
283                                         (errcode_for_socket_access(),
284                                          errmsg("could not create socket for statistics collector: %m")));
285                         continue;
286                 }
287
288                 /*
289                  * Bind it to a kernel assigned port on localhost and get the assigned
290                  * port via getsockname().
291                  */
292                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
293                 {
294                         ereport(LOG,
295                                         (errcode_for_socket_access(),
296                                          errmsg("could not bind socket for statistics collector: %m")));
297                         closesocket(pgStatSock);
298                         pgStatSock = -1;
299                         continue;
300                 }
301
302                 alen = sizeof(pgStatAddr);
303                 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
304                 {
305                         ereport(LOG,
306                                         (errcode_for_socket_access(),
307                                          errmsg("could not get address of socket for statistics collector: %m")));
308                         closesocket(pgStatSock);
309                         pgStatSock = -1;
310                         continue;
311                 }
312
313                 /*
314                  * Connect the socket to its own address.  This saves a few cycles by
315                  * not having to respecify the target address on every send. This also
316                  * provides a kernel-level check that only packets from this same
317                  * address will be received.
318                  */
319                 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
320                 {
321                         ereport(LOG,
322                                         (errcode_for_socket_access(),
323                                          errmsg("could not connect socket for statistics collector: %m")));
324                         closesocket(pgStatSock);
325                         pgStatSock = -1;
326                         continue;
327                 }
328
329                 /*
330                  * Try to send and receive a one-byte test message on the socket.
331                  * This is to catch situations where the socket can be created but
332                  * will not actually pass data (for instance, because kernel packet
333                  * filtering rules prevent it).
334                  */
335                 test_byte = TESTBYTEVAL;
336                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
337                 {
338                         ereport(LOG,
339                                         (errcode_for_socket_access(),
340                                          errmsg("could not send test message on socket for statistics collector: %m")));
341                         closesocket(pgStatSock);
342                         pgStatSock = -1;
343                         continue;
344                 }
345
346                 /*
347                  * There could possibly be a little delay before the message can be
348                  * received.  We arbitrarily allow up to half a second before deciding
349                  * it's broken.
350                  */
351                 for (;;)                                /* need a loop to handle EINTR */
352                 {
353                         FD_ZERO(&rset);
354                         FD_SET(pgStatSock, &rset);
355                         tv.tv_sec = 0;
356                         tv.tv_usec = 500000;
357                         sel_res = select(pgStatSock+1, &rset, NULL, NULL, &tv);
358                         if (sel_res >= 0 || errno != EINTR)
359                                 break;
360                 }
361                 if (sel_res < 0)
362                 {
363                         ereport(LOG,
364                                         (errcode_for_socket_access(),
365                                          errmsg("select() failed in statistics collector: %m")));
366                         closesocket(pgStatSock);
367                         pgStatSock = -1;
368                         continue;
369                 }
370                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
371                 {
372                         /*
373                          * This is the case we actually think is likely, so take pains to
374                          * give a specific message for it.
375                          *
376                          * errno will not be set meaningfully here, so don't use it.
377                          */
378                         ereport(LOG,
379                                         (ERRCODE_CONNECTION_FAILURE,
380                                          errmsg("test message did not get through on socket for statistics collector")));
381                         closesocket(pgStatSock);
382                         pgStatSock = -1;
383                         continue;
384                 }
385
386                 test_byte++;                    /* just make sure variable is changed */
387
388                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
389                 {
390                         ereport(LOG,
391                                         (errcode_for_socket_access(),
392                                          errmsg("could not receive test message on socket for statistics collector: %m")));
393                         closesocket(pgStatSock);
394                         pgStatSock = -1;
395                         continue;
396                 }
397
398                 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
399                 {
400                         ereport(LOG,
401                                         (ERRCODE_INTERNAL_ERROR,
402                                          errmsg("incorrect test message transmission on socket for statistics collector")));
403                         closesocket(pgStatSock);
404                         pgStatSock = -1;
405                         continue;
406                 }
407
408                 /* If we get here, we have a working socket */
409                 break;
410         }
411
412         /* Did we find a working address? */
413         if (!addr || pgStatSock < 0)
414         {
415                 ereport(LOG,
416                                 (errcode_for_socket_access(),
417                                  errmsg("disabling statistics collector for lack of working socket")));
418                 goto startup_failed;
419         }
420
421         /*
422          * Set the socket to non-blocking IO.  This ensures that if the
423          * collector falls behind (despite the buffering process), statistics
424          * messages will be discarded; backends won't block waiting to send
425          * messages to the collector.
426          */
427         if (!set_noblock(pgStatSock))
428         {
429                 ereport(LOG,
430                                 (errcode_for_socket_access(),
431                 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
432                 goto startup_failed;
433         }
434
435         freeaddrinfo_all(hints.ai_family, addrs);
436
437         return;
438
439 startup_failed:
440         if (addrs)
441                 freeaddrinfo_all(hints.ai_family, addrs);
442
443         if (pgStatSock >= 0)
444                 closesocket(pgStatSock);
445         pgStatSock = -1;
446
447         /* Adjust GUC variables to suppress useless activity */
448         pgstat_collect_startcollector = false;
449         pgstat_collect_querystring = false;
450         pgstat_collect_tuplelevel = false;
451         pgstat_collect_blocklevel = false;
452 }
453
454
455 #ifdef EXEC_BACKEND
456
457 /*
458  * pgstat_forkexec() -
459  *
460  * Format up the arglist for, then fork and exec, statistics
461  * (buffer and collector) processes
462  */
463 static pid_t
464 pgstat_forkexec(STATS_PROCESS_TYPE procType)
465 {
466         char *av[10];
467         int ac = 0, bufc = 0, i;
468         char pgstatBuf[2][32];
469
470         av[ac++] = "postgres";
471
472         switch (procType)
473         {
474                 case STAT_PROC_BUFFER:
475                         av[ac++] = "-forkbuf";
476                         break;
477
478                 case STAT_PROC_COLLECTOR:
479                         av[ac++] = "-forkcol";
480                         break;
481
482                 default:
483                         Assert(false);
484         }
485
486         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
487
488         /* postgres_exec_path is not passed by write_backend_variables */
489         av[ac++] = postgres_exec_path;
490
491         /* Pipe file ids (those not passed by write_backend_variables) */
492         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[0]);
493         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[1]);
494
495         /* Add to the arg list */
496         Assert(bufc <= lengthof(pgstatBuf));
497         for (i = 0; i < bufc; i++)
498                 av[ac++] = pgstatBuf[i];
499
500         av[ac] = NULL;
501         Assert(ac < lengthof(av));
502
503         return postmaster_forkexec(ac, av);
504 }
505
506
507 /*
508  * pgstat_parseArgs() -
509  *
510  * Extract data from the arglist for exec'ed statistics
511  * (buffer and collector) processes
512  */
513 static void
514 pgstat_parseArgs(int argc, char *argv[])
515 {
516         Assert(argc == 6);
517
518         argc = 3;
519         StrNCpy(postgres_exec_path,     argv[argc++], MAXPGPATH);
520         pgStatPipe[0]   = atoi(argv[argc++]);
521         pgStatPipe[1]   = atoi(argv[argc++]);
522 }
523
524 #endif /* EXEC_BACKEND */
525
526
527 /* ----------
528  * pgstat_start() -
529  *
530  *      Called from postmaster at startup or after an existing collector
531  *      died.  Attempt to fire up a fresh statistics collector.
532  *
533  *      Returns PID of child process, or 0 if fail.
534  *
535  *      Note: if fail, we will be called again from the postmaster main loop.
536  * ----------
537  */
538 int
539 pgstat_start(void)
540 {
541         time_t          curtime;
542         pid_t           pgStatPid;
543
544         /*
545          * Do nothing if no collector needed
546          */
547         if (!pgstat_collect_startcollector)
548                 return 0;
549
550         /*
551          * Do nothing if too soon since last collector start.  This is a
552          * safety valve to protect against continuous respawn attempts if the
553          * collector is dying immediately at launch.  Note that since we will
554          * be re-called from the postmaster main loop, we will get another
555          * chance later.
556          */
557         curtime = time(NULL);
558         if ((unsigned int) (curtime - last_pgstat_start_time) <
559                 (unsigned int) PGSTAT_RESTART_INTERVAL)
560                 return 0;
561         last_pgstat_start_time = curtime;
562
563         /*
564          * Check that the socket is there, else pgstat_init failed.
565          */
566         if (pgStatSock < 0)
567         {
568                 ereport(LOG,
569                                 (errmsg("statistics collector startup skipped")));
570
571                 /*
572                  * We can only get here if someone tries to manually turn
573                  * pgstat_collect_startcollector on after it had been off.
574                  */
575                 pgstat_collect_startcollector = false;
576                 return 0;
577         }
578
579         /*
580          * Okay, fork off the collector.
581          */
582
583         fflush(stdout);
584         fflush(stderr);
585
586 #ifdef __BEOS__
587         /* Specific beos actions before backend startup */
588         beos_before_backend_startup();
589 #endif
590
591 #ifdef EXEC_BACKEND
592         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
593 #else
594         switch ((pgStatPid = fork()))
595 #endif
596         {
597                 case -1:
598 #ifdef __BEOS__
599                         /* Specific beos actions */
600                         beos_backend_startup_failed();
601 #endif
602                         ereport(LOG,
603                                         (errmsg("could not fork statistics buffer: %m")));
604                         return 0;
605
606 #ifndef EXEC_BACKEND
607                 case 0:
608                         /* in postmaster child ... */
609 #ifdef __BEOS__
610                         /* Specific beos actions after backend startup */
611                         beos_backend_startup();
612 #endif
613                         /* Close the postmaster's sockets */
614                         ClosePostmasterPorts();
615
616                         /* Drop our connection to postmaster's shared memory, as well */
617                         PGSharedMemoryDetach();
618
619                         PgstatBufferMain(0, NULL);
620                         break;
621 #endif
622
623                 default:
624                         return (int) pgStatPid;
625         }
626
627         /* shouldn't get here */
628         return 0;
629 }
630
631
632 /* ----------
633  * pgstat_beterm() -
634  *
635  *      Called from postmaster to tell collector a backend terminated.
636  * ----------
637  */
638 void
639 pgstat_beterm(int pid)
640 {
641         PgStat_MsgBeterm msg;
642
643         if (pgStatSock < 0)
644                 return;
645
646         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
647         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
648         msg.m_hdr.m_procpid = pid;
649
650         pgstat_send(&msg, sizeof(msg));
651 }
652
653
654 /* ------------------------------------------------------------
655  * Public functions used by backends follow
656  *------------------------------------------------------------
657  */
658
659
660 /* ----------
661  * pgstat_bestart() -
662  *
663  *      Tell the collector that this new backend is soon ready to process
664  *      queries. Called from tcop/postgres.c before entering the mainloop.
665  * ----------
666  */
667 void
668 pgstat_bestart(void)
669 {
670         PgStat_MsgBestart msg;
671
672         if (pgStatSock < 0)
673                 return;
674
675         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
676         pgstat_send(&msg, sizeof(msg));
677 }
678
679
680 /* ----------
681  * pgstat_report_activity() -
682  *
683  *      Called in tcop/postgres.c to tell the collector what the backend
684  *      is actually doing (usually "<IDLE>" or the start of the query to
685  *      be executed).
686  * ----------
687  */
688 void
689 pgstat_report_activity(const char *what)
690 {
691         PgStat_MsgActivity msg;
692         int                     len;
693
694         if (!pgstat_collect_querystring || pgStatSock < 0)
695                 return;
696
697         len = strlen(what);
698         len = pg_mbcliplen((const unsigned char *) what, len,
699                                            PGSTAT_ACTIVITY_SIZE - 1);
700
701         memcpy(msg.m_what, what, len);
702         msg.m_what[len] = '\0';
703         len += offsetof(PgStat_MsgActivity, m_what) +1;
704
705         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
706         pgstat_send(&msg, len);
707 }
708
709
710 /* ----------
711  * pgstat_report_tabstat() -
712  *
713  *      Called from tcop/postgres.c to send the so far collected
714  *      per table access statistics to the collector.
715  * ----------
716  */
717 void
718 pgstat_report_tabstat(void)
719 {
720         int                     i;
721
722         if (pgStatSock < 0 ||
723                 !(pgstat_collect_querystring ||
724                   pgstat_collect_tuplelevel ||
725                   pgstat_collect_blocklevel))
726         {
727                 /* Not reporting stats, so just flush whatever we have */
728                 pgStatTabstatUsed = 0;
729                 return;
730         }
731
732         /*
733          * For each message buffer used during the last query set the header
734          * fields and send it out.
735          */
736         for (i = 0; i < pgStatTabstatUsed; i++)
737         {
738                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
739                 int                     n;
740                 int                     len;
741
742                 n = tsmsg->m_nentries;
743                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
744                         n * sizeof(PgStat_TableEntry);
745
746                 tsmsg->m_xact_commit = pgStatXactCommit;
747                 tsmsg->m_xact_rollback = pgStatXactRollback;
748                 pgStatXactCommit = 0;
749                 pgStatXactRollback = 0;
750
751                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
752                 pgstat_send(tsmsg, len);
753         }
754
755         pgStatTabstatUsed = 0;
756 }
757
758
759 /* ----------
760  * pgstat_vacuum_tabstat() -
761  *
762  *      Will tell the collector about objects he can get rid of.
763  * ----------
764  */
765 int
766 pgstat_vacuum_tabstat(void)
767 {
768         Relation        dbrel;
769         HeapScanDesc dbscan;
770         HeapTuple       dbtup;
771         Oid                *dbidlist;
772         int                     dbidalloc;
773         int                     dbidused;
774         HASH_SEQ_STATUS hstat;
775         PgStat_StatDBEntry *dbentry;
776         PgStat_StatTabEntry *tabentry;
777         HeapTuple       reltup;
778         int                     nobjects = 0;
779         PgStat_MsgTabpurge msg;
780         int                     len;
781         int                     i;
782
783         if (pgStatSock < 0)
784                 return 0;
785
786         /*
787          * If not done for this transaction, read the statistics collector
788          * stats file into some hash tables.
789          */
790         backend_read_statsfile();
791
792         /*
793          * Lookup our own database entry
794          */
795         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
796                                                                                                  (void *) &MyDatabaseId,
797                                                                                                  HASH_FIND, NULL);
798         if (dbentry == NULL)
799                 return -1;
800
801         if (dbentry->tables == NULL)
802                 return 0;
803
804         /*
805          * Initialize our messages table counter to zero
806          */
807         msg.m_nentries = 0;
808
809         /*
810          * Check for all tables if they still exist.
811          */
812         hash_seq_init(&hstat, dbentry->tables);
813         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
814         {
815                 /*
816                  * Check if this relation is still alive by looking up it's
817                  * pg_class tuple in the system catalog cache.
818                  */
819                 reltup = SearchSysCache(RELOID,
820                                                                 ObjectIdGetDatum(tabentry->tableid),
821                                                                 0, 0, 0);
822                 if (HeapTupleIsValid(reltup))
823                 {
824                         ReleaseSysCache(reltup);
825                         continue;
826                 }
827
828                 /*
829                  * Add this tables Oid to the message
830                  */
831                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
832                 nobjects++;
833
834                 /*
835                  * If the message is full, send it out and reinitialize ot zero
836                  */
837                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
838                 {
839                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
840                                 +msg.m_nentries * sizeof(Oid);
841
842                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
843                         pgstat_send(&msg, len);
844
845                         msg.m_nentries = 0;
846                 }
847         }
848
849         /*
850          * Send the rest
851          */
852         if (msg.m_nentries > 0)
853         {
854                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
855                         +msg.m_nentries * sizeof(Oid);
856
857                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
858                 pgstat_send(&msg, len);
859         }
860
861         /*
862          * Read pg_database and remember the Oid's of all existing databases
863          */
864         dbidalloc = 256;
865         dbidused = 0;
866         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
867
868         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
869         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
870         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
871         {
872                 if (dbidused >= dbidalloc)
873                 {
874                         dbidalloc *= 2;
875                         dbidlist = (Oid *) repalloc((char *) dbidlist,
876                                                                                 sizeof(Oid) * dbidalloc);
877                 }
878                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
879         }
880         heap_endscan(dbscan);
881         heap_close(dbrel, AccessShareLock);
882
883         /*
884          * Search the database hash table for dead databases and tell the
885          * collector to drop them as well.
886          */
887         hash_seq_init(&hstat, pgStatDBHash);
888         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
889         {
890                 Oid                     dbid = dbentry->databaseid;
891
892                 for (i = 0; i < dbidused; i++)
893                 {
894                         if (dbidlist[i] == dbid)
895                         {
896                                 dbid = InvalidOid;
897                                 break;
898                         }
899                 }
900
901                 if (dbid != InvalidOid)
902                 {
903                         nobjects++;
904                         pgstat_drop_database(dbid);
905                 }
906         }
907
908         /*
909          * Free the dbid list.
910          */
911         pfree((char *) dbidlist);
912
913         /*
914          * Tell the caller how many removeable objects we found
915          */
916         return nobjects;
917 }
918
919
920 /* ----------
921  * pgstat_drop_database() -
922  *
923  *      Tell the collector that we just dropped a database.
924  *      This is the only message that shouldn't get lost in space. Otherwise
925  *      the collector will keep the statistics for the dead DB until his
926  *      stats file got removed while the postmaster is down.
927  * ----------
928  */
929 static void
930 pgstat_drop_database(Oid databaseid)
931 {
932         PgStat_MsgDropdb msg;
933
934         if (pgStatSock < 0)
935                 return;
936
937         msg.m_databaseid = databaseid;
938
939         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
940         pgstat_send(&msg, sizeof(msg));
941 }
942
943
944 /* ----------
945  * pgstat_reset_counters() -
946  *
947  *      Tell the statistics collector to reset counters for our database.
948  * ----------
949  */
950 void
951 pgstat_reset_counters(void)
952 {
953         PgStat_MsgResetcounter msg;
954
955         if (pgStatSock < 0)
956                 return;
957
958         if (!superuser())
959                 ereport(ERROR,
960                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
961                           errmsg("must be superuser to reset statistics counters")));
962
963         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
964         pgstat_send(&msg, sizeof(msg));
965 }
966
967
968 /* ----------
969  * pgstat_ping() -
970  *
971  *      Send some junk data to the collector to increase traffic.
972  * ----------
973  */
974 void
975 pgstat_ping(void)
976 {
977         PgStat_MsgDummy msg;
978
979         if (pgStatSock < 0)
980                 return;
981
982         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
983         pgstat_send(&msg, sizeof(msg));
984 }
985
986 /*
987  * Create or enlarge the pgStatTabstatMessages array
988  */
989 static bool
990 more_tabstat_space(void)
991 {
992         PgStat_MsgTabstat *newMessages;
993         PgStat_MsgTabstat **msgArray;
994         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
995         int                     i;
996
997         /* Create (another) quantum of message buffers */
998         newMessages = (PgStat_MsgTabstat *)
999                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1000         if (newMessages == NULL)
1001         {
1002                 ereport(LOG,
1003                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1004                                  errmsg("out of memory")));
1005                 return false;
1006         }
1007
1008         /* Create or enlarge the pointer array */
1009         if (pgStatTabstatMessages == NULL)
1010                 msgArray = (PgStat_MsgTabstat **)
1011                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1012         else
1013                 msgArray = (PgStat_MsgTabstat **)
1014                         realloc(pgStatTabstatMessages,
1015                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1016         if (msgArray == NULL)
1017         {
1018                 free(newMessages);
1019                 ereport(LOG,
1020                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1021                                  errmsg("out of memory")));
1022                 return false;
1023         }
1024
1025         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1026         for (i = 0; i < TABSTAT_QUANTUM; i++)
1027                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1028         pgStatTabstatMessages = msgArray;
1029         pgStatTabstatAlloc = newAlloc;
1030
1031         return true;
1032 }
1033
1034 /* ----------
1035  * pgstat_initstats() -
1036  *
1037  *      Called from various places usually dealing with initialization
1038  *      of Relation or Scan structures. The data placed into these
1039  *      structures from here tell where later to count for buffer reads,
1040  *      scans and tuples fetched.
1041  * ----------
1042  */
1043 void
1044 pgstat_initstats(PgStat_Info *stats, Relation rel)
1045 {
1046         Oid                     rel_id = rel->rd_id;
1047         PgStat_TableEntry *useent;
1048         PgStat_MsgTabstat *tsmsg;
1049         int                     mb;
1050         int                     i;
1051
1052         /*
1053          * Initialize data not to count at all.
1054          */
1055         stats->tabentry = NULL;
1056         stats->no_stats = FALSE;
1057         stats->heap_scan_counted = FALSE;
1058         stats->index_scan_counted = FALSE;
1059
1060         if (pgStatSock < 0 ||
1061                 !(pgstat_collect_tuplelevel ||
1062                   pgstat_collect_blocklevel))
1063         {
1064                 stats->no_stats = TRUE;
1065                 return;
1066         }
1067
1068         /*
1069          * Search the already-used message slots for this relation.
1070          */
1071         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1072         {
1073                 tsmsg = pgStatTabstatMessages[mb];
1074
1075                 for (i = tsmsg->m_nentries; --i >= 0; )
1076                 {
1077                         if (tsmsg->m_entry[i].t_id == rel_id)
1078                         {
1079                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1080                                 return;
1081                         }
1082                 }
1083
1084                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1085                         continue;
1086
1087                 /*
1088                  * Not found, but found a message buffer with an empty slot
1089                  * instead. Fine, let's use this one.
1090                  */
1091                 i = tsmsg->m_nentries++;
1092                 useent = &tsmsg->m_entry[i];
1093                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1094                 useent->t_id = rel_id;
1095                 stats->tabentry = (void *) useent;
1096                 return;
1097         }
1098
1099         /*
1100          * If we ran out of message buffers, we just allocate more.
1101          */
1102         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1103         {
1104                 if (!more_tabstat_space())
1105                 {
1106                         stats->no_stats = TRUE;
1107                         return;
1108                 }
1109                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1110         }
1111
1112         /*
1113          * Use the first entry of the next message buffer.
1114          */
1115         mb = pgStatTabstatUsed++;
1116         tsmsg = pgStatTabstatMessages[mb];
1117         tsmsg->m_nentries = 1;
1118         useent = &tsmsg->m_entry[0];
1119         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1120         useent->t_id = rel_id;
1121         stats->tabentry = (void *) useent;
1122 }
1123
1124
1125 /* ----------
1126  * pgstat_count_xact_commit() -
1127  *
1128  *      Called from access/transam/xact.c to count transaction commits.
1129  * ----------
1130  */
1131 void
1132 pgstat_count_xact_commit(void)
1133 {
1134         if (!(pgstat_collect_querystring ||
1135                   pgstat_collect_tuplelevel ||
1136                   pgstat_collect_blocklevel))
1137                 return;
1138
1139         pgStatXactCommit++;
1140
1141         /*
1142          * If there was no relation activity yet, just make one existing
1143          * message buffer used without slots, causing the next report to tell
1144          * new xact-counters.
1145          */
1146         if (pgStatTabstatAlloc == 0)
1147         {
1148                 if (!more_tabstat_space())
1149                         return;
1150         }
1151         if (pgStatTabstatUsed == 0)
1152         {
1153                 pgStatTabstatUsed++;
1154                 pgStatTabstatMessages[0]->m_nentries = 0;
1155         }
1156 }
1157
1158
1159 /* ----------
1160  * pgstat_count_xact_rollback() -
1161  *
1162  *      Called from access/transam/xact.c to count transaction rollbacks.
1163  * ----------
1164  */
1165 void
1166 pgstat_count_xact_rollback(void)
1167 {
1168         if (!(pgstat_collect_querystring ||
1169                   pgstat_collect_tuplelevel ||
1170                   pgstat_collect_blocklevel))
1171                 return;
1172
1173         pgStatXactRollback++;
1174
1175         /*
1176          * If there was no relation activity yet, just make one existing
1177          * message buffer used without slots, causing the next report to tell
1178          * new xact-counters.
1179          */
1180         if (pgStatTabstatAlloc == 0)
1181         {
1182                 if (!more_tabstat_space())
1183                         return;
1184         }
1185         if (pgStatTabstatUsed == 0)
1186         {
1187                 pgStatTabstatUsed++;
1188                 pgStatTabstatMessages[0]->m_nentries = 0;
1189         }
1190 }
1191
1192
1193 /* ----------
1194  * pgstat_fetch_stat_dbentry() -
1195  *
1196  *      Support function for the SQL-callable pgstat* functions. Returns
1197  *      the collected statistics for one database or NULL. NULL doesn't mean
1198  *      that the database doesn't exist, it is just not yet known by the
1199  *      collector, so the caller is better off to report ZERO instead.
1200  * ----------
1201  */
1202 PgStat_StatDBEntry *
1203 pgstat_fetch_stat_dbentry(Oid dbid)
1204 {
1205         PgStat_StatDBEntry *dbentry;
1206
1207         /*
1208          * If not done for this transaction, read the statistics collector
1209          * stats file into some hash tables.
1210          */
1211         backend_read_statsfile();
1212
1213         /*
1214          * Lookup the requested database
1215          */
1216         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1217                                                                                                  (void *) &dbid,
1218                                                                                                  HASH_FIND, NULL);
1219         if (dbentry == NULL)
1220                 return NULL;
1221
1222         return dbentry;
1223 }
1224
1225
1226 /* ----------
1227  * pgstat_fetch_stat_tabentry() -
1228  *
1229  *      Support function for the SQL-callable pgstat* functions. Returns
1230  *      the collected statistics for one table or NULL. NULL doesn't mean
1231  *      that the table doesn't exist, it is just not yet known by the
1232  *      collector, so the caller is better off to report ZERO instead.
1233  * ----------
1234  */
1235 PgStat_StatTabEntry *
1236 pgstat_fetch_stat_tabentry(Oid relid)
1237 {
1238         PgStat_StatDBEntry *dbentry;
1239         PgStat_StatTabEntry *tabentry;
1240
1241         /*
1242          * If not done for this transaction, read the statistics collector
1243          * stats file into some hash tables.
1244          */
1245         backend_read_statsfile();
1246
1247         /*
1248          * Lookup our database.
1249          */
1250         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1251                                                                                                  (void *) &MyDatabaseId,
1252                                                                                                  HASH_FIND, NULL);
1253         if (dbentry == NULL)
1254                 return NULL;
1255
1256         /*
1257          * Now inside the DB's table hash table lookup the requested one.
1258          */
1259         if (dbentry->tables == NULL)
1260                 return NULL;
1261         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1262                                                                                                    (void *) &relid,
1263                                                                                                    HASH_FIND, NULL);
1264         if (tabentry == NULL)
1265                 return NULL;
1266
1267         return tabentry;
1268 }
1269
1270
1271 /* ----------
1272  * pgstat_fetch_stat_beentry() -
1273  *
1274  *      Support function for the SQL-callable pgstat* functions. Returns
1275  *      the actual activity slot of one active backend. The caller is
1276  *      responsible for a check if the actual user is permitted to see
1277  *      that info (especially the querystring).
1278  * ----------
1279  */
1280 PgStat_StatBeEntry *
1281 pgstat_fetch_stat_beentry(int beid)
1282 {
1283         backend_read_statsfile();
1284
1285         if (beid < 1 || beid > pgStatNumBackends)
1286                 return NULL;
1287
1288         return &pgStatBeTable[beid - 1];
1289 }
1290
1291
1292 /* ----------
1293  * pgstat_fetch_stat_numbackends() -
1294  *
1295  *      Support function for the SQL-callable pgstat* functions. Returns
1296  *      the maximum current backend id.
1297  * ----------
1298  */
1299 int
1300 pgstat_fetch_stat_numbackends(void)
1301 {
1302         backend_read_statsfile();
1303
1304         return pgStatNumBackends;
1305 }
1306
1307
1308
1309 /* ------------------------------------------------------------
1310  * Local support functions follow
1311  * ------------------------------------------------------------
1312  */
1313
1314
1315 /* ----------
1316  * pgstat_setheader() -
1317  *
1318  *              Set common header fields in a statistics message
1319  * ----------
1320  */
1321 static void
1322 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1323 {
1324         hdr->m_type = mtype;
1325         hdr->m_backendid = MyBackendId;
1326         hdr->m_procpid = MyProcPid;
1327         hdr->m_databaseid = MyDatabaseId;
1328         hdr->m_userid = GetSessionUserId();
1329 }
1330
1331
1332 /* ----------
1333  * pgstat_send() -
1334  *
1335  *              Send out one statistics message to the collector
1336  * ----------
1337  */
1338 static void
1339 pgstat_send(void *msg, int len)
1340 {
1341         if (pgStatSock < 0)
1342                 return;
1343
1344         ((PgStat_MsgHdr *) msg)->m_size = len;
1345
1346         send(pgStatSock, msg, len, 0);
1347         /* We deliberately ignore any error from send() */
1348 }
1349
1350
1351 /* ----------
1352  * PgstatBufferMain() -
1353  *
1354  *      Start up the statistics buffer process.  This is the body of the
1355  *      postmaster child process.
1356  *
1357  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1358  * ----------
1359  */
1360 NON_EXEC_STATIC void
1361 PgstatBufferMain(int argc, char *argv[])
1362 {
1363         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1364
1365         MyProcPid = getpid();           /* reset MyProcPid */
1366
1367         /* Lose the postmaster's on-exit routines */
1368         on_exit_reset();
1369
1370         /*
1371          * Ignore all signals usually bound to some action in the postmaster,
1372          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1373          */
1374         pqsignal(SIGHUP, SIG_IGN);
1375         pqsignal(SIGINT, SIG_IGN);
1376         pqsignal(SIGTERM, SIG_IGN);
1377         pqsignal(SIGQUIT, pgstat_exit);
1378         pqsignal(SIGALRM, SIG_IGN);
1379         pqsignal(SIGPIPE, SIG_IGN);
1380         pqsignal(SIGUSR1, SIG_IGN);
1381         pqsignal(SIGUSR2, SIG_IGN);
1382         pqsignal(SIGCHLD, pgstat_die);
1383         pqsignal(SIGTTIN, SIG_DFL);
1384         pqsignal(SIGTTOU, SIG_DFL);
1385         pqsignal(SIGCONT, SIG_DFL);
1386         pqsignal(SIGWINCH, SIG_DFL);
1387         /* unblock will happen in pgstat_recvbuffer */
1388
1389 #ifdef EXEC_BACKEND
1390         pgstat_parseArgs(argc,argv);
1391 #endif
1392
1393         /*
1394          * Start a buffering process to read from the socket, so we have a
1395          * little more time to process incoming messages.
1396          *
1397          * NOTE: the process structure is: postmaster is parent of buffer process
1398          * is parent of collector process.      This way, the buffer can detect
1399          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1400          * collector failure until it tried to write on the pipe.  That would
1401          * mean that after the postmaster started a new collector, we'd have
1402          * two buffer processes competing to read from the UDP socket --- not
1403          * good.
1404          */
1405         if (pgpipe(pgStatPipe) < 0)
1406         {
1407                 ereport(LOG,
1408                                 (errcode_for_socket_access(),
1409                          errmsg("could not create pipe for statistics buffer: %m")));
1410                 exit(1);
1411         }
1412
1413 #ifdef EXEC_BACKEND
1414         /* child becomes collector process */
1415         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1416 #else
1417         switch (fork())
1418 #endif
1419         {
1420                 case -1:
1421                         ereport(LOG,
1422                                         (errmsg("could not fork statistics collector: %m")));
1423                         exit(1);
1424
1425 #ifndef EXEC_BACKEND
1426                 case 0:
1427                         /* child becomes collector process */
1428                         PgstatCollectorMain(0, NULL);
1429                         break;
1430 #endif
1431
1432                 default:
1433                         /* parent becomes buffer process */
1434                         closesocket(pgStatPipe[0]);
1435                         pgstat_recvbuffer();
1436         }
1437         exit(0);
1438 }
1439
1440
1441 /* ----------
1442  * PgstatCollectorMain() -
1443  *
1444  *      Start up the statistics collector itself.  This is the body of the
1445  *      postmaster grandchild process.
1446  *
1447  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1448  * ----------
1449  */
1450 NON_EXEC_STATIC void
1451 PgstatCollectorMain(int argc, char *argv[])
1452 {
1453         PgStat_Msg      msg;
1454         fd_set          rfds;
1455         int                     readPipe;
1456         int                     nready;
1457         int                     len = 0;
1458         struct timeval timeout;
1459         struct timeval next_statwrite;
1460         bool            need_statwrite;
1461         HASHCTL         hash_ctl;
1462
1463         MyProcPid = getpid();           /* reset MyProcPid */
1464
1465         /*
1466          * Reset signal handling.  With the exception of restoring default
1467          * SIGCHLD and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND
1468          * case because we'll have inherited these settings from the buffer
1469          * process; but it's not a no-op for EXEC_BACKEND.
1470          */
1471         pqsignal(SIGHUP, SIG_IGN);
1472         pqsignal(SIGINT, SIG_IGN);
1473         pqsignal(SIGTERM, SIG_IGN);
1474         pqsignal(SIGQUIT, SIG_IGN);
1475         pqsignal(SIGALRM, SIG_IGN);
1476         pqsignal(SIGPIPE, SIG_IGN);
1477         pqsignal(SIGUSR1, SIG_IGN);
1478         pqsignal(SIGUSR2, SIG_IGN);
1479         pqsignal(SIGCHLD, SIG_DFL);
1480         pqsignal(SIGTTIN, SIG_DFL);
1481         pqsignal(SIGTTOU, SIG_DFL);
1482         pqsignal(SIGCONT, SIG_DFL);
1483         pqsignal(SIGWINCH, SIG_DFL);
1484         PG_SETMASK(&UnBlockSig);
1485
1486 #ifdef EXEC_BACKEND
1487         pgstat_parseArgs(argc,argv);
1488 #endif
1489
1490         /* Close unwanted files */
1491         closesocket(pgStatPipe[1]);
1492         closesocket(pgStatSock);
1493
1494         /*
1495          * Identify myself via ps
1496          */
1497         init_ps_display("stats collector process", "", "");
1498         set_ps_display("");
1499
1500         /*
1501          * Initialize filenames needed for status reports.
1502          */
1503         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1504         /* tmpfname need only be set correctly in this process */
1505         snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1506                          DataDir, getpid());
1507
1508         /*
1509          * Arrange to write the initial status file right away
1510          */
1511         gettimeofday(&next_statwrite, NULL);
1512         need_statwrite = TRUE;
1513
1514         /*
1515          * Read in an existing statistics stats file or initialize the stats
1516          * to zero.
1517          */
1518         pgStatRunningInCollector = TRUE;
1519         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1520
1521         /*
1522          * Create the dead backend hashtable
1523          */
1524         memset(&hash_ctl, 0, sizeof(hash_ctl));
1525         hash_ctl.keysize = sizeof(int);
1526         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1527         hash_ctl.hash = tag_hash;
1528         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1529                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1530         if (pgStatBeDead == NULL)
1531         {
1532                 /* assume the problem is out-of-memory */
1533                 ereport(LOG,
1534                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1535                                  errmsg("out of memory in statistics collector --- abort")));
1536                 exit(1);
1537         }
1538
1539         /*
1540          * Create the known backends table
1541          */
1542         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1543                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1544         if (pgStatBeTable == NULL)
1545         {
1546                 ereport(LOG,
1547                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1548                                  errmsg("out of memory in statistics collector --- abort")));
1549                 exit(1);
1550         }
1551         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1552
1553         readPipe = pgStatPipe[0];
1554
1555         /*
1556          * Process incoming messages and handle all the reporting stuff until
1557          * there are no more messages.
1558          */
1559         for (;;)
1560         {
1561                 /*
1562                  * If we need to write the status file again (there have been
1563                  * changes in the statistics since we wrote it last) calculate the
1564                  * timeout until we have to do so.
1565                  */
1566                 if (need_statwrite)
1567                 {
1568                         struct timeval now;
1569
1570                         gettimeofday(&now, NULL);
1571                         /* avoid assuming that tv_sec is signed */
1572                         if (now.tv_sec > next_statwrite.tv_sec ||
1573                                 (now.tv_sec == next_statwrite.tv_sec &&
1574                                  now.tv_usec >= next_statwrite.tv_usec))
1575                         {
1576                                 timeout.tv_sec = 0;
1577                                 timeout.tv_usec = 0;
1578                         }
1579                         else
1580                         {
1581                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1582                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1583                                 if (timeout.tv_usec < 0)
1584                                 {
1585                                         timeout.tv_sec--;
1586                                         timeout.tv_usec += 1000000;
1587                                 }
1588                         }
1589                 }
1590
1591                 /*
1592                  * Setup the descriptor set for select(2)
1593                  */
1594                 FD_ZERO(&rfds);
1595                 FD_SET(readPipe, &rfds);
1596
1597                 /*
1598                  * Now wait for something to do.
1599                  */
1600                 nready = select(readPipe+1, &rfds, NULL, NULL,
1601                                                 (need_statwrite) ? &timeout : NULL);
1602                 if (nready < 0)
1603                 {
1604                         if (errno == EINTR)
1605                                 continue;
1606                         ereport(LOG,
1607                                         (errcode_for_socket_access(),
1608                                    errmsg("select() failed in statistics collector: %m")));
1609                         exit(1);
1610                 }
1611
1612                 /*
1613                  * If there are no descriptors ready, our timeout for writing the
1614                  * stats file happened.
1615                  */
1616                 if (nready == 0)
1617                 {
1618                         pgstat_write_statsfile();
1619                         need_statwrite = FALSE;
1620
1621                         continue;
1622                 }
1623
1624                 /*
1625                  * Check if there is a new statistics message to collect.
1626                  */
1627                 if (FD_ISSET(readPipe, &rfds))
1628                 {
1629                         /*
1630                          * We may need to issue multiple read calls in case the buffer
1631                          * process didn't write the message in a single write, which
1632                          * is possible since it dumps its buffer bytewise. In any
1633                          * case, we'd need two reads since we don't know the message
1634                          * length initially.
1635                          */
1636                         int                     nread = 0;
1637                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1638                         bool            pipeEOF = false;
1639
1640                         while (nread < targetlen)
1641                         {
1642                                 len = piperead(readPipe, ((char *) &msg) + nread,
1643                                                                 targetlen - nread);
1644                                 if (len < 0)
1645                                 {
1646                                         if (errno == EINTR)
1647                                                 continue;
1648                                         ereport(LOG,
1649                                                         (errcode_for_socket_access(),
1650                                                          errmsg("could not read from statistics collector pipe: %m")));
1651                                         exit(1);
1652                                 }
1653                                 if (len == 0)   /* EOF on the pipe! */
1654                                 {
1655                                         pipeEOF = true;
1656                                         break;
1657                                 }
1658                                 nread += len;
1659                                 if (nread == sizeof(PgStat_MsgHdr))
1660                                 {
1661                                         /* we have the header, compute actual msg length */
1662                                         targetlen = msg.msg_hdr.m_size;
1663                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1664                                                 targetlen > (int) sizeof(msg))
1665                                         {
1666                                                 /*
1667                                                  * Bogus message length implies that we got out of
1668                                                  * sync with the buffer process somehow. Abort so
1669                                                  * that we can restart both processes.
1670                                                  */
1671                                                 ereport(LOG,
1672                                                   (errmsg("invalid statistics message length")));
1673                                                 exit(1);
1674                                         }
1675                                 }
1676                         }
1677
1678                         /*
1679                          * EOF on the pipe implies that the buffer process exited.
1680                          * Fall out of outer loop.
1681                          */
1682                         if (pipeEOF)
1683                                 break;
1684
1685                         /*
1686                          * Distribute the message to the specific function handling
1687                          * it.
1688                          */
1689                         switch (msg.msg_hdr.m_type)
1690                         {
1691                                 case PGSTAT_MTYPE_DUMMY:
1692                                         break;
1693
1694                                 case PGSTAT_MTYPE_BESTART:
1695                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1696                                         break;
1697
1698                                 case PGSTAT_MTYPE_BETERM:
1699                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1700                                         break;
1701
1702                                 case PGSTAT_MTYPE_TABSTAT:
1703                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1704                                         break;
1705
1706                                 case PGSTAT_MTYPE_TABPURGE:
1707                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1708                                         break;
1709
1710                                 case PGSTAT_MTYPE_ACTIVITY:
1711                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1712                                         break;
1713
1714                                 case PGSTAT_MTYPE_DROPDB:
1715                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1716                                         break;
1717
1718                                 case PGSTAT_MTYPE_RESETCOUNTER:
1719                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1720                                                                                          nread);
1721                                         break;
1722
1723                                 default:
1724                                         break;
1725                         }
1726
1727                         /*
1728                          * Globally count messages.
1729                          */
1730                         pgStatNumMessages++;
1731
1732                         /*
1733                          * If this is the first message after we wrote the stats file
1734                          * the last time, setup the timeout that it'd be written.
1735                          */
1736                         if (!need_statwrite)
1737                         {
1738                                 gettimeofday(&next_statwrite, NULL);
1739                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1740                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1741                                 next_statwrite.tv_usec %= 1000000;
1742                                 need_statwrite = TRUE;
1743                         }
1744                 }
1745
1746                 /*
1747                  * Note that we do NOT check for postmaster exit inside the loop;
1748                  * only EOF on the buffer pipe causes us to fall out.  This
1749                  * ensures we don't exit prematurely if there are still a few
1750                  * messages in the buffer or pipe at postmaster shutdown.
1751                  */
1752         }
1753
1754         /*
1755          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1756          * to process.  If the buffer process quit because of postmaster
1757          * shutdown, we want to save the final stats to reuse at next startup.
1758          * But if the buffer process failed, it seems best not to (there may
1759          * even now be a new collector firing up, and we don't want it to read
1760          * a partially-rewritten stats file).
1761          */
1762         if (!PostmasterIsAlive(false))
1763                 pgstat_write_statsfile();
1764 }
1765
1766
1767 /* ----------
1768  * pgstat_recvbuffer() -
1769  *
1770  *      This is the body of the separate buffering process. Its only
1771  *      purpose is to receive messages from the UDP socket as fast as
1772  *      possible and forward them over a pipe into the collector itself.
1773  *      If the collector is slow to absorb messages, they are buffered here.
1774  * ----------
1775  */
1776 static void
1777 pgstat_recvbuffer(void)
1778 {
1779         fd_set          rfds;
1780         fd_set          wfds;
1781         struct timeval timeout;
1782         int                     writePipe = pgStatPipe[1];
1783         int                     maxfd;
1784         int                     nready;
1785         int                     len;
1786         int                     xfr;
1787         int                     frm;
1788         PgStat_Msg      input_buffer;
1789         char       *msgbuffer;
1790         int                     msg_send = 0;   /* next send index in buffer */
1791         int                     msg_recv = 0;   /* next receive index */
1792         int                     msg_have = 0;   /* number of bytes stored */
1793         bool            overflow = false;
1794
1795         /*
1796          * Identify myself via ps
1797          */
1798         init_ps_display("stats buffer process", "", "");
1799         set_ps_display("");
1800
1801         /*
1802          * We want to die if our child collector process does.  There are two
1803          * ways we might notice that it has died: receive SIGCHLD, or get a
1804          * write failure on the pipe leading to the child.      We can set SIGPIPE
1805          * to kill us here.  Our SIGCHLD handler was already set up before we
1806          * forked (must do it that way, else it's a race condition).
1807          */
1808         pqsignal(SIGPIPE, SIG_DFL);
1809         PG_SETMASK(&UnBlockSig);
1810
1811         /*
1812          * Set the write pipe to nonblock mode, so that we cannot block when
1813          * the collector falls behind.
1814          */
1815         if (!set_noblock(writePipe))
1816         {
1817                 ereport(LOG,
1818                                 (errcode_for_socket_access(),
1819                   errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1820                 exit(1);
1821         }
1822
1823         /*
1824          * Allocate the message buffer
1825          */
1826         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1827         if (msgbuffer == NULL)
1828         {
1829                 ereport(LOG,
1830                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1831                          errmsg("out of memory in statistics collector --- abort")));
1832                 exit(1);
1833         }
1834
1835         /*
1836          * Loop forever
1837          */
1838         for (;;)
1839         {
1840                 FD_ZERO(&rfds);
1841                 FD_ZERO(&wfds);
1842                 maxfd = -1;
1843
1844                 /*
1845                  * As long as we have buffer space we add the socket to the read
1846                  * descriptor set.
1847                  */
1848                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1849                 {
1850                         FD_SET(pgStatSock, &rfds);
1851                         maxfd = pgStatSock;
1852                         overflow = false;
1853                 }
1854                 else
1855                 {
1856                         if (!overflow)
1857                         {
1858                                 ereport(LOG,
1859                                                 (errmsg("statistics buffer is full")));
1860                                 overflow = true;
1861                         }
1862                 }
1863
1864                 /*
1865                  * If we have messages to write out, we add the pipe to the write
1866                  * descriptor set.
1867                  */
1868                 if (msg_have > 0)
1869                 {
1870                         FD_SET(writePipe, &wfds);
1871                         if (writePipe > maxfd)
1872                                 maxfd = writePipe;
1873                 }
1874
1875                 /*
1876                  * Wait for some work to do; but not for more than 10 seconds.
1877                  * (This determines how quickly we will shut down after an
1878                  * ungraceful postmaster termination; so it needn't be very fast.)
1879                  */
1880                 timeout.tv_sec = 10;
1881                 timeout.tv_usec = 0;
1882
1883                 nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1884                 if (nready < 0)
1885                 {
1886                         if (errno == EINTR)
1887                                 continue;
1888                         ereport(LOG,
1889                                         (errcode_for_socket_access(),
1890                                          errmsg("select() failed in statistics buffer: %m")));
1891                         exit(1);
1892                 }
1893
1894                 /*
1895                  * If there is a message on the socket, read it and check for
1896                  * validity.
1897                  */
1898                 if (FD_ISSET(pgStatSock, &rfds))
1899                 {
1900                         len = recv(pgStatSock, (char *) &input_buffer,
1901                                            sizeof(PgStat_Msg), 0);
1902                         if (len < 0)
1903                         {
1904                                 ereport(LOG,
1905                                                 (errcode_for_socket_access(),
1906                                            errmsg("could not read statistics message: %m")));
1907                                 exit(1);
1908                         }
1909
1910                         /*
1911                          * We ignore messages that are smaller than our common header
1912                          */
1913                         if (len < sizeof(PgStat_MsgHdr))
1914                                 continue;
1915
1916                         /*
1917                          * The received length must match the length in the header
1918                          */
1919                         if (input_buffer.msg_hdr.m_size != len)
1920                                 continue;
1921
1922                         /*
1923                          * O.K. - we accept this message.  Copy it to the circular
1924                          * msgbuffer.
1925                          */
1926                         frm = 0;
1927                         while (len > 0)
1928                         {
1929                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1930                                 if (xfr > len)
1931                                         xfr = len;
1932                                 Assert(xfr > 0);
1933                                 memcpy(msgbuffer + msg_recv,
1934                                            ((char *) &input_buffer) + frm,
1935                                            xfr);
1936                                 msg_recv += xfr;
1937                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1938                                         msg_recv = 0;
1939                                 msg_have += xfr;
1940                                 frm += xfr;
1941                                 len -= xfr;
1942                         }
1943                 }
1944
1945                 /*
1946                  * If the collector is ready to receive, write some data into his
1947                  * pipe.  We may or may not be able to write all that we have.
1948                  *
1949                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
1950                  * the space available in the pipe buffer, most kernels will
1951                  * refuse to write any of it, and will return EAGAIN.  This means
1952                  * we will busy-loop until the situation changes (either because
1953                  * the collector caught up, or because more data arrives so that
1954                  * we have more than PIPE_BUF bytes buffered).  This is not good,
1955                  * but is there any way around it?      We have no way to tell when
1956                  * the collector has caught up...
1957                  */
1958                 if (FD_ISSET(writePipe, &wfds))
1959                 {
1960                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1961                         if (xfr > msg_have)
1962                                 xfr = msg_have;
1963                         Assert(xfr > 0);
1964                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1965                         if (len < 0)
1966                         {
1967                                 if (errno == EINTR || errno == EAGAIN)
1968                                         continue;       /* not enough space in pipe */
1969                                 ereport(LOG,
1970                                                 (errcode_for_socket_access(),
1971                                                  errmsg("could not write to statistics collector pipe: %m")));
1972                                 exit(1);
1973                         }
1974                         /* NB: len < xfr is okay */
1975                         msg_send += len;
1976                         if (msg_send == PGSTAT_RECVBUFFERSZ)
1977                                 msg_send = 0;
1978                         msg_have -= len;
1979                 }
1980
1981                 /*
1982                  * Make sure we forwarded all messages before we check for
1983                  * postmaster termination.
1984                  */
1985                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
1986                         continue;
1987
1988                 /*
1989                  * If the postmaster has terminated, we die too.  (This is no longer
1990                  * the normal exit path, however.)
1991                  */
1992                 if (!PostmasterIsAlive(true))
1993                         exit(0);
1994         }
1995 }
1996
1997 /* SIGQUIT signal handler for buffer process */
1998 static void
1999 pgstat_exit(SIGNAL_ARGS)
2000 {
2001         /*
2002          * For now, we just nail the doors shut and get out of town.  It might
2003          * be cleaner to allow any pending messages to be sent, but that creates
2004          * a tradeoff against speed of exit.
2005          */
2006         exit(0);
2007 }
2008
2009 /* SIGCHLD signal handler for buffer process */
2010 static void
2011 pgstat_die(SIGNAL_ARGS)
2012 {
2013         exit(1);
2014 }
2015
2016
2017 /* ----------
2018  * pgstat_add_backend() -
2019  *
2020  *      Support function to keep our backend list up to date.
2021  * ----------
2022  */
2023 static int
2024 pgstat_add_backend(PgStat_MsgHdr *msg)
2025 {
2026         PgStat_StatDBEntry *dbentry;
2027         PgStat_StatBeEntry *beentry;
2028         PgStat_StatBeDead *deadbe;
2029         bool            found;
2030
2031         /*
2032          * Check that the backend ID is valid
2033          */
2034         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2035         {
2036                 ereport(LOG,
2037                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2038                 return -1;
2039         }
2040
2041         /*
2042          * Get the slot for this backendid.
2043          */
2044         beentry = &pgStatBeTable[msg->m_backendid - 1];
2045         if (beentry->databaseid != InvalidOid)
2046         {
2047                 /*
2048                  * If the slot contains the PID of this backend, everything is
2049                  * fine and we got nothing to do.
2050                  */
2051                 if (beentry->procpid == msg->m_procpid)
2052                         return 0;
2053         }
2054
2055         /*
2056          * Lookup if this backend is known to be dead. This can be caused due
2057          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2058          * message might have arrived before we received all the backends
2059          * stats messages, or even a new backend with the same backendid was
2060          * faster in sending his BESTART.
2061          *
2062          * If the backend is known to be dead, we ignore this add.
2063          */
2064         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2065                                                                                            (void *) &(msg->m_procpid),
2066                                                                                            HASH_FIND, NULL);
2067         if (deadbe)
2068                 return 1;
2069
2070         /*
2071          * Backend isn't known to be dead. If it's slot is currently used, we
2072          * have to kick out the old backend.
2073          */
2074         if (beentry->databaseid != InvalidOid)
2075                 pgstat_sub_backend(beentry->procpid);
2076
2077         /*
2078          * Put this new backend into the slot.
2079          */
2080         beentry->databaseid = msg->m_databaseid;
2081         beentry->procpid = msg->m_procpid;
2082         beentry->userid = msg->m_userid;
2083         beentry->activity_start_sec = 0;
2084         beentry->activity_start_usec = 0;
2085         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2086
2087         /*
2088          * Lookup or create the database entry for this backends DB.
2089          */
2090         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2091                                                                                    (void *) &(msg->m_databaseid),
2092                                                                                                  HASH_ENTER, &found);
2093         if (dbentry == NULL)
2094         {
2095                 ereport(LOG,
2096                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2097                          errmsg("out of memory in statistics collector --- abort")));
2098                 exit(1);
2099         }
2100
2101         /*
2102          * If not found, initialize the new one.
2103          */
2104         if (!found)
2105         {
2106                 HASHCTL         hash_ctl;
2107
2108                 dbentry->tables = NULL;
2109                 dbentry->n_xact_commit = 0;
2110                 dbentry->n_xact_rollback = 0;
2111                 dbentry->n_blocks_fetched = 0;
2112                 dbentry->n_blocks_hit = 0;
2113                 dbentry->n_connects = 0;
2114                 dbentry->destroy = 0;
2115
2116                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2117                 hash_ctl.keysize = sizeof(Oid);
2118                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2119                 hash_ctl.hash = tag_hash;
2120                 dbentry->tables = hash_create("Per-database table",
2121                                                                           PGSTAT_TAB_HASH_SIZE,
2122                                                                           &hash_ctl,
2123                                                                           HASH_ELEM | HASH_FUNCTION);
2124                 if (dbentry->tables == NULL)
2125                 {
2126                         /* assume the problem is out-of-memory */
2127                         ereport(LOG,
2128                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2129                          errmsg("out of memory in statistics collector --- abort")));
2130                         exit(1);
2131                 }
2132         }
2133
2134         /*
2135          * Count number of connects to the database
2136          */
2137         dbentry->n_connects++;
2138
2139         return 0;
2140 }
2141
2142
2143 /* ----------
2144  * pgstat_sub_backend() -
2145  *
2146  *      Remove a backend from the actual backends list.
2147  * ----------
2148  */
2149 static void
2150 pgstat_sub_backend(int procpid)
2151 {
2152         int                     i;
2153         PgStat_StatBeDead *deadbe;
2154         bool            found;
2155
2156         /*
2157          * Search in the known-backends table for the slot containing this
2158          * PID.
2159          */
2160         for (i = 0; i < MaxBackends; i++)
2161         {
2162                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2163                         pgStatBeTable[i].procpid == procpid)
2164                 {
2165                         /*
2166                          * That's him. Add an entry to the known to be dead backends.
2167                          * Due to possible misorder in the arrival of UDP packets it's
2168                          * possible that even if we know the backend is dead, there
2169                          * could still be messages queued that arrive later. Those
2170                          * messages must not cause our number of backends statistics
2171                          * to get screwed up, so we remember for a couple of seconds
2172                          * that this PID is dead and ignore them (only the counting of
2173                          * backends, not the table access stats they sent).
2174                          */
2175                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2176                                                                                                            (void *) &procpid,
2177                                                                                                            HASH_ENTER,
2178                                                                                                            &found);
2179                         if (deadbe == NULL)
2180                         {
2181                                 ereport(LOG,
2182                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2183                                                  errmsg("out of memory in statistics collector --- abort")));
2184                                 exit(1);
2185                         }
2186                         if (!found)
2187                         {
2188                                 deadbe->backendid = i + 1;
2189                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2190                         }
2191
2192                         /*
2193                          * Declare the backend slot empty.
2194                          */
2195                         pgStatBeTable[i].databaseid = InvalidOid;
2196                         return;
2197                 }
2198         }
2199
2200         /*
2201          * No big problem if not found. This can happen if UDP messages arrive
2202          * out of order here.
2203          */
2204 }
2205
2206
2207 /* ----------
2208  * pgstat_write_statsfile() -
2209  *
2210  *      Tell the news.
2211  * ----------
2212  */
2213 static void
2214 pgstat_write_statsfile(void)
2215 {
2216         HASH_SEQ_STATUS hstat;
2217         HASH_SEQ_STATUS tstat;
2218         PgStat_StatDBEntry *dbentry;
2219         PgStat_StatTabEntry *tabentry;
2220         PgStat_StatBeDead *deadbe;
2221         FILE       *fpout;
2222         int                     i;
2223
2224         /*
2225          * Open the statistics temp file to write out the current values.
2226          */
2227         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2228         if (fpout == NULL)
2229         {
2230                 ereport(LOG,
2231                                 (errcode_for_file_access(),
2232                                  errmsg("could not open temporary statistics file \"%s\": %m",
2233                                                 pgStat_tmpfname)));
2234                 return;
2235         }
2236
2237         /*
2238          * Walk through the database table.
2239          */
2240         hash_seq_init(&hstat, pgStatDBHash);
2241         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2242         {
2243                 /*
2244                  * If this database is marked destroyed, count down and do so if
2245                  * it reaches 0.
2246                  */
2247                 if (dbentry->destroy > 0)
2248                 {
2249                         if (--(dbentry->destroy) == 0)
2250                         {
2251                                 if (dbentry->tables != NULL)
2252                                         hash_destroy(dbentry->tables);
2253
2254                                 if (hash_search(pgStatDBHash,
2255                                                                 (void *) &(dbentry->databaseid),
2256                                                                 HASH_REMOVE, NULL) == NULL)
2257                                 {
2258                                         ereport(LOG,
2259                                                         (errmsg("database hash table corrupted "
2260                                                                         "during cleanup --- abort")));
2261                                         exit(1);
2262                                 }
2263                         }
2264
2265                         /*
2266                          * Don't include statistics for it.
2267                          */
2268                         continue;
2269                 }
2270
2271                 /*
2272                  * Write out the DB line including the number of life backends.
2273                  */
2274                 fputc('D', fpout);
2275                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2276
2277                 /*
2278                  * Walk through the databases access stats per table.
2279                  */
2280                 hash_seq_init(&tstat, dbentry->tables);
2281                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2282                 {
2283                         /*
2284                          * If table entry marked for destruction, same as above for
2285                          * the database entry.
2286                          */
2287                         if (tabentry->destroy > 0)
2288                         {
2289                                 if (--(tabentry->destroy) == 0)
2290                                 {
2291                                         if (hash_search(dbentry->tables,
2292                                                                         (void *) &(tabentry->tableid),
2293                                                                         HASH_REMOVE, NULL) == NULL)
2294                                         {
2295                                                 ereport(LOG,
2296                                                                 (errmsg("tables hash table for "
2297                                                                                 "database %u corrupted during "
2298                                                                                 "cleanup --- abort",
2299                                                                                 dbentry->databaseid)));
2300                                                 exit(1);
2301                                         }
2302                                 }
2303                                 continue;
2304                         }
2305
2306                         /*
2307                          * At least we think this is still a life table. Print it's
2308                          * access stats.
2309                          */
2310                         fputc('T', fpout);
2311                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2312                 }
2313
2314                 /*
2315                  * Mark the end of this DB
2316                  */
2317                 fputc('d', fpout);
2318         }
2319
2320         /*
2321          * Write out the known running backends to the stats file.
2322          */
2323         i = MaxBackends;
2324         fputc('M', fpout);
2325         fwrite(&i, sizeof(i), 1, fpout);
2326
2327         for (i = 0; i < MaxBackends; i++)
2328         {
2329                 if (pgStatBeTable[i].databaseid != InvalidOid)
2330                 {
2331                         fputc('B', fpout);
2332                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2333                 }
2334         }
2335
2336         /*
2337          * No more output to be done. Close the temp file and replace the old
2338          * pgstat.stat with it.
2339          */
2340         fputc('E', fpout);
2341         if (fclose(fpout) < 0)
2342         {
2343                 ereport(LOG,
2344                                 (errcode_for_file_access(),
2345                                  errmsg("could not close temporary statistics file \"%s\": %m",
2346                                                 pgStat_tmpfname)));
2347         }
2348         else
2349         {
2350                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2351                 {
2352                         ereport(LOG,
2353                                         (errcode_for_file_access(),
2354                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2355                                                         pgStat_tmpfname, pgStat_fname)));
2356                 }
2357         }
2358
2359         /*
2360          * Clear out the dead backends table
2361          */
2362         hash_seq_init(&hstat, pgStatBeDead);
2363         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2364         {
2365                 /*
2366                  * Count down the destroy delay and remove entries where it
2367                  * reaches 0.
2368                  */
2369                 if (--(deadbe->destroy) <= 0)
2370                 {
2371                         if (hash_search(pgStatBeDead,
2372                                                         (void *) &(deadbe->procpid),
2373                                                         HASH_REMOVE, NULL) == NULL)
2374                         {
2375                                 ereport(LOG,
2376                                                 (errmsg("dead-server-process hash table corrupted "
2377                                                                 "during cleanup --- abort")));
2378                                 exit(1);
2379                         }
2380                 }
2381         }
2382 }
2383
2384
2385 /* ----------
2386  * pgstat_read_statsfile() -
2387  *
2388  *      Reads in an existing statistics collector and initializes the
2389  *      databases hash table (who's entries point to the tables hash tables)
2390  *      and the current backend table.
2391  * ----------
2392  */
2393 static void
2394 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2395                                           PgStat_StatBeEntry **betab, int *numbackends)
2396 {
2397         PgStat_StatDBEntry *dbentry;
2398         PgStat_StatDBEntry dbbuf;
2399         PgStat_StatTabEntry *tabentry;
2400         PgStat_StatTabEntry tabbuf;
2401         HASHCTL         hash_ctl;
2402         HTAB       *tabhash = NULL;
2403         FILE       *fpin;
2404         int                     maxbackends = 0;
2405         int                     havebackends = 0;
2406         bool            found;
2407         MemoryContext use_mcxt;
2408         int                     mcxt_flags;
2409
2410         /*
2411          * If running in the collector we use the DynaHashCxt memory context.
2412          * If running in a backend, we use the TopTransactionContext instead,
2413          * so the caller must only know the last XactId when this call
2414          * happened to know if his tables are still valid or already gone!
2415          */
2416         if (pgStatRunningInCollector)
2417         {
2418                 use_mcxt = NULL;
2419                 mcxt_flags = 0;
2420         }
2421         else
2422         {
2423                 use_mcxt = TopTransactionContext;
2424                 mcxt_flags = HASH_CONTEXT;
2425         }
2426
2427         /*
2428          * Create the DB hashtable
2429          */
2430         memset(&hash_ctl, 0, sizeof(hash_ctl));
2431         hash_ctl.keysize = sizeof(Oid);
2432         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2433         hash_ctl.hash = tag_hash;
2434         hash_ctl.hcxt = use_mcxt;
2435         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2436                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2437         if (*dbhash == NULL)
2438         {
2439                 /* assume the problem is out-of-memory */
2440                 if (pgStatRunningInCollector)
2441                 {
2442                         ereport(LOG,
2443                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2444                          errmsg("out of memory in statistics collector --- abort")));
2445                         exit(1);
2446                 }
2447                 /* in backend, can do normal error */
2448                 ereport(ERROR,
2449                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2450                                  errmsg("out of memory")));
2451         }
2452
2453         /*
2454          * Initialize the number of known backends to zero, just in case we do
2455          * a silent error return below.
2456          */
2457         if (numbackends != NULL)
2458                 *numbackends = 0;
2459         if (betab != NULL)
2460                 *betab = NULL;
2461
2462         /*
2463          * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2464          * postmaster, so compute it first time through.
2465          */
2466 #ifdef EXEC_BACKEND
2467         if (pgStat_fname[0] == '\0')
2468         {
2469                 Assert(DataDir != NULL);
2470                 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2471         }
2472 #endif
2473
2474         /*
2475          * Try to open the status file. If it doesn't exist, the backends
2476          * simply return zero for anything and the collector simply starts
2477          * from scratch with empty counters.
2478          */
2479         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2480                 return;
2481
2482         /*
2483          * We found an existing collector stats file. Read it and put all the
2484          * hashtable entries into place.
2485          */
2486         for (;;)
2487         {
2488                 switch (fgetc(fpin))
2489                 {
2490                                 /*
2491                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2492                                  * follows. Subsequently, zero to many 'T' entries will
2493                                  * follow until a 'd' is encountered.
2494                                  */
2495                         case 'D':
2496                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2497                                 {
2498                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2499                                                         (errmsg("corrupted pgstat.stat file")));
2500                                         fclose(fpin);
2501                                         return;
2502                                 }
2503
2504                                 /*
2505                                  * Add to the DB hash
2506                                  */
2507                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2508                                                                                           (void *) &dbbuf.databaseid,
2509                                                                                                                          HASH_ENTER,
2510                                                                                                                          &found);
2511                                 if (dbentry == NULL)
2512                                 {
2513                                         if (pgStatRunningInCollector)
2514                                         {
2515                                                 ereport(LOG,
2516                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2517                                                                  errmsg("out of memory in statistics collector --- abort")));
2518                                                 exit(1);
2519                                         }
2520                                         else
2521                                         {
2522                                                 fclose(fpin);
2523                                                 ereport(ERROR,
2524                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2525                                                                  errmsg("out of memory")));
2526                                         }
2527                                 }
2528                                 if (found)
2529                                 {
2530                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2531                                                         (errmsg("corrupted pgstat.stat file")));
2532                                         fclose(fpin);
2533                                         return;
2534                                 }
2535
2536                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2537                                 dbentry->tables = NULL;
2538                                 dbentry->destroy = 0;
2539                                 dbentry->n_backends = 0;
2540
2541                                 /*
2542                                  * Don't collect tables if not the requested DB
2543                                  */
2544                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2545                                         break;
2546
2547                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2548                                 hash_ctl.keysize = sizeof(Oid);
2549                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2550                                 hash_ctl.hash = tag_hash;
2551                                 hash_ctl.hcxt = use_mcxt;
2552                                 dbentry->tables = hash_create("Per-database table",
2553                                                                                           PGSTAT_TAB_HASH_SIZE,
2554                                                                                           &hash_ctl,
2555                                                                  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2556                                 if (dbentry->tables == NULL)
2557                                 {
2558                                         /* assume the problem is out-of-memory */
2559                                         if (pgStatRunningInCollector)
2560                                         {
2561                                                 ereport(LOG,
2562                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2563                                                                  errmsg("out of memory in statistics collector --- abort")));
2564                                                 exit(1);
2565                                         }
2566                                         /* in backend, can do normal error */
2567                                         fclose(fpin);
2568                                         ereport(ERROR,
2569                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2570                                                          errmsg("out of memory")));
2571                                 }
2572
2573                                 /*
2574                                  * Arrange that following 'T's add entries to this
2575                                  * databases tables hash table.
2576                                  */
2577                                 tabhash = dbentry->tables;
2578                                 break;
2579
2580                                 /*
2581                                  * 'd'  End of this database.
2582                                  */
2583                         case 'd':
2584                                 tabhash = NULL;
2585                                 break;
2586
2587                                 /*
2588                                  * 'T'  A PgStat_StatTabEntry follows.
2589                                  */
2590                         case 'T':
2591                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2592                                 {
2593                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2594                                                         (errmsg("corrupted pgstat.stat file")));
2595                                         fclose(fpin);
2596                                         return;
2597                                 }
2598
2599                                 /*
2600                                  * Skip if table belongs to a not requested database.
2601                                  */
2602                                 if (tabhash == NULL)
2603                                         break;
2604
2605                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2606                                                                                                 (void *) &tabbuf.tableid,
2607                                                                                                          HASH_ENTER, &found);
2608                                 if (tabentry == NULL)
2609                                 {
2610                                         if (pgStatRunningInCollector)
2611                                         {
2612                                                 ereport(LOG,
2613                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2614                                                                  errmsg("out of memory in statistics collector --- abort")));
2615                                                 exit(1);
2616                                         }
2617                                         /* in backend, can do normal error */
2618                                         fclose(fpin);
2619                                         ereport(ERROR,
2620                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2621                                                          errmsg("out of memory")));
2622                                 }
2623
2624                                 if (found)
2625                                 {
2626                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2627                                                         (errmsg("corrupted pgstat.stat file")));
2628                                         fclose(fpin);
2629                                         return;
2630                                 }
2631
2632                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2633                                 break;
2634
2635                                 /*
2636                                  * 'M'  The maximum number of backends to expect follows.
2637                                  */
2638                         case 'M':
2639                                 if (betab == NULL || numbackends == NULL)
2640                                 {
2641                                         fclose(fpin);
2642                                         return;
2643                                 }
2644                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2645                                         sizeof(maxbackends))
2646                                 {
2647                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2648                                                         (errmsg("corrupted pgstat.stat file")));
2649                                         fclose(fpin);
2650                                         return;
2651                                 }
2652                                 if (maxbackends == 0)
2653                                 {
2654                                         fclose(fpin);
2655                                         return;
2656                                 }
2657
2658                                 /*
2659                                  * Allocate space (in TopTransactionContext too) for the
2660                                  * backend table.
2661                                  */
2662                                 if (use_mcxt == NULL)
2663                                         *betab = (PgStat_StatBeEntry *) malloc(
2664                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2665                                 else
2666                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2667                                                                                                                                 use_mcxt,
2668                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2669                                 break;
2670
2671                                 /*
2672                                  * 'B'  A PgStat_StatBeEntry follows.
2673                                  */
2674                         case 'B':
2675                                 if (betab == NULL || numbackends == NULL)
2676                                 {
2677                                         fclose(fpin);
2678                                         return;
2679                                 }
2680                                 if (*betab == NULL)
2681                                 {
2682                                         fclose(fpin);
2683                                         return;
2684                                 }
2685
2686                                 /*
2687                                  * Read it directly into the table.
2688                                  */
2689                                 if (fread(&(*betab)[havebackends], 1,
2690                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2691                                         sizeof(PgStat_StatBeEntry))
2692                                 {
2693                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2694                                                         (errmsg("corrupted pgstat.stat file")));
2695                                         fclose(fpin);
2696                                         return;
2697                                 }
2698
2699                                 /*
2700                                  * Count backends per database here.
2701                                  */
2702                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2703                                                    (void *) &((*betab)[havebackends].databaseid),
2704                                                                                                                 HASH_FIND, NULL);
2705                                 if (dbentry)
2706                                         dbentry->n_backends++;
2707
2708                                 havebackends++;
2709                                 if (numbackends != 0)
2710                                         *numbackends = havebackends;
2711                                 if (havebackends >= maxbackends)
2712                                 {
2713                                         fclose(fpin);
2714                                         return;
2715                                 }
2716                                 break;
2717
2718                                 /*
2719                                  * 'E'  The EOF marker of a complete stats file.
2720                                  */
2721                         case 'E':
2722                                 fclose(fpin);
2723                                 return;
2724
2725                         default:
2726                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2727                                                 (errmsg("corrupted pgstat.stat file")));
2728                                 fclose(fpin);
2729                                 return;
2730                 }
2731         }
2732
2733         fclose(fpin);
2734 }
2735
2736 /*
2737  * If not done for this transaction, read the statistics collector
2738  * stats file into some hash tables.
2739  *
2740  * Because we store the hash tables in TopTransactionContext, the result
2741  * is good for the entire current main transaction.
2742  */
2743 static void
2744 backend_read_statsfile(void)
2745 {
2746         TransactionId   topXid = GetTopTransactionId();
2747
2748         if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2749         {
2750                 Assert(!pgStatRunningInCollector);
2751                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2752                                                           &pgStatBeTable, &pgStatNumBackends);
2753                 pgStatDBHashXact = topXid;
2754         }
2755 }
2756
2757
2758 /* ----------
2759  * pgstat_recv_bestart() -
2760  *
2761  *      Process a backend startup message.
2762  * ----------
2763  */
2764 static void
2765 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2766 {
2767         pgstat_add_backend(&msg->m_hdr);
2768 }
2769
2770
2771 /* ----------
2772  * pgstat_recv_beterm() -
2773  *
2774  *      Process a backend termination message.
2775  * ----------
2776  */
2777 static void
2778 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2779 {
2780         pgstat_sub_backend(msg->m_hdr.m_procpid);
2781 }
2782
2783
2784 /* ----------
2785  * pgstat_recv_activity() -
2786  *
2787  *      Remember what the backend is doing.
2788  * ----------
2789  */
2790 static void
2791 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2792 {
2793         PgStat_StatBeEntry *entry;
2794
2795         /*
2796          * Here we check explicitly for 0 return, since we don't want to
2797          * mangle the activity of an active backend by a delayed packet from a
2798          * dead one.
2799          */
2800         if (pgstat_add_backend(&msg->m_hdr) != 0)
2801                 return;
2802
2803         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2804
2805         StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2806
2807         entry->activity_start_sec =
2808                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2809 }
2810
2811
2812 /* ----------
2813  * pgstat_recv_tabstat() -
2814  *
2815  *      Count what the backend has done.
2816  * ----------
2817  */
2818 static void
2819 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2820 {
2821         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2822         PgStat_StatDBEntry *dbentry;
2823         PgStat_StatTabEntry *tabentry;
2824         int                     i;
2825         bool            found;
2826
2827         /*
2828          * Make sure the backend is counted for.
2829          */
2830         if (pgstat_add_backend(&msg->m_hdr) < 0)
2831                 return;
2832
2833         /*
2834          * Lookup the database in the hashtable.
2835          */
2836         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2837                                                                          (void *) &(msg->m_hdr.m_databaseid),
2838                                                                                                  HASH_FIND, NULL);
2839         if (!dbentry)
2840                 return;
2841
2842         /*
2843          * If the database is marked for destroy, this is a delayed UDP packet
2844          * and not worth being counted.
2845          */
2846         if (dbentry->destroy > 0)
2847                 return;
2848
2849         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2850         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2851
2852         /*
2853          * Process all table entries in the message.
2854          */
2855         for (i = 0; i < msg->m_nentries; i++)
2856         {
2857                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2858                                                                                           (void *) &(tabmsg[i].t_id),
2859                                                                                                          HASH_ENTER, &found);
2860                 if (tabentry == NULL)
2861                 {
2862                         ereport(LOG,
2863                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2864                          errmsg("out of memory in statistics collector --- abort")));
2865                         exit(1);
2866                 }
2867
2868                 if (!found)
2869                 {
2870                         /*
2871                          * If it's a new table entry, initialize counters to the
2872                          * values we just got.
2873                          */
2874                         tabentry->numscans = tabmsg[i].t_numscans;
2875                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2876                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2877                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2878                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2879                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2880                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2881                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2882
2883                         tabentry->destroy = 0;
2884                 }
2885                 else
2886                 {
2887                         /*
2888                          * Otherwise add the values to the existing entry.
2889                          */
2890                         tabentry->numscans += tabmsg[i].t_numscans;
2891                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2892                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2893                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2894                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2895                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2896                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2897                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2898                 }
2899
2900                 /*
2901                  * And add the block IO to the database entry.
2902                  */
2903                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2904                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2905         }
2906 }
2907
2908
2909 /* ----------
2910  * pgstat_recv_tabpurge() -
2911  *
2912  *      Arrange for dead table removal.
2913  * ----------
2914  */
2915 static void
2916 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2917 {
2918         PgStat_StatDBEntry *dbentry;
2919         PgStat_StatTabEntry *tabentry;
2920         int                     i;
2921
2922         /*
2923          * Make sure the backend is counted for.
2924          */
2925         if (pgstat_add_backend(&msg->m_hdr) < 0)
2926                 return;
2927
2928         /*
2929          * Lookup the database in the hashtable.
2930          */
2931         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2932                                                                          (void *) &(msg->m_hdr.m_databaseid),
2933                                                                                                  HASH_FIND, NULL);
2934         if (!dbentry)
2935                 return;
2936
2937         /*
2938          * If the database is marked for destroy, this is a delayed UDP packet
2939          * and the tables will go away at DB destruction.
2940          */
2941         if (dbentry->destroy > 0)
2942                 return;
2943
2944         /*
2945          * Process all table entries in the message.
2946          */
2947         for (i = 0; i < msg->m_nentries; i++)
2948         {
2949                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2950                                                                                    (void *) &(msg->m_tableid[i]),
2951                                                                                                            HASH_FIND, NULL);
2952                 if (tabentry)
2953                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
2954         }
2955 }
2956
2957
2958 /* ----------
2959  * pgstat_recv_dropdb() -
2960  *
2961  *      Arrange for dead database removal
2962  * ----------
2963  */
2964 static void
2965 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2966 {
2967         PgStat_StatDBEntry *dbentry;
2968
2969         /*
2970          * Make sure the backend is counted for.
2971          */
2972         if (pgstat_add_backend(&msg->m_hdr) < 0)
2973                 return;
2974
2975         /*
2976          * Lookup the database in the hashtable.
2977          */
2978         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2979                                                                                    (void *) &(msg->m_databaseid),
2980                                                                                                  HASH_FIND, NULL);
2981         if (!dbentry)
2982                 return;
2983
2984         /*
2985          * Mark the database for destruction.
2986          */
2987         dbentry->destroy = PGSTAT_DESTROY_COUNT;
2988 }
2989
2990
2991 /* ----------
2992  * pgstat_recv_dropdb() -
2993  *
2994  *      Arrange for dead database removal
2995  * ----------
2996  */
2997 static void
2998 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
2999 {
3000         HASHCTL         hash_ctl;
3001         PgStat_StatDBEntry *dbentry;
3002
3003         /*
3004          * Make sure the backend is counted for.
3005          */
3006         if (pgstat_add_backend(&msg->m_hdr) < 0)
3007                 return;
3008
3009         /*
3010          * Lookup the database in the hashtable.
3011          */
3012         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3013                                                                          (void *) &(msg->m_hdr.m_databaseid),
3014                                                                                                  HASH_FIND, NULL);
3015         if (!dbentry)
3016                 return;
3017
3018         /*
3019          * We simply throw away all the databases table entries by recreating
3020          * a new hash table for them.
3021          */
3022         if (dbentry->tables != NULL)
3023                 hash_destroy(dbentry->tables);
3024
3025         dbentry->tables = NULL;
3026         dbentry->n_xact_commit = 0;
3027         dbentry->n_xact_rollback = 0;
3028         dbentry->n_blocks_fetched = 0;
3029         dbentry->n_blocks_hit = 0;
3030         dbentry->n_connects = 0;
3031         dbentry->destroy = 0;
3032
3033         memset(&hash_ctl, 0, sizeof(hash_ctl));
3034         hash_ctl.keysize = sizeof(Oid);
3035         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3036         hash_ctl.hash = tag_hash;
3037         dbentry->tables = hash_create("Per-database table",
3038                                                                   PGSTAT_TAB_HASH_SIZE,
3039                                                                   &hash_ctl,
3040                                                                   HASH_ELEM | HASH_FUNCTION);
3041         if (dbentry->tables == NULL)
3042         {
3043                 /* assume the problem is out-of-memory */
3044                 ereport(LOG,
3045                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3046                          errmsg("out of memory in statistics collector --- abort")));
3047                 exit(1);
3048         }
3049 }