]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Adjust pgstat message definitions so that the target message size is
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2003, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.76 2004/06/26 16:32:02 tgl Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/catname.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_shadow.h"
39 #include "libpq/libpq.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/ipc.h"
46 #include "storage/pg_shmem.h"
47 #include "storage/pmsignal.h"
48 #include "tcop/tcopprot.h"
49 #include "utils/hsearch.h"
50 #include "utils/memutils.h"
51 #include "utils/ps_status.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54
55
56 /* ----------
57  * Paths for the statistics files. The %s is replaced with the
58  * installation's $PGDATA.
59  * ----------
60  */
61 #define PGSTAT_STAT_FILENAME    "%s/global/pgstat.stat"
62 #define PGSTAT_STAT_TMPFILE             "%s/global/pgstat.tmp.%d"
63
64 /* ----------
65  * Timer definitions.
66  * ----------
67  */
68 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status
69                                                                                  * file; in milliseconds. */
70
71 #define PGSTAT_DESTROY_DELAY    10000   /* How long to keep destroyed
72                                                                                  * objects known, to give delayed
73                                                                                  * UDP packets time to arrive;
74                                                                                  * in milliseconds. */
75
76 #define PGSTAT_DESTROY_COUNT    (PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)
77
78 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart
79                                                                                  * a failed statistics collector;
80                                                                                  * in seconds. */
81
82 /* ----------
83  * Amount of space reserved in pgstat_recvbuffer().
84  * ----------
85  */
86 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
87
88 /* ----------
89  * The initial size hints for the hash tables used in the collector.
90  * ----------
91  */
92 #define PGSTAT_DB_HASH_SIZE             16
93 #define PGSTAT_BE_HASH_SIZE             512
94 #define PGSTAT_TAB_HASH_SIZE    512
95
96
97 /* ----------
98  * GUC parameters
99  * ----------
100  */
101 bool            pgstat_collect_startcollector = true;
102 bool            pgstat_collect_resetonpmstart = true;
103 bool            pgstat_collect_querystring = false;
104 bool            pgstat_collect_tuplelevel = false;
105 bool            pgstat_collect_blocklevel = false;
106
107 /* ----------
108  * Local data
109  * ----------
110  */
111 NON_EXEC_STATIC int     pgStatSock = -1;
112 static int      pgStatPipe[2];
113 static struct sockaddr_storage pgStatAddr;
114
115 static time_t last_pgstat_start_time;
116
117 static long pgStatNumMessages = 0;
118
119 static bool pgStatRunningInCollector = FALSE;
120
121 static int      pgStatTabstatAlloc = 0;
122 static int      pgStatTabstatUsed = 0;
123 static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
124
125 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
126
127 static int      pgStatXactCommit = 0;
128 static int      pgStatXactRollback = 0;
129
130 static TransactionId pgStatDBHashXact = InvalidTransactionId;
131 static HTAB *pgStatDBHash = NULL;
132 static HTAB *pgStatBeDead = NULL;
133 static PgStat_StatBeEntry *pgStatBeTable = NULL;
134 static int      pgStatNumBackends = 0;
135
136 static char pgStat_fname[MAXPGPATH];
137 static char pgStat_tmpfname[MAXPGPATH];
138
139
140 /* ----------
141  * Local function forward declarations
142  * ----------
143  */
144 #ifdef EXEC_BACKEND
145
146 typedef enum STATS_PROCESS_TYPE
147 {
148         STAT_PROC_BUFFER,
149         STAT_PROC_COLLECTOR
150 } STATS_PROCESS_TYPE;
151
152 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
153 static void pgstat_parseArgs(int argc, char *argv[]);
154
155 #endif
156
157 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
158 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
159 static void pgstat_recvbuffer(void);
160 static void pgstat_exit(SIGNAL_ARGS);
161 static void pgstat_die(SIGNAL_ARGS);
162
163 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
164 static void pgstat_sub_backend(int procpid);
165 static void pgstat_drop_database(Oid databaseid);
166 static void pgstat_write_statsfile(void);
167 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
168                                           PgStat_StatBeEntry **betab,
169                                           int *numbackends);
170
171 static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
172 static void pgstat_send(void *msg, int len);
173
174 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
175 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
176 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
177 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
178 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
179 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
180 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
181
182
183 /* ------------------------------------------------------------
184  * Public functions called from postmaster follow
185  * ------------------------------------------------------------
186  */
187
188 /* ----------
189  * pgstat_init() -
190  *
191  *      Called from postmaster at startup. Create the resources required
192  *      by the statistics collector process.  If unable to do so, do not
193  *      fail --- better to let the postmaster start with stats collection
194  *      disabled.
195  * ----------
196  */
197 void
198 pgstat_init(void)
199 {
200         ACCEPT_TYPE_ARG3 alen;
201         struct addrinfo *addrs = NULL,
202                            *addr,
203                                 hints;
204         int                     ret;
205         fd_set      rset;
206         struct timeval tv;
207         char        test_byte;
208         int         sel_res;
209
210 #define TESTBYTEVAL ((char) 199)
211
212         /*
213          * Force start of collector daemon if something to collect
214          */
215         if (pgstat_collect_querystring ||
216                 pgstat_collect_tuplelevel ||
217                 pgstat_collect_blocklevel)
218                 pgstat_collect_startcollector = true;
219
220         /*
221          * Initialize the filename for the status reports.  (In the EXEC_BACKEND
222          * case, this only sets the value in the postmaster.  The collector
223          * subprocess will recompute the value for itself, and individual
224          * backends must do so also if they want to access the file.)
225          */
226         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
227
228         /*
229          * If we don't have to start a collector or should reset the collected
230          * statistics on postmaster start, simply remove the file.
231          */
232         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
233                 unlink(pgStat_fname);
234
235         /*
236          * Nothing else required if collector will not get started
237          */
238         if (!pgstat_collect_startcollector)
239                 return;
240
241         /*
242          * Create the UDP socket for sending and receiving statistic messages
243          */
244         hints.ai_flags = AI_PASSIVE;
245         hints.ai_family = PF_UNSPEC;
246         hints.ai_socktype = SOCK_DGRAM;
247         hints.ai_protocol = 0;
248         hints.ai_addrlen = 0;
249         hints.ai_addr = NULL;
250         hints.ai_canonname = NULL;
251         hints.ai_next = NULL;
252         ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
253         if (ret || !addrs)
254         {
255                 ereport(LOG,
256                                 (errmsg("could not resolve \"localhost\": %s",
257                                                 gai_strerror(ret))));
258                 goto startup_failed;
259         }
260
261         /*
262          * On some platforms, getaddrinfo_all() may return multiple addresses
263          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
264          * when kernel will reject IPv6).  Worse, the failure may occur at the
265          * bind() or perhaps even connect() stage.  So we must loop through the
266          * results till we find a working combination.  We will generate LOG
267          * messages, but no error, for bogus combinations.
268          */
269         for (addr = addrs; addr; addr = addr->ai_next)
270         {
271 #ifdef HAVE_UNIX_SOCKETS
272                 /* Ignore AF_UNIX sockets, if any are returned. */
273                 if (addr->ai_family == AF_UNIX)
274                         continue;
275 #endif
276                 /*
277                  * Create the socket.
278                  */
279                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
280                 {
281                         ereport(LOG,
282                                         (errcode_for_socket_access(),
283                                          errmsg("could not create socket for statistics collector: %m")));
284                         continue;
285                 }
286
287                 /*
288                  * Bind it to a kernel assigned port on localhost and get the assigned
289                  * port via getsockname().
290                  */
291                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
292                 {
293                         ereport(LOG,
294                                         (errcode_for_socket_access(),
295                                          errmsg("could not bind socket for statistics collector: %m")));
296                         closesocket(pgStatSock);
297                         pgStatSock = -1;
298                         continue;
299                 }
300
301                 alen = sizeof(pgStatAddr);
302                 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
303                 {
304                         ereport(LOG,
305                                         (errcode_for_socket_access(),
306                                          errmsg("could not get address of socket for statistics collector: %m")));
307                         closesocket(pgStatSock);
308                         pgStatSock = -1;
309                         continue;
310                 }
311
312                 /*
313                  * Connect the socket to its own address.  This saves a few cycles by
314                  * not having to respecify the target address on every send. This also
315                  * provides a kernel-level check that only packets from this same
316                  * address will be received.
317                  */
318                 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
319                 {
320                         ereport(LOG,
321                                         (errcode_for_socket_access(),
322                                          errmsg("could not connect socket for statistics collector: %m")));
323                         closesocket(pgStatSock);
324                         pgStatSock = -1;
325                         continue;
326                 }
327
328                 /*
329                  * Try to send and receive a one-byte test message on the socket.
330                  * This is to catch situations where the socket can be created but
331                  * will not actually pass data (for instance, because kernel packet
332                  * filtering rules prevent it).
333                  */
334                 test_byte = TESTBYTEVAL;
335                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
336                 {
337                         ereport(LOG,
338                                         (errcode_for_socket_access(),
339                                          errmsg("could not send test message on socket for statistics collector: %m")));
340                         closesocket(pgStatSock);
341                         pgStatSock = -1;
342                         continue;
343                 }
344
345                 /*
346                  * There could possibly be a little delay before the message can be
347                  * received.  We arbitrarily allow up to half a second before deciding
348                  * it's broken.
349                  */
350                 for (;;)                                /* need a loop to handle EINTR */
351                 {
352                         FD_ZERO(&rset);
353                         FD_SET(pgStatSock, &rset);
354                         tv.tv_sec = 0;
355                         tv.tv_usec = 500000;
356                         sel_res = select(pgStatSock+1, &rset, NULL, NULL, &tv);
357                         if (sel_res >= 0 || errno != EINTR)
358                                 break;
359                 }
360                 if (sel_res < 0)
361                 {
362                         ereport(LOG,
363                                         (errcode_for_socket_access(),
364                                          errmsg("select() failed in statistics collector: %m")));
365                         closesocket(pgStatSock);
366                         pgStatSock = -1;
367                         continue;
368                 }
369                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
370                 {
371                         /*
372                          * This is the case we actually think is likely, so take pains to
373                          * give a specific message for it.
374                          *
375                          * errno will not be set meaningfully here, so don't use it.
376                          */
377                         ereport(LOG,
378                                         (ERRCODE_CONNECTION_FAILURE,
379                                          errmsg("test message did not get through on socket for statistics collector")));
380                         closesocket(pgStatSock);
381                         pgStatSock = -1;
382                         continue;
383                 }
384
385                 test_byte++;                    /* just make sure variable is changed */
386
387                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
388                 {
389                         ereport(LOG,
390                                         (errcode_for_socket_access(),
391                                          errmsg("could not receive test message on socket for statistics collector: %m")));
392                         closesocket(pgStatSock);
393                         pgStatSock = -1;
394                         continue;
395                 }
396
397                 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
398                 {
399                         ereport(LOG,
400                                         (ERRCODE_INTERNAL_ERROR,
401                                          errmsg("incorrect test message transmission on socket for statistics collector")));
402                         closesocket(pgStatSock);
403                         pgStatSock = -1;
404                         continue;
405                 }
406
407                 /* If we get here, we have a working socket */
408                 break;
409         }
410
411         /* Did we find a working address? */
412         if (!addr || pgStatSock < 0)
413         {
414                 ereport(LOG,
415                                 (errcode_for_socket_access(),
416                                  errmsg("disabling statistics collector for lack of working socket")));
417                 goto startup_failed;
418         }
419
420         /*
421          * Set the socket to non-blocking IO.  This ensures that if the
422          * collector falls behind (despite the buffering process), statistics
423          * messages will be discarded; backends won't block waiting to send
424          * messages to the collector.
425          */
426         if (!set_noblock(pgStatSock))
427         {
428                 ereport(LOG,
429                                 (errcode_for_socket_access(),
430                 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
431                 goto startup_failed;
432         }
433
434         freeaddrinfo_all(hints.ai_family, addrs);
435
436         return;
437
438 startup_failed:
439         if (addrs)
440                 freeaddrinfo_all(hints.ai_family, addrs);
441
442         if (pgStatSock >= 0)
443                 closesocket(pgStatSock);
444         pgStatSock = -1;
445
446         /* Adjust GUC variables to suppress useless activity */
447         pgstat_collect_startcollector = false;
448         pgstat_collect_querystring = false;
449         pgstat_collect_tuplelevel = false;
450         pgstat_collect_blocklevel = false;
451 }
452
453
454 #ifdef EXEC_BACKEND
455
456 /*
457  * pgstat_forkexec() -
458  *
459  * Format up the arglist for, then fork and exec, statistics
460  * (buffer and collector) processes
461  */
462 static pid_t
463 pgstat_forkexec(STATS_PROCESS_TYPE procType)
464 {
465         char *av[10];
466         int ac = 0, bufc = 0, i;
467         char pgstatBuf[2][32];
468
469         av[ac++] = "postgres";
470
471         switch (procType)
472         {
473                 case STAT_PROC_BUFFER:
474                         av[ac++] = "-forkbuf";
475                         break;
476
477                 case STAT_PROC_COLLECTOR:
478                         av[ac++] = "-forkcol";
479                         break;
480
481                 default:
482                         Assert(false);
483         }
484
485         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
486
487         /* postgres_exec_path is not passed by write_backend_variables */
488         av[ac++] = postgres_exec_path;
489
490         /* Pipe file ids (those not passed by write_backend_variables) */
491         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[0]);
492         snprintf(pgstatBuf[bufc++],32,"%d",pgStatPipe[1]);
493
494         /* Add to the arg list */
495         Assert(bufc <= lengthof(pgstatBuf));
496         for (i = 0; i < bufc; i++)
497                 av[ac++] = pgstatBuf[i];
498
499         av[ac] = NULL;
500         Assert(ac < lengthof(av));
501
502         return postmaster_forkexec(ac, av);
503 }
504
505
506 /*
507  * pgstat_parseArgs() -
508  *
509  * Extract data from the arglist for exec'ed statistics
510  * (buffer and collector) processes
511  */
512 static void
513 pgstat_parseArgs(int argc, char *argv[])
514 {
515         Assert(argc == 6);
516
517         argc = 3;
518         StrNCpy(postgres_exec_path,     argv[argc++], MAXPGPATH);
519         pgStatPipe[0]   = atoi(argv[argc++]);
520         pgStatPipe[1]   = atoi(argv[argc++]);
521 }
522
523 #endif /* EXEC_BACKEND */
524
525
526 /* ----------
527  * pgstat_start() -
528  *
529  *      Called from postmaster at startup or after an existing collector
530  *      died.  Attempt to fire up a fresh statistics collector.
531  *
532  *      Returns PID of child process, or 0 if fail.
533  *
534  *      Note: if fail, we will be called again from the postmaster main loop.
535  * ----------
536  */
537 int
538 pgstat_start(void)
539 {
540         time_t          curtime;
541         pid_t           pgStatPid;
542
543         /*
544          * Do nothing if no collector needed
545          */
546         if (!pgstat_collect_startcollector)
547                 return 0;
548
549         /*
550          * Do nothing if too soon since last collector start.  This is a
551          * safety valve to protect against continuous respawn attempts if the
552          * collector is dying immediately at launch.  Note that since we will
553          * be re-called from the postmaster main loop, we will get another
554          * chance later.
555          */
556         curtime = time(NULL);
557         if ((unsigned int) (curtime - last_pgstat_start_time) <
558                 (unsigned int) PGSTAT_RESTART_INTERVAL)
559                 return 0;
560         last_pgstat_start_time = curtime;
561
562         /*
563          * Check that the socket is there, else pgstat_init failed.
564          */
565         if (pgStatSock < 0)
566         {
567                 ereport(LOG,
568                                 (errmsg("statistics collector startup skipped")));
569
570                 /*
571                  * We can only get here if someone tries to manually turn
572                  * pgstat_collect_startcollector on after it had been off.
573                  */
574                 pgstat_collect_startcollector = false;
575                 return 0;
576         }
577
578         /*
579          * Okay, fork off the collector.
580          */
581
582         fflush(stdout);
583         fflush(stderr);
584
585 #ifdef __BEOS__
586         /* Specific beos actions before backend startup */
587         beos_before_backend_startup();
588 #endif
589
590 #ifdef EXEC_BACKEND
591         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
592 #else
593         switch ((pgStatPid = fork()))
594 #endif
595         {
596                 case -1:
597 #ifdef __BEOS__
598                         /* Specific beos actions */
599                         beos_backend_startup_failed();
600 #endif
601                         ereport(LOG,
602                                         (errmsg("could not fork statistics buffer: %m")));
603                         return 0;
604
605 #ifndef EXEC_BACKEND
606                 case 0:
607                         /* in postmaster child ... */
608 #ifdef __BEOS__
609                         /* Specific beos actions after backend startup */
610                         beos_backend_startup();
611 #endif
612                         /* Close the postmaster's sockets */
613                         ClosePostmasterPorts();
614
615                         /* Drop our connection to postmaster's shared memory, as well */
616                         PGSharedMemoryDetach();
617
618                         PgstatBufferMain(0, NULL);
619                         break;
620 #endif
621
622                 default:
623                         return (int) pgStatPid;
624         }
625
626         /* shouldn't get here */
627         return 0;
628 }
629
630
631 /* ----------
632  * pgstat_beterm() -
633  *
634  *      Called from postmaster to tell collector a backend terminated.
635  * ----------
636  */
637 void
638 pgstat_beterm(int pid)
639 {
640         PgStat_MsgBeterm msg;
641
642         if (pgStatSock < 0)
643                 return;
644
645         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
646         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
647         msg.m_hdr.m_procpid = pid;
648
649         pgstat_send(&msg, sizeof(msg));
650 }
651
652
653 /* ------------------------------------------------------------
654  * Public functions used by backends follow
655  *------------------------------------------------------------
656  */
657
658
659 /* ----------
660  * pgstat_bestart() -
661  *
662  *      Tell the collector that this new backend is soon ready to process
663  *      queries. Called from tcop/postgres.c before entering the mainloop.
664  * ----------
665  */
666 void
667 pgstat_bestart(void)
668 {
669         PgStat_MsgBestart msg;
670
671         if (pgStatSock < 0)
672                 return;
673
674         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
675         pgstat_send(&msg, sizeof(msg));
676 }
677
678
679 /* ----------
680  * pgstat_report_activity() -
681  *
682  *      Called in tcop/postgres.c to tell the collector what the backend
683  *      is actually doing (usually "<IDLE>" or the start of the query to
684  *      be executed).
685  * ----------
686  */
687 void
688 pgstat_report_activity(const char *what)
689 {
690         PgStat_MsgActivity msg;
691         int                     len;
692
693         if (!pgstat_collect_querystring || pgStatSock < 0)
694                 return;
695
696         len = strlen(what);
697         len = pg_mbcliplen((const unsigned char *) what, len,
698                                            PGSTAT_ACTIVITY_SIZE - 1);
699
700         memcpy(msg.m_what, what, len);
701         msg.m_what[len] = '\0';
702         len += offsetof(PgStat_MsgActivity, m_what) +1;
703
704         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
705         pgstat_send(&msg, len);
706 }
707
708
709 /* ----------
710  * pgstat_report_tabstat() -
711  *
712  *      Called from tcop/postgres.c to send the so far collected
713  *      per table access statistics to the collector.
714  * ----------
715  */
716 void
717 pgstat_report_tabstat(void)
718 {
719         int                     i;
720
721         if (pgStatSock < 0 ||
722                 !(pgstat_collect_querystring ||
723                   pgstat_collect_tuplelevel ||
724                   pgstat_collect_blocklevel))
725         {
726                 /* Not reporting stats, so just flush whatever we have */
727                 pgStatTabstatUsed = 0;
728                 return;
729         }
730
731         /*
732          * For each message buffer used during the last query set the header
733          * fields and send it out.
734          */
735         for (i = 0; i < pgStatTabstatUsed; i++)
736         {
737                 PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
738                 int                     n;
739                 int                     len;
740
741                 n = tsmsg->m_nentries;
742                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
743                         n * sizeof(PgStat_TableEntry);
744
745                 tsmsg->m_xact_commit = pgStatXactCommit;
746                 tsmsg->m_xact_rollback = pgStatXactRollback;
747                 pgStatXactCommit = 0;
748                 pgStatXactRollback = 0;
749
750                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
751                 pgstat_send(tsmsg, len);
752         }
753
754         pgStatTabstatUsed = 0;
755 }
756
757
758 /* ----------
759  * pgstat_vacuum_tabstat() -
760  *
761  *      Will tell the collector about objects he can get rid of.
762  * ----------
763  */
764 int
765 pgstat_vacuum_tabstat(void)
766 {
767         Relation        dbrel;
768         HeapScanDesc dbscan;
769         HeapTuple       dbtup;
770         Oid                *dbidlist;
771         int                     dbidalloc;
772         int                     dbidused;
773         HASH_SEQ_STATUS hstat;
774         PgStat_StatDBEntry *dbentry;
775         PgStat_StatTabEntry *tabentry;
776         HeapTuple       reltup;
777         int                     nobjects = 0;
778         PgStat_MsgTabpurge msg;
779         int                     len;
780         int                     i;
781
782         if (pgStatSock < 0)
783                 return 0;
784
785         /*
786          * If not done for this transaction, read the statistics collector
787          * stats file into some hash tables.
788          */
789         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
790         {
791                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
792                                                           &pgStatBeTable, &pgStatNumBackends);
793                 pgStatDBHashXact = GetCurrentTransactionId();
794         }
795
796         /*
797          * Lookup our own database entry
798          */
799         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
800                                                                                                  (void *) &MyDatabaseId,
801                                                                                                  HASH_FIND, NULL);
802         if (dbentry == NULL)
803                 return -1;
804
805         if (dbentry->tables == NULL)
806                 return 0;
807
808         /*
809          * Initialize our messages table counter to zero
810          */
811         msg.m_nentries = 0;
812
813         /*
814          * Check for all tables if they still exist.
815          */
816         hash_seq_init(&hstat, dbentry->tables);
817         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
818         {
819                 /*
820                  * Check if this relation is still alive by looking up it's
821                  * pg_class tuple in the system catalog cache.
822                  */
823                 reltup = SearchSysCache(RELOID,
824                                                                 ObjectIdGetDatum(tabentry->tableid),
825                                                                 0, 0, 0);
826                 if (HeapTupleIsValid(reltup))
827                 {
828                         ReleaseSysCache(reltup);
829                         continue;
830                 }
831
832                 /*
833                  * Add this tables Oid to the message
834                  */
835                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
836                 nobjects++;
837
838                 /*
839                  * If the message is full, send it out and reinitialize ot zero
840                  */
841                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
842                 {
843                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
844                                 +msg.m_nentries * sizeof(Oid);
845
846                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
847                         pgstat_send(&msg, len);
848
849                         msg.m_nentries = 0;
850                 }
851         }
852
853         /*
854          * Send the rest
855          */
856         if (msg.m_nentries > 0)
857         {
858                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
859                         +msg.m_nentries * sizeof(Oid);
860
861                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
862                 pgstat_send(&msg, len);
863         }
864
865         /*
866          * Read pg_database and remember the Oid's of all existing databases
867          */
868         dbidalloc = 256;
869         dbidused = 0;
870         dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
871
872         dbrel = heap_openr(DatabaseRelationName, AccessShareLock);
873         dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
874         while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
875         {
876                 if (dbidused >= dbidalloc)
877                 {
878                         dbidalloc *= 2;
879                         dbidlist = (Oid *) repalloc((char *) dbidlist,
880                                                                                 sizeof(Oid) * dbidalloc);
881                 }
882                 dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
883         }
884         heap_endscan(dbscan);
885         heap_close(dbrel, AccessShareLock);
886
887         /*
888          * Search the database hash table for dead databases and tell the
889          * collector to drop them as well.
890          */
891         hash_seq_init(&hstat, pgStatDBHash);
892         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
893         {
894                 Oid                     dbid = dbentry->databaseid;
895
896                 for (i = 0; i < dbidused; i++)
897                 {
898                         if (dbidlist[i] == dbid)
899                         {
900                                 dbid = InvalidOid;
901                                 break;
902                         }
903                 }
904
905                 if (dbid != InvalidOid)
906                 {
907                         nobjects++;
908                         pgstat_drop_database(dbid);
909                 }
910         }
911
912         /*
913          * Free the dbid list.
914          */
915         pfree((char *) dbidlist);
916
917         /*
918          * Tell the caller how many removeable objects we found
919          */
920         return nobjects;
921 }
922
923
924 /* ----------
925  * pgstat_drop_database() -
926  *
927  *      Tell the collector that we just dropped a database.
928  *      This is the only message that shouldn't get lost in space. Otherwise
929  *      the collector will keep the statistics for the dead DB until his
930  *      stats file got removed while the postmaster is down.
931  * ----------
932  */
933 static void
934 pgstat_drop_database(Oid databaseid)
935 {
936         PgStat_MsgDropdb msg;
937
938         if (pgStatSock < 0)
939                 return;
940
941         msg.m_databaseid = databaseid;
942
943         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
944         pgstat_send(&msg, sizeof(msg));
945 }
946
947
948 /* ----------
949  * pgstat_reset_counters() -
950  *
951  *      Tell the statistics collector to reset counters for our database.
952  * ----------
953  */
954 void
955 pgstat_reset_counters(void)
956 {
957         PgStat_MsgResetcounter msg;
958
959         if (pgStatSock < 0)
960                 return;
961
962         if (!superuser())
963                 ereport(ERROR,
964                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
965                           errmsg("must be superuser to reset statistics counters")));
966
967         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
968         pgstat_send(&msg, sizeof(msg));
969 }
970
971
972 /* ----------
973  * pgstat_ping() -
974  *
975  *      Send some junk data to the collector to increase traffic.
976  * ----------
977  */
978 void
979 pgstat_ping(void)
980 {
981         PgStat_MsgDummy msg;
982
983         if (pgStatSock < 0)
984                 return;
985
986         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
987         pgstat_send(&msg, sizeof(msg));
988 }
989
990 /*
991  * Create or enlarge the pgStatTabstatMessages array
992  */
993 static bool
994 more_tabstat_space(void)
995 {
996         PgStat_MsgTabstat *newMessages;
997         PgStat_MsgTabstat **msgArray;
998         int                     newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
999         int                     i;
1000
1001         /* Create (another) quantum of message buffers */
1002         newMessages = (PgStat_MsgTabstat *)
1003                 malloc(sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1004         if (newMessages == NULL)
1005         {
1006                 ereport(LOG,
1007                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1008                                  errmsg("out of memory")));
1009                 return false;
1010         }
1011
1012         /* Create or enlarge the pointer array */
1013         if (pgStatTabstatMessages == NULL)
1014                 msgArray = (PgStat_MsgTabstat **)
1015                         malloc(sizeof(PgStat_MsgTabstat *) * newAlloc);
1016         else
1017                 msgArray = (PgStat_MsgTabstat **)
1018                         realloc(pgStatTabstatMessages,
1019                                         sizeof(PgStat_MsgTabstat *) * newAlloc);
1020         if (msgArray == NULL)
1021         {
1022                 free(newMessages);
1023                 ereport(LOG,
1024                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1025                                  errmsg("out of memory")));
1026                 return false;
1027         }
1028
1029         MemSet(newMessages, 0, sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1030         for (i = 0; i < TABSTAT_QUANTUM; i++)
1031                 msgArray[pgStatTabstatAlloc + i] = newMessages++;
1032         pgStatTabstatMessages = msgArray;
1033         pgStatTabstatAlloc = newAlloc;
1034
1035         return true;
1036 }
1037
1038 /* ----------
1039  * pgstat_initstats() -
1040  *
1041  *      Called from various places usually dealing with initialization
1042  *      of Relation or Scan structures. The data placed into these
1043  *      structures from here tell where later to count for buffer reads,
1044  *      scans and tuples fetched.
1045  * ----------
1046  */
1047 void
1048 pgstat_initstats(PgStat_Info *stats, Relation rel)
1049 {
1050         Oid                     rel_id = rel->rd_id;
1051         PgStat_TableEntry *useent;
1052         PgStat_MsgTabstat *tsmsg;
1053         int                     mb;
1054         int                     i;
1055
1056         /*
1057          * Initialize data not to count at all.
1058          */
1059         stats->tabentry = NULL;
1060         stats->no_stats = FALSE;
1061         stats->heap_scan_counted = FALSE;
1062         stats->index_scan_counted = FALSE;
1063
1064         if (pgStatSock < 0 ||
1065                 !(pgstat_collect_tuplelevel ||
1066                   pgstat_collect_blocklevel))
1067         {
1068                 stats->no_stats = TRUE;
1069                 return;
1070         }
1071
1072         /*
1073          * Search the already-used message slots for this relation.
1074          */
1075         for (mb = 0; mb < pgStatTabstatUsed; mb++)
1076         {
1077                 tsmsg = pgStatTabstatMessages[mb];
1078
1079                 for (i = tsmsg->m_nentries; --i >= 0; )
1080                 {
1081                         if (tsmsg->m_entry[i].t_id == rel_id)
1082                         {
1083                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1084                                 return;
1085                         }
1086                 }
1087
1088                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1089                         continue;
1090
1091                 /*
1092                  * Not found, but found a message buffer with an empty slot
1093                  * instead. Fine, let's use this one.
1094                  */
1095                 i = tsmsg->m_nentries++;
1096                 useent = &tsmsg->m_entry[i];
1097                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1098                 useent->t_id = rel_id;
1099                 stats->tabentry = (void *) useent;
1100                 return;
1101         }
1102
1103         /*
1104          * If we ran out of message buffers, we just allocate more.
1105          */
1106         if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1107         {
1108                 if (!more_tabstat_space())
1109                 {
1110                         stats->no_stats = TRUE;
1111                         return;
1112                 }
1113                 Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1114         }
1115
1116         /*
1117          * Use the first entry of the next message buffer.
1118          */
1119         mb = pgStatTabstatUsed++;
1120         tsmsg = pgStatTabstatMessages[mb];
1121         tsmsg->m_nentries = 1;
1122         useent = &tsmsg->m_entry[0];
1123         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1124         useent->t_id = rel_id;
1125         stats->tabentry = (void *) useent;
1126 }
1127
1128
1129 /* ----------
1130  * pgstat_count_xact_commit() -
1131  *
1132  *      Called from access/transam/xact.c to count transaction commits.
1133  * ----------
1134  */
1135 void
1136 pgstat_count_xact_commit(void)
1137 {
1138         if (!(pgstat_collect_querystring ||
1139                   pgstat_collect_tuplelevel ||
1140                   pgstat_collect_blocklevel))
1141                 return;
1142
1143         pgStatXactCommit++;
1144
1145         /*
1146          * If there was no relation activity yet, just make one existing
1147          * message buffer used without slots, causing the next report to tell
1148          * new xact-counters.
1149          */
1150         if (pgStatTabstatAlloc == 0)
1151         {
1152                 if (!more_tabstat_space())
1153                         return;
1154         }
1155         if (pgStatTabstatUsed == 0)
1156         {
1157                 pgStatTabstatUsed++;
1158                 pgStatTabstatMessages[0]->m_nentries = 0;
1159         }
1160 }
1161
1162
1163 /* ----------
1164  * pgstat_count_xact_rollback() -
1165  *
1166  *      Called from access/transam/xact.c to count transaction rollbacks.
1167  * ----------
1168  */
1169 void
1170 pgstat_count_xact_rollback(void)
1171 {
1172         if (!(pgstat_collect_querystring ||
1173                   pgstat_collect_tuplelevel ||
1174                   pgstat_collect_blocklevel))
1175                 return;
1176
1177         pgStatXactRollback++;
1178
1179         /*
1180          * If there was no relation activity yet, just make one existing
1181          * message buffer used without slots, causing the next report to tell
1182          * new xact-counters.
1183          */
1184         if (pgStatTabstatAlloc == 0)
1185         {
1186                 if (!more_tabstat_space())
1187                         return;
1188         }
1189         if (pgStatTabstatUsed == 0)
1190         {
1191                 pgStatTabstatUsed++;
1192                 pgStatTabstatMessages[0]->m_nentries = 0;
1193         }
1194 }
1195
1196
1197 /* ----------
1198  * pgstat_fetch_stat_dbentry() -
1199  *
1200  *      Support function for the SQL-callable pgstat* functions. Returns
1201  *      the collected statistics for one database or NULL. NULL doesn't mean
1202  *      that the database doesn't exist, it is just not yet known by the
1203  *      collector, so the caller is better off to report ZERO instead.
1204  * ----------
1205  */
1206 PgStat_StatDBEntry *
1207 pgstat_fetch_stat_dbentry(Oid dbid)
1208 {
1209         PgStat_StatDBEntry *dbentry;
1210
1211         /*
1212          * If not done for this transaction, read the statistics collector
1213          * stats file into some hash tables. Be careful with the
1214          * read_statsfile() call below!
1215          */
1216         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1217         {
1218                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1219                                                           &pgStatBeTable, &pgStatNumBackends);
1220                 pgStatDBHashXact = GetCurrentTransactionId();
1221         }
1222
1223         /*
1224          * Lookup the requested database
1225          */
1226         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1227                                                                                                  (void *) &dbid,
1228                                                                                                  HASH_FIND, NULL);
1229         if (dbentry == NULL)
1230                 return NULL;
1231
1232         return dbentry;
1233 }
1234
1235
1236 /* ----------
1237  * pgstat_fetch_stat_tabentry() -
1238  *
1239  *      Support function for the SQL-callable pgstat* functions. Returns
1240  *      the collected statistics for one table or NULL. NULL doesn't mean
1241  *      that the table doesn't exist, it is just not yet known by the
1242  *      collector, so the caller is better off to report ZERO instead.
1243  * ----------
1244  */
1245 PgStat_StatTabEntry *
1246 pgstat_fetch_stat_tabentry(Oid relid)
1247 {
1248         PgStat_StatDBEntry *dbentry;
1249         PgStat_StatTabEntry *tabentry;
1250
1251         /*
1252          * If not done for this transaction, read the statistics collector
1253          * stats file into some hash tables. Be careful with the
1254          * read_statsfile() call below!
1255          */
1256         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1257         {
1258                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1259                                                           &pgStatBeTable, &pgStatNumBackends);
1260                 pgStatDBHashXact = GetCurrentTransactionId();
1261         }
1262
1263         /*
1264          * Lookup our database.
1265          */
1266         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1267                                                                                                  (void *) &MyDatabaseId,
1268                                                                                                  HASH_FIND, NULL);
1269         if (dbentry == NULL)
1270                 return NULL;
1271
1272         /*
1273          * Now inside the DB's table hash table lookup the requested one.
1274          */
1275         if (dbentry->tables == NULL)
1276                 return NULL;
1277         tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1278                                                                                                    (void *) &relid,
1279                                                                                                    HASH_FIND, NULL);
1280         if (tabentry == NULL)
1281                 return NULL;
1282
1283         return tabentry;
1284 }
1285
1286
1287 /* ----------
1288  * pgstat_fetch_stat_beentry() -
1289  *
1290  *      Support function for the SQL-callable pgstat* functions. Returns
1291  *      the actual activity slot of one active backend. The caller is
1292  *      responsible for a check if the actual user is permitted to see
1293  *      that info (especially the querystring).
1294  * ----------
1295  */
1296 PgStat_StatBeEntry *
1297 pgstat_fetch_stat_beentry(int beid)
1298 {
1299         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1300         {
1301                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1302                                                           &pgStatBeTable, &pgStatNumBackends);
1303                 pgStatDBHashXact = GetCurrentTransactionId();
1304         }
1305
1306         if (beid < 1 || beid > pgStatNumBackends)
1307                 return NULL;
1308
1309         return &pgStatBeTable[beid - 1];
1310 }
1311
1312
1313 /* ----------
1314  * pgstat_fetch_stat_numbackends() -
1315  *
1316  *      Support function for the SQL-callable pgstat* functions. Returns
1317  *      the maximum current backend id.
1318  * ----------
1319  */
1320 int
1321 pgstat_fetch_stat_numbackends(void)
1322 {
1323         if (!TransactionIdEquals(pgStatDBHashXact, GetCurrentTransactionId()))
1324         {
1325                 pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
1326                                                           &pgStatBeTable, &pgStatNumBackends);
1327                 pgStatDBHashXact = GetCurrentTransactionId();
1328         }
1329
1330         return pgStatNumBackends;
1331 }
1332
1333
1334
1335 /* ------------------------------------------------------------
1336  * Local support functions follow
1337  * ------------------------------------------------------------
1338  */
1339
1340
1341 /* ----------
1342  * pgstat_setheader() -
1343  *
1344  *              Set common header fields in a statistics message
1345  * ----------
1346  */
1347 static void
1348 pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
1349 {
1350         hdr->m_type = mtype;
1351         hdr->m_backendid = MyBackendId;
1352         hdr->m_procpid = MyProcPid;
1353         hdr->m_databaseid = MyDatabaseId;
1354         hdr->m_userid = GetSessionUserId();
1355 }
1356
1357
1358 /* ----------
1359  * pgstat_send() -
1360  *
1361  *              Send out one statistics message to the collector
1362  * ----------
1363  */
1364 static void
1365 pgstat_send(void *msg, int len)
1366 {
1367         if (pgStatSock < 0)
1368                 return;
1369
1370         ((PgStat_MsgHdr *) msg)->m_size = len;
1371
1372         send(pgStatSock, msg, len, 0);
1373         /* We deliberately ignore any error from send() */
1374 }
1375
1376
1377 /* ----------
1378  * PgstatBufferMain() -
1379  *
1380  *      Start up the statistics buffer process.  This is the body of the
1381  *      postmaster child process.
1382  *
1383  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1384  * ----------
1385  */
1386 NON_EXEC_STATIC void
1387 PgstatBufferMain(int argc, char *argv[])
1388 {
1389         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1390
1391         MyProcPid = getpid();           /* reset MyProcPid */
1392
1393         /* Lose the postmaster's on-exit routines */
1394         on_exit_reset();
1395
1396         /*
1397          * Ignore all signals usually bound to some action in the postmaster,
1398          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1399          */
1400         pqsignal(SIGHUP, SIG_IGN);
1401         pqsignal(SIGINT, SIG_IGN);
1402         pqsignal(SIGTERM, SIG_IGN);
1403         pqsignal(SIGQUIT, pgstat_exit);
1404         pqsignal(SIGALRM, SIG_IGN);
1405         pqsignal(SIGPIPE, SIG_IGN);
1406         pqsignal(SIGUSR1, SIG_IGN);
1407         pqsignal(SIGUSR2, SIG_IGN);
1408         pqsignal(SIGCHLD, pgstat_die);
1409         pqsignal(SIGTTIN, SIG_DFL);
1410         pqsignal(SIGTTOU, SIG_DFL);
1411         pqsignal(SIGCONT, SIG_DFL);
1412         pqsignal(SIGWINCH, SIG_DFL);
1413         /* unblock will happen in pgstat_recvbuffer */
1414
1415 #ifdef EXEC_BACKEND
1416         pgstat_parseArgs(argc,argv);
1417 #endif
1418
1419         /*
1420          * Start a buffering process to read from the socket, so we have a
1421          * little more time to process incoming messages.
1422          *
1423          * NOTE: the process structure is: postmaster is parent of buffer process
1424          * is parent of collector process.      This way, the buffer can detect
1425          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1426          * collector failure until it tried to write on the pipe.  That would
1427          * mean that after the postmaster started a new collector, we'd have
1428          * two buffer processes competing to read from the UDP socket --- not
1429          * good.
1430          */
1431         if (pgpipe(pgStatPipe) < 0)
1432         {
1433                 ereport(LOG,
1434                                 (errcode_for_socket_access(),
1435                          errmsg("could not create pipe for statistics buffer: %m")));
1436                 exit(1);
1437         }
1438
1439 #ifdef EXEC_BACKEND
1440         /* child becomes collector process */
1441         switch (pgstat_forkexec(STAT_PROC_COLLECTOR))
1442 #else
1443         switch (fork())
1444 #endif
1445         {
1446                 case -1:
1447                         ereport(LOG,
1448                                         (errmsg("could not fork statistics collector: %m")));
1449                         exit(1);
1450
1451 #ifndef EXEC_BACKEND
1452                 case 0:
1453                         /* child becomes collector process */
1454                         PgstatCollectorMain(0, NULL);
1455                         break;
1456 #endif
1457
1458                 default:
1459                         /* parent becomes buffer process */
1460                         closesocket(pgStatPipe[0]);
1461                         pgstat_recvbuffer();
1462         }
1463         exit(0);
1464 }
1465
1466
1467 /* ----------
1468  * PgstatCollectorMain() -
1469  *
1470  *      Start up the statistics collector itself.  This is the body of the
1471  *      postmaster grandchild process.
1472  *
1473  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1474  * ----------
1475  */
1476 NON_EXEC_STATIC void
1477 PgstatCollectorMain(int argc, char *argv[])
1478 {
1479         PgStat_Msg      msg;
1480         fd_set          rfds;
1481         int                     readPipe;
1482         int                     nready;
1483         int                     len = 0;
1484         struct timeval timeout;
1485         struct timeval next_statwrite;
1486         bool            need_statwrite;
1487         HASHCTL         hash_ctl;
1488
1489         MyProcPid = getpid();           /* reset MyProcPid */
1490
1491         /*
1492          * Reset signal handling.  With the exception of restoring default
1493          * SIGCHLD and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND
1494          * case because we'll have inherited these settings from the buffer
1495          * process; but it's not a no-op for EXEC_BACKEND.
1496          */
1497         pqsignal(SIGHUP, SIG_IGN);
1498         pqsignal(SIGINT, SIG_IGN);
1499         pqsignal(SIGTERM, SIG_IGN);
1500         pqsignal(SIGQUIT, SIG_IGN);
1501         pqsignal(SIGALRM, SIG_IGN);
1502         pqsignal(SIGPIPE, SIG_IGN);
1503         pqsignal(SIGUSR1, SIG_IGN);
1504         pqsignal(SIGUSR2, SIG_IGN);
1505         pqsignal(SIGCHLD, SIG_DFL);
1506         pqsignal(SIGTTIN, SIG_DFL);
1507         pqsignal(SIGTTOU, SIG_DFL);
1508         pqsignal(SIGCONT, SIG_DFL);
1509         pqsignal(SIGWINCH, SIG_DFL);
1510         PG_SETMASK(&UnBlockSig);
1511
1512 #ifdef EXEC_BACKEND
1513         pgstat_parseArgs(argc,argv);
1514 #endif
1515
1516         /* Close unwanted files */
1517         closesocket(pgStatPipe[1]);
1518         closesocket(pgStatSock);
1519
1520         /*
1521          * Identify myself via ps
1522          */
1523         init_ps_display("stats collector process", "", "");
1524         set_ps_display("");
1525
1526         /*
1527          * Initialize filenames needed for status reports.
1528          */
1529         snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
1530         /* tmpfname need only be set correctly in this process */
1531         snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1532                          DataDir, getpid());
1533
1534         /*
1535          * Arrange to write the initial status file right away
1536          */
1537         gettimeofday(&next_statwrite, NULL);
1538         need_statwrite = TRUE;
1539
1540         /*
1541          * Read in an existing statistics stats file or initialize the stats
1542          * to zero.
1543          */
1544         pgStatRunningInCollector = TRUE;
1545         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1546
1547         /*
1548          * Create the dead backend hashtable
1549          */
1550         memset(&hash_ctl, 0, sizeof(hash_ctl));
1551         hash_ctl.keysize = sizeof(int);
1552         hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1553         hash_ctl.hash = tag_hash;
1554         pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
1555                                                            &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1556         if (pgStatBeDead == NULL)
1557         {
1558                 /* assume the problem is out-of-memory */
1559                 ereport(LOG,
1560                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1561                                  errmsg("out of memory in statistics collector --- abort")));
1562                 exit(1);
1563         }
1564
1565         /*
1566          * Create the known backends table
1567          */
1568         pgStatBeTable = (PgStat_StatBeEntry *) malloc(
1569                                                            sizeof(PgStat_StatBeEntry) * MaxBackends);
1570         if (pgStatBeTable == NULL)
1571         {
1572                 ereport(LOG,
1573                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1574                                  errmsg("out of memory in statistics collector --- abort")));
1575                 exit(1);
1576         }
1577         memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
1578
1579         readPipe = pgStatPipe[0];
1580
1581         /*
1582          * Process incoming messages and handle all the reporting stuff until
1583          * there are no more messages.
1584          */
1585         for (;;)
1586         {
1587                 /*
1588                  * If we need to write the status file again (there have been
1589                  * changes in the statistics since we wrote it last) calculate the
1590                  * timeout until we have to do so.
1591                  */
1592                 if (need_statwrite)
1593                 {
1594                         struct timeval now;
1595
1596                         gettimeofday(&now, NULL);
1597                         /* avoid assuming that tv_sec is signed */
1598                         if (now.tv_sec > next_statwrite.tv_sec ||
1599                                 (now.tv_sec == next_statwrite.tv_sec &&
1600                                  now.tv_usec >= next_statwrite.tv_usec))
1601                         {
1602                                 timeout.tv_sec = 0;
1603                                 timeout.tv_usec = 0;
1604                         }
1605                         else
1606                         {
1607                                 timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
1608                                 timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
1609                                 if (timeout.tv_usec < 0)
1610                                 {
1611                                         timeout.tv_sec--;
1612                                         timeout.tv_usec += 1000000;
1613                                 }
1614                         }
1615                 }
1616
1617                 /*
1618                  * Setup the descriptor set for select(2)
1619                  */
1620                 FD_ZERO(&rfds);
1621                 FD_SET(readPipe, &rfds);
1622
1623                 /*
1624                  * Now wait for something to do.
1625                  */
1626                 nready = select(readPipe+1, &rfds, NULL, NULL,
1627                                                 (need_statwrite) ? &timeout : NULL);
1628                 if (nready < 0)
1629                 {
1630                         if (errno == EINTR)
1631                                 continue;
1632                         ereport(LOG,
1633                                         (errcode_for_socket_access(),
1634                                    errmsg("select() failed in statistics collector: %m")));
1635                         exit(1);
1636                 }
1637
1638                 /*
1639                  * If there are no descriptors ready, our timeout for writing the
1640                  * stats file happened.
1641                  */
1642                 if (nready == 0)
1643                 {
1644                         pgstat_write_statsfile();
1645                         need_statwrite = FALSE;
1646
1647                         continue;
1648                 }
1649
1650                 /*
1651                  * Check if there is a new statistics message to collect.
1652                  */
1653                 if (FD_ISSET(readPipe, &rfds))
1654                 {
1655                         /*
1656                          * We may need to issue multiple read calls in case the buffer
1657                          * process didn't write the message in a single write, which
1658                          * is possible since it dumps its buffer bytewise. In any
1659                          * case, we'd need two reads since we don't know the message
1660                          * length initially.
1661                          */
1662                         int                     nread = 0;
1663                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1664                         bool            pipeEOF = false;
1665
1666                         while (nread < targetlen)
1667                         {
1668                                 len = piperead(readPipe, ((char *) &msg) + nread,
1669                                                                 targetlen - nread);
1670                                 if (len < 0)
1671                                 {
1672                                         if (errno == EINTR)
1673                                                 continue;
1674                                         ereport(LOG,
1675                                                         (errcode_for_socket_access(),
1676                                                          errmsg("could not read from statistics collector pipe: %m")));
1677                                         exit(1);
1678                                 }
1679                                 if (len == 0)   /* EOF on the pipe! */
1680                                 {
1681                                         pipeEOF = true;
1682                                         break;
1683                                 }
1684                                 nread += len;
1685                                 if (nread == sizeof(PgStat_MsgHdr))
1686                                 {
1687                                         /* we have the header, compute actual msg length */
1688                                         targetlen = msg.msg_hdr.m_size;
1689                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1690                                                 targetlen > (int) sizeof(msg))
1691                                         {
1692                                                 /*
1693                                                  * Bogus message length implies that we got out of
1694                                                  * sync with the buffer process somehow. Abort so
1695                                                  * that we can restart both processes.
1696                                                  */
1697                                                 ereport(LOG,
1698                                                   (errmsg("invalid statistics message length")));
1699                                                 exit(1);
1700                                         }
1701                                 }
1702                         }
1703
1704                         /*
1705                          * EOF on the pipe implies that the buffer process exited.
1706                          * Fall out of outer loop.
1707                          */
1708                         if (pipeEOF)
1709                                 break;
1710
1711                         /*
1712                          * Distribute the message to the specific function handling
1713                          * it.
1714                          */
1715                         switch (msg.msg_hdr.m_type)
1716                         {
1717                                 case PGSTAT_MTYPE_DUMMY:
1718                                         break;
1719
1720                                 case PGSTAT_MTYPE_BESTART:
1721                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1722                                         break;
1723
1724                                 case PGSTAT_MTYPE_BETERM:
1725                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1726                                         break;
1727
1728                                 case PGSTAT_MTYPE_TABSTAT:
1729                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1730                                         break;
1731
1732                                 case PGSTAT_MTYPE_TABPURGE:
1733                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1734                                         break;
1735
1736                                 case PGSTAT_MTYPE_ACTIVITY:
1737                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1738                                         break;
1739
1740                                 case PGSTAT_MTYPE_DROPDB:
1741                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1742                                         break;
1743
1744                                 case PGSTAT_MTYPE_RESETCOUNTER:
1745                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1746                                                                                          nread);
1747                                         break;
1748
1749                                 default:
1750                                         break;
1751                         }
1752
1753                         /*
1754                          * Globally count messages.
1755                          */
1756                         pgStatNumMessages++;
1757
1758                         /*
1759                          * If this is the first message after we wrote the stats file
1760                          * the last time, setup the timeout that it'd be written.
1761                          */
1762                         if (!need_statwrite)
1763                         {
1764                                 gettimeofday(&next_statwrite, NULL);
1765                                 next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1766                                 next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1767                                 next_statwrite.tv_usec %= 1000000;
1768                                 need_statwrite = TRUE;
1769                         }
1770                 }
1771
1772                 /*
1773                  * Note that we do NOT check for postmaster exit inside the loop;
1774                  * only EOF on the buffer pipe causes us to fall out.  This
1775                  * ensures we don't exit prematurely if there are still a few
1776                  * messages in the buffer or pipe at postmaster shutdown.
1777                  */
1778         }
1779
1780         /*
1781          * Okay, we saw EOF on the buffer pipe, so there are no more messages
1782          * to process.  If the buffer process quit because of postmaster
1783          * shutdown, we want to save the final stats to reuse at next startup.
1784          * But if the buffer process failed, it seems best not to (there may
1785          * even now be a new collector firing up, and we don't want it to read
1786          * a partially-rewritten stats file).
1787          */
1788         if (!PostmasterIsAlive(false))
1789                 pgstat_write_statsfile();
1790 }
1791
1792
1793 /* ----------
1794  * pgstat_recvbuffer() -
1795  *
1796  *      This is the body of the separate buffering process. Its only
1797  *      purpose is to receive messages from the UDP socket as fast as
1798  *      possible and forward them over a pipe into the collector itself.
1799  *      If the collector is slow to absorb messages, they are buffered here.
1800  * ----------
1801  */
1802 static void
1803 pgstat_recvbuffer(void)
1804 {
1805         fd_set          rfds;
1806         fd_set          wfds;
1807         struct timeval timeout;
1808         int                     writePipe = pgStatPipe[1];
1809         int                     maxfd;
1810         int                     nready;
1811         int                     len;
1812         int                     xfr;
1813         int                     frm;
1814         PgStat_Msg      input_buffer;
1815         char       *msgbuffer;
1816         int                     msg_send = 0;   /* next send index in buffer */
1817         int                     msg_recv = 0;   /* next receive index */
1818         int                     msg_have = 0;   /* number of bytes stored */
1819         bool            overflow = false;
1820
1821         /*
1822          * Identify myself via ps
1823          */
1824         init_ps_display("stats buffer process", "", "");
1825         set_ps_display("");
1826
1827         /*
1828          * We want to die if our child collector process does.  There are two
1829          * ways we might notice that it has died: receive SIGCHLD, or get a
1830          * write failure on the pipe leading to the child.      We can set SIGPIPE
1831          * to kill us here.  Our SIGCHLD handler was already set up before we
1832          * forked (must do it that way, else it's a race condition).
1833          */
1834         pqsignal(SIGPIPE, SIG_DFL);
1835         PG_SETMASK(&UnBlockSig);
1836
1837         /*
1838          * Set the write pipe to nonblock mode, so that we cannot block when
1839          * the collector falls behind.
1840          */
1841         if (!set_noblock(writePipe))
1842         {
1843                 ereport(LOG,
1844                                 (errcode_for_socket_access(),
1845                   errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1846                 exit(1);
1847         }
1848
1849         /*
1850          * Allocate the message buffer
1851          */
1852         msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
1853         if (msgbuffer == NULL)
1854         {
1855                 ereport(LOG,
1856                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1857                          errmsg("out of memory in statistics collector --- abort")));
1858                 exit(1);
1859         }
1860
1861         /*
1862          * Loop forever
1863          */
1864         for (;;)
1865         {
1866                 FD_ZERO(&rfds);
1867                 FD_ZERO(&wfds);
1868                 maxfd = -1;
1869
1870                 /*
1871                  * As long as we have buffer space we add the socket to the read
1872                  * descriptor set.
1873                  */
1874                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1875                 {
1876                         FD_SET(pgStatSock, &rfds);
1877                         maxfd = pgStatSock;
1878                         overflow = false;
1879                 }
1880                 else
1881                 {
1882                         if (!overflow)
1883                         {
1884                                 ereport(LOG,
1885                                                 (errmsg("statistics buffer is full")));
1886                                 overflow = true;
1887                         }
1888                 }
1889
1890                 /*
1891                  * If we have messages to write out, we add the pipe to the write
1892                  * descriptor set.
1893                  */
1894                 if (msg_have > 0)
1895                 {
1896                         FD_SET(writePipe, &wfds);
1897                         if (writePipe > maxfd)
1898                                 maxfd = writePipe;
1899                 }
1900
1901                 /*
1902                  * Wait for some work to do; but not for more than 10 seconds.
1903                  * (This determines how quickly we will shut down after an
1904                  * ungraceful postmaster termination; so it needn't be very fast.)
1905                  */
1906                 timeout.tv_sec = 10;
1907                 timeout.tv_usec = 0;
1908
1909                 nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1910                 if (nready < 0)
1911                 {
1912                         if (errno == EINTR)
1913                                 continue;
1914                         ereport(LOG,
1915                                         (errcode_for_socket_access(),
1916                                          errmsg("select() failed in statistics buffer: %m")));
1917                         exit(1);
1918                 }
1919
1920                 /*
1921                  * If there is a message on the socket, read it and check for
1922                  * validity.
1923                  */
1924                 if (FD_ISSET(pgStatSock, &rfds))
1925                 {
1926                         len = recv(pgStatSock, (char *) &input_buffer,
1927                                            sizeof(PgStat_Msg), 0);
1928                         if (len < 0)
1929                         {
1930                                 ereport(LOG,
1931                                                 (errcode_for_socket_access(),
1932                                            errmsg("could not read statistics message: %m")));
1933                                 exit(1);
1934                         }
1935
1936                         /*
1937                          * We ignore messages that are smaller than our common header
1938                          */
1939                         if (len < sizeof(PgStat_MsgHdr))
1940                                 continue;
1941
1942                         /*
1943                          * The received length must match the length in the header
1944                          */
1945                         if (input_buffer.msg_hdr.m_size != len)
1946                                 continue;
1947
1948                         /*
1949                          * O.K. - we accept this message.  Copy it to the circular
1950                          * msgbuffer.
1951                          */
1952                         frm = 0;
1953                         while (len > 0)
1954                         {
1955                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1956                                 if (xfr > len)
1957                                         xfr = len;
1958                                 Assert(xfr > 0);
1959                                 memcpy(msgbuffer + msg_recv,
1960                                            ((char *) &input_buffer) + frm,
1961                                            xfr);
1962                                 msg_recv += xfr;
1963                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1964                                         msg_recv = 0;
1965                                 msg_have += xfr;
1966                                 frm += xfr;
1967                                 len -= xfr;
1968                         }
1969                 }
1970
1971                 /*
1972                  * If the collector is ready to receive, write some data into his
1973                  * pipe.  We may or may not be able to write all that we have.
1974                  *
1975                  * NOTE: if what we have is less than PIPE_BUF bytes but more than
1976                  * the space available in the pipe buffer, most kernels will
1977                  * refuse to write any of it, and will return EAGAIN.  This means
1978                  * we will busy-loop until the situation changes (either because
1979                  * the collector caught up, or because more data arrives so that
1980                  * we have more than PIPE_BUF bytes buffered).  This is not good,
1981                  * but is there any way around it?      We have no way to tell when
1982                  * the collector has caught up...
1983                  */
1984                 if (FD_ISSET(writePipe, &wfds))
1985                 {
1986                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1987                         if (xfr > msg_have)
1988                                 xfr = msg_have;
1989                         Assert(xfr > 0);
1990                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1991                         if (len < 0)
1992                         {
1993                                 if (errno == EINTR || errno == EAGAIN)
1994                                         continue;       /* not enough space in pipe */
1995                                 ereport(LOG,
1996                                                 (errcode_for_socket_access(),
1997                                                  errmsg("could not write to statistics collector pipe: %m")));
1998                                 exit(1);
1999                         }
2000                         /* NB: len < xfr is okay */
2001                         msg_send += len;
2002                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2003                                 msg_send = 0;
2004                         msg_have -= len;
2005                 }
2006
2007                 /*
2008                  * Make sure we forwarded all messages before we check for
2009                  * postmaster termination.
2010                  */
2011                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2012                         continue;
2013
2014                 /*
2015                  * If the postmaster has terminated, we die too.  (This is no longer
2016                  * the normal exit path, however.)
2017                  */
2018                 if (!PostmasterIsAlive(true))
2019                         exit(0);
2020         }
2021 }
2022
2023 /* SIGQUIT signal handler for buffer process */
2024 static void
2025 pgstat_exit(SIGNAL_ARGS)
2026 {
2027         /*
2028          * For now, we just nail the doors shut and get out of town.  It might
2029          * be cleaner to allow any pending messages to be sent, but that creates
2030          * a tradeoff against speed of exit.
2031          */
2032         exit(0);
2033 }
2034
2035 /* SIGCHLD signal handler for buffer process */
2036 static void
2037 pgstat_die(SIGNAL_ARGS)
2038 {
2039         exit(1);
2040 }
2041
2042
2043 /* ----------
2044  * pgstat_add_backend() -
2045  *
2046  *      Support function to keep our backend list up to date.
2047  * ----------
2048  */
2049 static int
2050 pgstat_add_backend(PgStat_MsgHdr *msg)
2051 {
2052         PgStat_StatDBEntry *dbentry;
2053         PgStat_StatBeEntry *beentry;
2054         PgStat_StatBeDead *deadbe;
2055         bool            found;
2056
2057         /*
2058          * Check that the backend ID is valid
2059          */
2060         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2061         {
2062                 ereport(LOG,
2063                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2064                 return -1;
2065         }
2066
2067         /*
2068          * Get the slot for this backendid.
2069          */
2070         beentry = &pgStatBeTable[msg->m_backendid - 1];
2071         if (beentry->databaseid != InvalidOid)
2072         {
2073                 /*
2074                  * If the slot contains the PID of this backend, everything is
2075                  * fine and we got nothing to do.
2076                  */
2077                 if (beentry->procpid == msg->m_procpid)
2078                         return 0;
2079         }
2080
2081         /*
2082          * Lookup if this backend is known to be dead. This can be caused due
2083          * to messages arriving in the wrong order - i.e. Postmaster's BETERM
2084          * message might have arrived before we received all the backends
2085          * stats messages, or even a new backend with the same backendid was
2086          * faster in sending his BESTART.
2087          *
2088          * If the backend is known to be dead, we ignore this add.
2089          */
2090         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2091                                                                                            (void *) &(msg->m_procpid),
2092                                                                                            HASH_FIND, NULL);
2093         if (deadbe)
2094                 return 1;
2095
2096         /*
2097          * Backend isn't known to be dead. If it's slot is currently used, we
2098          * have to kick out the old backend.
2099          */
2100         if (beentry->databaseid != InvalidOid)
2101                 pgstat_sub_backend(beentry->procpid);
2102
2103         /*
2104          * Put this new backend into the slot.
2105          */
2106         beentry->databaseid = msg->m_databaseid;
2107         beentry->procpid = msg->m_procpid;
2108         beentry->userid = msg->m_userid;
2109         beentry->activity_start_sec = 0;
2110         beentry->activity_start_usec = 0;
2111         MemSet(beentry->activity, 0, PGSTAT_ACTIVITY_SIZE);
2112
2113         /*
2114          * Lookup or create the database entry for this backends DB.
2115          */
2116         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2117                                                                                    (void *) &(msg->m_databaseid),
2118                                                                                                  HASH_ENTER, &found);
2119         if (dbentry == NULL)
2120         {
2121                 ereport(LOG,
2122                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2123                          errmsg("out of memory in statistics collector --- abort")));
2124                 exit(1);
2125         }
2126
2127         /*
2128          * If not found, initialize the new one.
2129          */
2130         if (!found)
2131         {
2132                 HASHCTL         hash_ctl;
2133
2134                 dbentry->tables = NULL;
2135                 dbentry->n_xact_commit = 0;
2136                 dbentry->n_xact_rollback = 0;
2137                 dbentry->n_blocks_fetched = 0;
2138                 dbentry->n_blocks_hit = 0;
2139                 dbentry->n_connects = 0;
2140                 dbentry->destroy = 0;
2141
2142                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2143                 hash_ctl.keysize = sizeof(Oid);
2144                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2145                 hash_ctl.hash = tag_hash;
2146                 dbentry->tables = hash_create("Per-database table",
2147                                                                           PGSTAT_TAB_HASH_SIZE,
2148                                                                           &hash_ctl,
2149                                                                           HASH_ELEM | HASH_FUNCTION);
2150                 if (dbentry->tables == NULL)
2151                 {
2152                         /* assume the problem is out-of-memory */
2153                         ereport(LOG,
2154                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2155                          errmsg("out of memory in statistics collector --- abort")));
2156                         exit(1);
2157                 }
2158         }
2159
2160         /*
2161          * Count number of connects to the database
2162          */
2163         dbentry->n_connects++;
2164
2165         return 0;
2166 }
2167
2168
2169 /* ----------
2170  * pgstat_sub_backend() -
2171  *
2172  *      Remove a backend from the actual backends list.
2173  * ----------
2174  */
2175 static void
2176 pgstat_sub_backend(int procpid)
2177 {
2178         int                     i;
2179         PgStat_StatBeDead *deadbe;
2180         bool            found;
2181
2182         /*
2183          * Search in the known-backends table for the slot containing this
2184          * PID.
2185          */
2186         for (i = 0; i < MaxBackends; i++)
2187         {
2188                 if (pgStatBeTable[i].databaseid != InvalidOid &&
2189                         pgStatBeTable[i].procpid == procpid)
2190                 {
2191                         /*
2192                          * That's him. Add an entry to the known to be dead backends.
2193                          * Due to possible misorder in the arrival of UDP packets it's
2194                          * possible that even if we know the backend is dead, there
2195                          * could still be messages queued that arrive later. Those
2196                          * messages must not cause our number of backends statistics
2197                          * to get screwed up, so we remember for a couple of seconds
2198                          * that this PID is dead and ignore them (only the counting of
2199                          * backends, not the table access stats they sent).
2200                          */
2201                         deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
2202                                                                                                            (void *) &procpid,
2203                                                                                                            HASH_ENTER,
2204                                                                                                            &found);
2205                         if (deadbe == NULL)
2206                         {
2207                                 ereport(LOG,
2208                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2209                                                  errmsg("out of memory in statistics collector --- abort")));
2210                                 exit(1);
2211                         }
2212                         if (!found)
2213                         {
2214                                 deadbe->backendid = i + 1;
2215                                 deadbe->destroy = PGSTAT_DESTROY_COUNT;
2216                         }
2217
2218                         /*
2219                          * Declare the backend slot empty.
2220                          */
2221                         pgStatBeTable[i].databaseid = InvalidOid;
2222                         return;
2223                 }
2224         }
2225
2226         /*
2227          * No big problem if not found. This can happen if UDP messages arrive
2228          * out of order here.
2229          */
2230 }
2231
2232
2233 /* ----------
2234  * pgstat_write_statsfile() -
2235  *
2236  *      Tell the news.
2237  * ----------
2238  */
2239 static void
2240 pgstat_write_statsfile(void)
2241 {
2242         HASH_SEQ_STATUS hstat;
2243         HASH_SEQ_STATUS tstat;
2244         PgStat_StatDBEntry *dbentry;
2245         PgStat_StatTabEntry *tabentry;
2246         PgStat_StatBeDead *deadbe;
2247         FILE       *fpout;
2248         int                     i;
2249
2250         /*
2251          * Open the statistics temp file to write out the current values.
2252          */
2253         fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2254         if (fpout == NULL)
2255         {
2256                 ereport(LOG,
2257                                 (errcode_for_file_access(),
2258                                  errmsg("could not open temporary statistics file \"%s\": %m",
2259                                                 pgStat_tmpfname)));
2260                 return;
2261         }
2262
2263         /*
2264          * Walk through the database table.
2265          */
2266         hash_seq_init(&hstat, pgStatDBHash);
2267         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2268         {
2269                 /*
2270                  * If this database is marked destroyed, count down and do so if
2271                  * it reaches 0.
2272                  */
2273                 if (dbentry->destroy > 0)
2274                 {
2275                         if (--(dbentry->destroy) == 0)
2276                         {
2277                                 if (dbentry->tables != NULL)
2278                                         hash_destroy(dbentry->tables);
2279
2280                                 if (hash_search(pgStatDBHash,
2281                                                                 (void *) &(dbentry->databaseid),
2282                                                                 HASH_REMOVE, NULL) == NULL)
2283                                 {
2284                                         ereport(LOG,
2285                                                         (errmsg("database hash table corrupted "
2286                                                                         "during cleanup --- abort")));
2287                                         exit(1);
2288                                 }
2289                         }
2290
2291                         /*
2292                          * Don't include statistics for it.
2293                          */
2294                         continue;
2295                 }
2296
2297                 /*
2298                  * Write out the DB line including the number of life backends.
2299                  */
2300                 fputc('D', fpout);
2301                 fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);
2302
2303                 /*
2304                  * Walk through the databases access stats per table.
2305                  */
2306                 hash_seq_init(&tstat, dbentry->tables);
2307                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2308                 {
2309                         /*
2310                          * If table entry marked for destruction, same as above for
2311                          * the database entry.
2312                          */
2313                         if (tabentry->destroy > 0)
2314                         {
2315                                 if (--(tabentry->destroy) == 0)
2316                                 {
2317                                         if (hash_search(dbentry->tables,
2318                                                                         (void *) &(tabentry->tableid),
2319                                                                         HASH_REMOVE, NULL) == NULL)
2320                                         {
2321                                                 ereport(LOG,
2322                                                                 (errmsg("tables hash table for "
2323                                                                                 "database %u corrupted during "
2324                                                                                 "cleanup --- abort",
2325                                                                                 dbentry->databaseid)));
2326                                                 exit(1);
2327                                         }
2328                                 }
2329                                 continue;
2330                         }
2331
2332                         /*
2333                          * At least we think this is still a life table. Print it's
2334                          * access stats.
2335                          */
2336                         fputc('T', fpout);
2337                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2338                 }
2339
2340                 /*
2341                  * Mark the end of this DB
2342                  */
2343                 fputc('d', fpout);
2344         }
2345
2346         /*
2347          * Write out the known running backends to the stats file.
2348          */
2349         i = MaxBackends;
2350         fputc('M', fpout);
2351         fwrite(&i, sizeof(i), 1, fpout);
2352
2353         for (i = 0; i < MaxBackends; i++)
2354         {
2355                 if (pgStatBeTable[i].databaseid != InvalidOid)
2356                 {
2357                         fputc('B', fpout);
2358                         fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
2359                 }
2360         }
2361
2362         /*
2363          * No more output to be done. Close the temp file and replace the old
2364          * pgstat.stat with it.
2365          */
2366         fputc('E', fpout);
2367         if (fclose(fpout) < 0)
2368         {
2369                 ereport(LOG,
2370                                 (errcode_for_file_access(),
2371                                  errmsg("could not close temporary statistics file \"%s\": %m",
2372                                                 pgStat_tmpfname)));
2373         }
2374         else
2375         {
2376                 if (rename(pgStat_tmpfname, pgStat_fname) < 0)
2377                 {
2378                         ereport(LOG,
2379                                         (errcode_for_file_access(),
2380                                          errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2381                                                         pgStat_tmpfname, pgStat_fname)));
2382                 }
2383         }
2384
2385         /*
2386          * Clear out the dead backends table
2387          */
2388         hash_seq_init(&hstat, pgStatBeDead);
2389         while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2390         {
2391                 /*
2392                  * Count down the destroy delay and remove entries where it
2393                  * reaches 0.
2394                  */
2395                 if (--(deadbe->destroy) <= 0)
2396                 {
2397                         if (hash_search(pgStatBeDead,
2398                                                         (void *) &(deadbe->procpid),
2399                                                         HASH_REMOVE, NULL) == NULL)
2400                         {
2401                                 ereport(LOG,
2402                                                 (errmsg("dead-server-process hash table corrupted "
2403                                                                 "during cleanup --- abort")));
2404                                 exit(1);
2405                         }
2406                 }
2407         }
2408 }
2409
2410
2411 /* ----------
2412  * pgstat_read_statsfile() -
2413  *
2414  *      Reads in an existing statistics collector and initializes the
2415  *      databases hash table (who's entries point to the tables hash tables)
2416  *      and the current backend table.
2417  * ----------
2418  */
2419 static void
2420 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2421                                           PgStat_StatBeEntry **betab, int *numbackends)
2422 {
2423         PgStat_StatDBEntry *dbentry;
2424         PgStat_StatDBEntry dbbuf;
2425         PgStat_StatTabEntry *tabentry;
2426         PgStat_StatTabEntry tabbuf;
2427         HASHCTL         hash_ctl;
2428         HTAB       *tabhash = NULL;
2429         FILE       *fpin;
2430         int                     maxbackends = 0;
2431         int                     havebackends = 0;
2432         bool            found;
2433         MemoryContext use_mcxt;
2434         int                     mcxt_flags;
2435
2436         /*
2437          * If running in the collector we use the DynaHashCxt memory context.
2438          * If running in a backend, we use the TopTransactionContext instead,
2439          * so the caller must only know the last XactId when this call
2440          * happened to know if his tables are still valid or already gone!
2441          */
2442         if (pgStatRunningInCollector)
2443         {
2444                 use_mcxt = NULL;
2445                 mcxt_flags = 0;
2446         }
2447         else
2448         {
2449                 use_mcxt = TopTransactionContext;
2450                 mcxt_flags = HASH_CONTEXT;
2451         }
2452
2453         /*
2454          * Create the DB hashtable
2455          */
2456         memset(&hash_ctl, 0, sizeof(hash_ctl));
2457         hash_ctl.keysize = sizeof(Oid);
2458         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2459         hash_ctl.hash = tag_hash;
2460         hash_ctl.hcxt = use_mcxt;
2461         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2462                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2463         if (*dbhash == NULL)
2464         {
2465                 /* assume the problem is out-of-memory */
2466                 if (pgStatRunningInCollector)
2467                 {
2468                         ereport(LOG,
2469                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2470                          errmsg("out of memory in statistics collector --- abort")));
2471                         exit(1);
2472                 }
2473                 /* in backend, can do normal error */
2474                 ereport(ERROR,
2475                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2476                                  errmsg("out of memory")));
2477         }
2478
2479         /*
2480          * Initialize the number of known backends to zero, just in case we do
2481          * a silent error return below.
2482          */
2483         if (numbackends != NULL)
2484                 *numbackends = 0;
2485         if (betab != NULL)
2486                 *betab = NULL;
2487
2488         /*
2489          * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
2490          * postmaster, so compute it first time through.
2491          */
2492 #ifdef EXEC_BACKEND
2493         if (pgStat_fname[0] == '\0')
2494         {
2495                 Assert(DataDir != NULL);
2496                 snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
2497         }
2498 #endif
2499
2500         /*
2501          * Try to open the status file. If it doesn't exist, the backends
2502          * simply return zero for anything and the collector simply starts
2503          * from scratch with empty counters.
2504          */
2505         if ((fpin = fopen(pgStat_fname, PG_BINARY_R)) == NULL)
2506                 return;
2507
2508         /*
2509          * We found an existing collector stats file. Read it and put all the
2510          * hashtable entries into place.
2511          */
2512         for (;;)
2513         {
2514                 switch (fgetc(fpin))
2515                 {
2516                                 /*
2517                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2518                                  * follows. Subsequently, zero to many 'T' entries will
2519                                  * follow until a 'd' is encountered.
2520                                  */
2521                         case 'D':
2522                                 if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
2523                                 {
2524                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2525                                                         (errmsg("corrupted pgstat.stat file")));
2526                                         fclose(fpin);
2527                                         return;
2528                                 }
2529
2530                                 /*
2531                                  * Add to the DB hash
2532                                  */
2533                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2534                                                                                           (void *) &dbbuf.databaseid,
2535                                                                                                                          HASH_ENTER,
2536                                                                                                                          &found);
2537                                 if (dbentry == NULL)
2538                                 {
2539                                         if (pgStatRunningInCollector)
2540                                         {
2541                                                 ereport(LOG,
2542                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2543                                                                  errmsg("out of memory in statistics collector --- abort")));
2544                                                 exit(1);
2545                                         }
2546                                         else
2547                                         {
2548                                                 fclose(fpin);
2549                                                 ereport(ERROR,
2550                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2551                                                                  errmsg("out of memory")));
2552                                         }
2553                                 }
2554                                 if (found)
2555                                 {
2556                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2557                                                         (errmsg("corrupted pgstat.stat file")));
2558                                         fclose(fpin);
2559                                         return;
2560                                 }
2561
2562                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2563                                 dbentry->tables = NULL;
2564                                 dbentry->destroy = 0;
2565                                 dbentry->n_backends = 0;
2566
2567                                 /*
2568                                  * Don't collect tables if not the requested DB
2569                                  */
2570                                 if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
2571                                         break;
2572
2573                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2574                                 hash_ctl.keysize = sizeof(Oid);
2575                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2576                                 hash_ctl.hash = tag_hash;
2577                                 hash_ctl.hcxt = use_mcxt;
2578                                 dbentry->tables = hash_create("Per-database table",
2579                                                                                           PGSTAT_TAB_HASH_SIZE,
2580                                                                                           &hash_ctl,
2581                                                                  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2582                                 if (dbentry->tables == NULL)
2583                                 {
2584                                         /* assume the problem is out-of-memory */
2585                                         if (pgStatRunningInCollector)
2586                                         {
2587                                                 ereport(LOG,
2588                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2589                                                                  errmsg("out of memory in statistics collector --- abort")));
2590                                                 exit(1);
2591                                         }
2592                                         /* in backend, can do normal error */
2593                                         fclose(fpin);
2594                                         ereport(ERROR,
2595                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2596                                                          errmsg("out of memory")));
2597                                 }
2598
2599                                 /*
2600                                  * Arrange that following 'T's add entries to this
2601                                  * databases tables hash table.
2602                                  */
2603                                 tabhash = dbentry->tables;
2604                                 break;
2605
2606                                 /*
2607                                  * 'd'  End of this database.
2608                                  */
2609                         case 'd':
2610                                 tabhash = NULL;
2611                                 break;
2612
2613                                 /*
2614                                  * 'T'  A PgStat_StatTabEntry follows.
2615                                  */
2616                         case 'T':
2617                                 if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
2618                                 {
2619                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2620                                                         (errmsg("corrupted pgstat.stat file")));
2621                                         fclose(fpin);
2622                                         return;
2623                                 }
2624
2625                                 /*
2626                                  * Skip if table belongs to a not requested database.
2627                                  */
2628                                 if (tabhash == NULL)
2629                                         break;
2630
2631                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2632                                                                                                 (void *) &tabbuf.tableid,
2633                                                                                                          HASH_ENTER, &found);
2634                                 if (tabentry == NULL)
2635                                 {
2636                                         if (pgStatRunningInCollector)
2637                                         {
2638                                                 ereport(LOG,
2639                                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2640                                                                  errmsg("out of memory in statistics collector --- abort")));
2641                                                 exit(1);
2642                                         }
2643                                         /* in backend, can do normal error */
2644                                         fclose(fpin);
2645                                         ereport(ERROR,
2646                                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2647                                                          errmsg("out of memory")));
2648                                 }
2649
2650                                 if (found)
2651                                 {
2652                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2653                                                         (errmsg("corrupted pgstat.stat file")));
2654                                         fclose(fpin);
2655                                         return;
2656                                 }
2657
2658                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2659                                 break;
2660
2661                                 /*
2662                                  * 'M'  The maximum number of backends to expect follows.
2663                                  */
2664                         case 'M':
2665                                 if (betab == NULL || numbackends == NULL)
2666                                 {
2667                                         fclose(fpin);
2668                                         return;
2669                                 }
2670                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2671                                         sizeof(maxbackends))
2672                                 {
2673                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2674                                                         (errmsg("corrupted pgstat.stat file")));
2675                                         fclose(fpin);
2676                                         return;
2677                                 }
2678                                 if (maxbackends == 0)
2679                                 {
2680                                         fclose(fpin);
2681                                         return;
2682                                 }
2683
2684                                 /*
2685                                  * Allocate space (in TopTransactionContext too) for the
2686                                  * backend table.
2687                                  */
2688                                 if (use_mcxt == NULL)
2689                                         *betab = (PgStat_StatBeEntry *) malloc(
2690                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2691                                 else
2692                                         *betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
2693                                                                                                                                 use_mcxt,
2694                                                            sizeof(PgStat_StatBeEntry) * maxbackends);
2695                                 break;
2696
2697                                 /*
2698                                  * 'B'  A PgStat_StatBeEntry follows.
2699                                  */
2700                         case 'B':
2701                                 if (betab == NULL || numbackends == NULL)
2702                                 {
2703                                         fclose(fpin);
2704                                         return;
2705                                 }
2706                                 if (*betab == NULL)
2707                                 {
2708                                         fclose(fpin);
2709                                         return;
2710                                 }
2711
2712                                 /*
2713                                  * Read it directly into the table.
2714                                  */
2715                                 if (fread(&(*betab)[havebackends], 1,
2716                                                   sizeof(PgStat_StatBeEntry), fpin) !=
2717                                         sizeof(PgStat_StatBeEntry))
2718                                 {
2719                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2720                                                         (errmsg("corrupted pgstat.stat file")));
2721                                         fclose(fpin);
2722                                         return;
2723                                 }
2724
2725                                 /*
2726                                  * Count backends per database here.
2727                                  */
2728                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2729                                                    (void *) &((*betab)[havebackends].databaseid),
2730                                                                                                                 HASH_FIND, NULL);
2731                                 if (dbentry)
2732                                         dbentry->n_backends++;
2733
2734                                 havebackends++;
2735                                 if (numbackends != 0)
2736                                         *numbackends = havebackends;
2737                                 if (havebackends >= maxbackends)
2738                                 {
2739                                         fclose(fpin);
2740                                         return;
2741                                 }
2742                                 break;
2743
2744                                 /*
2745                                  * 'E'  The EOF marker of a complete stats file.
2746                                  */
2747                         case 'E':
2748                                 fclose(fpin);
2749                                 return;
2750
2751                         default:
2752                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2753                                                 (errmsg("corrupted pgstat.stat file")));
2754                                 fclose(fpin);
2755                                 return;
2756                 }
2757         }
2758
2759         fclose(fpin);
2760 }
2761
2762
2763 /* ----------
2764  * pgstat_recv_bestart() -
2765  *
2766  *      Process a backend starup message.
2767  * ----------
2768  */
2769 static void
2770 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2771 {
2772         pgstat_add_backend(&msg->m_hdr);
2773 }
2774
2775
2776 /* ----------
2777  * pgstat_recv_beterm() -
2778  *
2779  *      Process a backend termination message.
2780  * ----------
2781  */
2782 static void
2783 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2784 {
2785         pgstat_sub_backend(msg->m_hdr.m_procpid);
2786 }
2787
2788
2789 /* ----------
2790  * pgstat_recv_activity() -
2791  *
2792  *      Remember what the backend is doing.
2793  * ----------
2794  */
2795 static void
2796 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2797 {
2798         PgStat_StatBeEntry *entry;
2799
2800         /*
2801          * Here we check explicitly for 0 return, since we don't want to
2802          * mangle the activity of an active backend by a delayed packet from a
2803          * dead one.
2804          */
2805         if (pgstat_add_backend(&msg->m_hdr) != 0)
2806                 return;
2807
2808         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2809
2810         StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2811
2812         entry->activity_start_sec =
2813                 GetCurrentAbsoluteTimeUsec(&entry->activity_start_usec);
2814 }
2815
2816
2817 /* ----------
2818  * pgstat_recv_tabstat() -
2819  *
2820  *      Count what the backend has done.
2821  * ----------
2822  */
2823 static void
2824 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2825 {
2826         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2827         PgStat_StatDBEntry *dbentry;
2828         PgStat_StatTabEntry *tabentry;
2829         int                     i;
2830         bool            found;
2831
2832         /*
2833          * Make sure the backend is counted for.
2834          */
2835         if (pgstat_add_backend(&msg->m_hdr) < 0)
2836                 return;
2837
2838         /*
2839          * Lookup the database in the hashtable.
2840          */
2841         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2842                                                                          (void *) &(msg->m_hdr.m_databaseid),
2843                                                                                                  HASH_FIND, NULL);
2844         if (!dbentry)
2845                 return;
2846
2847         /*
2848          * If the database is marked for destroy, this is a delayed UDP packet
2849          * and not worth being counted.
2850          */
2851         if (dbentry->destroy > 0)
2852                 return;
2853
2854         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2855         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2856
2857         /*
2858          * Process all table entries in the message.
2859          */
2860         for (i = 0; i < msg->m_nentries; i++)
2861         {
2862                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2863                                                                                           (void *) &(tabmsg[i].t_id),
2864                                                                                                          HASH_ENTER, &found);
2865                 if (tabentry == NULL)
2866                 {
2867                         ereport(LOG,
2868                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2869                          errmsg("out of memory in statistics collector --- abort")));
2870                         exit(1);
2871                 }
2872
2873                 if (!found)
2874                 {
2875                         /*
2876                          * If it's a new table entry, initialize counters to the
2877                          * values we just got.
2878                          */
2879                         tabentry->numscans = tabmsg[i].t_numscans;
2880                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2881                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2882                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2883                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2884                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2885                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2886                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2887
2888                         tabentry->destroy = 0;
2889                 }
2890                 else
2891                 {
2892                         /*
2893                          * Otherwise add the values to the existing entry.
2894                          */
2895                         tabentry->numscans += tabmsg[i].t_numscans;
2896                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2897                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2898                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2899                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2900                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2901                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2902                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2903                 }
2904
2905                 /*
2906                  * And add the block IO to the database entry.
2907                  */
2908                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2909                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2910         }
2911 }
2912
2913
2914 /* ----------
2915  * pgstat_recv_tabpurge() -
2916  *
2917  *      Arrange for dead table removal.
2918  * ----------
2919  */
2920 static void
2921 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2922 {
2923         PgStat_StatDBEntry *dbentry;
2924         PgStat_StatTabEntry *tabentry;
2925         int                     i;
2926
2927         /*
2928          * Make sure the backend is counted for.
2929          */
2930         if (pgstat_add_backend(&msg->m_hdr) < 0)
2931                 return;
2932
2933         /*
2934          * Lookup the database in the hashtable.
2935          */
2936         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2937                                                                          (void *) &(msg->m_hdr.m_databaseid),
2938                                                                                                  HASH_FIND, NULL);
2939         if (!dbentry)
2940                 return;
2941
2942         /*
2943          * If the database is marked for destroy, this is a delayed UDP packet
2944          * and the tables will go away at DB destruction.
2945          */
2946         if (dbentry->destroy > 0)
2947                 return;
2948
2949         /*
2950          * Process all table entries in the message.
2951          */
2952         for (i = 0; i < msg->m_nentries; i++)
2953         {
2954                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2955                                                                                    (void *) &(msg->m_tableid[i]),
2956                                                                                                            HASH_FIND, NULL);
2957                 if (tabentry)
2958                         tabentry->destroy = PGSTAT_DESTROY_COUNT;
2959         }
2960 }
2961
2962
2963 /* ----------
2964  * pgstat_recv_dropdb() -
2965  *
2966  *      Arrange for dead database removal
2967  * ----------
2968  */
2969 static void
2970 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2971 {
2972         PgStat_StatDBEntry *dbentry;
2973
2974         /*
2975          * Make sure the backend is counted for.
2976          */
2977         if (pgstat_add_backend(&msg->m_hdr) < 0)
2978                 return;
2979
2980         /*
2981          * Lookup the database in the hashtable.
2982          */
2983         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2984                                                                                    (void *) &(msg->m_databaseid),
2985                                                                                                  HASH_FIND, NULL);
2986         if (!dbentry)
2987                 return;
2988
2989         /*
2990          * Mark the database for destruction.
2991          */
2992         dbentry->destroy = PGSTAT_DESTROY_COUNT;
2993 }
2994
2995
2996 /* ----------
2997  * pgstat_recv_dropdb() -
2998  *
2999  *      Arrange for dead database removal
3000  * ----------
3001  */
3002 static void
3003 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3004 {
3005         HASHCTL         hash_ctl;
3006         PgStat_StatDBEntry *dbentry;
3007
3008         /*
3009          * Make sure the backend is counted for.
3010          */
3011         if (pgstat_add_backend(&msg->m_hdr) < 0)
3012                 return;
3013
3014         /*
3015          * Lookup the database in the hashtable.
3016          */
3017         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3018                                                                          (void *) &(msg->m_hdr.m_databaseid),
3019                                                                                                  HASH_FIND, NULL);
3020         if (!dbentry)
3021                 return;
3022
3023         /*
3024          * We simply throw away all the databases table entries by recreating
3025          * a new hash table for them.
3026          */
3027         if (dbentry->tables != NULL)
3028                 hash_destroy(dbentry->tables);
3029
3030         dbentry->tables = NULL;
3031         dbentry->n_xact_commit = 0;
3032         dbentry->n_xact_rollback = 0;
3033         dbentry->n_blocks_fetched = 0;
3034         dbentry->n_blocks_hit = 0;
3035         dbentry->n_connects = 0;
3036         dbentry->destroy = 0;
3037
3038         memset(&hash_ctl, 0, sizeof(hash_ctl));
3039         hash_ctl.keysize = sizeof(Oid);
3040         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3041         hash_ctl.hash = tag_hash;
3042         dbentry->tables = hash_create("Per-database table",
3043                                                                   PGSTAT_TAB_HASH_SIZE,
3044                                                                   &hash_ctl,
3045                                                                   HASH_ELEM | HASH_FUNCTION);
3046         if (dbentry->tables == NULL)
3047         {
3048                 /* assume the problem is out-of-memory */
3049                 ereport(LOG,
3050                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3051                          errmsg("out of memory in statistics collector --- abort")));
3052                 exit(1);
3053         }
3054 }