]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Modify hash_create() to elog(ERROR) if an error occurs, rather than
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2004, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.82 2004/10/25 00:46:41 neilc Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/catname.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_shadow.h"
39 #include "libpq/libpq.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/ipc.h"
46 #include "storage/pg_shmem.h"
47 #include "storage/pmsignal.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/hsearch.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54
55
56 /* ----------
57  * Paths for the statistics files. The %s is replaced with the
58  * installation's $PGDATA.
59  * ----------
60  */
61 #define PGSTAT_STAT_FILENAME    "%s/global/pgstat.stat"
62 #define PGSTAT_STAT_TMPFILE             "%s/global/pgstat.tmp.%d"
63
64 /* ----------
65  * Timer definitions.
66  * ----------
67  */
68 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status
69                                                                                  * file; in milliseconds. */
70
71 #define PGSTAT_DESTROY_DELAY    10000   /* How long to keep destroyed
72                                                                                  * objects known, to give delayed
73                                                                                  * UDP packets time to arrive; in
74                                                                                  * milliseconds. */
75
76 #define PGSTAT_DESTROY_COUNT    (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
77
78 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart
79                                                                                  * a failed statistics collector;
80                                                                                  * in seconds. */
81
82 /* ----------
83  * Amount of space reserved in pgstat_recvbuffer().
84  * ----------
85  */
86 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
87
88 /* ----------
89  * The initial size hints for the hash tables used in the collector.
90  * ----------
91  */
92 #define PGSTAT_DB_HASH_SIZE             16
93 #define PGSTAT_BE_HASH_SIZE             512
94 #define PGSTAT_TAB_HASH_SIZE    512
95
96
97 /* ----------
98  * GUC parameters
99  * ----------
100  */
101 bool            pgstat_collect_startcollector = true;
102 bool            pgstat_collect_resetonpmstart = true;
103 bool            pgstat_collect_querystring = false;
104 bool            pgstat_collect_tuplelevel = false;
105 bool            pgstat_collect_blocklevel = false;
106
107 /* ----------
108  * Local data
109  * ----------
110  */
111 NON_EXEC_STATIC int pgStatSock = -1;
112 static int      pgStatPipe[2];
113 static struct sockaddr_storage pgStatAddr;
114
115 static time_t last_pgstat_start_time;
116
117 static long pgStatNumMessages = 0;
118
119 static bool pgStatRunningInCollector = FALSE;
120
121 static int      pgStatTabstatAlloc = 0;
122 static int      pgStatTabstatUsed = 0;
123 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
124
125 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
126
127 static int      pgStatXactCommit = 0;
128 static int      pgStatXactRollback = 0;
129
130 static TransactionId pgStatDBHashXact = InvalidTransactionId;
131 static HTAB *pgStatDBHash = NULL;
132 static HTAB *pgStatBeDead = NULL;
133 static PgStat_StatBeEntry *pgStatBeTable = NULL;
134 static int      pgStatNumBackends = 0;
135
136 static char pgStat_fname[MAXPGPATH];
137 static char pgStat_tmpfname[MAXPGPATH];
138
139
140 /* ----------
141  * Local function forward declarations
142  * ----------
143  */
144 #ifdef EXEC_BACKEND
145
146 typedef enum STATS_PROCESS_TYPE
147 {
148         STAT_PROC_BUFFER,
149         STAT_PROC_COLLECTOR
150 }       STATS_PROCESS_TYPE;
151
152 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
153 static void pgstat_parseArgs(int argc, char *argv[]);
154 #endif
155
156 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
157 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
158 static void pgstat_recvbuffer(void);
159 static void pgstat_exit(SIGNAL_ARGS);
160 static void pgstat_die(SIGNAL_ARGS);
161
162 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
163 static void pgstat_sub_backend(int procpid);
164 static void pgstat_drop_database(Oid databaseid);
165 static void pgstat_write_statsfile(void);
166 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
167                                           PgStat_StatBeEntry **betab,
168                                           int *numbackends);
169 static void backend_read_statsfile(void);
170
171 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
172 static void pgstat_send(void *msg, int len);
173
174 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
175 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
176 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
177 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
178 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
179 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
180 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
181
182
183 /* ------------------------------------------------------------
184  * Public functions called from postmaster follow
185  * ------------------------------------------------------------
186  */
187
188 /* ----------
189  * pgstat_init() -
190  *
191  *      Called from postmaster at startup. Create the resources required
192  *      by the statistics collector process.  If unable to do so, do not
193  *      fail --- better to let the postmaster start with stats collection
194  *      disabled.
195  * ----------
196  */
197 void
198 pgstat_init(void)
199 {
200         ACCEPT_TYPE_ARG3 alen;
201         struct addrinfo *addrs = NULL,
202                            *addr,
203                                 hints;
204         int                     ret;
205         fd_set          rset;
206         struct timeval tv;
207         char            test_byte;
208         int                     sel_res;
209
210 #define TESTBYTEVAL ((char) 199)
211
212         /*
213          * Force start of collector daemon if something to collect
214          */
215         if (pgstat_collect_querystring ||
216                 pgstat_collect_tuplelevel ||
217                 pgstat_collect_blocklevel)
218                 pgstat_collect_startcollector = true;
219
220         /*
221          * Initialize the filename for the status reports.      (In the
222          * EXEC_BACKEND case, this only sets the value in the postmaster.  The
223          * collector subprocess will recompute the value for itself, and
224          * individual backends must do so also if they want to access the
225          * file.)
226          */
227         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
228
229         /*
230          * If we don't have to start a collector or should reset the collected
231          * statistics on postmaster start, simply remove the file.
232          */
233         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
234                 unlink(pgStat_fname);
235
236         /*
237          * Nothing else required if collector will not get started
238          */
239         if (!pgstat_collect_startcollector)
240                 return;
241
242         /*
243          * Create the UDP socket for sending and receiving statistic messages
244          */
245         hints.ai_flags = AI_PASSIVE;
246         hints.ai_family = PF_UNSPEC;
247         hints.ai_socktype = SOCK_DGRAM;
248         hints.ai_protocol = 0;
249         hints.ai_addrlen = 0;
250         hints.ai_addr = NULL;
251         hints.ai_canonname = NULL;
252         hints.ai_next = NULL;
253         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
254         if (ret || !addrs)
255         {
256                 ereport(LOG,
257                                 (errmsg("could not resolve \"localhost\": %s",
258                                                 gai_strerror(ret))));
259                 goto startup_failed;
260         }
261
262         /*
263          * On some platforms, getaddrinfo_all() may return multiple addresses
264          * only one of which will actually work (eg, both IPv6 and IPv4
265          * addresses when kernel will reject IPv6).  Worse, the failure may
266          * occur at the bind() or perhaps even connect() stage.  So we must
267          * loop through the results till we find a working combination.  We
268          * will generate LOG messages, but no error, for bogus combinations.
269          */
270         for (addr = addrs; addr; addr = addr->ai_next)
271         {
272 #ifdef HAVE_UNIX_SOCKETS
273                 /* Ignore AF_UNIX sockets, if any are returned. */
274                 if (addr->ai_family == AF_UNIX)
275                         continue;
276 #endif
277
278                 /*
279                  * Create the socket.
280                  */
281                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
282                 {
283                         ereport(LOG,
284                                         (errcode_for_socket_access(),
285                                          errmsg("could not create socket for statistics collector: %m")));
286                         continue;
287                 }
288
289                 /*
290                  * Bind it to a kernel assigned port on localhost and get the
291                  * assigned port via getsockname().
292                  */
293                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
294                 {
295                         ereport(LOG,
296                                         (errcode_for_socket_access(),
297                                          errmsg("could not bind socket for statistics collector: %m")));
298                         closesocket(pgStatSock);
299                         pgStatSock = -1;
300                         continue;
301                 }
302
303                 alen = sizeof(pgStatAddr);
304                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
305                 {
306                         ereport(LOG,
307                                         (errcode_for_socket_access(),
308                                          errmsg("could not get address of socket for statistics collector: %m")));
309                         closesocket(pgStatSock);
310                         pgStatSock = -1;
311                         continue;
312                 }
313
314                 /*
315                  * Connect the socket to its own address.  This saves a few cycles
316                  * by not having to respecify the target address on every send.
317                  * This also provides a kernel-level check that only packets from
318                  * this same address will be received.
319                  */
320                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
321                 {
322                         ereport(LOG,
323                                         (errcode_for_socket_access(),
324                                          errmsg("could not connect socket for statistics collector: %m")));
325                         closesocket(pgStatSock);
326                         pgStatSock = -1;
327                         continue;
328                 }
329
330                 /*
331                  * Try to send and receive a one-byte test message on the socket.
332                  * This is to catch situations where the socket can be created but
333                  * will not actually pass data (for instance, because kernel
334                  * packet filtering rules prevent it).
335                  */
336                 test_byte = TESTBYTEVAL;
337                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
338                 {
339                         ereport(LOG,
340                                         (errcode_for_socket_access(),
341                                          errmsg("could not send test message on socket for statistics collector: %m")));
342                         closesocket(pgStatSock);
343                         pgStatSock = -1;
344                         continue;
345                 }
346
347                 /*
348                  * There could possibly be a little delay before the message can
349                  * be received.  We arbitrarily allow up to half a second before
350                  * deciding it's broken.
351                  */
352                 for (;;)                                /* need a loop to handle EINTR */
353                 {
354                         FD_ZERO(&rset);
355                         FD_SET(pgStatSock, &rset);
356                         tv.tv_sec = 0;
357                         tv.tv_usec = 500000;
358                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
359                         if (sel_res >= 0 || errno != EINTR)
360                                 break;
361                 }
362                 if (sel_res < 0)
363                 {
364                         ereport(LOG,
365                                         (errcode_for_socket_access(),
366                                  errmsg("select() failed in statistics collector: %m")));
367                         closesocket(pgStatSock);
368                         pgStatSock = -1;
369                         continue;
370                 }
371                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
372                 {
373                         /*
374                          * This is the case we actually think is likely, so take pains
375                          * to give a specific message for it.
376                          *
377                          * errno will not be set meaningfully here, so don't use it.
378                          */
379                         ereport(LOG,
380                                         (ERRCODE_CONNECTION_FAILURE,
381                                          errmsg("test message did not get through on socket for statistics collector")));
382                         closesocket(pgStatSock);
383                         pgStatSock = -1;
384                         continue;
385                 }
386
387                 test_byte++;                    /* just make sure variable is changed */
388
389                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
390                 {
391                         ereport(LOG,
392                                         (errcode_for_socket_access(),
393                                          errmsg("could not receive test message on socket for statistics collector: %m")));
394                         closesocket(pgStatSock);
395                         pgStatSock = -1;
396                         continue;
397                 }
398
399                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
400                 {
401                         ereport(LOG,
402                                         (ERRCODE_INTERNAL_ERROR,
403                                          errmsg("incorrect test message transmission on socket for statistics collector")));
404                         closesocket(pgStatSock);
405                         pgStatSock = -1;
406                         continue;
407                 }
408
409                 /* If we get here, we have a working socket */
410                 break;
411         }
412
413         /* Did we find a working address? */
414         if (!addr || pgStatSock < 0)
415         {
416                 ereport(LOG,
417                                 (errcode_for_socket_access(),
418                                  errmsg("disabling statistics collector for lack of working socket")));
419                 goto startup_failed;
420         }
421
422         /*
423          * Set the socket to non-blocking IO.  This ensures that if the
424          * collector falls behind (despite the buffering process), statistics
425          * messages will be discarded; backends won't block waiting to send
426          * messages to the collector.
427          */
428         if (!set_noblock(pgStatSock))
429         {
430                 ereport(LOG,
431                                 (errcode_for_socket_access(),
432                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
433                 goto startup_failed;
434         }
435
436         freeaddrinfo_all(hints.ai_family, addrs);
437
438         return;
439
440 startup_failed:
441         if (addrs)
442                 freeaddrinfo_all(hints.ai_family, addrs);
443
444         if (pgStatSock >= 0)
445                 closesocket(pgStatSock);
446         pgStatSock = -1;
447
448         /* Adjust GUC variables to suppress useless activity */
449         pgstat_collect_startcollector = false;
450         pgstat_collect_querystring = false;
451         pgstat_collect_tuplelevel = false;
452         pgstat_collect_blocklevel = false;
453 }
454
455
456 #ifdef EXEC_BACKEND
457
458 /*
459  * pgstat_forkexec() -
460  *
461  * Format up the arglist for, then fork and exec, statistics
462  * (buffer and collector) processes
463  */
464 static pid_t
465 pgstat_forkexec(STATS_PROCESS_TYPE procType)
466 {
467         char       *av[10];
468         int                     ac = 0,
469                                 bufc = 0,
470                                 i;
471         char            pgstatBuf[2][32];
472
473         av[ac++] = "postgres";
474
475         switch (procType)
476         {
477                 case STAT_PROC_BUFFER:
478                         av[ac++] = "-forkbuf";
479                         break;
480
481                 case STAT_PROC_COLLECTOR:
482                         av[ac++] = "-forkcol";
483                         break;
484
485                 default:
486                         Assert(false);
487         }
488
489         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
490
491         /* postgres_exec_path is not passed by write_backend_variables */
492         av[ac++] = postgres_exec_path;
493
494         /* Pipe file ids (those not passed by write_backend_variables) */
495         snprintf(pgstatBuf[bufc++], 32, "%d", pgStatPipe[0]);
496         snprintf(pgstatBuf[bufc++], 32, "%d", pgStatPipe[1]);
497
498         /* Add to the arg list */
499         Assert(bufc <= lengthof(pgstatBuf));
500         for (i = 0; i < bufc; i++)
501                 av[ac++] = pgstatBuf[i];
502
503         av[ac] = NULL;
504         Assert(ac < lengthof(av));
505
506         return postmaster_forkexec(ac, av);
507 }
508
509
510 /*
511  * pgstat_parseArgs() -
512  *
513  * Extract data from the arglist for exec'ed statistics
514  * (buffer and collector) processes
515  */
516 static void
517 pgstat_parseArgs(int argc, char *argv[])
518 {
519         Assert(argc == 6);
520
521         argc = 3;
522         StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
523         pgStatPipe[0] = atoi(argv[argc++]);
524         pgStatPipe[1] = atoi(argv[argc++]);
525 }
526 #endif   /* EXEC_BACKEND */
527
528
529 /* ----------
530  * pgstat_start() -
531  *
532  *      Called from postmaster at startup or after an existing collector
533  *      died.  Attempt to fire up a fresh statistics collector.
534  *
535  *      Returns PID of child process, or 0 if fail.
536  *
537  *      Note: if fail, we will be called again from the postmaster main loop.
538  * ----------
539  */
540 int
541 pgstat_start(void)
542 {
543         time_t          curtime;
544         pid_t           pgStatPid;
545
546         /*
547          * Do nothing if no collector needed
548          */
549         if (!pgstat_collect_startcollector)
550                 return 0;
551
552         /*
553          * Do nothing if too soon since last collector start.  This is a
554          * safety valve to protect against continuous respawn attempts if the
555          * collector is dying immediately at launch.  Note that since we will
556          * be re-called from the postmaster main loop, we will get another
557          * chance later.
558          */
559         curtime = time(NULL);
560         if ((unsigned int) (curtime - last_pgstat_start_time) <
561                 (unsigned int) PGSTAT_RESTART_INTERVAL)
562                 return 0;
563         last_pgstat_start_time = curtime;
564
565         /*
566          * Check that the socket is there, else pgstat_init failed.
567          */
568         if (pgStatSock < 0)
569         {
570                 ereport(LOG,
571                                 (errmsg("statistics collector startup skipped")));
572
573                 /*
574                  * We can only get here if someone tries to manually turn
575                  * pgstat_collect_startcollector on after it had been off.
576                  */
577                 pgstat_collect_startcollector = false;
578                 return 0;
579         }
580
581         /*
582          * Okay, fork off the collector.
583          */
584
585         fflush(stdout);
586         fflush(stderr);
587
588 #ifdef __BEOS__
589         /* Specific beos actions before backend startup */
590         beos_before_backend_startup();
591 #endif
592
593 #ifdef EXEC_BACKEND
594         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
595 #else
596         switch ((pgStatPid = fork()))
597 #endif
598         {
599                 case -1:
600 #ifdef __BEOS__
601                         /* Specific beos actions */
602                         beos_backend_startup_failed();
603 #endif
604                         ereport(LOG,
605                                         (errmsg("could not fork statistics buffer: %m")));
606                         return 0;
607
608 #ifndef EXEC_BACKEND
609                 case 0:
610                         /* in postmaster child ... */
611 #ifdef __BEOS__
612                         /* Specific beos actions after backend startup */
613                         beos_backend_startup();
614 #endif
615                         /* Close the postmaster's sockets */
616                         ClosePostmasterPorts(false);
617
618                         /* Drop our connection to postmaster's shared memory, as well */
619                         PGSharedMemoryDetach();
620
621                         PgstatBufferMain(0, NULL);
622                         break;
623 #endif
624
625                 default:
626                         return (int) pgStatPid;
627         }
628
629         /* shouldn't get here */
630         return 0;
631 }
632
633
634 /* ----------
635  * pgstat_beterm() -
636  *
637  *      Called from postmaster to tell collector a backend terminated.
638  * ----------
639  */
640 void
641 pgstat_beterm(int pid)
642 {
643         PgStat_MsgBeterm msg;
644
645         if (pgStatSock < 0)
646                 return;
647
648         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
649         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
650         msg.m_hdr.m_procpid = pid;
651
652         pgstat_send(&msg, sizeof(msg));
653 }
654
655
656 /* ------------------------------------------------------------
657  * Public functions used by backends follow
658  *------------------------------------------------------------
659  */
660
661
662 /* ----------
663  * pgstat_bestart() -
664  *
665  *      Tell the collector that this new backend is soon ready to process
666  *      queries. Called from tcop/postgres.c before entering the mainloop.
667  * ----------
668  */
669 void
670 pgstat_bestart(void)
671 {
672         PgStat_MsgBestart msg;
673
674         if (pgStatSock < 0)
675                 return;
676
677         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
678         pgstat_send(&msg, sizeof(msg));
679 }
680
681
682 /* ----------
683  * pgstat_report_activity() -
684  *
685  *      Called in tcop/postgres.c to tell the collector what the backend
686  *      is actually doing (usually "<IDLE>" or the start of the query to
687  *      be executed).
688  * ----------
689  */
690 void
691 pgstat_report_activity(const char *what)
692 {
693         PgStat_MsgActivity msg;
694         int                     len;
695
696         if (!pgstat_collect_querystring || pgStatSock < 0)
697                 return;
698
699         len = strlen(what);
700         len = pg_mbcliplen((const unsigned char *) what, len,
701                                            PGSTAT_ACTIVITY_SIZE - 1);
702
703         memcpy(msg.m_what, what, len);
704         msg.m_what[len] = '\0';
705         len += offsetof(PgStat_MsgActivity, m_what) +1;
706
707         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
708         pgstat_send(&msg, len);
709 }
710
711
712 /* ----------
713  * pgstat_report_tabstat() -
714  *
715  *      Called from tcop/postgres.c to send the so far collected
716  *      per table access statistics to the collector.
717  * ----------
718  */
719 void
720 pgstat_report_tabstat(void)
721 {
722         int                     i;
723
724         if (pgStatSock < 0 ||
725                 !(pgstat_collect_querystring ||
726                   pgstat_collect_tuplelevel ||
727                   pgstat_collect_blocklevel))
728         {
729                 /* Not reporting stats, so just flush whatever we have */
730                 pgStatTabstatUsed = 0;
731                 return;
732         }
733
734         /*
735          * For each message buffer used during the last query set the header
736          * fields and send it out.
737          */
738         for (i = 0; i < pgStatTabstatUsed; i++)
739         {
740                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
741                 int                     n;
742                 int                     len;
743
744                 n = tsmsg->m_nentries;
745                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
746                         n * sizeof(PgStat_TableEntry);
747
748                 tsmsg->m_xact_commit = pgStatXactCommit;
749                 tsmsg->m_xact_rollback = pgStatXactRollback;
750                 pgStatXactCommit = 0;
751                 pgStatXactRollback = 0;
752
753                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
754                 pgstat_send(tsmsg, len);
755         }
756
757         pgStatTabstatUsed = 0;
758 }
759
760
761 /* ----------
762  * pgstat_vacuum_tabstat() -
763  *
764  *      Will tell the collector about objects he can get rid of.
765  * ----------
766  */
767 int
768 pgstat_vacuum_tabstat(void)
769 {
770         Relation        dbrel;
771         HeapScanDesc dbscan;
772         HeapTuple       dbtup;
773         Oid                *dbidlist;
774         int                     dbidalloc;
775         int                     dbidused;
776         HASH_SEQ_STATUS hstat;
777         PgStat_StatDBEntry *dbentry;
778         PgStat_StatTabEntry *tabentry;
779         HeapTuple       reltup;
780         int                     nobjects = 0;
781         PgStat_MsgTabpurge msg;
782         int                     len;
783         int                     i;
784
785         if (pgStatSock < 0)
786                 return 0;
787
788         /*
789          * If not done for this transaction, read the statistics collector
790          * stats file into some hash tables.
791          */
792         backend_read_statsfile();
793
794         /*
795          * Lookup our own database entry
796          */
797         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
798                                                                                                  (void *) &MyDatabaseId,
799                                                                                                  HASH_FIND, NULL);
800         if (dbentry == NULL)
801                 return -1;
802
803         if (dbentry->tables == NULL)
804                 return 0;
805
806         /*
807          * Initialize our messages table counter to zero
808          */
809         msg.m_nentries = 0;
810
811         /*
812          * Check for all tables if they still exist.
813          */
814         hash_seq_init(&hstat, dbentry->tables);
815         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
816         {
817                 /*
818                  * Check if this relation is still alive by looking up it's
819                  * pg_class tuple in the system catalog cache.
820                  */
821                 reltup = SearchSysCache(RELOID,
822                                                                 ObjectIdGetDatum(tabentry->tableid),
823                                                                 0, 0, 0);
824                 if (HeapTupleIsValid(reltup))
825                 {
826                         ReleaseSysCache(reltup);
827                         continue;
828                 }
829
830                 /*
831                  * Add this tables Oid to the message
832                  */
833                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
834                 nobjects++;
835
836                 /*
837                  * If the message is full, send it out and reinitialize ot zero
838                  */
839                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
840                 {
841                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
842                                 +msg.m_nentries * sizeof(Oid);
843
844                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
845                         pgstat_send(&msg, len);
846
847                         msg.m_nentries = 0;
848                 }
849         }
850
851         /*
852          * Send the rest
853          */
854         if (msg.m_nentries > 0)
855         {
856                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
857                         +msg.m_nentries * sizeof(Oid);
858
859                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
860                 pgstat_send(&msg, len);
861         }
862
863         /*
864          * Read pg_database and remember the Oid's of all existing databases
865          */
866         dbidalloc = 256;
867         dbidused = 0;
868         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
869
870         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
871         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
872         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
873         {
874                 if (dbidused >= dbidalloc)
875                 {
876                         dbidalloc *= 2;
877                         dbidlist = (Oid *) repalloc((char *) dbidlist,
878                                                                                 sizeof(Oid) * dbidalloc);
879                 }
880                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
881         }
882         heap_endscan(dbscan);
883         heap_close(dbrel, AccessShareLock);
884
885         /*
886          * Search the database hash table for dead databases and tell the
887          * collector to drop them as well.
888          */
889         hash_seq_init(&hstat, pgStatDBHash);
890         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
891         {
892                 Oid                     dbid = dbentry->databaseid;
893
894                 for (i = 0; i < dbidused; i++)
895                 {
896                         if (dbidlist[i] == dbid)
897                         {
898                                 dbid = InvalidOid;
899                                 break;
900                         }
901                 }
902
903                 if (dbid != InvalidOid)
904                 {
905                         nobjects++;
906                         pgstat_drop_database(dbid);
907                 }
908         }
909
910         /*
911          * Free the dbid list.
912          */
913         pfree((char *) dbidlist);
914
915         /*
916          * Tell the caller how many removeable objects we found
917          */
918         return nobjects;
919 }
920
921
922 /* ----------
923  * pgstat_drop_database() -
924  *
925  *      Tell the collector that we just dropped a database.
926  *      This is the only message that shouldn't get lost in space. Otherwise
927  *      the collector will keep the statistics for the dead DB until his
928  *      stats file got removed while the postmaster is down.
929  * ----------
930  */
931 static void
932 pgstat_drop_database(Oid databaseid)
933 {
934         PgStat_MsgDropdb msg;
935
936         if (pgStatSock < 0)
937                 return;
938
939         msg.m_databaseid = databaseid;
940
941         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
942         pgstat_send(&msg, sizeof(msg));
943 }
944
945
946 /* ----------
947  * pgstat_reset_counters() -
948  *
949  *      Tell the statistics collector to reset counters for our database.
950  * ----------
951  */
952 void
953 pgstat_reset_counters(void)
954 {
955         PgStat_MsgResetcounter msg;
956
957         if (pgStatSock < 0)
958                 return;
959
960         if (!superuser())
961                 ereport(ERROR,
962                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
963                           errmsg("must be superuser to reset statistics counters")));
964
965         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
966         pgstat_send(&msg, sizeof(msg));
967 }
968
969
970 /* ----------
971  * pgstat_ping() -
972  *
973  *      Send some junk data to the collector to increase traffic.
974  * ----------
975  */
976 void
977 pgstat_ping(void)
978 {
979         PgStat_MsgDummy msg;
980
981         if (pgStatSock < 0)
982                 return;
983
984         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
985         pgstat_send(&msg, sizeof(msg));
986 }
987
988 /*
989  * Create or enlarge the pgStatTabstatMessages array
990  */
991 static bool
992 more_tabstat_space(void)
993 {
994         PgStat_MsgTabstat *newMessages;
995         PgStat_MsgTabstat **msgArray;
996         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
997         int                     i;
998
999         /* Create (another) quantum of message buffers */
1000         newMessages = (PgStat_MsgTabstat *)
1001                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1002         if (newMessages == NULL)
1003         {
1004                 ereport(LOG,
1005                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1006                                  errmsg("out of memory")));
1007                 return false;
1008         }
1009
1010         /* Create or enlarge the pointer array */
1011         if (pgStatTabstatMessages == NULL)
1012                 msgArray = (PgStat_MsgTabstat **)
1013                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1014         else
1015                 msgArray = (PgStat_MsgTabstat **)
1016                         realloc(pgStatTabstatMessages,
1017                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1018         if (msgArray == NULL)
1019         {
1020                 free(newMessages);
1021                 ereport(LOG,
1022                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1023                                  errmsg("out of memory")));
1024                 return false;
1025         }
1026
1027         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1028         for (i = 0; i < TABSTAT_QUANTUM; i++)
1029                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1030         pgStatTabstatMessages = msgArray;
1031         pgStatTabstatAlloc = newAlloc;
1032
1033         return true;
1034 }
1035
1036 /* ----------
1037  * pgstat_initstats() -
1038  *
1039  *      Called from various places usually dealing with initialization
1040  *      of Relation or Scan structures. The data placed into these
1041  *      structures from here tell where later to count for buffer reads,
1042  *      scans and tuples fetched.
1043  * ----------
1044  */
1045 void
1046 pgstat_initstats(PgStat_Info *stats, Relation rel)
1047 {
1048         Oid                     rel_id = rel->rd_id;
1049         PgStat_TableEntry *useent;
1050         PgStat_MsgTabstat *tsmsg;
1051         int                     mb;
1052         int                     i;
1053
1054         /*
1055          * Initialize data not to count at all.
1056          */
1057         stats->tabentry = NULL;
1058         stats->no_stats = FALSE;
1059         stats->heap_scan_counted = FALSE;
1060         stats->index_scan_counted = FALSE;
1061
1062         if (pgStatSock < 0 ||
1063                 !(pgstat_collect_tuplelevel ||
1064                   pgstat_collect_blocklevel))
1065         {
1066                 stats->no_stats = TRUE;
1067                 return;
1068         }
1069
1070         /*
1071          * Search the already-used message slots for this relation.
1072          */
1073         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1074         {
1075                 tsmsg = pgStatTabstatMessages[mb];
1076
1077                 for (i = tsmsg->m_nentries; --i >= 0;)
1078                 {
1079                         if (tsmsg->m_entry[i].t_id == rel_id)
1080                         {
1081                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1082                                 return;
1083                         }
1084                 }
1085
1086                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1087                         continue;
1088
1089                 /*
1090                  * Not found, but found a message buffer with an empty slot
1091                  * instead. Fine, let's use this one.
1092                  */
1093                 i = tsmsg->m_nentries++;
1094                 useent = &tsmsg->m_entry[i];
1095                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1096                 useent->t_id = rel_id;
1097                 stats->tabentry = (void *) useent;
1098                 return;
1099         }
1100
1101         /*
1102          * If we ran out of message buffers, we just allocate more.
1103          */
1104         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1105         {
1106                 if (!more_tabstat_space())
1107                 {
1108                         stats->no_stats = TRUE;
1109                         return;
1110                 }
1111                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1112         }
1113
1114         /*
1115          * Use the first entry of the next message buffer.
1116          */
1117         mb = pgStatTabstatUsed++;
1118         tsmsg = pgStatTabstatMessages[mb];
1119         tsmsg->m_nentries = 1;
1120         useent = &tsmsg->m_entry[0];
1121         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1122         useent->t_id = rel_id;
1123         stats->tabentry = (void *) useent;
1124 }
1125
1126
1127 /* ----------
1128  * pgstat_count_xact_commit() -
1129  *
1130  *      Called from access/transam/xact.c to count transaction commits.
1131  * ----------
1132  */
1133 void
1134 pgstat_count_xact_commit(void)
1135 {
1136         if (!(pgstat_collect_querystring ||
1137                   pgstat_collect_tuplelevel ||
1138                   pgstat_collect_blocklevel))
1139                 return;
1140
1141         pgStatXactCommit++;
1142
1143         /*
1144          * If there was no relation activity yet, just make one existing
1145          * message buffer used without slots, causing the next report to tell
1146          * new xact-counters.
1147          */
1148         if (pgStatTabstatAlloc == 0)
1149         {
1150                 if (!more_tabstat_space())
1151                         return;
1152         }
1153         if (pgStatTabstatUsed == 0)
1154         {
1155                 pgStatTabstatUsed++;
1156                 pgStatTabstatMessages[0]->m_nentries = 0;
1157         }
1158 }
1159
1160
1161 /* ----------
1162  * pgstat_count_xact_rollback() -
1163  *
1164  *      Called from access/transam/xact.c to count transaction rollbacks.
1165  * ----------
1166  */
1167 void
1168 pgstat_count_xact_rollback(void)
1169 {
1170         if (!(pgstat_collect_querystring ||
1171                   pgstat_collect_tuplelevel ||
1172                   pgstat_collect_blocklevel))
1173                 return;
1174
1175         pgStatXactRollback++;
1176
1177         /*
1178          * If there was no relation activity yet, just make one existing
1179          * message buffer used without slots, causing the next report to tell
1180          * new xact-counters.
1181          */
1182         if (pgStatTabstatAlloc == 0)
1183         {
1184                 if (!more_tabstat_space())
1185                         return;
1186         }
1187         if (pgStatTabstatUsed == 0)
1188         {
1189                 pgStatTabstatUsed++;
1190                 pgStatTabstatMessages[0]->m_nentries = 0;
1191         }
1192 }
1193
1194
1195 /* ----------
1196  * pgstat_fetch_stat_dbentry() -
1197  *
1198  *      Support function for the SQL-callable pgstat* functions. Returns
1199  *      the collected statistics for one database or NULL. NULL doesn't mean
1200  *      that the database doesn't exist, it is just not yet known by the
1201  *      collector, so the caller is better off to report ZERO instead.
1202  * ----------
1203  */
1204 PgStat_StatDBEntry *
1205 pgstat_fetch_stat_dbentry(Oid dbid)
1206 {
1207         PgStat_StatDBEntry *dbentry;
1208
1209         /*
1210          * If not done for this transaction, read the statistics collector
1211          * stats file into some hash tables.
1212          */
1213         backend_read_statsfile();
1214
1215         /*
1216          * Lookup the requested database
1217          */
1218         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1219                                                                                                  (void *) &dbid,
1220                                                                                                  HASH_FIND, NULL);
1221         if (dbentry == NULL)
1222                 return NULL;
1223
1224         return dbentry;
1225 }
1226
1227
1228 /* ----------
1229  * pgstat_fetch_stat_tabentry() -
1230  *
1231  *      Support function for the SQL-callable pgstat* functions. Returns
1232  *      the collected statistics for one table or NULL. NULL doesn't mean
1233  *      that the table doesn't exist, it is just not yet known by the
1234  *      collector, so the caller is better off to report ZERO instead.
1235  * ----------
1236  */
1237 PgStat_StatTabEntry *
1238 pgstat_fetch_stat_tabentry(Oid relid)
1239 {
1240         PgStat_StatDBEntry *dbentry;
1241         PgStat_StatTabEntry *tabentry;
1242
1243         /*
1244          * If not done for this transaction, read the statistics collector
1245          * stats file into some hash tables.
1246          */
1247         backend_read_statsfile();
1248
1249         /*
1250          * Lookup our database.
1251          */
1252         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1253                                                                                                  (void *) &MyDatabaseId,
1254                                                                                                  HASH_FIND, NULL);
1255         if (dbentry == NULL)
1256                 return NULL;
1257
1258         /*
1259          * Now inside the DB's table hash table lookup the requested one.
1260          */
1261         if (dbentry->tables == NULL)
1262                 return NULL;
1263         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1264                                                                                                    (void *) &relid,
1265                                                                                                    HASH_FIND, NULL);
1266         if (tabentry == NULL)
1267                 return NULL;
1268
1269         return tabentry;
1270 }
1271
1272
1273 /* ----------
1274  * pgstat_fetch_stat_beentry() -
1275  *
1276  *      Support function for the SQL-callable pgstat* functions. Returns
1277  *      the actual activity slot of one active backend. The caller is
1278  *      responsible for a check if the actual user is permitted to see
1279  *      that info (especially the querystring).
1280  * ----------
1281  */
1282 PgStat_StatBeEntry *
1283 pgstat_fetch_stat_beentry(int beid)
1284 {
1285         backend_read_statsfile();
1286
1287         if (beid < 1 || beid > pgStatNumBackends)
1288                 return NULL;
1289
1290         return &pgStatBeTable[beid - 1];
1291 }
1292
1293
1294 /* ----------
1295  * pgstat_fetch_stat_numbackends() -
1296  *
1297  *      Support function for the SQL-callable pgstat* functions. Returns
1298  *      the maximum current backend id.
1299  * ----------
1300  */
1301 int
1302 pgstat_fetch_stat_numbackends(void)
1303 {
1304         backend_read_statsfile();
1305
1306         return pgStatNumBackends;
1307 }
1308
1309
1310
1311 /* ------------------------------------------------------------
1312  * Local support functions follow
1313  * ------------------------------------------------------------
1314  */
1315
1316
1317 /* ----------
1318  * pgstat_setheader() -
1319  *
1320  *              Set common header fields in a statistics message
1321  * ----------
1322  */
1323 static void
1324 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1325 {
1326         hdr->m_type = mtype;
1327         hdr->m_backendid = MyBackendId;
1328         hdr->m_procpid = MyProcPid;
1329         hdr->m_databaseid = MyDatabaseId;
1330         hdr->m_userid = GetSessionUserId();
1331 }
1332
1333
1334 /* ----------
1335  * pgstat_send() -
1336  *
1337  *              Send out one statistics message to the collector
1338  * ----------
1339  */
1340 static void
1341 pgstat_send(void *msg, int len)
1342 {
1343         if (pgStatSock < 0)
1344                 return;
1345
1346         ((PgStat_MsgHdr *) msg)->m_size = len;
1347
1348         send(pgStatSock, msg, len, 0);
1349         /* We deliberately ignore any error from send() */
1350 }
1351
1352
1353 /* ----------
1354  * PgstatBufferMain() -
1355  *
1356  *      Start up the statistics buffer process.  This is the body of the
1357  *      postmaster child process.
1358  *
1359  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1360  * ----------
1361  */
1362 NON_EXEC_STATIC void
1363 PgstatBufferMain(int argc, char *argv[])
1364 {
1365         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1366
1367         MyProcPid = getpid();           /* reset MyProcPid */
1368
1369         /* Lose the postmaster's on-exit routines */
1370         on_exit_reset();
1371
1372         /*
1373          * Ignore all signals usually bound to some action in the postmaster,
1374          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1375          */
1376         pqsignal(SIGHUP, SIG_IGN);
1377         pqsignal(SIGINT, SIG_IGN);
1378         pqsignal(SIGTERM, SIG_IGN);
1379         pqsignal(SIGQUIT, pgstat_exit);
1380         pqsignal(SIGALRM, SIG_IGN);
1381         pqsignal(SIGPIPE, SIG_IGN);
1382         pqsignal(SIGUSR1, SIG_IGN);
1383         pqsignal(SIGUSR2, SIG_IGN);
1384         pqsignal(SIGCHLD, pgstat_die);
1385         pqsignal(SIGTTIN, SIG_DFL);
1386         pqsignal(SIGTTOU, SIG_DFL);
1387         pqsignal(SIGCONT, SIG_DFL);
1388         pqsignal(SIGWINCH, SIG_DFL);
1389         /* unblock will happen in pgstat_recvbuffer */
1390
1391 #ifdef EXEC_BACKEND
1392         pgstat_parseArgs(argc, argv);
1393 #endif
1394
1395         /*
1396          * Start a buffering process to read from the socket, so we have a
1397          * little more time to process incoming messages.
1398          *
1399          * NOTE: the process structure is: postmaster is parent of buffer process
1400          * is parent of collector process.      This way, the buffer can detect
1401          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1402          * collector failure until it tried to write on the pipe.  That would
1403          * mean that after the postmaster started a new collector, we'd have
1404          * two buffer processes competing to read from the UDP socket --- not
1405          * good.
1406          */
1407         if (pgpipe(pgStatPipe) < 0)
1408                 ereport(ERROR,
1409                                 (errcode_for_socket_access(),
1410                          errmsg("could not create pipe for statistics buffer: %m")));
1411
1412 #ifdef EXEC_BACKEND
1413         /* child becomes collector process */
1414         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1415 #else
1416         switch (fork())
1417 #endif
1418         {
1419                 case -1:
1420                         ereport(ERROR,
1421                                         (errmsg("could not fork statistics collector: %m")));
1422
1423 #ifndef EXEC_BACKEND
1424                 case 0:
1425                         /* child becomes collector process */
1426                         PgstatCollectorMain(0, NULL);
1427                         break;
1428 #endif
1429
1430                 default:
1431                         /* parent becomes buffer process */
1432                         closesocket(pgStatPipe[0]);
1433                         pgstat_recvbuffer();
1434         }
1435         exit(0);
1436 }
1437
1438
1439 /* ----------
1440  * PgstatCollectorMain() -
1441  *
1442  *      Start up the statistics collector itself.  This is the body of the
1443  *      postmaster grandchild process.
1444  *
1445  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1446  * ----------
1447  */
1448 NON_EXEC_STATIC void
1449 PgstatCollectorMain(int argc, char *argv[])
1450 {
1451         PgStat_Msg      msg;
1452         fd_set          rfds;
1453         int                     readPipe;
1454         int                     nready;
1455         int                     len = 0;
1456         struct timeval timeout;
1457         struct timeval next_statwrite;
1458         bool            need_statwrite;
1459         HASHCTL         hash_ctl;
1460
1461         MyProcPid = getpid();           /* reset MyProcPid */
1462
1463         /*
1464          * Reset signal handling.  With the exception of restoring default
1465          * SIGCHLD and SIGQUIT handling, this is a no-op in the
1466          * non-EXEC_BACKEND case because we'll have inherited these settings
1467          * from the buffer process; but it's not a no-op for EXEC_BACKEND.
1468          */
1469         pqsignal(SIGHUP, SIG_IGN);
1470         pqsignal(SIGINT, SIG_IGN);
1471         pqsignal(SIGTERM, SIG_IGN);
1472         pqsignal(SIGQUIT, SIG_IGN);
1473         pqsignal(SIGALRM, SIG_IGN);
1474         pqsignal(SIGPIPE, SIG_IGN);
1475         pqsignal(SIGUSR1, SIG_IGN);
1476         pqsignal(SIGUSR2, SIG_IGN);
1477         pqsignal(SIGCHLD, SIG_DFL);
1478         pqsignal(SIGTTIN, SIG_DFL);
1479         pqsignal(SIGTTOU, SIG_DFL);
1480         pqsignal(SIGCONT, SIG_DFL);
1481         pqsignal(SIGWINCH, SIG_DFL);
1482         PG_SETMASK(&UnBlockSig);
1483
1484 #ifdef EXEC_BACKEND
1485         pgstat_parseArgs(argc, argv);
1486 #endif
1487
1488         /* Close unwanted files */
1489         closesocket(pgStatPipe[1]);
1490         closesocket(pgStatSock);
1491
1492         /*
1493          * Identify myself via ps
1494          */
1495         init_ps_display("stats collector process", "", "");
1496         set_ps_display("");
1497
1498         /*
1499          * Initialize filenames needed for status reports.
1500          */
1501         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1502         /* tmpfname need only be set correctly in this process */
1503         snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1504                          DataDir, (int)getpid());
1505
1506         /*
1507          * Arrange to write the initial status file right away
1508          */
1509         gettimeofday(&next_statwrite, NULL);
1510         need_statwrite = TRUE;
1511
1512         /*
1513          * Read in an existing statistics stats file or initialize the stats
1514          * to zero.
1515          */
1516         pgStatRunningInCollector = TRUE;
1517         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1518
1519         /*
1520          * Create the dead backend hashtable
1521          */
1522         memset(&hash_ctl, 0, sizeof(hash_ctl));
1523         hash_ctl.keysize = sizeof(int);
1524         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1525         hash_ctl.hash = tag_hash;
1526         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1527                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1528
1529         /*
1530          * Create the known backends table
1531          */
1532         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1533                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1534         if (pgStatBeTable == NULL)
1535                 ereport(ERROR,
1536                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1537                          errmsg("out of memory in statistics collector --- abort")));
1538         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1539
1540         readPipe = pgStatPipe[0];
1541
1542         /*
1543          * Process incoming messages and handle all the reporting stuff until
1544          * there are no more messages.
1545          */
1546         for (;;)
1547         {
1548                 /*
1549                  * If we need to write the status file again (there have been
1550                  * changes in the statistics since we wrote it last) calculate the
1551                  * timeout until we have to do so.
1552                  */
1553                 if (need_statwrite)
1554                 {
1555                         struct timeval now;
1556
1557                         gettimeofday(&now, NULL);
1558                         /* avoid assuming that tv_sec is signed */
1559                         if (now.tv_sec > next_statwrite.tv_sec ||
1560                                 (now.tv_sec == next_statwrite.tv_sec &&
1561                                  now.tv_usec >= next_statwrite.tv_usec))
1562                         {
1563                                 timeout.tv_sec = 0;
1564                                 timeout.tv_usec = 0;
1565                         }
1566                         else
1567                         {
1568                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1569                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1570                                 if (timeout.tv_usec < 0)
1571                                 {
1572                                         timeout.tv_sec--;
1573                                         timeout.tv_usec += 1000000;
1574                                 }
1575                         }
1576                 }
1577
1578                 /*
1579                  * Setup the descriptor set for select(2)
1580                  */
1581                 FD_ZERO(&rfds);
1582                 FD_SET(readPipe, &rfds);
1583
1584                 /*
1585                  * Now wait for something to do.
1586                  */
1587                 nready = select(readPipe + 1, &rfds, NULL, NULL,
1588                                                 (need_statwrite) ? &timeout : NULL);
1589                 if (nready < 0)
1590                 {
1591                         if (errno == EINTR)
1592                                 continue;
1593                         ereport(ERROR,
1594                                         (errcode_for_socket_access(),
1595                                  errmsg("select() failed in statistics collector: %m")));
1596                 }
1597
1598                 /*
1599                  * If there are no descriptors ready, our timeout for writing the
1600                  * stats file happened.
1601                  */
1602                 if (nready == 0)
1603                 {
1604                         pgstat_write_statsfile();
1605                         need_statwrite = FALSE;
1606
1607                         continue;
1608                 }
1609
1610                 /*
1611                  * Check if there is a new statistics message to collect.
1612                  */
1613                 if (FD_ISSET(readPipe, &rfds))
1614                 {
1615                         /*
1616                          * We may need to issue multiple read calls in case the buffer
1617                          * process didn't write the message in a single write, which
1618                          * is possible since it dumps its buffer bytewise. In any
1619                          * case, we'd need two reads since we don't know the message
1620                          * length initially.
1621                          */
1622                         int                     nread = 0;
1623                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1624                         bool            pipeEOF = false;
1625
1626                         while (nread < targetlen)
1627                         {
1628                                 len = piperead(readPipe, ((char *) &msg) + nread,
1629                                                            targetlen - nread);
1630                                 if (len < 0)
1631                                 {
1632                                         if (errno == EINTR)
1633                                                 continue;
1634                                         ereport(ERROR,
1635                                                         (errcode_for_socket_access(),
1636                                                          errmsg("could not read from statistics collector pipe: %m")));
1637                                 }
1638                                 if (len == 0)   /* EOF on the pipe! */
1639                                 {
1640                                         pipeEOF = true;
1641                                         break;
1642                                 }
1643                                 nread += len;
1644                                 if (nread == sizeof(PgStat_MsgHdr))
1645                                 {
1646                                         /* we have the header, compute actual msg length */
1647                                         targetlen = msg.msg_hdr.m_size;
1648                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1649                                                 targetlen > (int) sizeof(msg))
1650                                         {
1651                                                 /*
1652                                                  * Bogus message length implies that we got out of
1653                                                  * sync with the buffer process somehow. Abort so
1654                                                  * that we can restart both processes.
1655                                                  */
1656                                                 ereport(ERROR,
1657                                                   (errmsg("invalid statistics message length")));
1658                                         }
1659                                 }
1660                         }
1661
1662                         /*
1663                          * EOF on the pipe implies that the buffer process exited.
1664                          * Fall out of outer loop.
1665                          */
1666                         if (pipeEOF)
1667                                 break;
1668
1669                         /*
1670                          * Distribute the message to the specific function handling
1671                          * it.
1672                          */
1673                         switch (msg.msg_hdr.m_type)
1674                         {
1675                                 case PGSTAT_MTYPE_DUMMY:
1676                                         break;
1677
1678                                 case PGSTAT_MTYPE_BESTART:
1679                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1680                                         break;
1681
1682                                 case PGSTAT_MTYPE_BETERM:
1683                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1684                                         break;
1685
1686                                 case PGSTAT_MTYPE_TABSTAT:
1687                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1688                                         break;
1689
1690                                 case PGSTAT_MTYPE_TABPURGE:
1691                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1692                                         break;
1693
1694                                 case PGSTAT_MTYPE_ACTIVITY:
1695                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1696                                         break;
1697
1698                                 case PGSTAT_MTYPE_DROPDB:
1699                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1700                                         break;
1701
1702                                 case PGSTAT_MTYPE_RESETCOUNTER:
1703                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1704                                                                                          nread);
1705                                         break;
1706
1707                                 default:
1708                                         break;
1709                         }
1710
1711                         /*
1712                          * Globally count messages.
1713                          */
1714                         pgStatNumMessages++;
1715
1716                         /*
1717                          * If this is the first message after we wrote the stats file
1718                          * the last time, setup the timeout that it'd be written.
1719                          */
1720                         if (!need_statwrite)
1721                         {
1722                                 gettimeofday(&next_statwrite, NULL);
1723                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1724                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1725                                 next_statwrite.tv_usec %= 1000000;
1726                                 need_statwrite = TRUE;
1727                         }
1728                 }
1729
1730                 /*
1731                  * Note that we do NOT check for postmaster exit inside the loop;
1732                  * only EOF on the buffer pipe causes us to fall out.  This
1733                  * ensures we don't exit prematurely if there are still a few
1734                  * messages in the buffer or pipe at postmaster shutdown.
1735                  */
1736         }
1737
1738         /*
1739          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1740          * to process.  If the buffer process quit because of postmaster
1741          * shutdown, we want to save the final stats to reuse at next startup.
1742          * But if the buffer process failed, it seems best not to (there may
1743          * even now be a new collector firing up, and we don't want it to read
1744          * a partially-rewritten stats file).
1745          */
1746         if (!PostmasterIsAlive(false))
1747                 pgstat_write_statsfile();
1748 }
1749
1750
1751 /* ----------
1752  * pgstat_recvbuffer() -
1753  *
1754  *      This is the body of the separate buffering process. Its only
1755  *      purpose is to receive messages from the UDP socket as fast as
1756  *      possible and forward them over a pipe into the collector itself.
1757  *      If the collector is slow to absorb messages, they are buffered here.
1758  * ----------
1759  */
1760 static void
1761 pgstat_recvbuffer(void)
1762 {
1763         fd_set          rfds;
1764         fd_set          wfds;
1765         struct timeval timeout;
1766         int                     writePipe = pgStatPipe[1];
1767         int                     maxfd;
1768         int                     nready;
1769         int                     len;
1770         int                     xfr;
1771         int                     frm;
1772         PgStat_Msg      input_buffer;
1773         char       *msgbuffer;
1774         int                     msg_send = 0;   /* next send index in buffer */
1775         int                     msg_recv = 0;   /* next receive index */
1776         int                     msg_have = 0;   /* number of bytes stored */
1777         bool            overflow = false;
1778
1779         /*
1780          * Identify myself via ps
1781          */
1782         init_ps_display("stats buffer process", "", "");
1783         set_ps_display("");
1784
1785         /*
1786          * We want to die if our child collector process does.  There are two
1787          * ways we might notice that it has died: receive SIGCHLD, or get a
1788          * write failure on the pipe leading to the child.      We can set SIGPIPE
1789          * to kill us here.  Our SIGCHLD handler was already set up before we
1790          * forked (must do it that way, else it's a race condition).
1791          */
1792         pqsignal(SIGPIPE, SIG_DFL);
1793         PG_SETMASK(&UnBlockSig);
1794
1795         /*
1796          * Set the write pipe to nonblock mode, so that we cannot block when
1797          * the collector falls behind.
1798          */
1799         if (!set_noblock(writePipe))
1800                 ereport(ERROR,
1801                                 (errcode_for_socket_access(),
1802                                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1803
1804         /*
1805          * Allocate the message buffer
1806          */
1807         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1808         if (msgbuffer == NULL)
1809                 ereport(ERROR,
1810                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1811                          errmsg("out of memory in statistics collector --- abort")));
1812
1813         /*
1814          * Loop forever
1815          */
1816         for (;;)
1817         {
1818                 FD_ZERO(&rfds);
1819                 FD_ZERO(&wfds);
1820                 maxfd = -1;
1821
1822                 /*
1823                  * As long as we have buffer space we add the socket to the read
1824                  * descriptor set.
1825                  */
1826                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1827                 {
1828                         FD_SET(pgStatSock, &rfds);
1829                         maxfd = pgStatSock;
1830                         overflow = false;
1831                 }
1832                 else
1833                 {
1834                         if (!overflow)
1835                         {
1836                                 ereport(LOG,
1837                                                 (errmsg("statistics buffer is full")));
1838                                 overflow = true;
1839                         }
1840                 }
1841
1842                 /*
1843                  * If we have messages to write out, we add the pipe to the write
1844                  * descriptor set.
1845                  */
1846                 if (msg_have > 0)
1847                 {
1848                         FD_SET(writePipe, &wfds);
1849                         if (writePipe > maxfd)
1850                                 maxfd = writePipe;
1851                 }
1852
1853                 /*
1854                  * Wait for some work to do; but not for more than 10 seconds.
1855                  * (This determines how quickly we will shut down after an
1856                  * ungraceful postmaster termination; so it needn't be very fast.)
1857                  */
1858                 timeout.tv_sec = 10;
1859                 timeout.tv_usec = 0;
1860
1861                 nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1862                 if (nready < 0)
1863                 {
1864                         if (errno == EINTR)
1865                                 continue;
1866                         ereport(ERROR,
1867                                         (errcode_for_socket_access(),
1868                                          errmsg("select() failed in statistics buffer: %m")));
1869                 }
1870
1871                 /*
1872                  * If there is a message on the socket, read it and check for
1873                  * validity.
1874                  */
1875                 if (FD_ISSET(pgStatSock, &rfds))
1876                 {
1877                         len = recv(pgStatSock, (char *) &input_buffer,
1878                                            sizeof(PgStat_Msg), 0);
1879                         if (len < 0)
1880                                 ereport(ERROR,
1881                                                 (errcode_for_socket_access(),
1882                                            errmsg("could not read statistics message: %m")));
1883
1884                         /*
1885                          * We ignore messages that are smaller than our common header
1886                          */
1887                         if (len < sizeof(PgStat_MsgHdr))
1888                                 continue;
1889
1890                         /*
1891                          * The received length must match the length in the header
1892                          */
1893                         if (input_buffer.msg_hdr.m_size != len)
1894                                 continue;
1895
1896                         /*
1897                          * O.K. - we accept this message.  Copy it to the circular
1898                          * msgbuffer.
1899                          */
1900                         frm = 0;
1901                         while (len > 0)
1902                         {
1903                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1904                                 if (xfr > len)
1905                                         xfr = len;
1906                                 Assert(xfr > 0);
1907                                 memcpy(msgbuffer + msg_recv,
1908                                            ((char *) &input_buffer) + frm,
1909                                            xfr);
1910                                 msg_recv += xfr;
1911                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1912                                         msg_recv = 0;
1913                                 msg_have += xfr;
1914                                 frm += xfr;
1915                                 len -= xfr;
1916                         }
1917                 }
1918
1919                 /*
1920                  * If the collector is ready to receive, write some data into his
1921                  * pipe.  We may or may not be able to write all that we have.
1922                  *
1923                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
1924                  * the space available in the pipe buffer, most kernels will
1925                  * refuse to write any of it, and will return EAGAIN.  This means
1926                  * we will busy-loop until the situation changes (either because
1927                  * the collector caught up, or because more data arrives so that
1928                  * we have more than PIPE_BUF bytes buffered).  This is not good,
1929                  * but is there any way around it?      We have no way to tell when
1930                  * the collector has caught up...
1931                  */
1932                 if (FD_ISSET(writePipe, &wfds))
1933                 {
1934                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1935                         if (xfr > msg_have)
1936                                 xfr = msg_have;
1937                         Assert(xfr > 0);
1938                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1939                         if (len < 0)
1940                         {
1941                                 if (errno == EINTR || errno == EAGAIN)
1942                                         continue;       /* not enough space in pipe */
1943                                 ereport(ERROR,
1944                                                 (errcode_for_socket_access(),
1945                                                  errmsg("could not write to statistics collector pipe: %m")));
1946                         }
1947                         /* NB: len < xfr is okay */
1948                         msg_send += len;
1949                         if (msg_send == PGSTAT_RECVBUFFERSZ)
1950                                 msg_send = 0;
1951                         msg_have -= len;
1952                 }
1953
1954                 /*
1955                  * Make sure we forwarded all messages before we check for
1956                  * postmaster termination.
1957                  */
1958                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
1959                         continue;
1960
1961                 /*
1962                  * If the postmaster has terminated, we die too.  (This is no
1963                  * longer the normal exit path, however.)
1964                  */
1965                 if (!PostmasterIsAlive(true))
1966                         exit(0);
1967         }
1968 }
1969
1970 /* SIGQUIT signal handler for buffer process */
1971 static void
1972 pgstat_exit(SIGNAL_ARGS)
1973 {
1974         /*
1975          * For now, we just nail the doors shut and get out of town.  It might
1976          * be cleaner to allow any pending messages to be sent, but that
1977          * creates a tradeoff against speed of exit.
1978          */
1979         exit(0);
1980 }
1981
1982 /* SIGCHLD signal handler for buffer process */
1983 static void
1984 pgstat_die(SIGNAL_ARGS)
1985 {
1986         exit(1);
1987 }
1988
1989
1990 /* ----------
1991  * pgstat_add_backend() -
1992  *
1993  *      Support function to keep our backend list up to date.
1994  * ----------
1995  */
1996 static int
1997 pgstat_add_backend(PgStat_MsgHdr *msg)
1998 {
1999         PgStat_StatDBEntry *dbentry;
2000         PgStat_StatBeEntry *beentry;
2001         PgStat_StatBeDead *deadbe;
2002         bool            found;
2003
2004         /*
2005          * Check that the backend ID is valid
2006          */
2007         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2008         {
2009                 ereport(LOG,
2010                          (errmsg("invalid server process ID %d", msg->m_backendid)));
2011                 return -1;
2012         }
2013
2014         /*
2015          * Get the slot for this backendid.
2016          */
2017         beentry = &pgStatBeTable[msg->m_backendid - 1];
2018         if (beentry->databaseid != InvalidOid)
2019         {
2020                 /*
2021                  * If the slot contains the PID of this backend, everything is
2022                  * fine and we got nothing to do.
2023                  */
2024                 if (beentry->procpid == msg->m_procpid)
2025                         return 0;
2026         }
2027
2028         /*
2029          * Lookup if this backend is known to be dead. This can be caused due
2030          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2031          * message might have arrived before we received all the backends
2032          * stats messages, or even a new backend with the same backendid was
2033          * faster in sending his BESTART.
2034          *
2035          * If the backend is known to be dead, we ignore this add.
2036          */
2037         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2038                                                                                            (void *) &(msg->m_procpid),
2039                                                                                            HASH_FIND, NULL);
2040         if (deadbe)
2041                 return 1;
2042
2043         /*
2044          * Backend isn't known to be dead. If it's slot is currently used, we
2045          * have to kick out the old backend.
2046          */
2047         if (beentry->databaseid != InvalidOid)
2048                 pgstat_sub_backend(beentry->procpid);
2049
2050         /*
2051          * Put this new backend into the slot.
2052          */
2053         beentry->databaseid = msg->m_databaseid;
2054         beentry->procpid = msg->m_procpid;
2055         beentry->userid = msg->m_userid;
2056         beentry->activity_start_sec = 0;
2057         beentry->activity_start_usec = 0;
2058         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2059
2060         /*
2061          * Lookup or create the database entry for this backends DB.
2062          */
2063         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2064                                                                                    (void *) &(msg->m_databaseid),
2065                                                                                                  HASH_ENTER, &found);
2066         if (dbentry == NULL)
2067                 ereport(ERROR,
2068                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2069                          errmsg("out of memory in statistics collector --- abort")));
2070
2071         /*
2072          * If not found, initialize the new one.
2073          */
2074         if (!found)
2075         {
2076                 HASHCTL         hash_ctl;
2077
2078                 dbentry->tables = NULL;
2079                 dbentry->n_xact_commit = 0;
2080                 dbentry->n_xact_rollback = 0;
2081                 dbentry->n_blocks_fetched = 0;
2082                 dbentry->n_blocks_hit = 0;
2083                 dbentry->n_connects = 0;
2084                 dbentry->destroy = 0;
2085
2086                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2087                 hash_ctl.keysize = sizeof(Oid);
2088                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2089                 hash_ctl.hash = tag_hash;
2090                 dbentry->tables = hash_create("Per-database table",
2091                                                                           PGSTAT_TAB_HASH_SIZE,
2092                                                                           &hash_ctl,
2093                                                                           HASH_ELEM | HASH_FUNCTION);
2094         }
2095
2096         /*
2097          * Count number of connects to the database
2098          */
2099         dbentry->n_connects++;
2100
2101         return 0;
2102 }
2103
2104
2105 /* ----------
2106  * pgstat_sub_backend() -
2107  *
2108  *      Remove a backend from the actual backends list.
2109  * ----------
2110  */
2111 static void
2112 pgstat_sub_backend(int procpid)
2113 {
2114         int                     i;
2115         PgStat_StatBeDead *deadbe;
2116         bool            found;
2117
2118         /*
2119          * Search in the known-backends table for the slot containing this
2120          * PID.
2121          */
2122         for (i = 0; i < MaxBackends; i++)
2123         {
2124                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2125                         pgStatBeTable[i].procpid == procpid)
2126                 {
2127                         /*
2128                          * That's him. Add an entry to the known to be dead backends.
2129                          * Due to possible misorder in the arrival of UDP packets it's
2130                          * possible that even if we know the backend is dead, there
2131                          * could still be messages queued that arrive later. Those
2132                          * messages must not cause our number of backends statistics
2133                          * to get screwed up, so we remember for a couple of seconds
2134                          * that this PID is dead and ignore them (only the counting of
2135                          * backends, not the table access stats they sent).
2136                          */
2137                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2138                                                                                                            (void *) &procpid,
2139                                                                                                            HASH_ENTER,
2140                                                                                                            &found);
2141                         if (deadbe == NULL)
2142                                 ereport(ERROR,
2143                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2144                                                  errmsg("out of memory in statistics collector --- abort")));
2145
2146                         if (!found)
2147                         {
2148                                 deadbe->backendid = i + 1;
2149                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2150                         }
2151
2152                         /*
2153                          * Declare the backend slot empty.
2154                          */
2155                         pgStatBeTable[i].databaseid = InvalidOid;
2156                         return;
2157                 }
2158         }
2159
2160         /*
2161          * No big problem if not found. This can happen if UDP messages arrive
2162          * out of order here.
2163          */
2164 }
2165
2166
2167 /* ----------
2168  * pgstat_write_statsfile() -
2169  *
2170  *      Tell the news.
2171  * ----------
2172  */
2173 static void
2174 pgstat_write_statsfile(void)
2175 {
2176         HASH_SEQ_STATUS hstat;
2177         HASH_SEQ_STATUS tstat;
2178         PgStat_StatDBEntry *dbentry;
2179         PgStat_StatTabEntry *tabentry;
2180         PgStat_StatBeDead *deadbe;
2181         FILE       *fpout;
2182         int                     i;
2183
2184         /*
2185          * Open the statistics temp file to write out the current values.
2186          */
2187         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2188         if (fpout == NULL)
2189         {
2190                 ereport(LOG,
2191                                 (errcode_for_file_access(),
2192                         errmsg("could not open temporary statistics file \"%s\": %m",
2193                                    pgStat_tmpfname)));
2194                 return;
2195         }
2196
2197         /*
2198          * Walk through the database table.
2199          */
2200         hash_seq_init(&hstat, pgStatDBHash);
2201         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2202         {
2203                 /*
2204                  * If this database is marked destroyed, count down and do so if
2205                  * it reaches 0.
2206                  */
2207                 if (dbentry->destroy > 0)
2208                 {
2209                         if (--(dbentry->destroy) == 0)
2210                         {
2211                                 if (dbentry->tables != NULL)
2212                                         hash_destroy(dbentry->tables);
2213
2214                                 if (hash_search(pgStatDBHash,
2215                                                                 (void *) &(dbentry->databaseid),
2216                                                                 HASH_REMOVE, NULL) == NULL)
2217                                         ereport(ERROR,
2218                                                         (errmsg("database hash table corrupted "
2219                                                                         "during cleanup --- abort")));
2220                         }
2221
2222                         /*
2223                          * Don't include statistics for it.
2224                          */
2225                         continue;
2226                 }
2227
2228                 /*
2229                  * Write out the DB line including the number of life backends.
2230                  */
2231                 fputc('D', fpout);
2232                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2233
2234                 /*
2235                  * Walk through the databases access stats per table.
2236                  */
2237                 hash_seq_init(&tstat, dbentry->tables);
2238                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2239                 {
2240                         /*
2241                          * If table entry marked for destruction, same as above for
2242                          * the database entry.
2243                          */
2244                         if (tabentry->destroy > 0)
2245                         {
2246                                 if (--(tabentry->destroy) == 0)
2247                                 {
2248                                         if (hash_search(dbentry->tables,
2249                                                                         (void *) &(tabentry->tableid),
2250                                                                         HASH_REMOVE, NULL) == NULL)
2251                                         {
2252                                                 ereport(ERROR,
2253                                                                 (errmsg("tables hash table for "
2254                                                                                 "database %u corrupted during "
2255                                                                                 "cleanup --- abort",
2256                                                                                 dbentry->databaseid)));
2257                                         }
2258                                 }
2259                                 continue;
2260                         }
2261
2262                         /*
2263                          * At least we think this is still a life table. Print it's
2264                          * access stats.
2265                          */
2266                         fputc('T', fpout);
2267                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2268                 }
2269
2270                 /*
2271                  * Mark the end of this DB
2272                  */
2273                 fputc('d', fpout);
2274         }
2275
2276         /*
2277          * Write out the known running backends to the stats file.
2278          */
2279         i = MaxBackends;
2280         fputc('M', fpout);
2281         fwrite(&i, sizeof(i), 1, fpout);
2282
2283         for (i = 0; i < MaxBackends; i++)
2284         {
2285                 if (pgStatBeTable[i].databaseid != InvalidOid)
2286                 {
2287                         fputc('B', fpout);
2288                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2289                 }
2290         }
2291
2292         /*
2293          * No more output to be done. Close the temp file and replace the old
2294          * pgstat.stat with it.
2295          */
2296         fputc('E', fpout);
2297         if (fclose(fpout) < 0)
2298         {
2299                 ereport(LOG,
2300                                 (errcode_for_file_access(),
2301                    errmsg("could not close temporary statistics file \"%s\": %m",
2302                                   pgStat_tmpfname)));
2303         }
2304         else
2305         {
2306                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2307                 {
2308                         ereport(LOG,
2309                                         (errcode_for_file_access(),
2310                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2311                                                         pgStat_tmpfname, pgStat_fname)));
2312                 }
2313         }
2314
2315         /*
2316          * Clear out the dead backends table
2317          */
2318         hash_seq_init(&hstat, pgStatBeDead);
2319         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2320         {
2321                 /*
2322                  * Count down the destroy delay and remove entries where it
2323                  * reaches 0.
2324                  */
2325                 if (--(deadbe->destroy) <= 0)
2326                 {
2327                         if (hash_search(pgStatBeDead,
2328                                                         (void *) &(deadbe->procpid),
2329                                                         HASH_REMOVE, NULL) == NULL)
2330                         {
2331                                 ereport(ERROR,
2332                                           (errmsg("dead-server-process hash table corrupted "
2333                                                           "during cleanup --- abort")));
2334                         }
2335                 }
2336         }
2337 }
2338
2339
2340 /* ----------
2341  * pgstat_read_statsfile() -
2342  *
2343  *      Reads in an existing statistics collector and initializes the
2344  *      databases hash table (who's entries point to the tables hash tables)
2345  *      and the current backend table.
2346  * ----------
2347  */
2348 static void
2349 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2350                                           PgStat_StatBeEntry **betab, int *numbackends)
2351 {
2352         PgStat_StatDBEntry *dbentry;
2353         PgStat_StatDBEntry dbbuf;
2354         PgStat_StatTabEntry *tabentry;
2355         PgStat_StatTabEntry tabbuf;
2356         HASHCTL         hash_ctl;
2357         HTAB       *tabhash = NULL;
2358         FILE       *fpin;
2359         int                     maxbackends = 0;
2360         int                     havebackends = 0;
2361         bool            found;
2362         MemoryContext use_mcxt;
2363         int                     mcxt_flags;
2364
2365         /*
2366          * If running in the collector we use the DynaHashCxt memory context.
2367          * If running in a backend, we use the TopTransactionContext instead,
2368          * so the caller must only know the last XactId when this call
2369          * happened to know if his tables are still valid or already gone!
2370          */
2371         if (pgStatRunningInCollector)
2372         {
2373                 use_mcxt = NULL;
2374                 mcxt_flags = 0;
2375         }
2376         else
2377         {
2378                 use_mcxt = TopTransactionContext;
2379                 mcxt_flags = HASH_CONTEXT;
2380         }
2381
2382         /*
2383          * Create the DB hashtable
2384          */
2385         memset(&hash_ctl, 0, sizeof(hash_ctl));
2386         hash_ctl.keysize = sizeof(Oid);
2387         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2388         hash_ctl.hash = tag_hash;
2389         hash_ctl.hcxt = use_mcxt;
2390         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2391                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2392
2393         /*
2394          * Initialize the number of known backends to zero, just in case we do
2395          * a silent error return below.
2396          */
2397         if (numbackends != NULL)
2398                 *numbackends = 0;
2399         if (betab != NULL)
2400                 *betab = NULL;
2401
2402         /*
2403          * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2404          * postmaster, so compute it first time through.
2405          */
2406 #ifdef EXEC_BACKEND
2407         if (pgStat_fname[0] == '\0')
2408         {
2409                 Assert(DataDir != NULL);
2410                 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2411         }
2412 #endif
2413
2414         /*
2415          * Try to open the status file. If it doesn't exist, the backends
2416          * simply return zero for anything and the collector simply starts
2417          * from scratch with empty counters.
2418          */
2419         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2420                 return;
2421
2422         /*
2423          * We found an existing collector stats file. Read it and put all the
2424          * hashtable entries into place.
2425          */
2426         for (;;)
2427         {
2428                 switch (fgetc(fpin))
2429                 {
2430                                 /*
2431                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2432                                  * follows. Subsequently, zero to many 'T' entries will
2433                                  * follow until a 'd' is encountered.
2434                                  */
2435                         case 'D':
2436                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2437                                 {
2438                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2439                                                         (errmsg("corrupted pgstat.stat file")));
2440                                         fclose(fpin);
2441                                         return;
2442                                 }
2443
2444                                 /*
2445                                  * Add to the DB hash
2446                                  */
2447                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2448                                                                                           (void *) &dbbuf.databaseid,
2449                                                                                                                          HASH_ENTER,
2450                                                                                                                          &found);
2451                                 if (dbentry == NULL)
2452                                 {
2453                                         fclose(fpin);
2454                                         ereport(ERROR,
2455                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2456                                                          errmsg("out of memory")));
2457                                 }
2458                                 if (found)
2459                                 {
2460                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2461                                                         (errmsg("corrupted pgstat.stat file")));
2462                                         fclose(fpin);
2463                                         return;
2464                                 }
2465
2466                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2467                                 dbentry->tables = NULL;
2468                                 dbentry->destroy = 0;
2469                                 dbentry->n_backends = 0;
2470
2471                                 /*
2472                                  * Don't collect tables if not the requested DB
2473                                  */
2474                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2475                                         break;
2476
2477                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2478                                 hash_ctl.keysize = sizeof(Oid);
2479                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2480                                 hash_ctl.hash = tag_hash;
2481                                 hash_ctl.hcxt = use_mcxt;
2482                                 PG_TRY();
2483                                 {
2484                                         dbentry->tables = hash_create("Per-database table",
2485                                                                                                   PGSTAT_TAB_HASH_SIZE,
2486                                                                                                   &hash_ctl,
2487                                                                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2488                                 }
2489                                 PG_CATCH();
2490                                 {
2491                                         fclose(fpin);
2492                                         PG_RE_THROW();
2493                                 }
2494                                 PG_END_TRY();
2495
2496                                 /*
2497                                  * Arrange that following 'T's add entries to this
2498                                  * databases tables hash table.
2499                                  */
2500                                 tabhash = dbentry->tables;
2501                                 break;
2502
2503                                 /*
2504                                  * 'd'  End of this database.
2505                                  */
2506                         case 'd':
2507                                 tabhash = NULL;
2508                                 break;
2509
2510                                 /*
2511                                  * 'T'  A PgStat_StatTabEntry follows.
2512                                  */
2513                         case 'T':
2514                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2515                                 {
2516                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2517                                                         (errmsg("corrupted pgstat.stat file")));
2518                                         fclose(fpin);
2519                                         return;
2520                                 }
2521
2522                                 /*
2523                                  * Skip if table belongs to a not requested database.
2524                                  */
2525                                 if (tabhash == NULL)
2526                                         break;
2527
2528                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2529                                                                                                 (void *) &tabbuf.tableid,
2530                                                                                                          HASH_ENTER, &found);
2531                                 if (tabentry == NULL)
2532                                 {
2533                                         fclose(fpin);
2534                                         ereport(ERROR,
2535                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2536                                                          errmsg("out of memory")));
2537                                 }
2538
2539                                 if (found)
2540                                 {
2541                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2542                                                         (errmsg("corrupted pgstat.stat file")));
2543                                         fclose(fpin);
2544                                         return;
2545                                 }
2546
2547                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2548                                 break;
2549
2550                                 /*
2551                                  * 'M'  The maximum number of backends to expect follows.
2552                                  */
2553                         case 'M':
2554                                 if (betab == NULL || numbackends == NULL)
2555                                 {
2556                                         fclose(fpin);
2557                                         return;
2558                                 }
2559                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2560                                         sizeof(maxbackends))
2561                                 {
2562                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2563                                                         (errmsg("corrupted pgstat.stat file")));
2564                                         fclose(fpin);
2565                                         return;
2566                                 }
2567                                 if (maxbackends == 0)
2568                                 {
2569                                         fclose(fpin);
2570                                         return;
2571                                 }
2572
2573                                 /*
2574                                  * Allocate space (in TopTransactionContext too) for the
2575                                  * backend table.
2576                                  */
2577                                 if (use_mcxt == NULL)
2578                                         *betab = (PgStat_StatBeEntry *) malloc(
2579                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2580                                 else
2581                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2582                                                                                                                                 use_mcxt,
2583                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2584                                 break;
2585
2586                                 /*
2587                                  * 'B'  A PgStat_StatBeEntry follows.
2588                                  */
2589                         case 'B':
2590                                 if (betab == NULL || numbackends == NULL)
2591                                 {
2592                                         fclose(fpin);
2593                                         return;
2594                                 }
2595                                 if (*betab == NULL)
2596                                 {
2597                                         fclose(fpin);
2598                                         return;
2599                                 }
2600
2601                                 /*
2602                                  * Read it directly into the table.
2603                                  */
2604                                 if (fread(&(*betab)[havebackends], 1,
2605                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2606                                         sizeof(PgStat_StatBeEntry))
2607                                 {
2608                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2609                                                         (errmsg("corrupted pgstat.stat file")));
2610                                         fclose(fpin);
2611                                         return;
2612                                 }
2613
2614                                 /*
2615                                  * Count backends per database here.
2616                                  */
2617                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2618                                                    (void *) &((*betab)[havebackends].databaseid),
2619                                                                                                                 HASH_FIND, NULL);
2620                                 if (dbentry)
2621                                         dbentry->n_backends++;
2622
2623                                 havebackends++;
2624                                 if (numbackends != 0)
2625                                         *numbackends = havebackends;
2626                                 if (havebackends >= maxbackends)
2627                                 {
2628                                         fclose(fpin);
2629                                         return;
2630                                 }
2631                                 break;
2632
2633                                 /*
2634                                  * 'E'  The EOF marker of a complete stats file.
2635                                  */
2636                         case 'E':
2637                                 fclose(fpin);
2638                                 return;
2639
2640                         default:
2641                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2642                                                 (errmsg("corrupted pgstat.stat file")));
2643                                 fclose(fpin);
2644                                 return;
2645                 }
2646         }
2647
2648         fclose(fpin);
2649 }
2650
2651 /*
2652  * If not done for this transaction, read the statistics collector
2653  * stats file into some hash tables.
2654  *
2655  * Because we store the hash tables in TopTransactionContext, the result
2656  * is good for the entire current main transaction.
2657  */
2658 static void
2659 backend_read_statsfile(void)
2660 {
2661         TransactionId topXid = GetTopTransactionId();
2662
2663         if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2664         {
2665                 Assert(!pgStatRunningInCollector);
2666                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2667                                                           &pgStatBeTable, &pgStatNumBackends);
2668                 pgStatDBHashXact = topXid;
2669         }
2670 }
2671
2672
2673 /* ----------
2674  * pgstat_recv_bestart() -
2675  *
2676  *      Process a backend startup message.
2677  * ----------
2678  */
2679 static void
2680 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2681 {
2682         pgstat_add_backend(&msg->m_hdr);
2683 }
2684
2685
2686 /* ----------
2687  * pgstat_recv_beterm() -
2688  *
2689  *      Process a backend termination message.
2690  * ----------
2691  */
2692 static void
2693 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2694 {
2695         pgstat_sub_backend(msg->m_hdr.m_procpid);
2696 }
2697
2698
2699 /* ----------
2700  * pgstat_recv_activity() -
2701  *
2702  *      Remember what the backend is doing.
2703  * ----------
2704  */
2705 static void
2706 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2707 {
2708         PgStat_StatBeEntry *entry;
2709
2710         /*
2711          * Here we check explicitly for 0 return, since we don't want to
2712          * mangle the activity of an active backend by a delayed packet from a
2713          * dead one.
2714          */
2715         if (pgstat_add_backend(&msg->m_hdr) != 0)
2716                 return;
2717
2718         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2719
2720         StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2721
2722         entry->activity_start_sec =
2723                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2724 }
2725
2726
2727 /* ----------
2728  * pgstat_recv_tabstat() -
2729  *
2730  *      Count what the backend has done.
2731  * ----------
2732  */
2733 static void
2734 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2735 {
2736         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2737         PgStat_StatDBEntry *dbentry;
2738         PgStat_StatTabEntry *tabentry;
2739         int                     i;
2740         bool            found;
2741
2742         /*
2743          * Make sure the backend is counted for.
2744          */
2745         if (pgstat_add_backend(&msg->m_hdr) < 0)
2746                 return;
2747
2748         /*
2749          * Lookup the database in the hashtable.
2750          */
2751         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2752                                                                          (void *) &(msg->m_hdr.m_databaseid),
2753                                                                                                  HASH_FIND, NULL);
2754         if (!dbentry)
2755                 return;
2756
2757         /*
2758          * If the database is marked for destroy, this is a delayed UDP packet
2759          * and not worth being counted.
2760          */
2761         if (dbentry->destroy > 0)
2762                 return;
2763
2764         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2765         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2766
2767         /*
2768          * Process all table entries in the message.
2769          */
2770         for (i = 0; i < msg->m_nentries; i++)
2771         {
2772                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2773                                                                                           (void *) &(tabmsg[i].t_id),
2774                                                                                                          HASH_ENTER, &found);
2775                 if (tabentry == NULL)
2776                         ereport(ERROR,
2777                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2778                          errmsg("out of memory in statistics collector --- abort")));
2779
2780                 if (!found)
2781                 {
2782                         /*
2783                          * If it's a new table entry, initialize counters to the
2784                          * values we just got.
2785                          */
2786                         tabentry->numscans = tabmsg[i].t_numscans;
2787                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2788                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2789                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2790                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2791                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2792                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2793                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2794
2795                         tabentry->destroy = 0;
2796                 }
2797                 else
2798                 {
2799                         /*
2800                          * Otherwise add the values to the existing entry.
2801                          */
2802                         tabentry->numscans += tabmsg[i].t_numscans;
2803                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2804                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2805                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2806                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2807                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2808                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2809                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2810                 }
2811
2812                 /*
2813                  * And add the block IO to the database entry.
2814                  */
2815                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2816                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2817         }
2818 }
2819
2820
2821 /* ----------
2822  * pgstat_recv_tabpurge() -
2823  *
2824  *      Arrange for dead table removal.
2825  * ----------
2826  */
2827 static void
2828 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2829 {
2830         PgStat_StatDBEntry *dbentry;
2831         PgStat_StatTabEntry *tabentry;
2832         int                     i;
2833
2834         /*
2835          * Make sure the backend is counted for.
2836          */
2837         if (pgstat_add_backend(&msg->m_hdr) < 0)
2838                 return;
2839
2840         /*
2841          * Lookup the database in the hashtable.
2842          */
2843         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2844                                                                          (void *) &(msg->m_hdr.m_databaseid),
2845                                                                                                  HASH_FIND, NULL);
2846         if (!dbentry)
2847                 return;
2848
2849         /*
2850          * If the database is marked for destroy, this is a delayed UDP packet
2851          * and the tables will go away at DB destruction.
2852          */
2853         if (dbentry->destroy > 0)
2854                 return;
2855
2856         /*
2857          * Process all table entries in the message.
2858          */
2859         for (i = 0; i < msg->m_nentries; i++)
2860         {
2861                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2862                                                                                    (void *) &(msg->m_tableid[i]),
2863                                                                                                            HASH_FIND, NULL);
2864                 if (tabentry)
2865                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
2866         }
2867 }
2868
2869
2870 /* ----------
2871  * pgstat_recv_dropdb() -
2872  *
2873  *      Arrange for dead database removal
2874  * ----------
2875  */
2876 static void
2877 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2878 {
2879         PgStat_StatDBEntry *dbentry;
2880
2881         /*
2882          * Make sure the backend is counted for.
2883          */
2884         if (pgstat_add_backend(&msg->m_hdr) < 0)
2885                 return;
2886
2887         /*
2888          * Lookup the database in the hashtable.
2889          */
2890         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2891                                                                                    (void *) &(msg->m_databaseid),
2892                                                                                                  HASH_FIND, NULL);
2893         if (!dbentry)
2894                 return;
2895
2896         /*
2897          * Mark the database for destruction.
2898          */
2899         dbentry->destroy = PGSTAT_DESTROY_COUNT;
2900 }
2901
2902
2903 /* ----------
2904  * pgstat_recv_dropdb() -
2905  *
2906  *      Arrange for dead database removal
2907  * ----------
2908  */
2909 static void
2910 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
2911 {
2912         HASHCTL         hash_ctl;
2913         PgStat_StatDBEntry *dbentry;
2914
2915         /*
2916          * Make sure the backend is counted for.
2917          */
2918         if (pgstat_add_backend(&msg->m_hdr) < 0)
2919                 return;
2920
2921         /*
2922          * Lookup the database in the hashtable.
2923          */
2924         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2925                                                                          (void *) &(msg->m_hdr.m_databaseid),
2926                                                                                                  HASH_FIND, NULL);
2927         if (!dbentry)
2928                 return;
2929
2930         /*
2931          * We simply throw away all the databases table entries by recreating
2932          * a new hash table for them.
2933          */
2934         if (dbentry->tables != NULL)
2935                 hash_destroy(dbentry->tables);
2936
2937         dbentry->tables = NULL;
2938         dbentry->n_xact_commit = 0;
2939         dbentry->n_xact_rollback = 0;
2940         dbentry->n_blocks_fetched = 0;
2941         dbentry->n_blocks_hit = 0;
2942         dbentry->n_connects = 0;
2943         dbentry->destroy = 0;
2944
2945         memset(&hash_ctl, 0, sizeof(hash_ctl));
2946         hash_ctl.keysize = sizeof(Oid);
2947         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2948         hash_ctl.hash = tag_hash;
2949         dbentry->tables = hash_create("Per-database table",
2950                                                                   PGSTAT_TAB_HASH_SIZE,
2951                                                                   &hash_ctl,
2952                                                                   HASH_ELEM | HASH_FUNCTION);
2953 }