]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Remove the pgstats logic for delaying destruction of stats table entries.
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2006, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.122 2006/04/06 20:38:00 tgl Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31
32 #include "pgstat.h"
33
34 #include "access/heapam.h"
35 #include "access/xact.h"
36 #include "catalog/pg_database.h"
37 #include "libpq/libpq.h"
38 #include "libpq/pqsignal.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/autovacuum.h"
42 #include "postmaster/fork_process.h"
43 #include "postmaster/postmaster.h"
44 #include "storage/backendid.h"
45 #include "storage/fd.h"
46 #include "storage/ipc.h"
47 #include "storage/pg_shmem.h"
48 #include "storage/pmsignal.h"
49 #include "storage/procarray.h"
50 #include "tcop/tcopprot.h"
51 #include "utils/hsearch.h"
52 #include "utils/memutils.h"
53 #include "utils/ps_status.h"
54 #include "utils/rel.h"
55 #include "utils/syscache.h"
56
57
58 /* ----------
59  * Paths for the statistics files (relative to installation's $PGDATA).
60  * ----------
61  */
62 #define PGSTAT_STAT_FILENAME    "global/pgstat.stat"
63 #define PGSTAT_STAT_TMPFILE             "global/pgstat.tmp"
64
65 /* ----------
66  * Timer definitions.
67  * ----------
68  */
69 #define PGSTAT_STAT_INTERVAL    500             /* How often to write the status file;
70                                                                                  * in milliseconds. */
71
72 #define PGSTAT_RESTART_INTERVAL 60              /* How often to attempt to restart a
73                                                                                  * failed statistics collector; in
74                                                                                  * seconds. */
75
76 /* ----------
77  * Amount of space reserved in pgstat_recvbuffer().
78  * ----------
79  */
80 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
81
82 /* ----------
83  * The initial size hints for the hash tables used in the collector.
84  * ----------
85  */
86 #define PGSTAT_DB_HASH_SIZE             16
87 #define PGSTAT_BE_HASH_SIZE             512
88 #define PGSTAT_TAB_HASH_SIZE    512
89
90
91 /* ----------
92  * GUC parameters
93  * ----------
94  */
95 bool            pgstat_collect_startcollector = true;
96 bool            pgstat_collect_resetonpmstart = false;
97 bool            pgstat_collect_querystring = false;
98 bool            pgstat_collect_tuplelevel = false;
99 bool            pgstat_collect_blocklevel = false;
100
101 /* ----------
102  * Local data
103  * ----------
104  */
105 NON_EXEC_STATIC int pgStatSock = -1;
106 NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1};
107 static struct sockaddr_storage pgStatAddr;
108 static pid_t pgStatCollectorPid = 0;
109
110 static time_t last_pgstat_start_time;
111
112 static long pgStatNumMessages = 0;
113
114 static bool pgStatRunningInCollector = false;
115
116 /*
117  * Place where backends store per-table info to be sent to the collector.
118  * We store shared relations separately from non-shared ones, to be able to
119  * send them in separate messages.
120  */
121 typedef struct TabStatArray
122 {
123         int                     tsa_alloc;              /* num allocated */
124         int                     tsa_used;               /* num actually used */
125         PgStat_MsgTabstat **tsa_messages;       /* the array itself */
126 } TabStatArray;
127
128 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
129
130 static TabStatArray RegularTabStat = {0, 0, NULL};
131 static TabStatArray SharedTabStat = {0, 0, NULL};
132
133 static int      pgStatXactCommit = 0;
134 static int      pgStatXactRollback = 0;
135
136 static TransactionId pgStatDBHashXact = InvalidTransactionId;
137 static HTAB *pgStatDBHash = NULL;
138 static PgStat_StatBeEntry *pgStatBeTable = NULL;
139 static int      pgStatNumBackends = 0;
140
141 static volatile bool    need_statwrite;
142
143
144 /* ----------
145  * Local function forward declarations
146  * ----------
147  */
148 #ifdef EXEC_BACKEND
149
150 typedef enum STATS_PROCESS_TYPE
151 {
152         STAT_PROC_BUFFER,
153         STAT_PROC_COLLECTOR
154 }       STATS_PROCESS_TYPE;
155
156 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
157 static void pgstat_parseArgs(int argc, char *argv[]);
158 #endif
159
160 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
161 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
162 static void force_statwrite(SIGNAL_ARGS);
163 static void pgstat_recvbuffer(void);
164 static void pgstat_exit(SIGNAL_ARGS);
165 static void pgstat_die(SIGNAL_ARGS);
166 static void pgstat_beshutdown_hook(int code, Datum arg);
167
168 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
169 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
170 static void pgstat_sub_backend(int procpid);
171 static void pgstat_drop_database(Oid databaseid);
172 static void pgstat_write_statsfile(void);
173 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
174                                           PgStat_StatBeEntry **betab,
175                                           int *numbackends);
176 static void backend_read_statsfile(void);
177
178 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
179 static void pgstat_send(void *msg, int len);
180
181 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
182 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
183 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
184 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
185 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
186 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
187 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
188 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
189 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
190 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
191
192
193 /* ------------------------------------------------------------
194  * Public functions called from postmaster follow
195  * ------------------------------------------------------------
196  */
197
198 /* ----------
199  * pgstat_init() -
200  *
201  *      Called from postmaster at startup. Create the resources required
202  *      by the statistics collector process.  If unable to do so, do not
203  *      fail --- better to let the postmaster start with stats collection
204  *      disabled.
205  * ----------
206  */
207 void
208 pgstat_init(void)
209 {
210         ACCEPT_TYPE_ARG3 alen;
211         struct addrinfo *addrs = NULL,
212                            *addr,
213                                 hints;
214         int                     ret;
215         fd_set          rset;
216         struct timeval tv;
217         char            test_byte;
218         int                     sel_res;
219
220 #define TESTBYTEVAL ((char) 199)
221
222         /*
223          * Force start of collector daemon if something to collect
224          */
225         if (pgstat_collect_querystring ||
226                 pgstat_collect_tuplelevel ||
227                 pgstat_collect_blocklevel)
228                 pgstat_collect_startcollector = true;
229
230         /*
231          * If we don't have to start a collector or should reset the collected
232          * statistics on postmaster start, simply remove the stats file.
233          */
234         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
235                 pgstat_reset_all();
236
237         /*
238          * Nothing else required if collector will not get started
239          */
240         if (!pgstat_collect_startcollector)
241                 return;
242
243         /*
244          * Create the UDP socket for sending and receiving statistic messages
245          */
246         hints.ai_flags = AI_PASSIVE;
247         hints.ai_family = PF_UNSPEC;
248         hints.ai_socktype = SOCK_DGRAM;
249         hints.ai_protocol = 0;
250         hints.ai_addrlen = 0;
251         hints.ai_addr = NULL;
252         hints.ai_canonname = NULL;
253         hints.ai_next = NULL;
254         ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
255         if (ret || !addrs)
256         {
257                 ereport(LOG,
258                                 (errmsg("could not resolve \"localhost\": %s",
259                                                 gai_strerror(ret))));
260                 goto startup_failed;
261         }
262
263         /*
264          * On some platforms, pg_getaddrinfo_all() may return multiple addresses
265          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
266          * when kernel will reject IPv6).  Worse, the failure may occur at the
267          * bind() or perhaps even connect() stage.      So we must loop through the
268          * results till we find a working combination. We will generate LOG
269          * messages, but no error, for bogus combinations.
270          */
271         for (addr = addrs; addr; addr = addr->ai_next)
272         {
273 #ifdef HAVE_UNIX_SOCKETS
274                 /* Ignore AF_UNIX sockets, if any are returned. */
275                 if (addr->ai_family == AF_UNIX)
276                         continue;
277 #endif
278
279                 /*
280                  * Create the socket.
281                  */
282                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
283                 {
284                         ereport(LOG,
285                                         (errcode_for_socket_access(),
286                         errmsg("could not create socket for statistics collector: %m")));
287                         continue;
288                 }
289
290                 /*
291                  * Bind it to a kernel assigned port on localhost and get the assigned
292                  * port via getsockname().
293                  */
294                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
295                 {
296                         ereport(LOG,
297                                         (errcode_for_socket_access(),
298                           errmsg("could not bind socket for statistics collector: %m")));
299                         closesocket(pgStatSock);
300                         pgStatSock = -1;
301                         continue;
302                 }
303
304                 alen = sizeof(pgStatAddr);
305                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
306                 {
307                         ereport(LOG,
308                                         (errcode_for_socket_access(),
309                                          errmsg("could not get address of socket for statistics collector: %m")));
310                         closesocket(pgStatSock);
311                         pgStatSock = -1;
312                         continue;
313                 }
314
315                 /*
316                  * Connect the socket to its own address.  This saves a few cycles by
317                  * not having to respecify the target address on every send. This also
318                  * provides a kernel-level check that only packets from this same
319                  * address will be received.
320                  */
321                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
322                 {
323                         ereport(LOG,
324                                         (errcode_for_socket_access(),
325                         errmsg("could not connect socket for statistics collector: %m")));
326                         closesocket(pgStatSock);
327                         pgStatSock = -1;
328                         continue;
329                 }
330
331                 /*
332                  * Try to send and receive a one-byte test message on the socket. This
333                  * is to catch situations where the socket can be created but will not
334                  * actually pass data (for instance, because kernel packet filtering
335                  * rules prevent it).
336                  */
337                 test_byte = TESTBYTEVAL;
338                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
339                 {
340                         ereport(LOG,
341                                         (errcode_for_socket_access(),
342                                          errmsg("could not send test message on socket for statistics collector: %m")));
343                         closesocket(pgStatSock);
344                         pgStatSock = -1;
345                         continue;
346                 }
347
348                 /*
349                  * There could possibly be a little delay before the message can be
350                  * received.  We arbitrarily allow up to half a second before deciding
351                  * it's broken.
352                  */
353                 for (;;)                                /* need a loop to handle EINTR */
354                 {
355                         FD_ZERO(&rset);
356                         FD_SET(pgStatSock, &rset);
357                         tv.tv_sec = 0;
358                         tv.tv_usec = 500000;
359                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
360                         if (sel_res >= 0 || errno != EINTR)
361                                 break;
362                 }
363                 if (sel_res < 0)
364                 {
365                         ereport(LOG,
366                                         (errcode_for_socket_access(),
367                                          errmsg("select() failed in statistics collector: %m")));
368                         closesocket(pgStatSock);
369                         pgStatSock = -1;
370                         continue;
371                 }
372                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
373                 {
374                         /*
375                          * This is the case we actually think is likely, so take pains to
376                          * give a specific message for it.
377                          *
378                          * errno will not be set meaningfully here, so don't use it.
379                          */
380                         ereport(LOG,
381                                         (errcode(ERRCODE_CONNECTION_FAILURE),
382                                          errmsg("test message did not get through on socket for statistics collector")));
383                         closesocket(pgStatSock);
384                         pgStatSock = -1;
385                         continue;
386                 }
387
388                 test_byte++;                    /* just make sure variable is changed */
389
390                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
391                 {
392                         ereport(LOG,
393                                         (errcode_for_socket_access(),
394                                          errmsg("could not receive test message on socket for statistics collector: %m")));
395                         closesocket(pgStatSock);
396                         pgStatSock = -1;
397                         continue;
398                 }
399
400                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
401                 {
402                         ereport(LOG,
403                                         (errcode(ERRCODE_INTERNAL_ERROR),
404                                          errmsg("incorrect test message transmission on socket for statistics collector")));
405                         closesocket(pgStatSock);
406                         pgStatSock = -1;
407                         continue;
408                 }
409
410                 /* If we get here, we have a working socket */
411                 break;
412         }
413
414         /* Did we find a working address? */
415         if (!addr || pgStatSock < 0)
416                 goto startup_failed;
417
418         /*
419          * Set the socket to non-blocking IO.  This ensures that if the collector
420          * falls behind (despite the buffering process), statistics messages will
421          * be discarded; backends won't block waiting to send messages to the
422          * collector.
423          */
424         if (!pg_set_noblock(pgStatSock))
425         {
426                 ereport(LOG,
427                                 (errcode_for_socket_access(),
428                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
429                 goto startup_failed;
430         }
431
432         pg_freeaddrinfo_all(hints.ai_family, addrs);
433
434         return;
435
436 startup_failed:
437         ereport(LOG,
438           (errmsg("disabling statistics collector for lack of working socket")));
439
440         if (addrs)
441                 pg_freeaddrinfo_all(hints.ai_family, addrs);
442
443         if (pgStatSock >= 0)
444                 closesocket(pgStatSock);
445         pgStatSock = -1;
446
447         /* Adjust GUC variables to suppress useless activity */
448         pgstat_collect_startcollector = false;
449         pgstat_collect_querystring = false;
450         pgstat_collect_tuplelevel = false;
451         pgstat_collect_blocklevel = false;
452 }
453
454 /*
455  * pgstat_reset_all() -
456  *
457  * Remove the stats file.  This is used on server start if the
458  * stats_reset_on_server_start feature is enabled, or if WAL
459  * recovery is needed after a crash.
460  */
461 void
462 pgstat_reset_all(void)
463 {
464         unlink(PGSTAT_STAT_FILENAME);
465 }
466
467 #ifdef EXEC_BACKEND
468
469 /*
470  * pgstat_forkexec() -
471  *
472  * Format up the arglist for, then fork and exec, statistics
473  * (buffer and collector) processes
474  */
475 static pid_t
476 pgstat_forkexec(STATS_PROCESS_TYPE procType)
477 {
478         char       *av[10];
479         int                     ac = 0,
480                                 bufc = 0,
481                                 i;
482         char            pgstatBuf[2][32];
483
484         av[ac++] = "postgres";
485
486         switch (procType)
487         {
488                 case STAT_PROC_BUFFER:
489                         av[ac++] = "-forkbuf";
490                         break;
491
492                 case STAT_PROC_COLLECTOR:
493                         av[ac++] = "-forkcol";
494                         break;
495
496                 default:
497                         Assert(false);
498         }
499
500         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
501
502         /* postgres_exec_path is not passed by write_backend_variables */
503         av[ac++] = postgres_exec_path;
504
505         /* Add to the arg list */
506         Assert(bufc <= lengthof(pgstatBuf));
507         for (i = 0; i < bufc; i++)
508                 av[ac++] = pgstatBuf[i];
509
510         av[ac] = NULL;
511         Assert(ac < lengthof(av));
512
513         return postmaster_forkexec(ac, av);
514 }
515
516
517 /*
518  * pgstat_parseArgs() -
519  *
520  * Extract data from the arglist for exec'ed statistics
521  * (buffer and collector) processes
522  */
523 static void
524 pgstat_parseArgs(int argc, char *argv[])
525 {
526         Assert(argc == 4);
527
528         argc = 3;
529         StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
530 }
531 #endif   /* EXEC_BACKEND */
532
533
534 /* ----------
535  * pgstat_start() -
536  *
537  *      Called from postmaster at startup or after an existing collector
538  *      died.  Attempt to fire up a fresh statistics collector.
539  *
540  *      Returns PID of child process, or 0 if fail.
541  *
542  *      Note: if fail, we will be called again from the postmaster main loop.
543  * ----------
544  */
545 int
546 pgstat_start(void)
547 {
548         time_t          curtime;
549         pid_t           pgStatPid;
550
551         /*
552          * Do nothing if no collector needed
553          */
554         if (!pgstat_collect_startcollector)
555                 return 0;
556
557         /*
558          * Do nothing if too soon since last collector start.  This is a safety
559          * valve to protect against continuous respawn attempts if the collector
560          * is dying immediately at launch.      Note that since we will be re-called
561          * from the postmaster main loop, we will get another chance later.
562          */
563         curtime = time(NULL);
564         if ((unsigned int) (curtime - last_pgstat_start_time) <
565                 (unsigned int) PGSTAT_RESTART_INTERVAL)
566                 return 0;
567         last_pgstat_start_time = curtime;
568
569         /*
570          * Check that the socket is there, else pgstat_init failed.
571          */
572         if (pgStatSock < 0)
573         {
574                 ereport(LOG,
575                                 (errmsg("statistics collector startup skipped")));
576
577                 /*
578                  * We can only get here if someone tries to manually turn
579                  * pgstat_collect_startcollector on after it had been off.
580                  */
581                 pgstat_collect_startcollector = false;
582                 return 0;
583         }
584
585         /*
586          * Okay, fork off the collector.
587          */
588 #ifdef EXEC_BACKEND
589         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
590 #else
591         switch ((pgStatPid = fork_process()))
592 #endif
593         {
594                 case -1:
595                         ereport(LOG,
596                                         (errmsg("could not fork statistics buffer: %m")));
597                         return 0;
598
599 #ifndef EXEC_BACKEND
600                 case 0:
601                         /* in postmaster child ... */
602                         /* Close the postmaster's sockets */
603                         ClosePostmasterPorts(false);
604
605                         /* Lose the postmaster's on-exit routines */
606                         on_exit_reset();
607
608                         /* Drop our connection to postmaster's shared memory, as well */
609                         PGSharedMemoryDetach();
610
611                         PgstatBufferMain(0, NULL);
612                         break;
613 #endif
614
615                 default:
616                         return (int) pgStatPid;
617         }
618
619         /* shouldn't get here */
620         return 0;
621 }
622
623
624 /* ----------
625  * pgstat_beterm() -
626  *
627  *      Called from postmaster to tell collector a backend terminated.
628  * ----------
629  */
630 void
631 pgstat_beterm(int pid)
632 {
633         PgStat_MsgBeterm msg;
634
635         if (pgStatSock < 0)
636                 return;
637
638         /* can't use pgstat_setheader() because it's not called in a backend */
639         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
640         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
641         msg.m_hdr.m_procpid = pid;
642
643         pgstat_send(&msg, sizeof(msg));
644 }
645
646
647 /* ----------
648  * pgstat_report_autovac() -
649  *
650  *      Called from autovacuum.c to report startup of an autovacuum process.
651  *      We are called before InitPostgres is done, so can't rely on MyDatabaseId;
652  *      the db OID must be passed in, instead.
653  * ----------
654  */
655 void
656 pgstat_report_autovac(Oid dboid)
657 {
658         PgStat_MsgAutovacStart msg;
659
660         if (pgStatSock < 0)
661                 return;
662
663         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
664         msg.m_databaseid = dboid;
665         msg.m_start_time = GetCurrentTimestamp();
666
667         pgstat_send(&msg, sizeof(msg));
668 }
669
670 /* ------------------------------------------------------------
671  * Public functions used by backends follow
672  *------------------------------------------------------------
673  */
674
675
676 /* ----------
677  * pgstat_bestart() -
678  *
679  *      Tell the collector that this new backend is soon ready to process
680  *      queries. Called from InitPostgres.
681  * ----------
682  */
683 void
684 pgstat_bestart(void)
685 {
686         PgStat_MsgBestart msg;
687
688         if (pgStatSock < 0)
689                 return;
690
691         /*
692          * We may not have a MyProcPort (eg, if this is the autovacuum process).
693          * For the moment, punt and don't send BESTART --- would be better to work
694          * out a clean way of handling "unknown clientaddr".
695          */
696         if (MyProcPort)
697         {
698                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
699                 msg.m_databaseid = MyDatabaseId;
700                 msg.m_userid = GetSessionUserId();
701                 memcpy(&msg.m_clientaddr, &MyProcPort->raddr, sizeof(msg.m_clientaddr));
702                 pgstat_send(&msg, sizeof(msg));
703         }
704
705         /*
706          * Set up a process-exit hook to ensure we flush the last batch of
707          * statistics to the collector.
708          */
709         on_shmem_exit(pgstat_beshutdown_hook, 0);
710 }
711
712 /* ---------
713  * pgstat_report_vacuum() -
714  *
715  *      Tell the collector about the table we just vacuumed.
716  * ---------
717  */
718 void
719 pgstat_report_vacuum(Oid tableoid, bool shared,
720                                          bool analyze, PgStat_Counter tuples)
721 {
722         PgStat_MsgVacuum msg;
723
724         if (pgStatSock < 0 ||
725                 !pgstat_collect_tuplelevel)
726                 return;
727
728         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
729         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
730         msg.m_tableoid = tableoid;
731         msg.m_analyze = analyze;
732         msg.m_tuples = tuples;
733         pgstat_send(&msg, sizeof(msg));
734 }
735
736 /* --------
737  * pgstat_report_analyze() -
738  *
739  *      Tell the collector about the table we just analyzed.
740  * --------
741  */
742 void
743 pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
744                                           PgStat_Counter deadtuples)
745 {
746         PgStat_MsgAnalyze msg;
747
748         if (pgStatSock < 0 ||
749                 !pgstat_collect_tuplelevel)
750                 return;
751
752         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
753         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
754         msg.m_tableoid = tableoid;
755         msg.m_live_tuples = livetuples;
756         msg.m_dead_tuples = deadtuples;
757         pgstat_send(&msg, sizeof(msg));
758 }
759
760 /*
761  * Flush any remaining statistics counts out to the collector at process
762  * exit.   Without this, operations triggered during backend exit (such as
763  * temp table deletions) won't be counted.
764  */
765 static void
766 pgstat_beshutdown_hook(int code, Datum arg)
767 {
768         pgstat_report_tabstat();
769 }
770
771
772 /* ----------
773  * pgstat_report_activity() -
774  *
775  *      Called from tcop/postgres.c to tell the collector what the backend
776  *      is actually doing (usually "<IDLE>" or the start of the query to
777  *      be executed).
778  * ----------
779  */
780 void
781 pgstat_report_activity(const char *cmd_str)
782 {
783         PgStat_MsgActivity msg;
784         int                     len;
785
786         if (!pgstat_collect_querystring || pgStatSock < 0)
787                 return;
788
789         len = strlen(cmd_str);
790         len = pg_mbcliplen(cmd_str, len, PGSTAT_ACTIVITY_SIZE - 1);
791
792         memcpy(msg.m_cmd_str, cmd_str, len);
793         msg.m_cmd_str[len] = '\0';
794         len += offsetof(PgStat_MsgActivity, m_cmd_str) + 1;
795
796         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
797         pgstat_send(&msg, len);
798 }
799
800
801 /* ----------
802  * pgstat_report_tabstat() -
803  *
804  *      Called from tcop/postgres.c to send the so far collected
805  *      per table access statistics to the collector.
806  * ----------
807  */
808 void
809 pgstat_report_tabstat(void)
810 {
811         int                     i;
812
813         if (pgStatSock < 0 ||
814                 (!pgstat_collect_querystring &&
815                  !pgstat_collect_tuplelevel &&
816                  !pgstat_collect_blocklevel))
817         {
818                 /* Not reporting stats, so just flush whatever we have */
819                 RegularTabStat.tsa_used = 0;
820                 SharedTabStat.tsa_used = 0;
821                 return;
822         }
823
824         /*
825          * For each message buffer used during the last query set the header
826          * fields and send it out.
827          */
828         for (i = 0; i < RegularTabStat.tsa_used; i++)
829         {
830                 PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
831                 int                     n;
832                 int                     len;
833
834                 n = tsmsg->m_nentries;
835                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
836                         n * sizeof(PgStat_TableEntry);
837
838                 tsmsg->m_xact_commit = pgStatXactCommit;
839                 tsmsg->m_xact_rollback = pgStatXactRollback;
840                 pgStatXactCommit = 0;
841                 pgStatXactRollback = 0;
842
843                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
844                 tsmsg->m_databaseid = MyDatabaseId;
845                 pgstat_send(tsmsg, len);
846         }
847         RegularTabStat.tsa_used = 0;
848
849         /* Ditto, for shared relations */
850         for (i = 0; i < SharedTabStat.tsa_used; i++)
851         {
852                 PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
853                 int                     n;
854                 int                     len;
855
856                 n = tsmsg->m_nentries;
857                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
858                         n * sizeof(PgStat_TableEntry);
859
860                 /* We don't report transaction commit/abort here */
861                 tsmsg->m_xact_commit = 0;
862                 tsmsg->m_xact_rollback = 0;
863
864                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
865                 tsmsg->m_databaseid = InvalidOid;
866                 pgstat_send(tsmsg, len);
867         }
868         SharedTabStat.tsa_used = 0;
869 }
870
871
872 /* ----------
873  * pgstat_vacuum_tabstat() -
874  *
875  *      Will tell the collector about objects he can get rid of.
876  * ----------
877  */
878 void
879 pgstat_vacuum_tabstat(void)
880 {
881         List       *oidlist;
882         Relation        rel;
883         HeapScanDesc scan;
884         HeapTuple       tup;
885         PgStat_MsgTabpurge msg;
886         HASH_SEQ_STATUS hstat;
887         PgStat_StatDBEntry *dbentry;
888         PgStat_StatTabEntry *tabentry;
889         int                     len;
890
891         if (pgStatSock < 0)
892                 return;
893
894         /*
895          * If not done for this transaction, read the statistics collector stats
896          * file into some hash tables.
897          */
898         backend_read_statsfile();
899
900         /*
901          * Read pg_database and make a list of OIDs of all existing databases
902          */
903         oidlist = NIL;
904         rel = heap_open(DatabaseRelationId, AccessShareLock);
905         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
906         while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
907         {
908                 oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
909         }
910         heap_endscan(scan);
911         heap_close(rel, AccessShareLock);
912
913         /*
914          * Search the database hash table for dead databases and tell the
915          * collector to drop them.
916          */
917         hash_seq_init(&hstat, pgStatDBHash);
918         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
919         {
920                 Oid                     dbid = dbentry->databaseid;
921
922                 if (!list_member_oid(oidlist, dbid))
923                         pgstat_drop_database(dbid);
924         }
925
926         /* Clean up */
927         list_free(oidlist);
928
929         /*
930          * Lookup our own database entry; if not found, nothing more to do.
931          */
932         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
933                                                                                                  (void *) &MyDatabaseId,
934                                                                                                  HASH_FIND, NULL);
935         if (dbentry == NULL || dbentry->tables == NULL)
936                 return;
937
938         /*
939          * Similarly to above, make a list of all known relations in this DB.
940          */
941         oidlist = NIL;
942         rel = heap_open(RelationRelationId, AccessShareLock);
943         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
944         while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
945         {
946                 oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
947         }
948         heap_endscan(scan);
949         heap_close(rel, AccessShareLock);
950
951         /*
952          * Initialize our messages table counter to zero
953          */
954         msg.m_nentries = 0;
955
956         /*
957          * Check for all tables listed in stats hashtable if they still exist.
958          */
959         hash_seq_init(&hstat, dbentry->tables);
960         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
961         {
962                 if (list_member_oid(oidlist, tabentry->tableid))
963                         continue;
964
965                 /*
966                  * Not there, so add this table's Oid to the message
967                  */
968                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
969
970                 /*
971                  * If the message is full, send it out and reinitialize to empty
972                  */
973                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
974                 {
975                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
976                                 +msg.m_nentries * sizeof(Oid);
977
978                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
979                         msg.m_databaseid = MyDatabaseId;
980                         pgstat_send(&msg, len);
981
982                         msg.m_nentries = 0;
983                 }
984         }
985
986         /*
987          * Send the rest
988          */
989         if (msg.m_nentries > 0)
990         {
991                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
992                         +msg.m_nentries * sizeof(Oid);
993
994                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
995                 msg.m_databaseid = MyDatabaseId;
996                 pgstat_send(&msg, len);
997         }
998
999         /* Clean up */
1000         list_free(oidlist);
1001 }
1002
1003
1004 /* ----------
1005  * pgstat_drop_database() -
1006  *
1007  *      Tell the collector that we just dropped a database.
1008  *      (If the message gets lost, we will still clean the dead DB eventually
1009  *      via future invocations of pgstat_vacuum_tabstat().)
1010  * ----------
1011  */
1012 static void
1013 pgstat_drop_database(Oid databaseid)
1014 {
1015         PgStat_MsgDropdb msg;
1016
1017         if (pgStatSock < 0)
1018                 return;
1019
1020         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1021         msg.m_databaseid = databaseid;
1022         pgstat_send(&msg, sizeof(msg));
1023 }
1024
1025
1026 /* ----------
1027  * pgstat_drop_relation() -
1028  *
1029  *      Tell the collector that we just dropped a relation.
1030  *      (If the message gets lost, we will still clean the dead entry eventually
1031  *      via future invocations of pgstat_vacuum_tabstat().)
1032  * ----------
1033  */
1034 void
1035 pgstat_drop_relation(Oid relid)
1036 {
1037         PgStat_MsgTabpurge msg;
1038         int                     len;
1039
1040         if (pgStatSock < 0)
1041                 return;
1042
1043         msg.m_tableid[0] = relid;
1044         msg.m_nentries = 1;
1045
1046         len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1047
1048         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1049         msg.m_databaseid = MyDatabaseId;
1050         pgstat_send(&msg, len);
1051 }
1052
1053
1054 /* ----------
1055  * pgstat_reset_counters() -
1056  *
1057  *      Tell the statistics collector to reset counters for our database.
1058  * ----------
1059  */
1060 void
1061 pgstat_reset_counters(void)
1062 {
1063         PgStat_MsgResetcounter msg;
1064
1065         if (pgStatSock < 0)
1066                 return;
1067
1068         if (!superuser())
1069                 ereport(ERROR,
1070                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1071                                  errmsg("must be superuser to reset statistics counters")));
1072
1073         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1074         msg.m_databaseid = MyDatabaseId;
1075         pgstat_send(&msg, sizeof(msg));
1076 }
1077
1078
1079 /* ----------
1080  * pgstat_ping() -
1081  *
1082  *      Send some junk data to the collector to increase traffic.
1083  * ----------
1084  */
1085 void
1086 pgstat_ping(void)
1087 {
1088         PgStat_MsgDummy msg;
1089
1090         if (pgStatSock < 0)
1091                 return;
1092
1093         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1094         pgstat_send(&msg, sizeof(msg));
1095 }
1096
1097 /*
1098  * Enlarge a TabStatArray
1099  */
1100 static void
1101 more_tabstat_space(TabStatArray *tsarr)
1102 {
1103         PgStat_MsgTabstat *newMessages;
1104         PgStat_MsgTabstat **msgArray;
1105         int                     newAlloc;
1106         int                     i;
1107
1108         AssertArg(PointerIsValid(tsarr));
1109
1110         newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
1111
1112         /* Create (another) quantum of message buffers */
1113         newMessages = (PgStat_MsgTabstat *)
1114                 MemoryContextAllocZero(TopMemoryContext,
1115                                                            sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1116
1117         /* Create or enlarge the pointer array */
1118         if (tsarr->tsa_messages == NULL)
1119                 msgArray = (PgStat_MsgTabstat **)
1120                         MemoryContextAlloc(TopMemoryContext,
1121                                                            sizeof(PgStat_MsgTabstat *) * newAlloc);
1122         else
1123                 msgArray = (PgStat_MsgTabstat **)
1124                         repalloc(tsarr->tsa_messages,
1125                                          sizeof(PgStat_MsgTabstat *) * newAlloc);
1126
1127         for (i = 0; i < TABSTAT_QUANTUM; i++)
1128                 msgArray[tsarr->tsa_alloc + i] = newMessages++;
1129         tsarr->tsa_messages = msgArray;
1130         tsarr->tsa_alloc = newAlloc;
1131
1132         Assert(tsarr->tsa_used < tsarr->tsa_alloc);
1133 }
1134
1135 /* ----------
1136  * pgstat_initstats() -
1137  *
1138  *      Called from various places usually dealing with initialization
1139  *      of Relation or Scan structures. The data placed into these
1140  *      structures from here tell where later to count for buffer reads,
1141  *      scans and tuples fetched.
1142  * ----------
1143  */
1144 void
1145 pgstat_initstats(PgStat_Info *stats, Relation rel)
1146 {
1147         Oid                     rel_id = rel->rd_id;
1148         PgStat_TableEntry *useent;
1149         TabStatArray *tsarr;
1150         PgStat_MsgTabstat *tsmsg;
1151         int                     mb;
1152         int                     i;
1153
1154         /*
1155          * Initialize data not to count at all.
1156          */
1157         stats->tabentry = NULL;
1158
1159         if (pgStatSock < 0 ||
1160                 !(pgstat_collect_tuplelevel ||
1161                   pgstat_collect_blocklevel))
1162                 return;
1163
1164         tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
1165
1166         /*
1167          * Search the already-used message slots for this relation.
1168          */
1169         for (mb = 0; mb < tsarr->tsa_used; mb++)
1170         {
1171                 tsmsg = tsarr->tsa_messages[mb];
1172
1173                 for (i = tsmsg->m_nentries; --i >= 0;)
1174                 {
1175                         if (tsmsg->m_entry[i].t_id == rel_id)
1176                         {
1177                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1178                                 return;
1179                         }
1180                 }
1181
1182                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1183                         continue;
1184
1185                 /*
1186                  * Not found, but found a message buffer with an empty slot instead.
1187                  * Fine, let's use this one.
1188                  */
1189                 i = tsmsg->m_nentries++;
1190                 useent = &tsmsg->m_entry[i];
1191                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1192                 useent->t_id = rel_id;
1193                 stats->tabentry = (void *) useent;
1194                 return;
1195         }
1196
1197         /*
1198          * If we ran out of message buffers, we just allocate more.
1199          */
1200         if (tsarr->tsa_used >= tsarr->tsa_alloc)
1201                 more_tabstat_space(tsarr);
1202
1203         /*
1204          * Use the first entry of the next message buffer.
1205          */
1206         mb = tsarr->tsa_used++;
1207         tsmsg = tsarr->tsa_messages[mb];
1208         tsmsg->m_nentries = 1;
1209         useent = &tsmsg->m_entry[0];
1210         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1211         useent->t_id = rel_id;
1212         stats->tabentry = (void *) useent;
1213 }
1214
1215
1216 /* ----------
1217  * pgstat_count_xact_commit() -
1218  *
1219  *      Called from access/transam/xact.c to count transaction commits.
1220  * ----------
1221  */
1222 void
1223 pgstat_count_xact_commit(void)
1224 {
1225         if      (!pgstat_collect_querystring &&
1226                  !pgstat_collect_tuplelevel &&
1227                  !pgstat_collect_blocklevel)
1228                 return;
1229
1230         pgStatXactCommit++;
1231
1232         /*
1233          * If there was no relation activity yet, just make one existing message
1234          * buffer used without slots, causing the next report to tell new
1235          * xact-counters.
1236          */
1237         if (RegularTabStat.tsa_alloc == 0)
1238                 more_tabstat_space(&RegularTabStat);
1239
1240         if (RegularTabStat.tsa_used == 0)
1241         {
1242                 RegularTabStat.tsa_used++;
1243                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1244         }
1245 }
1246
1247
1248 /* ----------
1249  * pgstat_count_xact_rollback() -
1250  *
1251  *      Called from access/transam/xact.c to count transaction rollbacks.
1252  * ----------
1253  */
1254 void
1255 pgstat_count_xact_rollback(void)
1256 {
1257         if      (!pgstat_collect_querystring &&
1258                  !pgstat_collect_tuplelevel &&
1259                  !pgstat_collect_blocklevel)
1260                 return;
1261
1262         pgStatXactRollback++;
1263
1264         /*
1265          * If there was no relation activity yet, just make one existing message
1266          * buffer used without slots, causing the next report to tell new
1267          * xact-counters.
1268          */
1269         if (RegularTabStat.tsa_alloc == 0)
1270                 more_tabstat_space(&RegularTabStat);
1271
1272         if (RegularTabStat.tsa_used == 0)
1273         {
1274                 RegularTabStat.tsa_used++;
1275                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1276         }
1277 }
1278
1279
1280 /* ----------
1281  * pgstat_fetch_stat_dbentry() -
1282  *
1283  *      Support function for the SQL-callable pgstat* functions. Returns
1284  *      the collected statistics for one database or NULL. NULL doesn't mean
1285  *      that the database doesn't exist, it is just not yet known by the
1286  *      collector, so the caller is better off to report ZERO instead.
1287  * ----------
1288  */
1289 PgStat_StatDBEntry *
1290 pgstat_fetch_stat_dbentry(Oid dbid)
1291 {
1292         /*
1293          * If not done for this transaction, read the statistics collector stats
1294          * file into some hash tables.
1295          */
1296         backend_read_statsfile();
1297
1298         /*
1299          * Lookup the requested database; return NULL if not found
1300          */
1301         return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1302                                                                                           (void *) &dbid,
1303                                                                                           HASH_FIND, NULL);
1304 }
1305
1306
1307 /* ----------
1308  * pgstat_fetch_stat_tabentry() -
1309  *
1310  *      Support function for the SQL-callable pgstat* functions. Returns
1311  *      the collected statistics for one table or NULL. NULL doesn't mean
1312  *      that the table doesn't exist, it is just not yet known by the
1313  *      collector, so the caller is better off to report ZERO instead.
1314  * ----------
1315  */
1316 PgStat_StatTabEntry *
1317 pgstat_fetch_stat_tabentry(Oid relid)
1318 {
1319         Oid                     dbid;
1320         PgStat_StatDBEntry *dbentry;
1321         PgStat_StatTabEntry *tabentry;
1322
1323         /*
1324          * If not done for this transaction, read the statistics collector stats
1325          * file into some hash tables.
1326          */
1327         backend_read_statsfile();
1328
1329         /*
1330          * Lookup our database, then look in its table hash table.
1331          */
1332         dbid = MyDatabaseId;
1333         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1334                                                                                                  (void *) &dbid,
1335                                                                                                  HASH_FIND, NULL);
1336         if (dbentry != NULL && dbentry->tables != NULL)
1337         {
1338                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1339                                                                                                            (void *) &relid,
1340                                                                                                            HASH_FIND, NULL);
1341                 if (tabentry)
1342                         return tabentry;
1343         }
1344
1345         /*
1346          * If we didn't find it, maybe it's a shared table.
1347          */
1348         dbid = InvalidOid;
1349         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1350                                                                                                  (void *) &dbid,
1351                                                                                                  HASH_FIND, NULL);
1352         if (dbentry != NULL && dbentry->tables != NULL)
1353         {
1354                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1355                                                                                                            (void *) &relid,
1356                                                                                                            HASH_FIND, NULL);
1357                 if (tabentry)
1358                         return tabentry;
1359         }
1360
1361         return NULL;
1362 }
1363
1364
1365 /* ----------
1366  * pgstat_fetch_stat_beentry() -
1367  *
1368  *      Support function for the SQL-callable pgstat* functions. Returns
1369  *      the actual activity slot of one active backend. The caller is
1370  *      responsible for a check if the actual user is permitted to see
1371  *      that info (especially the querystring).
1372  * ----------
1373  */
1374 PgStat_StatBeEntry *
1375 pgstat_fetch_stat_beentry(int beid)
1376 {
1377         backend_read_statsfile();
1378
1379         if (beid < 1 || beid > pgStatNumBackends)
1380                 return NULL;
1381
1382         return &pgStatBeTable[beid - 1];
1383 }
1384
1385
1386 /* ----------
1387  * pgstat_fetch_stat_numbackends() -
1388  *
1389  *      Support function for the SQL-callable pgstat* functions. Returns
1390  *      the maximum current backend id.
1391  * ----------
1392  */
1393 int
1394 pgstat_fetch_stat_numbackends(void)
1395 {
1396         backend_read_statsfile();
1397
1398         return pgStatNumBackends;
1399 }
1400
1401
1402
1403 /* ------------------------------------------------------------
1404  * Local support functions follow
1405  * ------------------------------------------------------------
1406  */
1407
1408
1409 /* ----------
1410  * pgstat_setheader() -
1411  *
1412  *              Set common header fields in a statistics message
1413  * ----------
1414  */
1415 static void
1416 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
1417 {
1418         hdr->m_type = mtype;
1419         hdr->m_backendid = MyBackendId;
1420         hdr->m_procpid = MyProcPid;
1421 }
1422
1423
1424 /* ----------
1425  * pgstat_send() -
1426  *
1427  *              Send out one statistics message to the collector
1428  * ----------
1429  */
1430 static void
1431 pgstat_send(void *msg, int len)
1432 {
1433         if (pgStatSock < 0)
1434                 return;
1435
1436         ((PgStat_MsgHdr *) msg)->m_size = len;
1437
1438 #ifdef USE_ASSERT_CHECKING
1439         if (send(pgStatSock, msg, len, 0) < 0)
1440                 elog(LOG, "could not send to statistics collector: %m");
1441 #else
1442         send(pgStatSock, msg, len, 0);
1443         /* We deliberately ignore any error from send() */
1444 #endif
1445 }
1446
1447
1448 /* ----------
1449  * PgstatBufferMain() -
1450  *
1451  *      Start up the statistics buffer process.  This is the body of the
1452  *      postmaster child process.
1453  *
1454  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1455  * ----------
1456  */
1457 NON_EXEC_STATIC void
1458 PgstatBufferMain(int argc, char *argv[])
1459 {
1460         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1461
1462         MyProcPid = getpid();           /* reset MyProcPid */
1463
1464         /*
1465          * Ignore all signals usually bound to some action in the postmaster,
1466          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1467          */
1468         pqsignal(SIGHUP, SIG_IGN);
1469         pqsignal(SIGINT, SIG_IGN);
1470         pqsignal(SIGTERM, SIG_IGN);
1471         pqsignal(SIGQUIT, pgstat_exit);
1472         pqsignal(SIGALRM, SIG_IGN);
1473         pqsignal(SIGPIPE, SIG_IGN);
1474         pqsignal(SIGUSR1, SIG_IGN);
1475         pqsignal(SIGUSR2, SIG_IGN);
1476         pqsignal(SIGCHLD, pgstat_die);
1477         pqsignal(SIGTTIN, SIG_DFL);
1478         pqsignal(SIGTTOU, SIG_DFL);
1479         pqsignal(SIGCONT, SIG_DFL);
1480         pqsignal(SIGWINCH, SIG_DFL);
1481         /* unblock will happen in pgstat_recvbuffer */
1482
1483 #ifdef EXEC_BACKEND
1484         pgstat_parseArgs(argc, argv);
1485 #endif
1486
1487         /*
1488          * Start a buffering process to read from the socket, so we have a little
1489          * more time to process incoming messages.
1490          *
1491          * NOTE: the process structure is: postmaster is parent of buffer process
1492          * is parent of collector process.      This way, the buffer can detect
1493          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1494          * collector failure until it tried to write on the pipe.  That would mean
1495          * that after the postmaster started a new collector, we'd have two buffer
1496          * processes competing to read from the UDP socket --- not good.
1497          */
1498         if (pgpipe(pgStatPipe) < 0)
1499                 ereport(ERROR,
1500                                 (errcode_for_socket_access(),
1501                                  errmsg("could not create pipe for statistics buffer: %m")));
1502
1503         /* child becomes collector process */
1504 #ifdef EXEC_BACKEND
1505         pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR);
1506 #else
1507         pgStatCollectorPid = fork();
1508 #endif
1509         switch (pgStatCollectorPid)
1510         {
1511                 case -1:
1512                         ereport(ERROR,
1513                                         (errmsg("could not fork statistics collector: %m")));
1514
1515 #ifndef EXEC_BACKEND
1516                 case 0:
1517                         /* child becomes collector process */
1518                         PgstatCollectorMain(0, NULL);
1519                         break;
1520 #endif
1521
1522                 default:
1523                         /* parent becomes buffer process */
1524                         closesocket(pgStatPipe[0]);
1525                         pgstat_recvbuffer();
1526         }
1527         exit(0);
1528 }
1529
1530
1531 /* ----------
1532  * PgstatCollectorMain() -
1533  *
1534  *      Start up the statistics collector itself.  This is the body of the
1535  *      postmaster grandchild process.
1536  *
1537  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1538  * ----------
1539  */
1540 NON_EXEC_STATIC void
1541 PgstatCollectorMain(int argc, char *argv[])
1542 {
1543         PgStat_Msg      msg;
1544         fd_set          rfds;
1545         int                     readPipe;
1546         int                     len = 0;
1547         struct itimerval timeout;
1548         bool            need_timer = false;
1549
1550         MyProcPid = getpid();           /* reset MyProcPid */
1551
1552         /*
1553          * Reset signal handling.  With the exception of restoring default SIGCHLD
1554          * and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND case
1555          * because we'll have inherited these settings from the buffer process;
1556          * but it's not a no-op for EXEC_BACKEND.
1557          */
1558         pqsignal(SIGHUP, SIG_IGN);
1559         pqsignal(SIGINT, SIG_IGN);
1560         pqsignal(SIGTERM, SIG_IGN);
1561 #ifndef WIN32
1562         pqsignal(SIGQUIT, SIG_IGN);
1563 #else
1564         /* kluge to allow buffer process to kill collector; FIXME */
1565         pqsignal(SIGQUIT, pgstat_exit);
1566 #endif
1567         pqsignal(SIGALRM, force_statwrite);
1568         pqsignal(SIGPIPE, SIG_IGN);
1569         pqsignal(SIGUSR1, SIG_IGN);
1570         pqsignal(SIGUSR2, SIG_IGN);
1571         pqsignal(SIGCHLD, SIG_DFL);
1572         pqsignal(SIGTTIN, SIG_DFL);
1573         pqsignal(SIGTTOU, SIG_DFL);
1574         pqsignal(SIGCONT, SIG_DFL);
1575         pqsignal(SIGWINCH, SIG_DFL);
1576         PG_SETMASK(&UnBlockSig);
1577
1578 #ifdef EXEC_BACKEND
1579         pgstat_parseArgs(argc, argv);
1580 #endif
1581
1582         /* Close unwanted files */
1583         closesocket(pgStatPipe[1]);
1584         closesocket(pgStatSock);
1585
1586         /*
1587          * Identify myself via ps
1588          */
1589         init_ps_display("stats collector process", "", "");
1590         set_ps_display("");
1591
1592         /*
1593          * Arrange to write the initial status file right away
1594          */
1595         need_statwrite = true;
1596
1597         /* Preset the delay between status file writes */
1598         MemSet(&timeout, 0, sizeof(struct itimerval));
1599         timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
1600         timeout.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
1601
1602         /*
1603          * Read in an existing statistics stats file or initialize the stats to
1604          * zero.
1605          */
1606         pgStatRunningInCollector = true;
1607         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);
1608
1609         /*
1610          * Create the known backends table
1611          */
1612         pgStatBeTable = (PgStat_StatBeEntry *)
1613                 palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);
1614
1615         readPipe = pgStatPipe[0];
1616
1617         /*
1618          * Process incoming messages and handle all the reporting stuff until
1619          * there are no more messages.
1620          */
1621         for (;;)
1622         {
1623                 /*
1624                  * If time to write the stats file, do so.  Note that the alarm
1625                  * interrupt isn't re-enabled immediately, but only after we next
1626                  * receive a stats message; so no cycles are wasted when there is
1627                  * nothing going on.
1628                  */
1629                 if (need_statwrite)
1630                 {
1631                         pgstat_write_statsfile();
1632                         need_statwrite = false;
1633                         need_timer = true;
1634                 }
1635
1636                 /*
1637                  * Setup the descriptor set for select(2)
1638                  */
1639                 FD_ZERO(&rfds);
1640                 FD_SET(readPipe, &rfds);
1641
1642                 /*
1643                  * Now wait for something to do.
1644                  */
1645                 if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
1646                 {
1647                         if (errno == EINTR)
1648                                 continue;
1649                         ereport(ERROR,
1650                                         (errcode_for_socket_access(),
1651                                          errmsg("select() failed in statistics collector: %m")));
1652                 }
1653
1654                 /*
1655                  * Check if there is a new statistics message to collect.
1656                  */
1657                 if (FD_ISSET(readPipe, &rfds))
1658                 {
1659                         /*
1660                          * We may need to issue multiple read calls in case the buffer
1661                          * process didn't write the message in a single write, which is
1662                          * possible since it dumps its buffer bytewise. In any case, we'd
1663                          * need two reads since we don't know the message length
1664                          * initially.
1665                          */
1666                         int                     nread = 0;
1667                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1668                         bool            pipeEOF = false;
1669
1670                         while (nread < targetlen)
1671                         {
1672                                 len = piperead(readPipe, ((char *) &msg) + nread,
1673                                                            targetlen - nread);
1674                                 if (len < 0)
1675                                 {
1676                                         if (errno == EINTR)
1677                                                 continue;
1678                                         ereport(ERROR,
1679                                                         (errcode_for_socket_access(),
1680                                                          errmsg("could not read from statistics collector pipe: %m")));
1681                                 }
1682                                 if (len == 0)   /* EOF on the pipe! */
1683                                 {
1684                                         pipeEOF = true;
1685                                         break;
1686                                 }
1687                                 nread += len;
1688                                 if (nread == sizeof(PgStat_MsgHdr))
1689                                 {
1690                                         /* we have the header, compute actual msg length */
1691                                         targetlen = msg.msg_hdr.m_size;
1692                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1693                                                 targetlen > (int) sizeof(msg))
1694                                         {
1695                                                 /*
1696                                                  * Bogus message length implies that we got out of
1697                                                  * sync with the buffer process somehow. Abort so that
1698                                                  * we can restart both processes.
1699                                                  */
1700                                                 ereport(ERROR,
1701                                                           (errmsg("invalid statistics message length")));
1702                                         }
1703                                 }
1704                         }
1705
1706                         /*
1707                          * EOF on the pipe implies that the buffer process exited. Fall
1708                          * out of outer loop.
1709                          */
1710                         if (pipeEOF)
1711                                 break;
1712
1713                         /*
1714                          * Distribute the message to the specific function handling it.
1715                          */
1716                         switch (msg.msg_hdr.m_type)
1717                         {
1718                                 case PGSTAT_MTYPE_DUMMY:
1719                                         break;
1720
1721                                 case PGSTAT_MTYPE_BESTART:
1722                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1723                                         break;
1724
1725                                 case PGSTAT_MTYPE_BETERM:
1726                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1727                                         break;
1728
1729                                 case PGSTAT_MTYPE_TABSTAT:
1730                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1731                                         break;
1732
1733                                 case PGSTAT_MTYPE_TABPURGE:
1734                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1735                                         break;
1736
1737                                 case PGSTAT_MTYPE_ACTIVITY:
1738                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1739                                         break;
1740
1741                                 case PGSTAT_MTYPE_DROPDB:
1742                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1743                                         break;
1744
1745                                 case PGSTAT_MTYPE_RESETCOUNTER:
1746                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1747                                                                                          nread);
1748                                         break;
1749
1750                                 case PGSTAT_MTYPE_AUTOVAC_START:
1751                                         pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);
1752                                         break;
1753
1754                                 case PGSTAT_MTYPE_VACUUM:
1755                                         pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);
1756                                         break;
1757
1758                                 case PGSTAT_MTYPE_ANALYZE:
1759                                         pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);
1760                                         break;
1761
1762                                 default:
1763                                         break;
1764                         }
1765
1766                         /*
1767                          * Globally count messages.
1768                          */
1769                         pgStatNumMessages++;
1770
1771                         /*
1772                          * If this is the first message after we wrote the stats file the
1773                          * last time, enable the alarm interrupt to make it be written
1774                          * again later.
1775                          */
1776                         if (need_timer)
1777                         {
1778                                 if (setitimer(ITIMER_REAL, &timeout, NULL))
1779                                         ereport(ERROR,
1780                                                   (errmsg("could not set statistics collector timer: %m")));
1781                                 need_timer = false;
1782                         }
1783                 }
1784
1785                 /*
1786                  * Note that we do NOT check for postmaster exit inside the loop; only
1787                  * EOF on the buffer pipe causes us to fall out.  This ensures we
1788                  * don't exit prematurely if there are still a few messages in the
1789                  * buffer or pipe at postmaster shutdown.
1790                  */
1791         }
1792
1793         /*
1794          * Okay, we saw EOF on the buffer pipe, so there are no more messages to
1795          * process.  If the buffer process quit because of postmaster shutdown, we
1796          * want to save the final stats to reuse at next startup. But if the
1797          * buffer process failed, it seems best not to (there may even now be a
1798          * new collector firing up, and we don't want it to read a
1799          * partially-rewritten stats file).
1800          */
1801         if (!PostmasterIsAlive(false))
1802                 pgstat_write_statsfile();
1803 }
1804
1805
1806 /* SIGALRM signal handler for collector process */
1807 static void
1808 force_statwrite(SIGNAL_ARGS)
1809 {
1810         need_statwrite = true;
1811 }
1812
1813
1814 /* ----------
1815  * pgstat_recvbuffer() -
1816  *
1817  *      This is the body of the separate buffering process. Its only
1818  *      purpose is to receive messages from the UDP socket as fast as
1819  *      possible and forward them over a pipe into the collector itself.
1820  *      If the collector is slow to absorb messages, they are buffered here.
1821  * ----------
1822  */
1823 static void
1824 pgstat_recvbuffer(void)
1825 {
1826         fd_set          rfds;
1827         fd_set          wfds;
1828         struct timeval timeout;
1829         int                     writePipe = pgStatPipe[1];
1830         int                     maxfd;
1831         int                     len;
1832         int                     xfr;
1833         int                     frm;
1834         PgStat_Msg      input_buffer;
1835         char       *msgbuffer;
1836         int                     msg_send = 0;   /* next send index in buffer */
1837         int                     msg_recv = 0;   /* next receive index */
1838         int                     msg_have = 0;   /* number of bytes stored */
1839         bool            overflow = false;
1840
1841         /*
1842          * Identify myself via ps
1843          */
1844         init_ps_display("stats buffer process", "", "");
1845         set_ps_display("");
1846
1847         /*
1848          * We want to die if our child collector process does.  There are two ways
1849          * we might notice that it has died: receive SIGCHLD, or get a write
1850          * failure on the pipe leading to the child.  We can set SIGPIPE to kill
1851          * us here.  Our SIGCHLD handler was already set up before we forked (must
1852          * do it that way, else it's a race condition).
1853          */
1854         pqsignal(SIGPIPE, SIG_DFL);
1855         PG_SETMASK(&UnBlockSig);
1856
1857         /*
1858          * Set the write pipe to nonblock mode, so that we cannot block when the
1859          * collector falls behind.
1860          */
1861         if (!pg_set_noblock(writePipe))
1862                 ereport(ERROR,
1863                                 (errcode_for_socket_access(),
1864                                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1865
1866         /*
1867          * Allocate the message buffer
1868          */
1869         msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);
1870
1871         /*
1872          * Loop forever
1873          */
1874         for (;;)
1875         {
1876                 FD_ZERO(&rfds);
1877                 FD_ZERO(&wfds);
1878                 maxfd = -1;
1879
1880                 /*
1881                  * As long as we have buffer space we add the socket to the read
1882                  * descriptor set.
1883                  */
1884                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1885                 {
1886                         FD_SET(pgStatSock, &rfds);
1887                         maxfd = pgStatSock;
1888                         overflow = false;
1889                 }
1890                 else
1891                 {
1892                         if (!overflow)
1893                         {
1894                                 ereport(LOG,
1895                                                 (errmsg("statistics buffer is full")));
1896                                 overflow = true;
1897                         }
1898                 }
1899
1900                 /*
1901                  * If we have messages to write out, we add the pipe to the write
1902                  * descriptor set.
1903                  */
1904                 if (msg_have > 0)
1905                 {
1906                         FD_SET(writePipe, &wfds);
1907                         if (writePipe > maxfd)
1908                                 maxfd = writePipe;
1909                 }
1910
1911                 /*
1912                  * Wait for some work to do; but not for more than 10 seconds. (This
1913                  * determines how quickly we will shut down after an ungraceful
1914                  * postmaster termination; so it needn't be very fast.)
1915                  *
1916                  * struct timeout is modified by select() on some operating systems,
1917                  * so re-fill it each time.
1918                  */
1919                 timeout.tv_sec = 10;
1920                 timeout.tv_usec = 0;
1921
1922                 if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0)
1923                 {
1924                         if (errno == EINTR)
1925                                 continue;
1926                         ereport(ERROR,
1927                                         (errcode_for_socket_access(),
1928                                          errmsg("select() failed in statistics buffer: %m")));
1929                 }
1930
1931                 /*
1932                  * If there is a message on the socket, read it and check for
1933                  * validity.
1934                  */
1935                 if (FD_ISSET(pgStatSock, &rfds))
1936                 {
1937                         len = recv(pgStatSock, (char *) &input_buffer,
1938                                            sizeof(PgStat_Msg), 0);
1939                         if (len < 0)
1940                                 ereport(ERROR,
1941                                                 (errcode_for_socket_access(),
1942                                                  errmsg("could not read statistics message: %m")));
1943
1944                         /*
1945                          * We ignore messages that are smaller than our common header
1946                          */
1947                         if (len < sizeof(PgStat_MsgHdr))
1948                                 continue;
1949
1950                         /*
1951                          * The received length must match the length in the header
1952                          */
1953                         if (input_buffer.msg_hdr.m_size != len)
1954                                 continue;
1955
1956                         /*
1957                          * O.K. - we accept this message.  Copy it to the circular
1958                          * msgbuffer.
1959                          */
1960                         frm = 0;
1961                         while (len > 0)
1962                         {
1963                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
1964                                 if (xfr > len)
1965                                         xfr = len;
1966                                 Assert(xfr > 0);
1967                                 memcpy(msgbuffer + msg_recv,
1968                                            ((char *) &input_buffer) + frm,
1969                                            xfr);
1970                                 msg_recv += xfr;
1971                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
1972                                         msg_recv = 0;
1973                                 msg_have += xfr;
1974                                 frm += xfr;
1975                                 len -= xfr;
1976                         }
1977                 }
1978
1979                 /*
1980                  * If the collector is ready to receive, write some data into his
1981                  * pipe.  We may or may not be able to write all that we have.
1982                  *
1983                  * NOTE: if what we have is less than PIPE_BUF bytes but more than the
1984                  * space available in the pipe buffer, most kernels will refuse to
1985                  * write any of it, and will return EAGAIN.  This means we will
1986                  * busy-loop until the situation changes (either because the collector
1987                  * caught up, or because more data arrives so that we have more than
1988                  * PIPE_BUF bytes buffered).  This is not good, but is there any way
1989                  * around it?  We have no way to tell when the collector has caught
1990                  * up...
1991                  */
1992                 if (FD_ISSET(writePipe, &wfds))
1993                 {
1994                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
1995                         if (xfr > msg_have)
1996                                 xfr = msg_have;
1997                         Assert(xfr > 0);
1998                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1999                         if (len < 0)
2000                         {
2001                                 if (errno == EINTR || errno == EAGAIN)
2002                                         continue;       /* not enough space in pipe */
2003                                 ereport(ERROR,
2004                                                 (errcode_for_socket_access(),
2005                                 errmsg("could not write to statistics collector pipe: %m")));
2006                         }
2007                         /* NB: len < xfr is okay */
2008                         msg_send += len;
2009                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2010                                 msg_send = 0;
2011                         msg_have -= len;
2012                 }
2013
2014                 /*
2015                  * Make sure we forwarded all messages before we check for postmaster
2016                  * termination.
2017                  */
2018                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2019                         continue;
2020
2021                 /*
2022                  * If the postmaster has terminated, we die too.  (This is no longer
2023                  * the normal exit path, however.)
2024                  */
2025                 if (!PostmasterIsAlive(true))
2026                         exit(0);
2027         }
2028 }
2029
2030 /* SIGQUIT signal handler for buffer process */
2031 static void
2032 pgstat_exit(SIGNAL_ARGS)
2033 {
2034         /*
2035          * For now, we just nail the doors shut and get out of town.  It might be
2036          * cleaner to allow any pending messages to be sent, but that creates a
2037          * tradeoff against speed of exit.
2038          */
2039
2040         /*
2041          * If running in bufferer, kill our collector as well. On some broken
2042          * win32 systems, it does not shut down automatically because of issues
2043          * with socket inheritance.  XXX so why not fix the socket inheritance...
2044          */
2045 #ifdef WIN32
2046         if (pgStatCollectorPid > 0)
2047                 kill(pgStatCollectorPid, SIGQUIT);
2048 #endif
2049         exit(0);
2050 }
2051
2052 /* SIGCHLD signal handler for buffer process */
2053 static void
2054 pgstat_die(SIGNAL_ARGS)
2055 {
2056         exit(1);
2057 }
2058
2059
2060 /* ----------
2061  * pgstat_add_backend() -
2062  *
2063  *      Support function to keep our backend list up to date.
2064  * ----------
2065  */
2066 static int
2067 pgstat_add_backend(PgStat_MsgHdr *msg)
2068 {
2069         PgStat_StatBeEntry *beentry;
2070
2071         /*
2072          * Check that the backend ID is valid
2073          */
2074         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2075         {
2076                 ereport(LOG,
2077                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2078                 return -1;
2079         }
2080
2081         /*
2082          * Get the slot for this backendid.
2083          */
2084         beentry = &pgStatBeTable[msg->m_backendid - 1];
2085
2086         /*
2087          * If the slot contains the PID of this backend, everything is fine and we
2088          * have nothing to do. Note that all the slots are zero'd out when the
2089          * collector is started. We assume that a slot is "empty" iff procpid ==
2090          * 0.
2091          */
2092         if (beentry->procpid > 0 && beentry->procpid == msg->m_procpid)
2093                 return 0;
2094
2095         /* Must be able to distinguish between empty and non-empty slots */
2096         Assert(msg->m_procpid > 0);
2097
2098         /*
2099          * Put this new backend into the slot (possibly overwriting an old entry,
2100          * if we missed its BETERM or the BETERM hasn't arrived yet).
2101          */
2102         beentry->procpid = msg->m_procpid;
2103         beentry->start_timestamp = GetCurrentTimestamp();
2104         beentry->activity_start_timestamp = 0;
2105         beentry->activity[0] = '\0';
2106
2107         /*
2108          * We can't initialize the rest of the data in this slot until we see the
2109          * BESTART message. Therefore, we set the database and user to sentinel
2110          * values, to indicate "undefined". There is no easy way to do this for
2111          * the client address, so make sure to check that the database or user are
2112          * defined before accessing the client address.
2113          */
2114         beentry->userid = InvalidOid;
2115         beentry->databaseid = InvalidOid;
2116
2117         return 0;
2118 }
2119
2120 /*
2121  * Lookup the hash table entry for the specified database. If no hash
2122  * table entry exists, initialize it, if the create parameter is true.
2123  * Else, return NULL.
2124  */
2125 static PgStat_StatDBEntry *
2126 pgstat_get_db_entry(Oid databaseid, bool create)
2127 {
2128         PgStat_StatDBEntry *result;
2129         bool            found;
2130         HASHACTION      action = (create ? HASH_ENTER : HASH_FIND);
2131
2132         /* Lookup or create the hash table entry for this database */
2133         result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2134                                                                                                 &databaseid,
2135                                                                                                 action, &found);
2136
2137         if (!create && !found)
2138                 return NULL;
2139
2140         /* If not found, initialize the new one. */
2141         if (!found)
2142         {
2143                 HASHCTL         hash_ctl;
2144
2145                 result->tables = NULL;
2146                 result->n_xact_commit = 0;
2147                 result->n_xact_rollback = 0;
2148                 result->n_blocks_fetched = 0;
2149                 result->n_blocks_hit = 0;
2150                 result->last_autovac_time = 0;
2151
2152                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2153                 hash_ctl.keysize = sizeof(Oid);
2154                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2155                 hash_ctl.hash = oid_hash;
2156                 result->tables = hash_create("Per-database table",
2157                                                                          PGSTAT_TAB_HASH_SIZE,
2158                                                                          &hash_ctl,
2159                                                                          HASH_ELEM | HASH_FUNCTION);
2160         }
2161
2162         return result;
2163 }
2164
2165 /* ----------
2166  * pgstat_sub_backend() -
2167  *
2168  *      Remove a backend from the actual backends list.
2169  * ----------
2170  */
2171 static void
2172 pgstat_sub_backend(int procpid)
2173 {
2174         int                     i;
2175
2176         /*
2177          * Search in the known-backends table for the slot containing this PID.
2178          */
2179         for (i = 0; i < MaxBackends; i++)
2180         {
2181                 if (pgStatBeTable[i].procpid == procpid)
2182                 {
2183                         /*
2184                          * That's him.  Mark the backend slot empty.
2185                          */
2186                         pgStatBeTable[i].procpid = 0;
2187                         return;
2188                 }
2189         }
2190
2191         /*
2192          * No big problem if not found. This can happen if UDP messages arrive out
2193          * of order here.
2194          */
2195 }
2196
2197
2198 /* ----------
2199  * pgstat_write_statsfile() -
2200  *
2201  *      Tell the news.
2202  * ----------
2203  */
2204 static void
2205 pgstat_write_statsfile(void)
2206 {
2207         HASH_SEQ_STATUS hstat;
2208         HASH_SEQ_STATUS tstat;
2209         PgStat_StatDBEntry *dbentry;
2210         PgStat_StatTabEntry *tabentry;
2211         FILE       *fpout;
2212         int                     i;
2213         int32           format_id;
2214
2215         /*
2216          * Open the statistics temp file to write out the current values.
2217          */
2218         fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2219         if (fpout == NULL)
2220         {
2221                 ereport(LOG,
2222                                 (errcode_for_file_access(),
2223                                  errmsg("could not open temporary statistics file \"%s\": %m",
2224                                                 PGSTAT_STAT_TMPFILE)));
2225                 return;
2226         }
2227
2228         /*
2229          * Write the file header --- currently just a format ID.
2230          */
2231         format_id = PGSTAT_FILE_FORMAT_ID;
2232         fwrite(&format_id, sizeof(format_id), 1, fpout);
2233
2234         /*
2235          * Walk through the database table.
2236          */
2237         hash_seq_init(&hstat, pgStatDBHash);
2238         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2239         {
2240                 /*
2241                  * Write out the DB entry including the number of live backends.
2242                  * We don't write the tables pointer since it's of no use to any
2243                  * other process.
2244                  */
2245                 fputc('D', fpout);
2246                 fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
2247
2248                 /*
2249                  * Walk through the database's access stats per table.
2250                  */
2251                 hash_seq_init(&tstat, dbentry->tables);
2252                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2253                 {
2254                         fputc('T', fpout);
2255                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2256                 }
2257
2258                 /*
2259                  * Mark the end of this DB
2260                  */
2261                 fputc('d', fpout);
2262         }
2263
2264         /*
2265          * Write out the known running backends to the stats file.
2266          */
2267         i = MaxBackends;
2268         fputc('M', fpout);
2269         fwrite(&i, sizeof(i), 1, fpout);
2270
2271         for (i = 0; i < MaxBackends; i++)
2272         {
2273                 PgStat_StatBeEntry *beentry = &pgStatBeTable[i];
2274
2275                 if (beentry->procpid > 0)
2276                 {
2277                         int             len;
2278
2279                         len = offsetof(PgStat_StatBeEntry, activity) +
2280                                 strlen(beentry->activity) + 1;
2281                         fputc('B', fpout);
2282                         fwrite(&len, sizeof(len), 1, fpout);
2283                         fwrite(beentry, len, 1, fpout);
2284                 }
2285         }
2286
2287         /*
2288          * No more output to be done. Close the temp file and replace the old
2289          * pgstat.stat with it.  The ferror() check replaces testing for error
2290          * after each individual fputc or fwrite above.
2291          */
2292         fputc('E', fpout);
2293
2294         if (ferror(fpout))
2295         {
2296                 ereport(LOG,
2297                                 (errcode_for_file_access(),
2298                                  errmsg("could not write temporary statistics file \"%s\": %m",
2299                                                 PGSTAT_STAT_TMPFILE)));
2300                 fclose(fpout);
2301                 unlink(PGSTAT_STAT_TMPFILE);
2302         }
2303         else if (fclose(fpout) < 0)
2304         {
2305                 ereport(LOG,
2306                                 (errcode_for_file_access(),
2307                            errmsg("could not close temporary statistics file \"%s\": %m",
2308                                           PGSTAT_STAT_TMPFILE)));
2309                 unlink(PGSTAT_STAT_TMPFILE);
2310         }
2311         else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2312         {
2313                 ereport(LOG,
2314                                 (errcode_for_file_access(),
2315                                  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2316                                                 PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
2317                 unlink(PGSTAT_STAT_TMPFILE);
2318         }
2319 }
2320
2321 /*
2322  * qsort/bsearch comparison routine for PIDs
2323  *
2324  * We assume PIDs are nonnegative, so there's no overflow risk
2325  */
2326 static int
2327 comparePids(const void *v1, const void *v2)
2328 {
2329         return *((const int *) v1) - *((const int *) v2);
2330 }
2331
2332 /* ----------
2333  * pgstat_read_statsfile() -
2334  *
2335  *      Reads in an existing statistics collector and initializes the
2336  *      databases' hash table (whose entries point to the tables' hash tables)
2337  *      and the current backend table.
2338  * ----------
2339  */
2340 static void
2341 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2342                                           PgStat_StatBeEntry **betab, int *numbackends)
2343 {
2344         PgStat_StatDBEntry *dbentry;
2345         PgStat_StatDBEntry dbbuf;
2346         PgStat_StatTabEntry *tabentry;
2347         PgStat_StatTabEntry tabbuf;
2348         PgStat_StatBeEntry *beentry;
2349         HASHCTL         hash_ctl;
2350         HTAB       *tabhash = NULL;
2351         FILE       *fpin;
2352         int32           format_id;
2353         int                     len;
2354         int                     maxbackends = 0;
2355         int                     havebackends = 0;
2356         bool            found;
2357         int                *live_pids;
2358         MemoryContext use_mcxt;
2359         int                     mcxt_flags;
2360
2361         /*
2362          * If running in the collector or the autovacuum process, we use the
2363          * DynaHashCxt memory context.  If running in a backend, we use the
2364          * TopTransactionContext instead, so the caller must only know the last
2365          * XactId when this call happened to know if his tables are still valid or
2366          * already gone!
2367          *
2368          * Also, if running in a regular backend, we check backend entries against
2369          * the PGPROC array so that we can detect stale entries.  This lets us
2370          * discard entries whose BETERM message got lost for some reason.
2371          */
2372         if (pgStatRunningInCollector || IsAutoVacuumProcess())
2373         {
2374                 use_mcxt = NULL;
2375                 mcxt_flags = 0;
2376                 live_pids = NULL;
2377         }
2378         else
2379         {
2380                 use_mcxt = TopTransactionContext;
2381                 mcxt_flags = HASH_CONTEXT;
2382                 live_pids = GetAllBackendPids();
2383                 /* Sort the PID array so we can use bsearch */
2384                 if (live_pids[0] > 1)
2385                         qsort((void *) &live_pids[1], live_pids[0], sizeof(int),
2386                                   comparePids);
2387         }
2388
2389         /*
2390          * Create the DB hashtable
2391          */
2392         memset(&hash_ctl, 0, sizeof(hash_ctl));
2393         hash_ctl.keysize = sizeof(Oid);
2394         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2395         hash_ctl.hash = oid_hash;
2396         hash_ctl.hcxt = use_mcxt;
2397         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2398                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2399
2400         /*
2401          * Initialize the number of known backends to zero, just in case we do a
2402          * silent error return below.
2403          */
2404         if (numbackends != NULL)
2405                 *numbackends = 0;
2406         if (betab != NULL)
2407                 *betab = NULL;
2408
2409         /*
2410          * Try to open the status file. If it doesn't exist, the backends simply
2411          * return zero for anything and the collector simply starts from scratch
2412          * with empty counters.
2413          */
2414         if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
2415                 return;
2416
2417         /*
2418          * Verify it's of the expected format.
2419          */
2420         if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
2421                 || format_id != PGSTAT_FILE_FORMAT_ID)
2422         {
2423                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2424                                 (errmsg("corrupted pgstat.stat file")));
2425                 goto done;
2426         }
2427
2428         /*
2429          * We found an existing collector stats file. Read it and put all the
2430          * hashtable entries into place.
2431          */
2432         for (;;)
2433         {
2434                 switch (fgetc(fpin))
2435                 {
2436                                 /*
2437                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2438                                  * follows. Subsequently, zero to many 'T' entries will follow
2439                                  * until a 'd' is encountered.
2440                                  */
2441                         case 'D':
2442                                 if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
2443                                                   fpin) != offsetof(PgStat_StatDBEntry, tables))
2444                                 {
2445                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2446                                                         (errmsg("corrupted pgstat.stat file")));
2447                                         goto done;
2448                                 }
2449
2450                                 /*
2451                                  * Add to the DB hash
2452                                  */
2453                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2454                                                                                                   (void *) &dbbuf.databaseid,
2455                                                                                                                          HASH_ENTER,
2456                                                                                                                          &found);
2457                                 if (found)
2458                                 {
2459                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2460                                                         (errmsg("corrupted pgstat.stat file")));
2461                                         goto done;
2462                                 }
2463
2464                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2465                                 dbentry->tables = NULL;
2466                                 dbentry->n_backends = 0;
2467
2468                                 /*
2469                                  * Don't collect tables if not the requested DB (or the
2470                                  * shared-table info)
2471                                  */
2472                                 if (onlydb != InvalidOid)
2473                                 {
2474                                         if (dbbuf.databaseid != onlydb &&
2475                                                 dbbuf.databaseid != InvalidOid)
2476                                                 break;
2477                                 }
2478
2479                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2480                                 hash_ctl.keysize = sizeof(Oid);
2481                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2482                                 hash_ctl.hash = oid_hash;
2483                                 hash_ctl.hcxt = use_mcxt;
2484                                 dbentry->tables = hash_create("Per-database table",
2485                                                                                           PGSTAT_TAB_HASH_SIZE,
2486                                                                                           &hash_ctl,
2487                                                                          HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2488
2489                                 /*
2490                                  * Arrange that following 'T's add entries to this database's
2491                                  * tables hash table.
2492                                  */
2493                                 tabhash = dbentry->tables;
2494                                 break;
2495
2496                                 /*
2497                                  * 'd'  End of this database.
2498                                  */
2499                         case 'd':
2500                                 tabhash = NULL;
2501                                 break;
2502
2503                                 /*
2504                                  * 'T'  A PgStat_StatTabEntry follows.
2505                                  */
2506                         case 'T':
2507                                 if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
2508                                                   fpin) != sizeof(PgStat_StatTabEntry))
2509                                 {
2510                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2511                                                         (errmsg("corrupted pgstat.stat file")));
2512                                         goto done;
2513                                 }
2514
2515                                 /*
2516                                  * Skip if table belongs to a not requested database.
2517                                  */
2518                                 if (tabhash == NULL)
2519                                         break;
2520
2521                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2522                                                                                                         (void *) &tabbuf.tableid,
2523                                                                                                                  HASH_ENTER, &found);
2524
2525                                 if (found)
2526                                 {
2527                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2528                                                         (errmsg("corrupted pgstat.stat file")));
2529                                         goto done;
2530                                 }
2531
2532                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2533                                 break;
2534
2535                                 /*
2536                                  * 'M'  The maximum number of backends to expect follows.
2537                                  */
2538                         case 'M':
2539                                 if (betab == NULL || numbackends == NULL)
2540                                         goto done;
2541                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2542                                         sizeof(maxbackends))
2543                                 {
2544                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2545                                                         (errmsg("corrupted pgstat.stat file")));
2546                                         goto done;
2547                                 }
2548                                 if (maxbackends == 0)
2549                                         goto done;
2550
2551                                 /*
2552                                  * Allocate space (in TopTransactionContext too) for the
2553                                  * backend table.
2554                                  */
2555                                 if (use_mcxt == NULL)
2556                                         *betab = (PgStat_StatBeEntry *)
2557                                                 palloc(sizeof(PgStat_StatBeEntry) * maxbackends);
2558                                 else
2559                                         *betab = (PgStat_StatBeEntry *)
2560                                                 MemoryContextAlloc(use_mcxt,
2561                                                                    sizeof(PgStat_StatBeEntry) * maxbackends);
2562                                 break;
2563
2564                                 /*
2565                                  * 'B'  A PgStat_StatBeEntry follows.
2566                                  */
2567                         case 'B':
2568                                 if (betab == NULL || numbackends == NULL || *betab == NULL)
2569                                         goto done;
2570
2571                                 if (havebackends >= maxbackends)
2572                                         goto done;
2573
2574                                 /* Read and validate the entry length */
2575                                 if (fread(&len, 1, sizeof(len), fpin) != sizeof(len))
2576                                 {
2577                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2578                                                         (errmsg("corrupted pgstat.stat file")));
2579                                         goto done;
2580                                 }
2581                                 if (len <= offsetof(PgStat_StatBeEntry, activity) ||
2582                                         len > sizeof(PgStat_StatBeEntry))
2583                                 {
2584                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2585                                                         (errmsg("corrupted pgstat.stat file")));
2586                                         goto done;
2587                                 }
2588
2589                                 /*
2590                                  * Read it directly into the table.
2591                                  */
2592                                 beentry = &(*betab)[havebackends];
2593
2594                                 if (fread(beentry, 1, len, fpin) != len)
2595                                 {
2596                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2597                                                         (errmsg("corrupted pgstat.stat file")));
2598                                         goto done;
2599                                 }
2600
2601                                 /*
2602                                  * If possible, check PID to verify still running
2603                                  */
2604                                 if (live_pids &&
2605                                         (live_pids[0] == 0 ||
2606                                          bsearch((void *) &beentry->procpid,
2607                                                          (void *) &live_pids[1],
2608                                                          live_pids[0],
2609                                                          sizeof(int),
2610                                                          comparePids) == NULL))
2611                                 {
2612                                         /*
2613                                          * Note: we could send a BETERM message to tell the
2614                                          * collector to drop the entry, but I'm a bit worried
2615                                          * about race conditions.  For now, just silently ignore
2616                                          * dead entries; they'll get recycled eventually anyway.
2617                                          */
2618
2619                                         /* Don't accept the entry */
2620                                         memset(beentry, 0, sizeof(PgStat_StatBeEntry));
2621                                         break;
2622                                 }
2623
2624                                 /*
2625                                  * Count backends per database here.
2626                                  */
2627                                 dbentry = (PgStat_StatDBEntry *)
2628                                         hash_search(*dbhash,
2629                                                                 &(beentry->databaseid),
2630                                                                 HASH_FIND,
2631                                                                 NULL);
2632                                 if (dbentry)
2633                                         dbentry->n_backends++;
2634
2635                                 havebackends++;
2636                                 *numbackends = havebackends;
2637
2638                                 break;
2639
2640                                 /*
2641                                  * 'E'  The EOF marker of a complete stats file.
2642                                  */
2643                         case 'E':
2644                                 goto done;
2645
2646                         default:
2647                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2648                                                 (errmsg("corrupted pgstat.stat file")));
2649                                 goto done;
2650                 }
2651         }
2652
2653 done:
2654         FreeFile(fpin);
2655 }
2656
2657 /*
2658  * If not done for this transaction, read the statistics collector
2659  * stats file into some hash tables.
2660  *
2661  * Because we store the hash tables in TopTransactionContext, the result
2662  * is good for the entire current main transaction.
2663  *
2664  * Inside the autovacuum process, the statfile is assumed to be valid
2665  * "forever", that is one iteration, within one database.  This means
2666  * we only consider the statistics as they were when the autovacuum
2667  * iteration started.
2668  */
2669 static void
2670 backend_read_statsfile(void)
2671 {
2672         if (IsAutoVacuumProcess())
2673         {
2674                 /* already read it? */
2675                 if (pgStatDBHash)
2676                         return;
2677                 Assert(!pgStatRunningInCollector);
2678                 pgstat_read_statsfile(&pgStatDBHash, InvalidOid,
2679                                                           &pgStatBeTable, &pgStatNumBackends);
2680         }
2681         else
2682         {
2683                 TransactionId topXid = GetTopTransactionId();
2684
2685                 if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2686                 {
2687                         Assert(!pgStatRunningInCollector);
2688                         pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2689                                                                   &pgStatBeTable, &pgStatNumBackends);
2690                         pgStatDBHashXact = topXid;
2691                 }
2692         }
2693 }
2694
2695
2696 /* ----------
2697  * pgstat_recv_bestart() -
2698  *
2699  *      Process a backend startup message.
2700  * ----------
2701  */
2702 static void
2703 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2704 {
2705         PgStat_StatBeEntry *entry;
2706
2707         /*
2708          * If the backend is known dead, we ignore the message -- we don't want to
2709          * update the backend entry's state since this BESTART message refers to
2710          * an old, dead backend
2711          */
2712         if (pgstat_add_backend(&msg->m_hdr) != 0)
2713                 return;
2714
2715         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2716         entry->userid = msg->m_userid;
2717         memcpy(&entry->clientaddr, &msg->m_clientaddr, sizeof(entry->clientaddr));
2718         entry->databaseid = msg->m_databaseid;
2719 }
2720
2721
2722 /* ----------
2723  * pgstat_recv_beterm() -
2724  *
2725  *      Process a backend termination message.
2726  * ----------
2727  */
2728 static void
2729 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2730 {
2731         pgstat_sub_backend(msg->m_hdr.m_procpid);
2732 }
2733
2734 /* ----------
2735  * pgstat_recv_autovac() -
2736  *
2737  *      Process an autovacuum signalling message.
2738  * ----------
2739  */
2740 static void
2741 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
2742 {
2743         PgStat_StatDBEntry *dbentry;
2744
2745         /*
2746          * Lookup the database in the hashtable.  Don't create the entry if it
2747          * doesn't exist, because autovacuum may be processing a template
2748          * database.  If this isn't the case, the database is most likely to have
2749          * an entry already.  (If it doesn't, not much harm is done anyway --
2750          * it'll get created as soon as somebody actually uses the database.)
2751          */
2752         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2753         if (dbentry == NULL)
2754                 return;
2755
2756         /*
2757          * Store the last autovacuum time in the database entry.
2758          */
2759         dbentry->last_autovac_time = msg->m_start_time;
2760 }
2761
2762 /* ----------
2763  * pgstat_recv_vacuum() -
2764  *
2765  *      Process a VACUUM message.
2766  * ----------
2767  */
2768 static void
2769 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
2770 {
2771         PgStat_StatDBEntry *dbentry;
2772         PgStat_StatTabEntry *tabentry;
2773
2774         /*
2775          * Don't create either the database or table entry if it doesn't already
2776          * exist.  This avoids bloating the stats with entries for stuff that is
2777          * only touched by vacuum and not by live operations.
2778          */
2779         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2780         if (dbentry == NULL)
2781                 return;
2782
2783         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2784                                                    HASH_FIND, NULL);
2785         if (tabentry == NULL)
2786                 return;
2787
2788         tabentry->n_live_tuples = msg->m_tuples;
2789         tabentry->n_dead_tuples = 0;
2790         if (msg->m_analyze)
2791                 tabentry->last_anl_tuples = msg->m_tuples;
2792 }
2793
2794 /* ----------
2795  * pgstat_recv_analyze() -
2796  *
2797  *      Process an ANALYZE message.
2798  * ----------
2799  */
2800 static void
2801 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
2802 {
2803         PgStat_StatDBEntry *dbentry;
2804         PgStat_StatTabEntry *tabentry;
2805
2806         /*
2807          * Don't create either the database or table entry if it doesn't already
2808          * exist.  This avoids bloating the stats with entries for stuff that is
2809          * only touched by analyze and not by live operations.
2810          */
2811         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2812         if (dbentry == NULL)
2813                 return;
2814
2815         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2816                                                    HASH_FIND, NULL);
2817         if (tabentry == NULL)
2818                 return;
2819
2820         tabentry->n_live_tuples = msg->m_live_tuples;
2821         tabentry->n_dead_tuples = msg->m_dead_tuples;
2822         tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
2823 }
2824
2825 /* ----------
2826  * pgstat_recv_activity() -
2827  *
2828  *      Remember what the backend is doing.
2829  * ----------
2830  */
2831 static void
2832 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2833 {
2834         PgStat_StatBeEntry *entry;
2835
2836         /*
2837          * Here we check explicitly for 0 return, since we don't want to mangle
2838          * the activity of an active backend by a delayed packet from a dead one.
2839          */
2840         if (pgstat_add_backend(&msg->m_hdr) != 0)
2841                 return;
2842
2843         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2844
2845         StrNCpy(entry->activity, msg->m_cmd_str, PGSTAT_ACTIVITY_SIZE);
2846
2847         entry->activity_start_timestamp = GetCurrentTimestamp();
2848 }
2849
2850
2851 /* ----------
2852  * pgstat_recv_tabstat() -
2853  *
2854  *      Count what the backend has done.
2855  * ----------
2856  */
2857 static void
2858 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2859 {
2860         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2861         PgStat_StatDBEntry *dbentry;
2862         PgStat_StatTabEntry *tabentry;
2863         int                     i;
2864         bool            found;
2865
2866         /*
2867          * Make sure the backend is counted for.
2868          */
2869         if (pgstat_add_backend(&msg->m_hdr) < 0)
2870                 return;
2871
2872         dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
2873
2874         /*
2875          * Update database-wide stats.
2876          */
2877         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2878         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2879
2880         /*
2881          * Process all table entries in the message.
2882          */
2883         for (i = 0; i < msg->m_nentries; i++)
2884         {
2885                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2886                                                                                                   (void *) &(tabmsg[i].t_id),
2887                                                                                                            HASH_ENTER, &found);
2888
2889                 if (!found)
2890                 {
2891                         /*
2892                          * If it's a new table entry, initialize counters to the values we
2893                          * just got.
2894                          */
2895                         tabentry->numscans = tabmsg[i].t_numscans;
2896                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2897                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
2898                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
2899                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
2900                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
2901
2902                         tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
2903                         tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
2904                                 tabmsg[i].t_tuples_deleted;
2905                         tabentry->last_anl_tuples = 0;
2906
2907                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
2908                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2909                 }
2910                 else
2911                 {
2912                         /*
2913                          * Otherwise add the values to the existing entry.
2914                          */
2915                         tabentry->numscans += tabmsg[i].t_numscans;
2916                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
2917                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
2918                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
2919                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
2920                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
2921
2922                         tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted;
2923                         tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
2924                                 tabmsg[i].t_tuples_deleted;
2925
2926                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
2927                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2928                 }
2929
2930                 /*
2931                  * And add the block IO to the database entry.
2932                  */
2933                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
2934                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2935         }
2936 }
2937
2938
2939 /* ----------
2940  * pgstat_recv_tabpurge() -
2941  *
2942  *      Arrange for dead table removal.
2943  * ----------
2944  */
2945 static void
2946 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
2947 {
2948         PgStat_StatDBEntry *dbentry;
2949         int                     i;
2950
2951         /*
2952          * Make sure the backend is counted for.
2953          */
2954         if (pgstat_add_backend(&msg->m_hdr) < 0)
2955                 return;
2956
2957         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2958
2959         /*
2960          * No need to purge if we don't even know the database.
2961          */
2962         if (!dbentry || !dbentry->tables)
2963                 return;
2964
2965         /*
2966          * Process all table entries in the message.
2967          */
2968         for (i = 0; i < msg->m_nentries; i++)
2969         {
2970                 /* Remove from hashtable if present; we don't care if it's not. */
2971                 (void) hash_search(dbentry->tables,
2972                                                    (void *) &(msg->m_tableid[i]),
2973                                                    HASH_REMOVE, NULL);
2974         }
2975 }
2976
2977
2978 /* ----------
2979  * pgstat_recv_dropdb() -
2980  *
2981  *      Arrange for dead database removal
2982  * ----------
2983  */
2984 static void
2985 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
2986 {
2987         PgStat_StatDBEntry *dbentry;
2988
2989         /*
2990          * Make sure the backend is counted for.
2991          */
2992         if (pgstat_add_backend(&msg->m_hdr) < 0)
2993                 return;
2994
2995         /*
2996          * Lookup the database in the hashtable.
2997          */
2998         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2999
3000         /*
3001          * If found, remove it.
3002          */
3003         if (dbentry)
3004         {
3005                 if (dbentry->tables != NULL)
3006                         hash_destroy(dbentry->tables);
3007
3008                 if (hash_search(pgStatDBHash,
3009                                                 (void *) &(dbentry->databaseid),
3010                                                 HASH_REMOVE, NULL) == NULL)
3011                         ereport(ERROR,
3012                                         (errmsg("database hash table corrupted "
3013                                                         "during cleanup --- abort")));
3014         }
3015 }
3016
3017
3018 /* ----------
3019  * pgstat_recv_resetcounter() -
3020  *
3021  *      Reset the statistics for the specified database.
3022  * ----------
3023  */
3024 static void
3025 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3026 {
3027         HASHCTL         hash_ctl;
3028         PgStat_StatDBEntry *dbentry;
3029
3030         /*
3031          * Make sure the backend is counted for.
3032          */
3033         if (pgstat_add_backend(&msg->m_hdr) < 0)
3034                 return;
3035
3036         /*
3037          * Lookup the database in the hashtable.  Nothing to do if not there.
3038          */
3039         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3040
3041         if (!dbentry)
3042                 return;
3043
3044         /*
3045          * We simply throw away all the database's table entries by recreating a
3046          * new hash table for them.
3047          */
3048         if (dbentry->tables != NULL)
3049                 hash_destroy(dbentry->tables);
3050
3051         dbentry->tables = NULL;
3052         dbentry->n_xact_commit = 0;
3053         dbentry->n_xact_rollback = 0;
3054         dbentry->n_blocks_fetched = 0;
3055         dbentry->n_blocks_hit = 0;
3056
3057         memset(&hash_ctl, 0, sizeof(hash_ctl));
3058         hash_ctl.keysize = sizeof(Oid);
3059         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3060         hash_ctl.hash = oid_hash;
3061         dbentry->tables = hash_create("Per-database table",
3062                                                                   PGSTAT_TAB_HASH_SIZE,
3063                                                                   &hash_ctl,
3064                                                                   HASH_ELEM | HASH_FUNCTION);
3065 }