]> granicus.if.org Git - postgresql/blob - src/backend/postmaster/pgstat.c
Delay write of pg_stats file to once every five minutes, during
[postgresql] / src / backend / postmaster / pgstat.c
1 /* ----------
2  * pgstat.c
3  *
4  *      All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *      TODO:   - Separate collector, postmaster and backend stuff
7  *                        into different files.
8  *
9  *                      - Add some automatic call for pgstat vacuuming.
10  *
11  *                      - Add a pgstat config column to pg_database, so this
12  *                        entire thing can be enabled/disabled on a per db basis.
13  *
14  *      Copyright (c) 2001-2006, PostgreSQL Global Development Group
15  *
16  *      $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.124 2006/04/27 00:06:58 momjian Exp $
17  * ----------
18  */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #include <sys/stat.h>
32
33 #include "pgstat.h"
34
35 #include "access/heapam.h"
36 #include "access/xact.h"
37 #include "catalog/pg_database.h"
38 #include "libpq/libpq.h"
39 #include "libpq/pqsignal.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "postmaster/autovacuum.h"
43 #include "postmaster/fork_process.h"
44 #include "postmaster/postmaster.h"
45 #include "storage/backendid.h"
46 #include "storage/fd.h"
47 #include "storage/ipc.h"
48 #include "storage/pg_shmem.h"
49 #include "storage/pmsignal.h"
50 #include "storage/procarray.h"
51 #include "tcop/tcopprot.h"
52 #include "utils/hsearch.h"
53 #include "utils/memutils.h"
54 #include "utils/ps_status.h"
55 #include "utils/rel.h"
56 #include "utils/syscache.h"
57
58
59 /* ----------
60  * Paths for the statistics files (relative to installation's $PGDATA).
61  * ----------
62  */
63 #define PGSTAT_STAT_FILENAME    "global/pgstat.stat"
64 #define PGSTAT_STAT_TMPFILE             "global/pgstat.tmp"
65
66 /* ----------
67  * Timer definitions.
68  * ----------
69  */
70
71 /* How often to write the status file, in milliseconds. */
72 #define PGSTAT_STAT_INTERVAL    (5*60*1000)
73
74 /*
75  *      How often to attempt to restart a failed statistics collector; in ms.
76  *      Must be at least PGSTAT_STAT_INTERVAL.
77  */
78 #define PGSTAT_RESTART_INTERVAL (5*60*1000)
79
80 /* ----------
81  * Amount of space reserved in pgstat_recvbuffer().
82  * ----------
83  */
84 #define PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
85
86 /* ----------
87  * The initial size hints for the hash tables used in the collector.
88  * ----------
89  */
90 #define PGSTAT_DB_HASH_SIZE             16
91 #define PGSTAT_BE_HASH_SIZE             512
92 #define PGSTAT_TAB_HASH_SIZE    512
93
94
95 /* ----------
96  * GUC parameters
97  * ----------
98  */
99 bool            pgstat_collect_startcollector = true;
100 bool            pgstat_collect_resetonpmstart = false;
101 bool            pgstat_collect_querystring = false;
102 bool            pgstat_collect_tuplelevel = false;
103 bool            pgstat_collect_blocklevel = false;
104
105 /* ----------
106  * Local data
107  * ----------
108  */
109 NON_EXEC_STATIC int pgStatSock = -1;
110 NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1};
111 static struct sockaddr_storage pgStatAddr;
112 static pid_t pgStatCollectorPid = 0;
113
114 static time_t last_pgstat_start_time;
115
116 static long pgStatNumMessages = 0;
117
118 static bool pgStatRunningInCollector = false;
119
120 /*
121  * Place where backends store per-table info to be sent to the collector.
122  * We store shared relations separately from non-shared ones, to be able to
123  * send them in separate messages.
124  */
125 typedef struct TabStatArray
126 {
127         int                     tsa_alloc;              /* num allocated */
128         int                     tsa_used;               /* num actually used */
129         PgStat_MsgTabstat **tsa_messages;       /* the array itself */
130 } TabStatArray;
131
132 #define TABSTAT_QUANTUM         4       /* we alloc this many at a time */
133
134 static TabStatArray RegularTabStat = {0, 0, NULL};
135 static TabStatArray SharedTabStat = {0, 0, NULL};
136
137 static int      pgStatXactCommit = 0;
138 static int      pgStatXactRollback = 0;
139
140 static TransactionId pgStatDBHashXact = InvalidTransactionId;
141 static HTAB *pgStatDBHash = NULL;
142 static PgStat_StatBeEntry *pgStatBeTable = NULL;
143 static int      pgStatNumBackends = 0;
144
145 static volatile bool    need_statwrite;
146
147
148 /* ----------
149  * Local function forward declarations
150  * ----------
151  */
152 #ifdef EXEC_BACKEND
153
154 typedef enum STATS_PROCESS_TYPE
155 {
156         STAT_PROC_BUFFER,
157         STAT_PROC_COLLECTOR
158 }       STATS_PROCESS_TYPE;
159
160 static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
161 static void pgstat_parseArgs(int argc, char *argv[]);
162 #endif
163
164 NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
165 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
166 static void force_statwrite(SIGNAL_ARGS);
167 static void pgstat_recvbuffer(void);
168 static void pgstat_exit(SIGNAL_ARGS);
169 static void pgstat_die(SIGNAL_ARGS);
170 static void pgstat_beshutdown_hook(int code, Datum arg);
171
172 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
173 static int      pgstat_add_backend(PgStat_MsgHdr *msg);
174 static void pgstat_sub_backend(int procpid);
175 static void pgstat_drop_database(Oid databaseid);
176 static void pgstat_write_statsfile(void);
177 static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
178                                           PgStat_StatBeEntry **betab,
179                                           int *numbackends, bool rewrite);
180 static void backend_read_statsfile(void);
181
182 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
183 static void pgstat_send(void *msg, int len);
184 static void pgstat_send_rewrite(void);
185
186 static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
187 static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
188 static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
189 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
190 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
191 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
192 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
193 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
194 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
195 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
196
197
198 /* ------------------------------------------------------------
199  * Public functions called from postmaster follow
200  * ------------------------------------------------------------
201  */
202
203 /* ----------
204  * pgstat_init() -
205  *
206  *      Called from postmaster at startup. Create the resources required
207  *      by the statistics collector process.  If unable to do so, do not
208  *      fail --- better to let the postmaster start with stats collection
209  *      disabled.
210  * ----------
211  */
212 void
213 pgstat_init(void)
214 {
215         ACCEPT_TYPE_ARG3 alen;
216         struct addrinfo *addrs = NULL,
217                            *addr,
218                                 hints;
219         int                     ret;
220         fd_set          rset;
221         struct timeval tv;
222         char            test_byte;
223         int                     sel_res;
224         int                     tries = 0;
225         
226 #define TESTBYTEVAL ((char) 199)
227
228         /*
229          * Force start of collector daemon if something to collect
230          */
231         if (pgstat_collect_querystring ||
232                 pgstat_collect_tuplelevel ||
233                 pgstat_collect_blocklevel)
234                 pgstat_collect_startcollector = true;
235
236         /*
237          * If we don't have to start a collector or should reset the collected
238          * statistics on postmaster start, simply remove the stats file.
239          */
240         if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
241                 pgstat_reset_all();
242
243         /*
244          * Nothing else required if collector will not get started
245          */
246         if (!pgstat_collect_startcollector)
247                 return;
248
249         /*
250          * Create the UDP socket for sending and receiving statistic messages
251          */
252         hints.ai_flags = AI_PASSIVE;
253         hints.ai_family = PF_UNSPEC;
254         hints.ai_socktype = SOCK_DGRAM;
255         hints.ai_protocol = 0;
256         hints.ai_addrlen = 0;
257         hints.ai_addr = NULL;
258         hints.ai_canonname = NULL;
259         hints.ai_next = NULL;
260         ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
261         if (ret || !addrs)
262         {
263                 ereport(LOG,
264                                 (errmsg("could not resolve \"localhost\": %s",
265                                                 gai_strerror(ret))));
266                 goto startup_failed;
267         }
268
269         /*
270          * On some platforms, pg_getaddrinfo_all() may return multiple addresses
271          * only one of which will actually work (eg, both IPv6 and IPv4 addresses
272          * when kernel will reject IPv6).  Worse, the failure may occur at the
273          * bind() or perhaps even connect() stage.      So we must loop through the
274          * results till we find a working combination. We will generate LOG
275          * messages, but no error, for bogus combinations.
276          */
277         for (addr = addrs; addr; addr = addr->ai_next)
278         {
279 #ifdef HAVE_UNIX_SOCKETS
280                 /* Ignore AF_UNIX sockets, if any are returned. */
281                 if (addr->ai_family == AF_UNIX)
282                         continue;
283 #endif
284
285                 if (++tries > 1)
286                         ereport(LOG,
287                                 (errmsg("trying another address for the statistics collector")));
288                 
289                 /*
290                  * Create the socket.
291                  */
292                 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
293                 {
294                         ereport(LOG,
295                                         (errcode_for_socket_access(),
296                         errmsg("could not create socket for statistics collector: %m")));
297                         continue;
298                 }
299
300                 /*
301                  * Bind it to a kernel assigned port on localhost and get the assigned
302                  * port via getsockname().
303                  */
304                 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
305                 {
306                         ereport(LOG,
307                                         (errcode_for_socket_access(),
308                           errmsg("could not bind socket for statistics collector: %m")));
309                         closesocket(pgStatSock);
310                         pgStatSock = -1;
311                         continue;
312                 }
313
314                 alen = sizeof(pgStatAddr);
315                 if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
316                 {
317                         ereport(LOG,
318                                         (errcode_for_socket_access(),
319                                          errmsg("could not get address of socket for statistics collector: %m")));
320                         closesocket(pgStatSock);
321                         pgStatSock = -1;
322                         continue;
323                 }
324
325                 /*
326                  * Connect the socket to its own address.  This saves a few cycles by
327                  * not having to respecify the target address on every send. This also
328                  * provides a kernel-level check that only packets from this same
329                  * address will be received.
330                  */
331                 if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
332                 {
333                         ereport(LOG,
334                                         (errcode_for_socket_access(),
335                         errmsg("could not connect socket for statistics collector: %m")));
336                         closesocket(pgStatSock);
337                         pgStatSock = -1;
338                         continue;
339                 }
340
341                 /*
342                  * Try to send and receive a one-byte test message on the socket. This
343                  * is to catch situations where the socket can be created but will not
344                  * actually pass data (for instance, because kernel packet filtering
345                  * rules prevent it).
346                  */
347                 test_byte = TESTBYTEVAL;
348                 if (send(pgStatSock, &test_byte, 1, 0) != 1)
349                 {
350                         ereport(LOG,
351                                         (errcode_for_socket_access(),
352                                          errmsg("could not send test message on socket for statistics collector: %m")));
353                         closesocket(pgStatSock);
354                         pgStatSock = -1;
355                         continue;
356                 }
357
358                 /*
359                  * There could possibly be a little delay before the message can be
360                  * received.  We arbitrarily allow up to half a second before deciding
361                  * it's broken.
362                  */
363                 for (;;)                                /* need a loop to handle EINTR */
364                 {
365                         FD_ZERO(&rset);
366                         FD_SET(pgStatSock, &rset);
367                         tv.tv_sec = 0;
368                         tv.tv_usec = 500000;
369                         sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
370                         if (sel_res >= 0 || errno != EINTR)
371                                 break;
372                 }
373                 if (sel_res < 0)
374                 {
375                         ereport(LOG,
376                                         (errcode_for_socket_access(),
377                                          errmsg("select() failed in statistics collector: %m")));
378                         closesocket(pgStatSock);
379                         pgStatSock = -1;
380                         continue;
381                 }
382                 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
383                 {
384                         /*
385                          * This is the case we actually think is likely, so take pains to
386                          * give a specific message for it.
387                          *
388                          * errno will not be set meaningfully here, so don't use it.
389                          */
390                         ereport(LOG,
391                                         (errcode(ERRCODE_CONNECTION_FAILURE),
392                                          errmsg("test message did not get through on socket for statistics collector")));
393                         closesocket(pgStatSock);
394                         pgStatSock = -1;
395                         continue;
396                 }
397
398                 test_byte++;                    /* just make sure variable is changed */
399
400                 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
401                 {
402                         ereport(LOG,
403                                         (errcode_for_socket_access(),
404                                          errmsg("could not receive test message on socket for statistics collector: %m")));
405                         closesocket(pgStatSock);
406                         pgStatSock = -1;
407                         continue;
408                 }
409
410                 if (test_byte != TESTBYTEVAL)   /* strictly paranoia ... */
411                 {
412                         ereport(LOG,
413                                         (errcode(ERRCODE_INTERNAL_ERROR),
414                                          errmsg("incorrect test message transmission on socket for statistics collector")));
415                         closesocket(pgStatSock);
416                         pgStatSock = -1;
417                         continue;
418                 }
419
420                 /* If we get here, we have a working socket */
421                 break;
422         }
423
424         /* Did we find a working address? */
425         if (!addr || pgStatSock < 0)
426                 goto startup_failed;
427
428         /*
429          * Set the socket to non-blocking IO.  This ensures that if the collector
430          * falls behind (despite the buffering process), statistics messages will
431          * be discarded; backends won't block waiting to send messages to the
432          * collector.
433          */
434         if (!pg_set_noblock(pgStatSock))
435         {
436                 ereport(LOG,
437                                 (errcode_for_socket_access(),
438                                  errmsg("could not set statistics collector socket to nonblocking mode: %m")));
439                 goto startup_failed;
440         }
441
442         pg_freeaddrinfo_all(hints.ai_family, addrs);
443
444         return;
445
446 startup_failed:
447         ereport(LOG,
448           (errmsg("disabling statistics collector for lack of working socket")));
449
450         if (addrs)
451                 pg_freeaddrinfo_all(hints.ai_family, addrs);
452
453         if (pgStatSock >= 0)
454                 closesocket(pgStatSock);
455         pgStatSock = -1;
456
457         /* Adjust GUC variables to suppress useless activity */
458         pgstat_collect_startcollector = false;
459         pgstat_collect_querystring = false;
460         pgstat_collect_tuplelevel = false;
461         pgstat_collect_blocklevel = false;
462 }
463
464 /*
465  * pgstat_reset_all() -
466  *
467  * Remove the stats file.  This is used on server start if the
468  * stats_reset_on_server_start feature is enabled, or if WAL
469  * recovery is needed after a crash.
470  */
471 void
472 pgstat_reset_all(void)
473 {
474         unlink(PGSTAT_STAT_FILENAME);
475 }
476
477 #ifdef EXEC_BACKEND
478
479 /*
480  * pgstat_forkexec() -
481  *
482  * Format up the arglist for, then fork and exec, statistics
483  * (buffer and collector) processes
484  */
485 static pid_t
486 pgstat_forkexec(STATS_PROCESS_TYPE procType)
487 {
488         char       *av[10];
489         int                     ac = 0,
490                                 bufc = 0,
491                                 i;
492         char            pgstatBuf[2][32];
493
494         av[ac++] = "postgres";
495
496         switch (procType)
497         {
498                 case STAT_PROC_BUFFER:
499                         av[ac++] = "-forkbuf";
500                         break;
501
502                 case STAT_PROC_COLLECTOR:
503                         av[ac++] = "-forkcol";
504                         break;
505
506                 default:
507                         Assert(false);
508         }
509
510         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
511
512         /* postgres_exec_path is not passed by write_backend_variables */
513         av[ac++] = postgres_exec_path;
514
515         /* Add to the arg list */
516         Assert(bufc <= lengthof(pgstatBuf));
517         for (i = 0; i < bufc; i++)
518                 av[ac++] = pgstatBuf[i];
519
520         av[ac] = NULL;
521         Assert(ac < lengthof(av));
522
523         return postmaster_forkexec(ac, av);
524 }
525
526
527 /*
528  * pgstat_parseArgs() -
529  *
530  * Extract data from the arglist for exec'ed statistics
531  * (buffer and collector) processes
532  */
533 static void
534 pgstat_parseArgs(int argc, char *argv[])
535 {
536         Assert(argc == 4);
537
538         argc = 3;
539         StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
540 }
541 #endif   /* EXEC_BACKEND */
542
543
544 /* ----------
545  * pgstat_start() -
546  *
547  *      Called from postmaster at startup or after an existing collector
548  *      died.  Attempt to fire up a fresh statistics collector.
549  *
550  *      Returns PID of child process, or 0 if fail.
551  *
552  *      Note: if fail, we will be called again from the postmaster main loop.
553  * ----------
554  */
555 int
556 pgstat_start(void)
557 {
558         time_t          curtime;
559         pid_t           pgStatPid;
560
561         /*
562          * Do nothing if no collector needed
563          */
564         if (!pgstat_collect_startcollector)
565                 return 0;
566
567         /*
568          * Do nothing if too soon since last collector start.  This is a safety
569          * valve to protect against continuous respawn attempts if the collector
570          * is dying immediately at launch.      Note that since we will be re-called
571          * from the postmaster main loop, we will get another chance later.
572          */
573         curtime = time(NULL);
574         if ((unsigned int) (curtime - last_pgstat_start_time) <
575                 (unsigned int) PGSTAT_RESTART_INTERVAL)
576                 return 0;
577         last_pgstat_start_time = curtime;
578
579         /*
580          * Check that the socket is there, else pgstat_init failed.
581          */
582         if (pgStatSock < 0)
583         {
584                 ereport(LOG,
585                                 (errmsg("statistics collector startup skipped")));
586
587                 /*
588                  * We can only get here if someone tries to manually turn
589                  * pgstat_collect_startcollector on after it had been off.
590                  */
591                 pgstat_collect_startcollector = false;
592                 return 0;
593         }
594
595         /*
596          * Okay, fork off the collector.
597          */
598 #ifdef EXEC_BACKEND
599         switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
600 #else
601         switch ((pgStatPid = fork_process()))
602 #endif
603         {
604                 case -1:
605                         ereport(LOG,
606                                         (errmsg("could not fork statistics buffer: %m")));
607                         return 0;
608
609 #ifndef EXEC_BACKEND
610                 case 0:
611                         /* in postmaster child ... */
612                         /* Close the postmaster's sockets */
613                         ClosePostmasterPorts(false);
614
615                         /* Lose the postmaster's on-exit routines */
616                         on_exit_reset();
617
618                         /* Drop our connection to postmaster's shared memory, as well */
619                         PGSharedMemoryDetach();
620
621                         PgstatBufferMain(0, NULL);
622                         break;
623 #endif
624
625                 default:
626                         return (int) pgStatPid;
627         }
628
629         /* shouldn't get here */
630         return 0;
631 }
632
633
634 /* ----------
635  * pgstat_beterm() -
636  *
637  *      Called from postmaster to tell collector a backend terminated.
638  * ----------
639  */
640 void
641 pgstat_beterm(int pid)
642 {
643         PgStat_MsgBeterm msg;
644
645         if (pgStatSock < 0)
646                 return;
647
648         /* can't use pgstat_setheader() because it's not called in a backend */
649         MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
650         msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
651         msg.m_hdr.m_procpid = pid;
652
653         pgstat_send(&msg, sizeof(msg));
654 }
655
656
657 /* ----------
658  * pgstat_report_autovac() -
659  *
660  *      Called from autovacuum.c to report startup of an autovacuum process.
661  *      We are called before InitPostgres is done, so can't rely on MyDatabaseId;
662  *      the db OID must be passed in, instead.
663  * ----------
664  */
665 void
666 pgstat_report_autovac(Oid dboid)
667 {
668         PgStat_MsgAutovacStart msg;
669
670         if (pgStatSock < 0)
671                 return;
672
673         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
674         msg.m_databaseid = dboid;
675         msg.m_start_time = GetCurrentTimestamp();
676
677         pgstat_send(&msg, sizeof(msg));
678 }
679
680 /* ------------------------------------------------------------
681  * Public functions used by backends follow
682  *------------------------------------------------------------
683  */
684
685
686 /* ----------
687  * pgstat_bestart() -
688  *
689  *      Tell the collector that this new backend is soon ready to process
690  *      queries. Called from InitPostgres.
691  * ----------
692  */
693 void
694 pgstat_bestart(void)
695 {
696         PgStat_MsgBestart msg;
697
698         if (pgStatSock < 0)
699                 return;
700
701         /*
702          * We may not have a MyProcPort (eg, if this is the autovacuum process).
703          * For the moment, punt and don't send BESTART --- would be better to work
704          * out a clean way of handling "unknown clientaddr".
705          */
706         if (MyProcPort)
707         {
708                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
709                 msg.m_databaseid = MyDatabaseId;
710                 msg.m_userid = GetSessionUserId();
711                 memcpy(&msg.m_clientaddr, &MyProcPort->raddr, sizeof(msg.m_clientaddr));
712                 pgstat_send(&msg, sizeof(msg));
713         }
714
715         /*
716          * Set up a process-exit hook to ensure we flush the last batch of
717          * statistics to the collector.
718          */
719         on_shmem_exit(pgstat_beshutdown_hook, 0);
720 }
721
722 /* ---------
723  * pgstat_report_vacuum() -
724  *
725  *      Tell the collector about the table we just vacuumed.
726  * ---------
727  */
728 void
729 pgstat_report_vacuum(Oid tableoid, bool shared,
730                                          bool analyze, PgStat_Counter tuples)
731 {
732         PgStat_MsgVacuum msg;
733
734         if (pgStatSock < 0 ||
735                 !pgstat_collect_tuplelevel)
736                 return;
737
738         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
739         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
740         msg.m_tableoid = tableoid;
741         msg.m_analyze = analyze;
742         msg.m_tuples = tuples;
743         pgstat_send(&msg, sizeof(msg));
744 }
745
746 /* --------
747  * pgstat_report_analyze() -
748  *
749  *      Tell the collector about the table we just analyzed.
750  * --------
751  */
752 void
753 pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
754                                           PgStat_Counter deadtuples)
755 {
756         PgStat_MsgAnalyze msg;
757
758         if (pgStatSock < 0 ||
759                 !pgstat_collect_tuplelevel)
760                 return;
761
762         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
763         msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
764         msg.m_tableoid = tableoid;
765         msg.m_live_tuples = livetuples;
766         msg.m_dead_tuples = deadtuples;
767         pgstat_send(&msg, sizeof(msg));
768 }
769
770 /*
771  * Flush any remaining statistics counts out to the collector at process
772  * exit.   Without this, operations triggered during backend exit (such as
773  * temp table deletions) won't be counted.
774  */
775 static void
776 pgstat_beshutdown_hook(int code, Datum arg)
777 {
778         pgstat_report_tabstat();
779 }
780
781
782 /* ----------
783  * pgstat_report_activity() -
784  *
785  *      Called from tcop/postgres.c to tell the collector what the backend
786  *      is actually doing (usually "<IDLE>" or the start of the query to
787  *      be executed).
788  * ----------
789  */
790 void
791 pgstat_report_activity(const char *cmd_str)
792 {
793         PgStat_MsgActivity msg;
794         int                     len;
795
796         if (!pgstat_collect_querystring || pgStatSock < 0)
797                 return;
798
799         len = strlen(cmd_str);
800         len = pg_mbcliplen(cmd_str, len, PGSTAT_ACTIVITY_SIZE - 1);
801
802         memcpy(msg.m_cmd_str, cmd_str, len);
803         msg.m_cmd_str[len] = '\0';
804         len += offsetof(PgStat_MsgActivity, m_cmd_str) + 1;
805
806         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
807         pgstat_send(&msg, len);
808 }
809
810
811 /* ----------
812  * pgstat_report_tabstat() -
813  *
814  *      Called from tcop/postgres.c to send the so far collected
815  *      per table access statistics to the collector.
816  * ----------
817  */
818 void
819 pgstat_report_tabstat(void)
820 {
821         int                     i;
822
823         if (pgStatSock < 0 ||
824                 (!pgstat_collect_querystring &&
825                  !pgstat_collect_tuplelevel &&
826                  !pgstat_collect_blocklevel))
827         {
828                 /* Not reporting stats, so just flush whatever we have */
829                 RegularTabStat.tsa_used = 0;
830                 SharedTabStat.tsa_used = 0;
831                 return;
832         }
833
834         /*
835          * For each message buffer used during the last query set the header
836          * fields and send it out.
837          */
838         for (i = 0; i < RegularTabStat.tsa_used; i++)
839         {
840                 PgStat_MsgTabstat *tsmsg = RegularTabStat.tsa_messages[i];
841                 int                     n;
842                 int                     len;
843
844                 n = tsmsg->m_nentries;
845                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
846                         n * sizeof(PgStat_TableEntry);
847
848                 tsmsg->m_xact_commit = pgStatXactCommit;
849                 tsmsg->m_xact_rollback = pgStatXactRollback;
850                 pgStatXactCommit = 0;
851                 pgStatXactRollback = 0;
852
853                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
854                 tsmsg->m_databaseid = MyDatabaseId;
855                 pgstat_send(tsmsg, len);
856         }
857         RegularTabStat.tsa_used = 0;
858
859         /* Ditto, for shared relations */
860         for (i = 0; i < SharedTabStat.tsa_used; i++)
861         {
862                 PgStat_MsgTabstat *tsmsg = SharedTabStat.tsa_messages[i];
863                 int                     n;
864                 int                     len;
865
866                 n = tsmsg->m_nentries;
867                 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
868                         n * sizeof(PgStat_TableEntry);
869
870                 /* We don't report transaction commit/abort here */
871                 tsmsg->m_xact_commit = 0;
872                 tsmsg->m_xact_rollback = 0;
873
874                 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
875                 tsmsg->m_databaseid = InvalidOid;
876                 pgstat_send(tsmsg, len);
877         }
878         SharedTabStat.tsa_used = 0;
879 }
880
881
882 /* ----------
883  * pgstat_vacuum_tabstat() -
884  *
885  *      Will tell the collector about objects he can get rid of.
886  * ----------
887  */
888 void
889 pgstat_vacuum_tabstat(void)
890 {
891         List       *oidlist;
892         Relation        rel;
893         HeapScanDesc scan;
894         HeapTuple       tup;
895         PgStat_MsgTabpurge msg;
896         HASH_SEQ_STATUS hstat;
897         PgStat_StatDBEntry *dbentry;
898         PgStat_StatTabEntry *tabentry;
899         int                     len;
900
901         if (pgStatSock < 0)
902                 return;
903
904         /*
905          * If not done for this transaction, read the statistics collector stats
906          * file into some hash tables.
907          */
908         backend_read_statsfile();
909
910         /*
911          * Read pg_database and make a list of OIDs of all existing databases
912          */
913         oidlist = NIL;
914         rel = heap_open(DatabaseRelationId, AccessShareLock);
915         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
916         while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
917         {
918                 oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
919         }
920         heap_endscan(scan);
921         heap_close(rel, AccessShareLock);
922
923         /*
924          * Search the database hash table for dead databases and tell the
925          * collector to drop them.
926          */
927         hash_seq_init(&hstat, pgStatDBHash);
928         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
929         {
930                 Oid                     dbid = dbentry->databaseid;
931
932                 if (!list_member_oid(oidlist, dbid))
933                         pgstat_drop_database(dbid);
934         }
935
936         /* Clean up */
937         list_free(oidlist);
938
939         /*
940          * Lookup our own database entry; if not found, nothing more to do.
941          */
942         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
943                                                                                                  (void *) &MyDatabaseId,
944                                                                                                  HASH_FIND, NULL);
945         if (dbentry == NULL || dbentry->tables == NULL)
946                 return;
947
948         /*
949          * Similarly to above, make a list of all known relations in this DB.
950          */
951         oidlist = NIL;
952         rel = heap_open(RelationRelationId, AccessShareLock);
953         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
954         while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
955         {
956                 oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
957         }
958         heap_endscan(scan);
959         heap_close(rel, AccessShareLock);
960
961         /*
962          * Initialize our messages table counter to zero
963          */
964         msg.m_nentries = 0;
965
966         /*
967          * Check for all tables listed in stats hashtable if they still exist.
968          */
969         hash_seq_init(&hstat, dbentry->tables);
970         while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
971         {
972                 if (list_member_oid(oidlist, tabentry->tableid))
973                         continue;
974
975                 /*
976                  * Not there, so add this table's Oid to the message
977                  */
978                 msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
979
980                 /*
981                  * If the message is full, send it out and reinitialize to empty
982                  */
983                 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
984                 {
985                         len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
986                                 +msg.m_nentries * sizeof(Oid);
987
988                         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
989                         msg.m_databaseid = MyDatabaseId;
990                         pgstat_send(&msg, len);
991
992                         msg.m_nentries = 0;
993                 }
994         }
995
996         /*
997          * Send the rest
998          */
999         if (msg.m_nentries > 0)
1000         {
1001                 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1002                         +msg.m_nentries * sizeof(Oid);
1003
1004                 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1005                 msg.m_databaseid = MyDatabaseId;
1006                 pgstat_send(&msg, len);
1007         }
1008
1009         /* Clean up */
1010         list_free(oidlist);
1011 }
1012
1013
1014 /* ----------
1015  * pgstat_drop_database() -
1016  *
1017  *      Tell the collector that we just dropped a database.
1018  *      (If the message gets lost, we will still clean the dead DB eventually
1019  *      via future invocations of pgstat_vacuum_tabstat().)
1020  * ----------
1021  */
1022 static void
1023 pgstat_drop_database(Oid databaseid)
1024 {
1025         PgStat_MsgDropdb msg;
1026
1027         if (pgStatSock < 0)
1028                 return;
1029
1030         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1031         msg.m_databaseid = databaseid;
1032         pgstat_send(&msg, sizeof(msg));
1033 }
1034
1035
1036 /* ----------
1037  * pgstat_drop_relation() -
1038  *
1039  *      Tell the collector that we just dropped a relation.
1040  *      (If the message gets lost, we will still clean the dead entry eventually
1041  *      via future invocations of pgstat_vacuum_tabstat().)
1042  * ----------
1043  */
1044 void
1045 pgstat_drop_relation(Oid relid)
1046 {
1047         PgStat_MsgTabpurge msg;
1048         int                     len;
1049
1050         if (pgStatSock < 0)
1051                 return;
1052
1053         msg.m_tableid[0] = relid;
1054         msg.m_nentries = 1;
1055
1056         len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1057
1058         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1059         msg.m_databaseid = MyDatabaseId;
1060         pgstat_send(&msg, len);
1061 }
1062
1063
1064 /* ----------
1065  * pgstat_reset_counters() -
1066  *
1067  *      Tell the statistics collector to reset counters for our database.
1068  * ----------
1069  */
1070 void
1071 pgstat_reset_counters(void)
1072 {
1073         PgStat_MsgResetcounter msg;
1074
1075         if (pgStatSock < 0)
1076                 return;
1077
1078         if (!superuser())
1079                 ereport(ERROR,
1080                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1081                                  errmsg("must be superuser to reset statistics counters")));
1082
1083         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1084         msg.m_databaseid = MyDatabaseId;
1085         pgstat_send(&msg, sizeof(msg));
1086 }
1087
1088
1089 /* ----------
1090  * pgstat_ping() -
1091  *
1092  *      Send some junk data to the collector to increase traffic.
1093  * ----------
1094  */
1095 void
1096 pgstat_ping(void)
1097 {
1098         PgStat_MsgDummy msg;
1099
1100         if (pgStatSock < 0)
1101                 return;
1102
1103         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1104         pgstat_send(&msg, sizeof(msg));
1105 }
1106
1107 /*
1108  * Enlarge a TabStatArray
1109  */
1110 static void
1111 more_tabstat_space(TabStatArray *tsarr)
1112 {
1113         PgStat_MsgTabstat *newMessages;
1114         PgStat_MsgTabstat **msgArray;
1115         int                     newAlloc;
1116         int                     i;
1117
1118         AssertArg(PointerIsValid(tsarr));
1119
1120         newAlloc = tsarr->tsa_alloc + TABSTAT_QUANTUM;
1121
1122         /* Create (another) quantum of message buffers */
1123         newMessages = (PgStat_MsgTabstat *)
1124                 MemoryContextAllocZero(TopMemoryContext,
1125                                                            sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1126
1127         /* Create or enlarge the pointer array */
1128         if (tsarr->tsa_messages == NULL)
1129                 msgArray = (PgStat_MsgTabstat **)
1130                         MemoryContextAlloc(TopMemoryContext,
1131                                                            sizeof(PgStat_MsgTabstat *) * newAlloc);
1132         else
1133                 msgArray = (PgStat_MsgTabstat **)
1134                         repalloc(tsarr->tsa_messages,
1135                                          sizeof(PgStat_MsgTabstat *) * newAlloc);
1136
1137         for (i = 0; i < TABSTAT_QUANTUM; i++)
1138                 msgArray[tsarr->tsa_alloc + i] = newMessages++;
1139         tsarr->tsa_messages = msgArray;
1140         tsarr->tsa_alloc = newAlloc;
1141
1142         Assert(tsarr->tsa_used < tsarr->tsa_alloc);
1143 }
1144
1145 /* ----------
1146  * pgstat_initstats() -
1147  *
1148  *      Called from various places usually dealing with initialization
1149  *      of Relation or Scan structures. The data placed into these
1150  *      structures from here tell where later to count for buffer reads,
1151  *      scans and tuples fetched.
1152  * ----------
1153  */
1154 void
1155 pgstat_initstats(PgStat_Info *stats, Relation rel)
1156 {
1157         Oid                     rel_id = rel->rd_id;
1158         PgStat_TableEntry *useent;
1159         TabStatArray *tsarr;
1160         PgStat_MsgTabstat *tsmsg;
1161         int                     mb;
1162         int                     i;
1163
1164         /*
1165          * Initialize data not to count at all.
1166          */
1167         stats->tabentry = NULL;
1168
1169         if (pgStatSock < 0 ||
1170                 !(pgstat_collect_tuplelevel ||
1171                   pgstat_collect_blocklevel))
1172                 return;
1173
1174         tsarr = rel->rd_rel->relisshared ? &SharedTabStat : &RegularTabStat;
1175
1176         /*
1177          * Search the already-used message slots for this relation.
1178          */
1179         for (mb = 0; mb < tsarr->tsa_used; mb++)
1180         {
1181                 tsmsg = tsarr->tsa_messages[mb];
1182
1183                 for (i = tsmsg->m_nentries; --i >= 0;)
1184                 {
1185                         if (tsmsg->m_entry[i].t_id == rel_id)
1186                         {
1187                                 stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1188                                 return;
1189                         }
1190                 }
1191
1192                 if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1193                         continue;
1194
1195                 /*
1196                  * Not found, but found a message buffer with an empty slot instead.
1197                  * Fine, let's use this one.
1198                  */
1199                 i = tsmsg->m_nentries++;
1200                 useent = &tsmsg->m_entry[i];
1201                 MemSet(useent, 0, sizeof(PgStat_TableEntry));
1202                 useent->t_id = rel_id;
1203                 stats->tabentry = (void *) useent;
1204                 return;
1205         }
1206
1207         /*
1208          * If we ran out of message buffers, we just allocate more.
1209          */
1210         if (tsarr->tsa_used >= tsarr->tsa_alloc)
1211                 more_tabstat_space(tsarr);
1212
1213         /*
1214          * Use the first entry of the next message buffer.
1215          */
1216         mb = tsarr->tsa_used++;
1217         tsmsg = tsarr->tsa_messages[mb];
1218         tsmsg->m_nentries = 1;
1219         useent = &tsmsg->m_entry[0];
1220         MemSet(useent, 0, sizeof(PgStat_TableEntry));
1221         useent->t_id = rel_id;
1222         stats->tabentry = (void *) useent;
1223 }
1224
1225
1226 /* ----------
1227  * pgstat_count_xact_commit() -
1228  *
1229  *      Called from access/transam/xact.c to count transaction commits.
1230  * ----------
1231  */
1232 void
1233 pgstat_count_xact_commit(void)
1234 {
1235         if      (!pgstat_collect_querystring &&
1236                  !pgstat_collect_tuplelevel &&
1237                  !pgstat_collect_blocklevel)
1238                 return;
1239
1240         pgStatXactCommit++;
1241
1242         /*
1243          * If there was no relation activity yet, just make one existing message
1244          * buffer used without slots, causing the next report to tell new
1245          * xact-counters.
1246          */
1247         if (RegularTabStat.tsa_alloc == 0)
1248                 more_tabstat_space(&RegularTabStat);
1249
1250         if (RegularTabStat.tsa_used == 0)
1251         {
1252                 RegularTabStat.tsa_used++;
1253                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1254         }
1255 }
1256
1257
1258 /* ----------
1259  * pgstat_count_xact_rollback() -
1260  *
1261  *      Called from access/transam/xact.c to count transaction rollbacks.
1262  * ----------
1263  */
1264 void
1265 pgstat_count_xact_rollback(void)
1266 {
1267         if      (!pgstat_collect_querystring &&
1268                  !pgstat_collect_tuplelevel &&
1269                  !pgstat_collect_blocklevel)
1270                 return;
1271
1272         pgStatXactRollback++;
1273
1274         /*
1275          * If there was no relation activity yet, just make one existing message
1276          * buffer used without slots, causing the next report to tell new
1277          * xact-counters.
1278          */
1279         if (RegularTabStat.tsa_alloc == 0)
1280                 more_tabstat_space(&RegularTabStat);
1281
1282         if (RegularTabStat.tsa_used == 0)
1283         {
1284                 RegularTabStat.tsa_used++;
1285                 RegularTabStat.tsa_messages[0]->m_nentries = 0;
1286         }
1287 }
1288
1289
1290 /* ----------
1291  * pgstat_fetch_stat_dbentry() -
1292  *
1293  *      Support function for the SQL-callable pgstat* functions. Returns
1294  *      the collected statistics for one database or NULL. NULL doesn't mean
1295  *      that the database doesn't exist, it is just not yet known by the
1296  *      collector, so the caller is better off to report ZERO instead.
1297  * ----------
1298  */
1299 PgStat_StatDBEntry *
1300 pgstat_fetch_stat_dbentry(Oid dbid)
1301 {
1302         /*
1303          * If not done for this transaction, read the statistics collector stats
1304          * file into some hash tables.
1305          */
1306         backend_read_statsfile();
1307
1308         /*
1309          * Lookup the requested database; return NULL if not found
1310          */
1311         return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1312                                                                                           (void *) &dbid,
1313                                                                                           HASH_FIND, NULL);
1314 }
1315
1316
1317 /* ----------
1318  * pgstat_fetch_stat_tabentry() -
1319  *
1320  *      Support function for the SQL-callable pgstat* functions. Returns
1321  *      the collected statistics for one table or NULL. NULL doesn't mean
1322  *      that the table doesn't exist, it is just not yet known by the
1323  *      collector, so the caller is better off to report ZERO instead.
1324  * ----------
1325  */
1326 PgStat_StatTabEntry *
1327 pgstat_fetch_stat_tabentry(Oid relid)
1328 {
1329         Oid                     dbid;
1330         PgStat_StatDBEntry *dbentry;
1331         PgStat_StatTabEntry *tabentry;
1332
1333         /*
1334          * If not done for this transaction, read the statistics collector stats
1335          * file into some hash tables.
1336          */
1337         backend_read_statsfile();
1338
1339         /*
1340          * Lookup our database, then look in its table hash table.
1341          */
1342         dbid = MyDatabaseId;
1343         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1344                                                                                                  (void *) &dbid,
1345                                                                                                  HASH_FIND, NULL);
1346         if (dbentry != NULL && dbentry->tables != NULL)
1347         {
1348                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1349                                                                                                            (void *) &relid,
1350                                                                                                            HASH_FIND, NULL);
1351                 if (tabentry)
1352                         return tabentry;
1353         }
1354
1355         /*
1356          * If we didn't find it, maybe it's a shared table.
1357          */
1358         dbid = InvalidOid;
1359         dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1360                                                                                                  (void *) &dbid,
1361                                                                                                  HASH_FIND, NULL);
1362         if (dbentry != NULL && dbentry->tables != NULL)
1363         {
1364                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
1365                                                                                                            (void *) &relid,
1366                                                                                                            HASH_FIND, NULL);
1367                 if (tabentry)
1368                         return tabentry;
1369         }
1370
1371         return NULL;
1372 }
1373
1374
1375 /* ----------
1376  * pgstat_fetch_stat_beentry() -
1377  *
1378  *      Support function for the SQL-callable pgstat* functions. Returns
1379  *      the actual activity slot of one active backend. The caller is
1380  *      responsible for a check if the actual user is permitted to see
1381  *      that info (especially the querystring).
1382  * ----------
1383  */
1384 PgStat_StatBeEntry *
1385 pgstat_fetch_stat_beentry(int beid)
1386 {
1387         backend_read_statsfile();
1388
1389         if (beid < 1 || beid > pgStatNumBackends)
1390                 return NULL;
1391
1392         return &pgStatBeTable[beid - 1];
1393 }
1394
1395
1396 /* ----------
1397  * pgstat_fetch_stat_numbackends() -
1398  *
1399  *      Support function for the SQL-callable pgstat* functions. Returns
1400  *      the maximum current backend id.
1401  * ----------
1402  */
1403 int
1404 pgstat_fetch_stat_numbackends(void)
1405 {
1406         backend_read_statsfile();
1407
1408         return pgStatNumBackends;
1409 }
1410
1411
1412
1413 /* ------------------------------------------------------------
1414  * Local support functions follow
1415  * ------------------------------------------------------------
1416  */
1417
1418
1419 /* ----------
1420  * pgstat_setheader() -
1421  *
1422  *              Set common header fields in a statistics message
1423  * ----------
1424  */
1425 static void
1426 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
1427 {
1428         hdr->m_type = mtype;
1429         hdr->m_backendid = MyBackendId;
1430         hdr->m_procpid = MyProcPid;
1431 }
1432
1433
1434 /* ----------
1435  * pgstat_send() -
1436  *
1437  *              Send out one statistics message to the collector
1438  * ----------
1439  */
1440 static void
1441 pgstat_send(void *msg, int len)
1442 {
1443         if (pgStatSock < 0)
1444                 return;
1445
1446         ((PgStat_MsgHdr *) msg)->m_size = len;
1447
1448 #ifdef USE_ASSERT_CHECKING
1449         if (send(pgStatSock, msg, len, 0) < 0)
1450                 elog(LOG, "could not send to statistics collector: %m");
1451 #else
1452         send(pgStatSock, msg, len, 0);
1453         /* We deliberately ignore any error from send() */
1454 #endif
1455 }
1456
1457 /*
1458  * pgstat_send_rewrite() -
1459  *
1460  *      Send a command to the collector to rewrite the stats file.
1461  * ----------
1462  */
1463 static void
1464 pgstat_send_rewrite(void)
1465 {
1466     PgStat_MsgRewrite msg;
1467
1468         if (pgStatSock < 0)
1469                 return;
1470
1471         pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REWRITE);
1472         pgstat_send(&msg, sizeof(msg));
1473 }
1474
1475
1476 /* ----------
1477  * PgstatBufferMain() -
1478  *
1479  *      Start up the statistics buffer process.  This is the body of the
1480  *      postmaster child process.
1481  *
1482  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1483  * ----------
1484  */
1485 NON_EXEC_STATIC void
1486 PgstatBufferMain(int argc, char *argv[])
1487 {
1488         IsUnderPostmaster = true;       /* we are a postmaster subprocess now */
1489
1490         MyProcPid = getpid();           /* reset MyProcPid */
1491
1492         /*
1493          * Ignore all signals usually bound to some action in the postmaster,
1494          * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1495          */
1496         pqsignal(SIGHUP, SIG_IGN);
1497         pqsignal(SIGINT, SIG_IGN);
1498         pqsignal(SIGTERM, SIG_IGN);
1499         pqsignal(SIGQUIT, pgstat_exit);
1500         pqsignal(SIGALRM, SIG_IGN);
1501         pqsignal(SIGPIPE, SIG_IGN);
1502         pqsignal(SIGUSR1, SIG_IGN);
1503         pqsignal(SIGUSR2, SIG_IGN);
1504         pqsignal(SIGCHLD, pgstat_die);
1505         pqsignal(SIGTTIN, SIG_DFL);
1506         pqsignal(SIGTTOU, SIG_DFL);
1507         pqsignal(SIGCONT, SIG_DFL);
1508         pqsignal(SIGWINCH, SIG_DFL);
1509         /* unblock will happen in pgstat_recvbuffer */
1510
1511 #ifdef EXEC_BACKEND
1512         pgstat_parseArgs(argc, argv);
1513 #endif
1514
1515         /*
1516          * Start a buffering process to read from the socket, so we have a little
1517          * more time to process incoming messages.
1518          *
1519          * NOTE: the process structure is: postmaster is parent of buffer process
1520          * is parent of collector process.      This way, the buffer can detect
1521          * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1522          * collector failure until it tried to write on the pipe.  That would mean
1523          * that after the postmaster started a new collector, we'd have two buffer
1524          * processes competing to read from the UDP socket --- not good.
1525          */
1526         if (pgpipe(pgStatPipe) < 0)
1527                 ereport(ERROR,
1528                                 (errcode_for_socket_access(),
1529                                  errmsg("could not create pipe for statistics buffer: %m")));
1530
1531         /* child becomes collector process */
1532 #ifdef EXEC_BACKEND
1533         pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR);
1534 #else
1535         pgStatCollectorPid = fork();
1536 #endif
1537         switch (pgStatCollectorPid)
1538         {
1539                 case -1:
1540                         ereport(ERROR,
1541                                         (errmsg("could not fork statistics collector: %m")));
1542
1543 #ifndef EXEC_BACKEND
1544                 case 0:
1545                         /* child becomes collector process */
1546                         PgstatCollectorMain(0, NULL);
1547                         break;
1548 #endif
1549
1550                 default:
1551                         /* parent becomes buffer process */
1552                         closesocket(pgStatPipe[0]);
1553                         pgstat_recvbuffer();
1554         }
1555         exit(0);
1556 }
1557
1558
1559 /* ----------
1560  * PgstatCollectorMain() -
1561  *
1562  *      Start up the statistics collector itself.  This is the body of the
1563  *      postmaster grandchild process.
1564  *
1565  *      The argc/argv parameters are valid only in EXEC_BACKEND case.
1566  * ----------
1567  */
1568 NON_EXEC_STATIC void
1569 PgstatCollectorMain(int argc, char *argv[])
1570 {
1571         PgStat_Msg      msg;
1572         fd_set          rfds;
1573         int                     readPipe;
1574         int                     len = 0;
1575         struct itimerval timeout, canceltimeout;
1576         bool            need_timer = false;
1577
1578         MyProcPid = getpid();           /* reset MyProcPid */
1579
1580         /*
1581          * Reset signal handling.  With the exception of restoring default SIGCHLD
1582          * and SIGQUIT handling, this is a no-op in the non-EXEC_BACKEND case
1583          * because we'll have inherited these settings from the buffer process;
1584          * but it's not a no-op for EXEC_BACKEND.
1585          */
1586         pqsignal(SIGHUP, SIG_IGN);
1587         pqsignal(SIGINT, SIG_IGN);
1588         pqsignal(SIGTERM, SIG_IGN);
1589 #ifndef WIN32
1590         pqsignal(SIGQUIT, SIG_IGN);
1591 #else
1592         /* kluge to allow buffer process to kill collector; FIXME */
1593         pqsignal(SIGQUIT, pgstat_exit);
1594 #endif
1595         pqsignal(SIGALRM, force_statwrite);
1596         pqsignal(SIGPIPE, SIG_IGN);
1597         pqsignal(SIGUSR1, SIG_IGN);
1598         pqsignal(SIGUSR2, SIG_IGN);
1599         pqsignal(SIGCHLD, SIG_DFL);
1600         pqsignal(SIGTTIN, SIG_DFL);
1601         pqsignal(SIGTTOU, SIG_DFL);
1602         pqsignal(SIGCONT, SIG_DFL);
1603         pqsignal(SIGWINCH, SIG_DFL);
1604         PG_SETMASK(&UnBlockSig);
1605
1606 #ifdef EXEC_BACKEND
1607         pgstat_parseArgs(argc, argv);
1608 #endif
1609
1610         /* Close unwanted files */
1611         closesocket(pgStatPipe[1]);
1612         closesocket(pgStatSock);
1613
1614         /*
1615          * Identify myself via ps
1616          */
1617         init_ps_display("stats collector process", "", "");
1618         set_ps_display("");
1619
1620         /*
1621          * Arrange to write the initial status file right away
1622          */
1623         need_statwrite = true;
1624
1625         /* Preset the delay between status file writes */
1626         MemSet(&timeout, 0, sizeof(struct itimerval));
1627         timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
1628         timeout.it_value.tv_usec = PGSTAT_STAT_INTERVAL % 1000;
1629
1630         /* Values set to zero will cancel the active timer */
1631         MemSet(&canceltimeout, 0, sizeof(struct itimerval));
1632
1633         /*
1634          * Read in an existing statistics stats file or initialize the stats to
1635          * zero.
1636          */
1637         pgStatRunningInCollector = true;
1638         pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL, false);
1639
1640         /*
1641          * Create the known backends table
1642          */
1643         pgStatBeTable = (PgStat_StatBeEntry *)
1644                 palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends);
1645
1646         readPipe = pgStatPipe[0];
1647
1648         /*
1649          * Process incoming messages and handle all the reporting stuff until
1650          * there are no more messages.
1651          */
1652         for (;;)
1653         {
1654                 /*
1655                  * If time to write the stats file, do so.  Note that the alarm
1656                  * interrupt isn't re-enabled immediately, but only after we next
1657                  * receive a stats message; so no cycles are wasted when there is
1658                  * nothing going on.
1659                  */
1660                 if (need_statwrite)
1661                 {
1662                         pgstat_write_statsfile();
1663                         need_statwrite = false;
1664                         need_timer = true;
1665                 }
1666
1667                 /*
1668                  * Setup the descriptor set for select(2)
1669                  */
1670                 FD_ZERO(&rfds);
1671                 FD_SET(readPipe, &rfds);
1672
1673                 /*
1674                  * Now wait for something to do.
1675                  */
1676                 if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0)
1677                 {
1678                         if (errno == EINTR)
1679                                 continue;
1680                         ereport(ERROR,
1681                                         (errcode_for_socket_access(),
1682                                          errmsg("select() failed in statistics collector: %m")));
1683                 }
1684
1685                 /*
1686                  * Check if there is a new statistics message to collect.
1687                  */
1688                 if (FD_ISSET(readPipe, &rfds))
1689                 {
1690                         /*
1691                          * We may need to issue multiple read calls in case the buffer
1692                          * process didn't write the message in a single write, which is
1693                          * possible since it dumps its buffer bytewise. In any case, we'd
1694                          * need two reads since we don't know the message length
1695                          * initially.
1696                          */
1697                         int                     nread = 0;
1698                         int                     targetlen = sizeof(PgStat_MsgHdr);              /* initial */
1699                         bool            pipeEOF = false;
1700
1701                         while (nread < targetlen)
1702                         {
1703                                 len = piperead(readPipe, ((char *) &msg) + nread,
1704                                                            targetlen - nread);
1705                                 if (len < 0)
1706                                 {
1707                                         if (errno == EINTR)
1708                                                 continue;
1709                                         ereport(ERROR,
1710                                                         (errcode_for_socket_access(),
1711                                                          errmsg("could not read from statistics collector pipe: %m")));
1712                                 }
1713                                 if (len == 0)   /* EOF on the pipe! */
1714                                 {
1715                                         pipeEOF = true;
1716                                         break;
1717                                 }
1718                                 nread += len;
1719                                 if (nread == sizeof(PgStat_MsgHdr))
1720                                 {
1721                                         /* we have the header, compute actual msg length */
1722                                         targetlen = msg.msg_hdr.m_size;
1723                                         if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
1724                                                 targetlen > (int) sizeof(msg))
1725                                         {
1726                                                 /*
1727                                                  * Bogus message length implies that we got out of
1728                                                  * sync with the buffer process somehow. Abort so that
1729                                                  * we can restart both processes.
1730                                                  */
1731                                                 ereport(ERROR,
1732                                                           (errmsg("invalid statistics message length")));
1733                                         }
1734                                 }
1735                         }
1736
1737                         /*
1738                          * EOF on the pipe implies that the buffer process exited. Fall
1739                          * out of outer loop.
1740                          */
1741                         if (pipeEOF)
1742                                 break;
1743
1744                         /*
1745                          * Distribute the message to the specific function handling it.
1746                          */
1747                         switch (msg.msg_hdr.m_type)
1748                         {
1749                                 case PGSTAT_MTYPE_DUMMY:
1750                                         break;
1751
1752                                 case PGSTAT_MTYPE_BESTART:
1753                                         pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1754                                         break;
1755
1756                                 case PGSTAT_MTYPE_BETERM:
1757                                         pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1758                                         break;
1759
1760                                 case PGSTAT_MTYPE_TABSTAT:
1761                                         pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1762                                         break;
1763
1764                                 case PGSTAT_MTYPE_TABPURGE:
1765                                         pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1766                                         break;
1767
1768                                 case PGSTAT_MTYPE_ACTIVITY:
1769                                         pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1770                                         break;
1771
1772                                 case PGSTAT_MTYPE_DROPDB:
1773                                         pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1774                                         break;
1775
1776                                 case PGSTAT_MTYPE_RESETCOUNTER:
1777                                         pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1778                                                                                          nread);
1779                                         break;
1780
1781                                 case PGSTAT_MTYPE_AUTOVAC_START:
1782                                         pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread);
1783                                         break;
1784
1785                                 case PGSTAT_MTYPE_VACUUM:
1786                                         pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread);
1787                                         break;
1788
1789                                 case PGSTAT_MTYPE_ANALYZE:
1790                                         pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread);
1791                                         break;
1792
1793                                 case PGSTAT_MTYPE_REWRITE:
1794                                         need_statwrite = true;
1795                                         /* Disable the timer - it will be restarted on next data update */
1796                                         setitimer(ITIMER_REAL, &canceltimeout, NULL);
1797                                         break;
1798
1799                                 default:
1800                                         break;
1801                         }
1802
1803                         /*
1804                          * Globally count messages.
1805                          */
1806                         pgStatNumMessages++;
1807
1808                         /*
1809                          * If this is the first message after we wrote the stats file the
1810                          * last time, enable the alarm interrupt to make it be written
1811                          * again later.
1812                          */
1813                         if (need_timer)
1814                         {
1815                                 if (setitimer(ITIMER_REAL, &timeout, NULL))
1816                                         ereport(ERROR,
1817                                                   (errmsg("could not set statistics collector timer: %m")));
1818                                 need_timer = false;
1819                         }
1820                 }
1821
1822                 /*
1823                  * Note that we do NOT check for postmaster exit inside the loop; only
1824                  * EOF on the buffer pipe causes us to fall out.  This ensures we
1825                  * don't exit prematurely if there are still a few messages in the
1826                  * buffer or pipe at postmaster shutdown.
1827                  */
1828         }
1829
1830         /*
1831          * Okay, we saw EOF on the buffer pipe, so there are no more messages to
1832          * process.  If the buffer process quit because of postmaster shutdown, we
1833          * want to save the final stats to reuse at next startup. But if the
1834          * buffer process failed, it seems best not to (there may even now be a
1835          * new collector firing up, and we don't want it to read a
1836          * partially-rewritten stats file).
1837          */
1838         if (!PostmasterIsAlive(false))
1839                 pgstat_write_statsfile();
1840 }
1841
1842
1843 /* SIGALRM signal handler for collector process */
1844 static void
1845 force_statwrite(SIGNAL_ARGS)
1846 {
1847         need_statwrite = true;
1848 }
1849
1850
1851 /* ----------
1852  * pgstat_recvbuffer() -
1853  *
1854  *      This is the body of the separate buffering process. Its only
1855  *      purpose is to receive messages from the UDP socket as fast as
1856  *      possible and forward them over a pipe into the collector itself.
1857  *      If the collector is slow to absorb messages, they are buffered here.
1858  * ----------
1859  */
1860 static void
1861 pgstat_recvbuffer(void)
1862 {
1863         fd_set          rfds;
1864         fd_set          wfds;
1865         struct timeval timeout;
1866         int                     writePipe = pgStatPipe[1];
1867         int                     maxfd;
1868         int                     len;
1869         int                     xfr;
1870         int                     frm;
1871         PgStat_Msg      input_buffer;
1872         char       *msgbuffer;
1873         int                     msg_send = 0;   /* next send index in buffer */
1874         int                     msg_recv = 0;   /* next receive index */
1875         int                     msg_have = 0;   /* number of bytes stored */
1876         bool            overflow = false;
1877
1878         /*
1879          * Identify myself via ps
1880          */
1881         init_ps_display("stats buffer process", "", "");
1882         set_ps_display("");
1883
1884         /*
1885          * We want to die if our child collector process does.  There are two ways
1886          * we might notice that it has died: receive SIGCHLD, or get a write
1887          * failure on the pipe leading to the child.  We can set SIGPIPE to kill
1888          * us here.  Our SIGCHLD handler was already set up before we forked (must
1889          * do it that way, else it's a race condition).
1890          */
1891         pqsignal(SIGPIPE, SIG_DFL);
1892         PG_SETMASK(&UnBlockSig);
1893
1894         /*
1895          * Set the write pipe to nonblock mode, so that we cannot block when the
1896          * collector falls behind.
1897          */
1898         if (!pg_set_noblock(writePipe))
1899                 ereport(ERROR,
1900                                 (errcode_for_socket_access(),
1901                                  errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1902
1903         /*
1904          * Allocate the message buffer
1905          */
1906         msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);
1907
1908         /*
1909          * Loop forever
1910          */
1911         for (;;)
1912         {
1913                 FD_ZERO(&rfds);
1914                 FD_ZERO(&wfds);
1915                 maxfd = -1;
1916
1917                 /*
1918                  * As long as we have buffer space we add the socket to the read
1919                  * descriptor set.
1920                  */
1921                 if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1922                 {
1923                         FD_SET(pgStatSock, &rfds);
1924                         maxfd = pgStatSock;
1925                         overflow = false;
1926                 }
1927                 else
1928                 {
1929                         if (!overflow)
1930                         {
1931                                 ereport(LOG,
1932                                                 (errmsg("statistics buffer is full")));
1933                                 overflow = true;
1934                         }
1935                 }
1936
1937                 /*
1938                  * If we have messages to write out, we add the pipe to the write
1939                  * descriptor set.
1940                  */
1941                 if (msg_have > 0)
1942                 {
1943                         FD_SET(writePipe, &wfds);
1944                         if (writePipe > maxfd)
1945                                 maxfd = writePipe;
1946                 }
1947
1948                 /*
1949                  * Wait for some work to do; but not for more than 10 seconds. (This
1950                  * determines how quickly we will shut down after an ungraceful
1951                  * postmaster termination; so it needn't be very fast.)
1952                  *
1953                  * struct timeout is modified by select() on some operating systems,
1954                  * so re-fill it each time.
1955                  */
1956                 timeout.tv_sec = 10;
1957                 timeout.tv_usec = 0;
1958
1959                 if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0)
1960                 {
1961                         if (errno == EINTR)
1962                                 continue;
1963                         ereport(ERROR,
1964                                         (errcode_for_socket_access(),
1965                                          errmsg("select() failed in statistics buffer: %m")));
1966                 }
1967
1968                 /*
1969                  * If there is a message on the socket, read it and check for
1970                  * validity.
1971                  */
1972                 if (FD_ISSET(pgStatSock, &rfds))
1973                 {
1974                         len = recv(pgStatSock, (char *) &input_buffer,
1975                                            sizeof(PgStat_Msg), 0);
1976                         if (len < 0)
1977                                 ereport(ERROR,
1978                                                 (errcode_for_socket_access(),
1979                                                  errmsg("could not read statistics message: %m")));
1980
1981                         /*
1982                          * We ignore messages that are smaller than our common header
1983                          */
1984                         if (len < sizeof(PgStat_MsgHdr))
1985                                 continue;
1986
1987                         /*
1988                          * The received length must match the length in the header
1989                          */
1990                         if (input_buffer.msg_hdr.m_size != len)
1991                                 continue;
1992
1993                         /*
1994                          * O.K. - we accept this message.  Copy it to the circular
1995                          * msgbuffer.
1996                          */
1997                         frm = 0;
1998                         while (len > 0)
1999                         {
2000                                 xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
2001                                 if (xfr > len)
2002                                         xfr = len;
2003                                 Assert(xfr > 0);
2004                                 memcpy(msgbuffer + msg_recv,
2005                                            ((char *) &input_buffer) + frm,
2006                                            xfr);
2007                                 msg_recv += xfr;
2008                                 if (msg_recv == PGSTAT_RECVBUFFERSZ)
2009                                         msg_recv = 0;
2010                                 msg_have += xfr;
2011                                 frm += xfr;
2012                                 len -= xfr;
2013                         }
2014                 }
2015
2016                 /*
2017                  * If the collector is ready to receive, write some data into his
2018                  * pipe.  We may or may not be able to write all that we have.
2019                  *
2020                  * NOTE: if what we have is less than PIPE_BUF bytes but more than the
2021                  * space available in the pipe buffer, most kernels will refuse to
2022                  * write any of it, and will return EAGAIN.  This means we will
2023                  * busy-loop until the situation changes (either because the collector
2024                  * caught up, or because more data arrives so that we have more than
2025                  * PIPE_BUF bytes buffered).  This is not good, but is there any way
2026                  * around it?  We have no way to tell when the collector has caught
2027                  * up...
2028                  */
2029                 if (FD_ISSET(writePipe, &wfds))
2030                 {
2031                         xfr = PGSTAT_RECVBUFFERSZ - msg_send;
2032                         if (xfr > msg_have)
2033                                 xfr = msg_have;
2034                         Assert(xfr > 0);
2035                         len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
2036                         if (len < 0)
2037                         {
2038                                 if (errno == EINTR || errno == EAGAIN)
2039                                         continue;       /* not enough space in pipe */
2040                                 ereport(ERROR,
2041                                                 (errcode_for_socket_access(),
2042                                 errmsg("could not write to statistics collector pipe: %m")));
2043                         }
2044                         /* NB: len < xfr is okay */
2045                         msg_send += len;
2046                         if (msg_send == PGSTAT_RECVBUFFERSZ)
2047                                 msg_send = 0;
2048                         msg_have -= len;
2049                 }
2050
2051                 /*
2052                  * Make sure we forwarded all messages before we check for postmaster
2053                  * termination.
2054                  */
2055                 if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
2056                         continue;
2057
2058                 /*
2059                  * If the postmaster has terminated, we die too.  (This is no longer
2060                  * the normal exit path, however.)
2061                  */
2062                 if (!PostmasterIsAlive(true))
2063                         exit(0);
2064         }
2065 }
2066
2067 /* SIGQUIT signal handler for buffer process */
2068 static void
2069 pgstat_exit(SIGNAL_ARGS)
2070 {
2071         /*
2072          * For now, we just nail the doors shut and get out of town.  It might be
2073          * cleaner to allow any pending messages to be sent, but that creates a
2074          * tradeoff against speed of exit.
2075          */
2076
2077         /*
2078          * If running in bufferer, kill our collector as well. On some broken
2079          * win32 systems, it does not shut down automatically because of issues
2080          * with socket inheritance.  XXX so why not fix the socket inheritance...
2081          */
2082 #ifdef WIN32
2083         if (pgStatCollectorPid > 0)
2084                 kill(pgStatCollectorPid, SIGQUIT);
2085 #endif
2086         exit(0);
2087 }
2088
2089 /* SIGCHLD signal handler for buffer process */
2090 static void
2091 pgstat_die(SIGNAL_ARGS)
2092 {
2093         exit(1);
2094 }
2095
2096
2097 /* ----------
2098  * pgstat_add_backend() -
2099  *
2100  *      Support function to keep our backend list up to date.
2101  * ----------
2102  */
2103 static int
2104 pgstat_add_backend(PgStat_MsgHdr *msg)
2105 {
2106         PgStat_StatBeEntry *beentry;
2107
2108         /*
2109          * Check that the backend ID is valid
2110          */
2111         if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
2112         {
2113                 ereport(LOG,
2114                                 (errmsg("invalid server process ID %d", msg->m_backendid)));
2115                 return -1;
2116         }
2117
2118         /*
2119          * Get the slot for this backendid.
2120          */
2121         beentry = &pgStatBeTable[msg->m_backendid - 1];
2122
2123         /*
2124          * If the slot contains the PID of this backend, everything is fine and we
2125          * have nothing to do. Note that all the slots are zero'd out when the
2126          * collector is started. We assume that a slot is "empty" iff procpid ==
2127          * 0.
2128          */
2129         if (beentry->procpid > 0 && beentry->procpid == msg->m_procpid)
2130                 return 0;
2131
2132         /* Must be able to distinguish between empty and non-empty slots */
2133         Assert(msg->m_procpid > 0);
2134
2135         /*
2136          * Put this new backend into the slot (possibly overwriting an old entry,
2137          * if we missed its BETERM or the BETERM hasn't arrived yet).
2138          */
2139         beentry->procpid = msg->m_procpid;
2140         beentry->start_timestamp = GetCurrentTimestamp();
2141         beentry->activity_start_timestamp = 0;
2142         beentry->activity[0] = '\0';
2143
2144         /*
2145          * We can't initialize the rest of the data in this slot until we see the
2146          * BESTART message. Therefore, we set the database and user to sentinel
2147          * values, to indicate "undefined". There is no easy way to do this for
2148          * the client address, so make sure to check that the database or user are
2149          * defined before accessing the client address.
2150          */
2151         beentry->userid = InvalidOid;
2152         beentry->databaseid = InvalidOid;
2153
2154         return 0;
2155 }
2156
2157 /*
2158  * Lookup the hash table entry for the specified database. If no hash
2159  * table entry exists, initialize it, if the create parameter is true.
2160  * Else, return NULL.
2161  */
2162 static PgStat_StatDBEntry *
2163 pgstat_get_db_entry(Oid databaseid, bool create)
2164 {
2165         PgStat_StatDBEntry *result;
2166         bool            found;
2167         HASHACTION      action = (create ? HASH_ENTER : HASH_FIND);
2168
2169         /* Lookup or create the hash table entry for this database */
2170         result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2171                                                                                                 &databaseid,
2172                                                                                                 action, &found);
2173
2174         if (!create && !found)
2175                 return NULL;
2176
2177         /* If not found, initialize the new one. */
2178         if (!found)
2179         {
2180                 HASHCTL         hash_ctl;
2181
2182                 result->tables = NULL;
2183                 result->n_xact_commit = 0;
2184                 result->n_xact_rollback = 0;
2185                 result->n_blocks_fetched = 0;
2186                 result->n_blocks_hit = 0;
2187                 result->last_autovac_time = 0;
2188
2189                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2190                 hash_ctl.keysize = sizeof(Oid);
2191                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2192                 hash_ctl.hash = oid_hash;
2193                 result->tables = hash_create("Per-database table",
2194                                                                          PGSTAT_TAB_HASH_SIZE,
2195                                                                          &hash_ctl,
2196                                                                          HASH_ELEM | HASH_FUNCTION);
2197         }
2198
2199         return result;
2200 }
2201
2202 /* ----------
2203  * pgstat_sub_backend() -
2204  *
2205  *      Remove a backend from the actual backends list.
2206  * ----------
2207  */
2208 static void
2209 pgstat_sub_backend(int procpid)
2210 {
2211         int                     i;
2212
2213         /*
2214          * Search in the known-backends table for the slot containing this PID.
2215          */
2216         for (i = 0; i < MaxBackends; i++)
2217         {
2218                 if (pgStatBeTable[i].procpid == procpid)
2219                 {
2220                         /*
2221                          * That's him.  Mark the backend slot empty.
2222                          */
2223                         pgStatBeTable[i].procpid = 0;
2224                         return;
2225                 }
2226         }
2227
2228         /*
2229          * No big problem if not found. This can happen if UDP messages arrive out
2230          * of order here.
2231          */
2232 }
2233
2234
2235 /* ----------
2236  * pgstat_write_statsfile() -
2237  *
2238  *      Tell the news.
2239  * ----------
2240  */
2241 static void
2242 pgstat_write_statsfile(void)
2243 {
2244         HASH_SEQ_STATUS hstat;
2245         HASH_SEQ_STATUS tstat;
2246         PgStat_StatDBEntry *dbentry;
2247         PgStat_StatTabEntry *tabentry;
2248         FILE       *fpout;
2249         int                     i;
2250         int32           format_id;
2251
2252         /*
2253          * Open the statistics temp file to write out the current values.
2254          */
2255         fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2256         if (fpout == NULL)
2257         {
2258                 ereport(LOG,
2259                                 (errcode_for_file_access(),
2260                                  errmsg("could not open temporary statistics file \"%s\": %m",
2261                                                 PGSTAT_STAT_TMPFILE)));
2262                 return;
2263         }
2264
2265         /*
2266          * Write the file header --- currently just a format ID.
2267          */
2268         format_id = PGSTAT_FILE_FORMAT_ID;
2269         fwrite(&format_id, sizeof(format_id), 1, fpout);
2270
2271         /*
2272          * Walk through the database table.
2273          */
2274         hash_seq_init(&hstat, pgStatDBHash);
2275         while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2276         {
2277                 /*
2278                  * Write out the DB entry including the number of live backends.
2279                  * We don't write the tables pointer since it's of no use to any
2280                  * other process.
2281                  */
2282                 fputc('D', fpout);
2283                 fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
2284
2285                 /*
2286                  * Walk through the database's access stats per table.
2287                  */
2288                 hash_seq_init(&tstat, dbentry->tables);
2289                 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2290                 {
2291                         fputc('T', fpout);
2292                         fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
2293                 }
2294
2295                 /*
2296                  * Mark the end of this DB
2297                  */
2298                 fputc('d', fpout);
2299         }
2300
2301         /*
2302          * Write out the known running backends to the stats file.
2303          */
2304         i = MaxBackends;
2305         fputc('M', fpout);
2306         fwrite(&i, sizeof(i), 1, fpout);
2307
2308         for (i = 0; i < MaxBackends; i++)
2309         {
2310                 PgStat_StatBeEntry *beentry = &pgStatBeTable[i];
2311
2312                 if (beentry->procpid > 0)
2313                 {
2314                         int             len;
2315
2316                         len = offsetof(PgStat_StatBeEntry, activity) +
2317                                 strlen(beentry->activity) + 1;
2318                         fputc('B', fpout);
2319                         fwrite(&len, sizeof(len), 1, fpout);
2320                         fwrite(beentry, len, 1, fpout);
2321                 }
2322         }
2323
2324         /*
2325          * No more output to be done. Close the temp file and replace the old
2326          * pgstat.stat with it.  The ferror() check replaces testing for error
2327          * after each individual fputc or fwrite above.
2328          */
2329         fputc('E', fpout);
2330
2331         if (ferror(fpout))
2332         {
2333                 ereport(LOG,
2334                                 (errcode_for_file_access(),
2335                                  errmsg("could not write temporary statistics file \"%s\": %m",
2336                                                 PGSTAT_STAT_TMPFILE)));
2337                 fclose(fpout);
2338                 unlink(PGSTAT_STAT_TMPFILE);
2339         }
2340         else if (fclose(fpout) < 0)
2341         {
2342                 ereport(LOG,
2343                                 (errcode_for_file_access(),
2344                            errmsg("could not close temporary statistics file \"%s\": %m",
2345                                           PGSTAT_STAT_TMPFILE)));
2346                 unlink(PGSTAT_STAT_TMPFILE);
2347         }
2348         else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2349         {
2350                 ereport(LOG,
2351                                 (errcode_for_file_access(),
2352                                  errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2353                                                 PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
2354                 unlink(PGSTAT_STAT_TMPFILE);
2355         }
2356 }
2357
2358 /*
2359  * qsort/bsearch comparison routine for PIDs
2360  *
2361  * We assume PIDs are nonnegative, so there's no overflow risk
2362  */
2363 static int
2364 comparePids(const void *v1, const void *v2)
2365 {
2366         return *((const int *) v1) - *((const int *) v2);
2367 }
2368
2369 /* ----------
2370  * pgstat_read_statsfile() -
2371  *
2372  *      Reads in an existing statistics collector and initializes the
2373  *      databases' hash table (whose entries point to the tables' hash tables)
2374  *      and the current backend table.
2375  * ----------
2376  */
2377 static void
2378 pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
2379                                           PgStat_StatBeEntry **betab, int *numbackends, bool rewrite)
2380 {
2381         PgStat_StatDBEntry *dbentry;
2382         PgStat_StatDBEntry dbbuf;
2383         PgStat_StatTabEntry *tabentry;
2384         PgStat_StatTabEntry tabbuf;
2385         PgStat_StatBeEntry *beentry;
2386         HASHCTL         hash_ctl;
2387         HTAB       *tabhash = NULL;
2388         FILE       *fpin;
2389         int32           format_id;
2390         int                     len;
2391         int                     maxbackends = 0;
2392         int                     havebackends = 0;
2393         bool            found;
2394         int                *live_pids;
2395         MemoryContext use_mcxt;
2396         int                     mcxt_flags;
2397
2398
2399         if (rewrite)
2400         {
2401                 /*
2402                  * To force a rewrite of the stats file from the collector, send
2403                  * a REWRITE message to the stats collector. Then wait for the file
2404                  * to change. On Unix, we wait for the inode to change (as the file
2405                  * is renamed into place from a different file). Win32 has no concept
2406                  * of inodes, so we wait for the date on the file to change instead.
2407                  * We can do this on win32 because we have high-res timing on the 
2408                  * file dates, but we can't on unix, because it has 1sec resolution
2409                  * on the fields in struct stat.
2410                  */
2411                 int i;
2412 #ifndef WIN32
2413                 struct stat st1, st2;
2414
2415                 if (stat(PGSTAT_STAT_FILENAME, &st1))
2416                 {
2417                         /* Assume no file there yet */
2418                         st1.st_ino = 0;
2419                 }
2420                 st2.st_ino = 0;
2421 #else
2422                 WIN32_FILE_ATTRIBUTE_DATA fd1, fd2;
2423
2424                 if (!GetFileAttributesEx(PGSTAT_STAT_FILENAME, GetFileExInfoStandard, &fd1))
2425                 {
2426                         fd1.ftLastWriteTime.dwLowDateTime = 0;
2427                         fd1.ftLastWriteTime.dwHighDateTime = 0;
2428                 }
2429                 fd2.ftLastWriteTime.dwLowDateTime = 0;
2430                 fd2.ftLastWriteTime.dwHighDateTime = 0;
2431 #endif
2432
2433
2434                 /* Send rewrite message */
2435                 pgstat_send_rewrite();
2436
2437                 /* Now wait for the file to change */
2438                 for (i=0; i < 50; i++)
2439                 {
2440 #ifndef WIN32
2441                         if (!stat(PGSTAT_STAT_FILENAME, &st2))
2442                         {
2443                                 if (st2.st_ino != st1.st_ino)
2444                                         break;
2445                         }
2446 #else
2447                         if (GetFileAttributesEx(PGSTAT_STAT_FILENAME, GetFileExInfoStandard, &fd2))
2448                         {
2449                                 if (fd1.ftLastWriteTime.dwLowDateTime != fd2.ftLastWriteTime.dwLowDateTime ||
2450                                         fd1.ftLastWriteTime.dwHighDateTime != fd2.ftLastWriteTime.dwHighDateTime)
2451                                         break;
2452                         }
2453 #endif
2454
2455                         pg_usleep(50000); 
2456                 }
2457                 if (i >= 50)
2458                         ereport(WARNING,
2459                                 (errmsg("pgstat update timeout")));
2460                 /* Fallthrough and read the old file anyway - old data better than no data */
2461         }
2462
2463         /*
2464          * If running in the collector or the autovacuum process, we use the
2465          * DynaHashCxt memory context.  If running in a backend, we use the
2466          * TopTransactionContext instead, so the caller must only know the last
2467          * XactId when this call happened to know if his tables are still valid or
2468          * already gone!
2469          *
2470          * Also, if running in a regular backend, we check backend entries against
2471          * the PGPROC array so that we can detect stale entries.  This lets us
2472          * discard entries whose BETERM message got lost for some reason.
2473          */
2474         if (pgStatRunningInCollector || IsAutoVacuumProcess())
2475         {
2476                 use_mcxt = NULL;
2477                 mcxt_flags = 0;
2478                 live_pids = NULL;
2479         }
2480         else
2481         {
2482                 use_mcxt = TopTransactionContext;
2483                 mcxt_flags = HASH_CONTEXT;
2484                 live_pids = GetAllBackendPids();
2485                 /* Sort the PID array so we can use bsearch */
2486                 if (live_pids[0] > 1)
2487                         qsort((void *) &live_pids[1], live_pids[0], sizeof(int),
2488                                   comparePids);
2489         }
2490
2491         /*
2492          * Create the DB hashtable
2493          */
2494         memset(&hash_ctl, 0, sizeof(hash_ctl));
2495         hash_ctl.keysize = sizeof(Oid);
2496         hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2497         hash_ctl.hash = oid_hash;
2498         hash_ctl.hcxt = use_mcxt;
2499         *dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
2500                                                   HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2501
2502         /*
2503          * Initialize the number of known backends to zero, just in case we do a
2504          * silent error return below.
2505          */
2506         if (numbackends != NULL)
2507                 *numbackends = 0;
2508         if (betab != NULL)
2509                 *betab = NULL;
2510
2511         /*
2512          * Try to open the status file. If it doesn't exist, the backends simply
2513          * return zero for anything and the collector simply starts from scratch
2514          * with empty counters.
2515          */
2516         if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
2517                 return;
2518
2519         /*
2520          * Verify it's of the expected format.
2521          */
2522         if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
2523                 || format_id != PGSTAT_FILE_FORMAT_ID)
2524         {
2525                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2526                                 (errmsg("corrupted pgstat.stat file")));
2527                 goto done;
2528         }
2529
2530         /*
2531          * We found an existing collector stats file. Read it and put all the
2532          * hashtable entries into place.
2533          */
2534         for (;;)
2535         {
2536                 switch (fgetc(fpin))
2537                 {
2538                                 /*
2539                                  * 'D'  A PgStat_StatDBEntry struct describing a database
2540                                  * follows. Subsequently, zero to many 'T' entries will follow
2541                                  * until a 'd' is encountered.
2542                                  */
2543                         case 'D':
2544                                 if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
2545                                                   fpin) != offsetof(PgStat_StatDBEntry, tables))
2546                                 {
2547                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2548                                                         (errmsg("corrupted pgstat.stat file")));
2549                                         goto done;
2550                                 }
2551
2552                                 /*
2553                                  * Add to the DB hash
2554                                  */
2555                                 dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2556                                                                                                   (void *) &dbbuf.databaseid,
2557                                                                                                                          HASH_ENTER,
2558                                                                                                                          &found);
2559                                 if (found)
2560                                 {
2561                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2562                                                         (errmsg("corrupted pgstat.stat file")));
2563                                         goto done;
2564                                 }
2565
2566                                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2567                                 dbentry->tables = NULL;
2568                                 dbentry->n_backends = 0;
2569
2570                                 /*
2571                                  * Don't collect tables if not the requested DB (or the
2572                                  * shared-table info)
2573                                  */
2574                                 if (onlydb != InvalidOid)
2575                                 {
2576                                         if (dbbuf.databaseid != onlydb &&
2577                                                 dbbuf.databaseid != InvalidOid)
2578                                                 break;
2579                                 }
2580
2581                                 memset(&hash_ctl, 0, sizeof(hash_ctl));
2582                                 hash_ctl.keysize = sizeof(Oid);
2583                                 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2584                                 hash_ctl.hash = oid_hash;
2585                                 hash_ctl.hcxt = use_mcxt;
2586                                 dbentry->tables = hash_create("Per-database table",
2587                                                                                           PGSTAT_TAB_HASH_SIZE,
2588                                                                                           &hash_ctl,
2589                                                                          HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2590
2591                                 /*
2592                                  * Arrange that following 'T's add entries to this database's
2593                                  * tables hash table.
2594                                  */
2595                                 tabhash = dbentry->tables;
2596                                 break;
2597
2598                                 /*
2599                                  * 'd'  End of this database.
2600                                  */
2601                         case 'd':
2602                                 tabhash = NULL;
2603                                 break;
2604
2605                                 /*
2606                                  * 'T'  A PgStat_StatTabEntry follows.
2607                                  */
2608                         case 'T':
2609                                 if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
2610                                                   fpin) != sizeof(PgStat_StatTabEntry))
2611                                 {
2612                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2613                                                         (errmsg("corrupted pgstat.stat file")));
2614                                         goto done;
2615                                 }
2616
2617                                 /*
2618                                  * Skip if table belongs to a not requested database.
2619                                  */
2620                                 if (tabhash == NULL)
2621                                         break;
2622
2623                                 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2624                                                                                                         (void *) &tabbuf.tableid,
2625                                                                                                                  HASH_ENTER, &found);
2626
2627                                 if (found)
2628                                 {
2629                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2630                                                         (errmsg("corrupted pgstat.stat file")));
2631                                         goto done;
2632                                 }
2633
2634                                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
2635                                 break;
2636
2637                                 /*
2638                                  * 'M'  The maximum number of backends to expect follows.
2639                                  */
2640                         case 'M':
2641                                 if (betab == NULL || numbackends == NULL)
2642                                         goto done;
2643                                 if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2644                                         sizeof(maxbackends))
2645                                 {
2646                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2647                                                         (errmsg("corrupted pgstat.stat file")));
2648                                         goto done;
2649                                 }
2650                                 if (maxbackends == 0)
2651                                         goto done;
2652
2653                                 /*
2654                                  * Allocate space (in TopTransactionContext too) for the
2655                                  * backend table.
2656                                  */
2657                                 if (use_mcxt == NULL)
2658                                         *betab = (PgStat_StatBeEntry *)
2659                                                 palloc(sizeof(PgStat_StatBeEntry) * maxbackends);
2660                                 else
2661                                         *betab = (PgStat_StatBeEntry *)
2662                                                 MemoryContextAlloc(use_mcxt,
2663                                                                    sizeof(PgStat_StatBeEntry) * maxbackends);
2664                                 break;
2665
2666                                 /*
2667                                  * 'B'  A PgStat_StatBeEntry follows.
2668                                  */
2669                         case 'B':
2670                                 if (betab == NULL || numbackends == NULL || *betab == NULL)
2671                                         goto done;
2672
2673                                 if (havebackends >= maxbackends)
2674                                         goto done;
2675
2676                                 /* Read and validate the entry length */
2677                                 if (fread(&len, 1, sizeof(len), fpin) != sizeof(len))
2678                                 {
2679                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2680                                                         (errmsg("corrupted pgstat.stat file")));
2681                                         goto done;
2682                                 }
2683                                 if (len <= offsetof(PgStat_StatBeEntry, activity) ||
2684                                         len > sizeof(PgStat_StatBeEntry))
2685                                 {
2686                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2687                                                         (errmsg("corrupted pgstat.stat file")));
2688                                         goto done;
2689                                 }
2690
2691                                 /*
2692                                  * Read it directly into the table.
2693                                  */
2694                                 beentry = &(*betab)[havebackends];
2695
2696                                 if (fread(beentry, 1, len, fpin) != len)
2697                                 {
2698                                         ereport(pgStatRunningInCollector ? LOG : WARNING,
2699                                                         (errmsg("corrupted pgstat.stat file")));
2700                                         goto done;
2701                                 }
2702
2703                                 /*
2704                                  * If possible, check PID to verify still running
2705                                  */
2706                                 if (live_pids &&
2707                                         (live_pids[0] == 0 ||
2708                                          bsearch((void *) &beentry->procpid,
2709                                                          (void *) &live_pids[1],
2710                                                          live_pids[0],
2711                                                          sizeof(int),
2712                                                          comparePids) == NULL))
2713                                 {
2714                                         /*
2715                                          * Note: we could send a BETERM message to tell the
2716                                          * collector to drop the entry, but I'm a bit worried
2717                                          * about race conditions.  For now, just silently ignore
2718                                          * dead entries; they'll get recycled eventually anyway.
2719                                          */
2720
2721                                         /* Don't accept the entry */
2722                                         memset(beentry, 0, sizeof(PgStat_StatBeEntry));
2723                                         break;
2724                                 }
2725
2726                                 /*
2727                                  * Count backends per database here.
2728                                  */
2729                                 dbentry = (PgStat_StatDBEntry *)
2730                                         hash_search(*dbhash,
2731                                                                 &(beentry->databaseid),
2732                                                                 HASH_FIND,
2733                                                                 NULL);
2734                                 if (dbentry)
2735                                         dbentry->n_backends++;
2736
2737                                 havebackends++;
2738                                 *numbackends = havebackends;
2739
2740                                 break;
2741
2742                                 /*
2743                                  * 'E'  The EOF marker of a complete stats file.
2744                                  */
2745                         case 'E':
2746                                 goto done;
2747
2748                         default:
2749                                 ereport(pgStatRunningInCollector ? LOG : WARNING,
2750                                                 (errmsg("corrupted pgstat.stat file")));
2751                                 goto done;
2752                 }
2753         }
2754
2755 done:
2756         FreeFile(fpin);
2757 }
2758
2759 /*
2760  * If not done for this transaction, read the statistics collector
2761  * stats file into some hash tables.
2762  *
2763  * Because we store the hash tables in TopTransactionContext, the result
2764  * is good for the entire current main transaction.
2765  *
2766  * Inside the autovacuum process, the statfile is assumed to be valid
2767  * "forever", that is one iteration, within one database.  This means
2768  * we only consider the statistics as they were when the autovacuum
2769  * iteration started.
2770  */
2771 static void
2772 backend_read_statsfile(void)
2773 {
2774         if (IsAutoVacuumProcess())
2775         {
2776                 /* already read it? */
2777                 if (pgStatDBHash)
2778                         return;
2779                 Assert(!pgStatRunningInCollector);
2780                 pgstat_read_statsfile(&pgStatDBHash, InvalidOid,
2781                                                           &pgStatBeTable, &pgStatNumBackends, true);
2782         }
2783         else
2784         {
2785                 TransactionId topXid = GetTopTransactionId();
2786
2787                 if (!TransactionIdEquals(pgStatDBHashXact, topXid))
2788                 {
2789                         Assert(!pgStatRunningInCollector);
2790                         pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
2791                                                                   &pgStatBeTable, &pgStatNumBackends, true);
2792                         pgStatDBHashXact = topXid;
2793                 }
2794         }
2795 }
2796
2797
2798 /* ----------
2799  * pgstat_recv_bestart() -
2800  *
2801  *      Process a backend startup message.
2802  * ----------
2803  */
2804 static void
2805 pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
2806 {
2807         PgStat_StatBeEntry *entry;
2808
2809         /*
2810          * If the backend is known dead, we ignore the message -- we don't want to
2811          * update the backend entry's state since this BESTART message refers to
2812          * an old, dead backend
2813          */
2814         if (pgstat_add_backend(&msg->m_hdr) != 0)
2815                 return;
2816
2817         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2818         entry->userid = msg->m_userid;
2819         memcpy(&entry->clientaddr, &msg->m_clientaddr, sizeof(entry->clientaddr));
2820         entry->databaseid = msg->m_databaseid;
2821 }
2822
2823
2824 /* ----------
2825  * pgstat_recv_beterm() -
2826  *
2827  *      Process a backend termination message.
2828  * ----------
2829  */
2830 static void
2831 pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
2832 {
2833         pgstat_sub_backend(msg->m_hdr.m_procpid);
2834 }
2835
2836 /* ----------
2837  * pgstat_recv_autovac() -
2838  *
2839  *      Process an autovacuum signalling message.
2840  * ----------
2841  */
2842 static void
2843 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
2844 {
2845         PgStat_StatDBEntry *dbentry;
2846
2847         /*
2848          * Lookup the database in the hashtable.  Don't create the entry if it
2849          * doesn't exist, because autovacuum may be processing a template
2850          * database.  If this isn't the case, the database is most likely to have
2851          * an entry already.  (If it doesn't, not much harm is done anyway --
2852          * it'll get created as soon as somebody actually uses the database.)
2853          */
2854         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2855         if (dbentry == NULL)
2856                 return;
2857
2858         /*
2859          * Store the last autovacuum time in the database entry.
2860          */
2861         dbentry->last_autovac_time = msg->m_start_time;
2862 }
2863
2864 /* ----------
2865  * pgstat_recv_vacuum() -
2866  *
2867  *      Process a VACUUM message.
2868  * ----------
2869  */
2870 static void
2871 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
2872 {
2873         PgStat_StatDBEntry *dbentry;
2874         PgStat_StatTabEntry *tabentry;
2875
2876         /*
2877          * Don't create either the database or table entry if it doesn't already
2878          * exist.  This avoids bloating the stats with entries for stuff that is
2879          * only touched by vacuum and not by live operations.
2880          */
2881         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2882         if (dbentry == NULL)
2883                 return;
2884
2885         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2886                                                    HASH_FIND, NULL);
2887         if (tabentry == NULL)
2888                 return;
2889
2890         tabentry->n_live_tuples = msg->m_tuples;
2891         tabentry->n_dead_tuples = 0;
2892         if (msg->m_analyze)
2893                 tabentry->last_anl_tuples = msg->m_tuples;
2894 }
2895
2896 /* ----------
2897  * pgstat_recv_analyze() -
2898  *
2899  *      Process an ANALYZE message.
2900  * ----------
2901  */
2902 static void
2903 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
2904 {
2905         PgStat_StatDBEntry *dbentry;
2906         PgStat_StatTabEntry *tabentry;
2907
2908         /*
2909          * Don't create either the database or table entry if it doesn't already
2910          * exist.  This avoids bloating the stats with entries for stuff that is
2911          * only touched by analyze and not by live operations.
2912          */
2913         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2914         if (dbentry == NULL)
2915                 return;
2916
2917         tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
2918                                                    HASH_FIND, NULL);
2919         if (tabentry == NULL)
2920                 return;
2921
2922         tabentry->n_live_tuples = msg->m_live_tuples;
2923         tabentry->n_dead_tuples = msg->m_dead_tuples;
2924         tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
2925 }
2926
2927 /* ----------
2928  * pgstat_recv_activity() -
2929  *
2930  *      Remember what the backend is doing.
2931  * ----------
2932  */
2933 static void
2934 pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
2935 {
2936         PgStat_StatBeEntry *entry;
2937
2938         /*
2939          * Here we check explicitly for 0 return, since we don't want to mangle
2940          * the activity of an active backend by a delayed packet from a dead one.
2941          */
2942         if (pgstat_add_backend(&msg->m_hdr) != 0)
2943                 return;
2944
2945         entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
2946
2947         StrNCpy(entry->activity, msg->m_cmd_str, PGSTAT_ACTIVITY_SIZE);
2948
2949         entry->activity_start_timestamp = GetCurrentTimestamp();
2950 }
2951
2952
2953 /* ----------
2954  * pgstat_recv_tabstat() -
2955  *
2956  *      Count what the backend has done.
2957  * ----------
2958  */
2959 static void
2960 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
2961 {
2962         PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
2963         PgStat_StatDBEntry *dbentry;
2964         PgStat_StatTabEntry *tabentry;
2965         int                     i;
2966         bool            found;
2967
2968         /*
2969          * Make sure the backend is counted for.
2970          */
2971         if (pgstat_add_backend(&msg->m_hdr) < 0)
2972                 return;
2973
2974         dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
2975
2976         /*
2977          * Update database-wide stats.
2978          */
2979         dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
2980         dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2981
2982         /*
2983          * Process all table entries in the message.
2984          */
2985         for (i = 0; i < msg->m_nentries; i++)
2986         {
2987                 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2988                                                                                                   (void *) &(tabmsg[i].t_id),
2989                                                                                                            HASH_ENTER, &found);
2990
2991                 if (!found)
2992                 {
2993                         /*
2994                          * If it's a new table entry, initialize counters to the values we
2995                          * just got.
2996                          */
2997                         tabentry->numscans = tabmsg[i].t_numscans;
2998                         tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
2999                         tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
3000                         tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
3001                         tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
3002                         tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
3003
3004                         tabentry->n_live_tuples = tabmsg[i].t_tuples_inserted;
3005                         tabentry->n_dead_tuples = tabmsg[i].t_tuples_updated +
3006                                 tabmsg[i].t_tuples_deleted;
3007                         tabentry->last_anl_tuples = 0;
3008
3009                         tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
3010                         tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
3011                 }
3012                 else
3013                 {
3014                         /*
3015                          * Otherwise add the values to the existing entry.
3016                          */
3017                         tabentry->numscans += tabmsg[i].t_numscans;
3018                         tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
3019                         tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
3020                         tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
3021                         tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
3022                         tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
3023
3024                         tabentry->n_live_tuples += tabmsg[i].t_tuples_inserted;
3025                         tabentry->n_dead_tuples += tabmsg[i].t_tuples_updated +
3026                                 tabmsg[i].t_tuples_deleted;
3027
3028                         tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
3029                         tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
3030                 }
3031
3032                 /*
3033                  * And add the block IO to the database entry.
3034                  */
3035                 dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
3036                 dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
3037         }
3038 }
3039
3040
3041 /* ----------
3042  * pgstat_recv_tabpurge() -
3043  *
3044  *      Arrange for dead table removal.
3045  * ----------
3046  */
3047 static void
3048 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
3049 {
3050         PgStat_StatDBEntry *dbentry;
3051         int                     i;
3052
3053         /*
3054          * Make sure the backend is counted for.
3055          */
3056         if (pgstat_add_backend(&msg->m_hdr) < 0)
3057                 return;
3058
3059         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3060
3061         /*
3062          * No need to purge if we don't even know the database.
3063          */
3064         if (!dbentry || !dbentry->tables)
3065                 return;
3066
3067         /*
3068          * Process all table entries in the message.
3069          */
3070         for (i = 0; i < msg->m_nentries; i++)
3071         {
3072                 /* Remove from hashtable if present; we don't care if it's not. */
3073                 (void) hash_search(dbentry->tables,
3074                                                    (void *) &(msg->m_tableid[i]),
3075                                                    HASH_REMOVE, NULL);
3076         }
3077 }
3078
3079
3080 /* ----------
3081  * pgstat_recv_dropdb() -
3082  *
3083  *      Arrange for dead database removal
3084  * ----------
3085  */
3086 static void
3087 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
3088 {
3089         PgStat_StatDBEntry *dbentry;
3090
3091         /*
3092          * Make sure the backend is counted for.
3093          */
3094         if (pgstat_add_backend(&msg->m_hdr) < 0)
3095                 return;
3096
3097         /*
3098          * Lookup the database in the hashtable.
3099          */
3100         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3101
3102         /*
3103          * If found, remove it.
3104          */
3105         if (dbentry)
3106         {
3107                 if (dbentry->tables != NULL)
3108                         hash_destroy(dbentry->tables);
3109
3110                 if (hash_search(pgStatDBHash,
3111                                                 (void *) &(dbentry->databaseid),
3112                                                 HASH_REMOVE, NULL) == NULL)
3113                         ereport(ERROR,
3114                                         (errmsg("database hash table corrupted "
3115                                                         "during cleanup --- abort")));
3116         }
3117 }
3118
3119
3120 /* ----------
3121  * pgstat_recv_resetcounter() -
3122  *
3123  *      Reset the statistics for the specified database.
3124  * ----------
3125  */
3126 static void
3127 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
3128 {
3129         HASHCTL         hash_ctl;
3130         PgStat_StatDBEntry *dbentry;
3131
3132         /*
3133          * Make sure the backend is counted for.
3134          */
3135         if (pgstat_add_backend(&msg->m_hdr) < 0)
3136                 return;
3137
3138         /*
3139          * Lookup the database in the hashtable.  Nothing to do if not there.
3140          */
3141         dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3142
3143         if (!dbentry)
3144                 return;
3145
3146         /*
3147          * We simply throw away all the database's table entries by recreating a
3148          * new hash table for them.
3149          */
3150         if (dbentry->tables != NULL)
3151                 hash_destroy(dbentry->tables);
3152
3153         dbentry->tables = NULL;
3154         dbentry->n_xact_commit = 0;
3155         dbentry->n_xact_rollback = 0;
3156         dbentry->n_blocks_fetched = 0;
3157         dbentry->n_blocks_hit = 0;
3158
3159         memset(&hash_ctl, 0, sizeof(hash_ctl));
3160         hash_ctl.keysize = sizeof(Oid);
3161         hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3162         hash_ctl.hash = oid_hash;
3163         dbentry->tables = hash_create("Per-database table",
3164                                                                   PGSTAT_TAB_HASH_SIZE,
3165                                                                   &hash_ctl,
3166                                                                   HASH_ELEM | HASH_FUNCTION);
3167 }