]> granicus.if.org Git - postgresql/blob - src/backend/commands/analyze.c
Adjust HeapTupleSatisfies* routines to take a HeapTuple.
[postgresql] / src / backend / commands / analyze.c
1 /*-------------------------------------------------------------------------
2  *
3  * analyze.c
4  *        the Postgres statistics generator
5  *
6  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/analyze.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <math.h>
18
19 #include "access/multixact.h"
20 #include "access/transam.h"
21 #include "access/tupconvert.h"
22 #include "access/tuptoaster.h"
23 #include "access/visibilitymap.h"
24 #include "access/xact.h"
25 #include "catalog/index.h"
26 #include "catalog/indexing.h"
27 #include "catalog/pg_collation.h"
28 #include "catalog/pg_inherits_fn.h"
29 #include "catalog/pg_namespace.h"
30 #include "commands/dbcommands.h"
31 #include "commands/tablecmds.h"
32 #include "commands/vacuum.h"
33 #include "executor/executor.h"
34 #include "foreign/fdwapi.h"
35 #include "miscadmin.h"
36 #include "nodes/nodeFuncs.h"
37 #include "parser/parse_oper.h"
38 #include "parser/parse_relation.h"
39 #include "pgstat.h"
40 #include "postmaster/autovacuum.h"
41 #include "storage/bufmgr.h"
42 #include "storage/lmgr.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/attoptcache.h"
47 #include "utils/datum.h"
48 #include "utils/guc.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/pg_rusage.h"
52 #include "utils/sortsupport.h"
53 #include "utils/syscache.h"
54 #include "utils/timestamp.h"
55 #include "utils/tqual.h"
56
57
58 /* Data structure for Algorithm S from Knuth 3.4.2 */
59 typedef struct
60 {
61         BlockNumber N;                          /* number of blocks, known in advance */
62         int                     n;                              /* desired sample size */
63         BlockNumber t;                          /* current block number */
64         int                     m;                              /* blocks selected so far */
65 } BlockSamplerData;
66
67 typedef BlockSamplerData *BlockSampler;
68
69 /* Per-index data for ANALYZE */
70 typedef struct AnlIndexData
71 {
72         IndexInfo  *indexInfo;          /* BuildIndexInfo result */
73         double          tupleFract;             /* fraction of rows for partial index */
74         VacAttrStats **vacattrstats;    /* index attrs to analyze */
75         int                     attr_cnt;
76 } AnlIndexData;
77
78
79 /* Default statistics target (GUC parameter) */
80 int                     default_statistics_target = 100;
81
82 /* A few variables that don't seem worth passing around as parameters */
83 static MemoryContext anl_context = NULL;
84 static BufferAccessStrategy vac_strategy;
85
86
87 static void do_analyze_rel(Relation onerel, VacuumStmt *vacstmt,
88                            AcquireSampleRowsFunc acquirefunc, BlockNumber relpages,
89                            bool inh, int elevel);
90 static void BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
91                                   int samplesize);
92 static bool BlockSampler_HasMore(BlockSampler bs);
93 static BlockNumber BlockSampler_Next(BlockSampler bs);
94 static void compute_index_stats(Relation onerel, double totalrows,
95                                         AnlIndexData *indexdata, int nindexes,
96                                         HeapTuple *rows, int numrows,
97                                         MemoryContext col_context);
98 static VacAttrStats *examine_attribute(Relation onerel, int attnum,
99                                   Node *index_expr);
100 static int acquire_sample_rows(Relation onerel, int elevel,
101                                         HeapTuple *rows, int targrows,
102                                         double *totalrows, double *totaldeadrows);
103 static int      compare_rows(const void *a, const void *b);
104 static int acquire_inherited_sample_rows(Relation onerel, int elevel,
105                                                           HeapTuple *rows, int targrows,
106                                                           double *totalrows, double *totaldeadrows);
107 static void update_attstats(Oid relid, bool inh,
108                                 int natts, VacAttrStats **vacattrstats);
109 static Datum std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
110 static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
111
112
113 /*
114  *      analyze_rel() -- analyze one relation
115  */
116 void
117 analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
118 {
119         Relation        onerel;
120         int                     elevel;
121         AcquireSampleRowsFunc acquirefunc = NULL;
122         BlockNumber relpages = 0;
123
124         /* Select logging level */
125         if (vacstmt->options & VACOPT_VERBOSE)
126                 elevel = INFO;
127         else
128                 elevel = DEBUG2;
129
130         /* Set up static variables */
131         vac_strategy = bstrategy;
132
133         /*
134          * Check for user-requested abort.
135          */
136         CHECK_FOR_INTERRUPTS();
137
138         /*
139          * Open the relation, getting ShareUpdateExclusiveLock to ensure that two
140          * ANALYZEs don't run on it concurrently.  (This also locks out a
141          * concurrent VACUUM, which doesn't matter much at the moment but might
142          * matter if we ever try to accumulate stats on dead tuples.) If the rel
143          * has been dropped since we last saw it, we don't need to process it.
144          */
145         if (!(vacstmt->options & VACOPT_NOWAIT))
146                 onerel = try_relation_open(relid, ShareUpdateExclusiveLock);
147         else if (ConditionalLockRelationOid(relid, ShareUpdateExclusiveLock))
148                 onerel = try_relation_open(relid, NoLock);
149         else
150         {
151                 onerel = NULL;
152                 if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
153                         ereport(LOG,
154                                         (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
155                                   errmsg("skipping analyze of \"%s\" --- lock not available",
156                                                  vacstmt->relation->relname)));
157         }
158         if (!onerel)
159                 return;
160
161         /*
162          * Check permissions --- this should match vacuum's check!
163          */
164         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
165                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
166         {
167                 /* No need for a WARNING if we already complained during VACUUM */
168                 if (!(vacstmt->options & VACOPT_VACUUM))
169                 {
170                         if (onerel->rd_rel->relisshared)
171                                 ereport(WARNING,
172                                  (errmsg("skipping \"%s\" --- only superuser can analyze it",
173                                                  RelationGetRelationName(onerel))));
174                         else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
175                                 ereport(WARNING,
176                                                 (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
177                                                                 RelationGetRelationName(onerel))));
178                         else
179                                 ereport(WARNING,
180                                                 (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
181                                                                 RelationGetRelationName(onerel))));
182                 }
183                 relation_close(onerel, ShareUpdateExclusiveLock);
184                 return;
185         }
186
187         /*
188          * Silently ignore tables that are temp tables of other backends ---
189          * trying to analyze these is rather pointless, since their contents are
190          * probably not up-to-date on disk.  (We don't throw a warning here; it
191          * would just lead to chatter during a database-wide ANALYZE.)
192          */
193         if (RELATION_IS_OTHER_TEMP(onerel))
194         {
195                 relation_close(onerel, ShareUpdateExclusiveLock);
196                 return;
197         }
198
199         /*
200          * We can ANALYZE any table except pg_statistic. See update_attstats
201          */
202         if (RelationGetRelid(onerel) == StatisticRelationId)
203         {
204                 relation_close(onerel, ShareUpdateExclusiveLock);
205                 return;
206         }
207
208         /*
209          * Check that it's a plain table, materialized view, or foreign table; we
210          * used to do this in get_rel_oids() but seems safer to check after we've
211          * locked the relation.
212          */
213         if (onerel->rd_rel->relkind == RELKIND_RELATION ||
214                 onerel->rd_rel->relkind == RELKIND_MATVIEW)
215         {
216                 /* Regular table, so we'll use the regular row acquisition function */
217                 acquirefunc = acquire_sample_rows;
218                 /* Also get regular table's size */
219                 relpages = RelationGetNumberOfBlocks(onerel);
220         }
221         else if (onerel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
222         {
223                 /*
224                  * For a foreign table, call the FDW's hook function to see whether it
225                  * supports analysis.
226                  */
227                 FdwRoutine *fdwroutine;
228                 bool            ok = false;
229
230                 fdwroutine = GetFdwRoutineForRelation(onerel, false);
231
232                 if (fdwroutine->AnalyzeForeignTable != NULL)
233                         ok = fdwroutine->AnalyzeForeignTable(onerel,
234                                                                                                  &acquirefunc,
235                                                                                                  &relpages);
236
237                 if (!ok)
238                 {
239                         ereport(WARNING,
240                          (errmsg("skipping \"%s\" --- cannot analyze this foreign table",
241                                          RelationGetRelationName(onerel))));
242                         relation_close(onerel, ShareUpdateExclusiveLock);
243                         return;
244                 }
245         }
246         else
247         {
248                 /* No need for a WARNING if we already complained during VACUUM */
249                 if (!(vacstmt->options & VACOPT_VACUUM))
250                         ereport(WARNING,
251                                         (errmsg("skipping \"%s\" --- cannot analyze non-tables or special system tables",
252                                                         RelationGetRelationName(onerel))));
253                 relation_close(onerel, ShareUpdateExclusiveLock);
254                 return;
255         }
256
257         /*
258          * OK, let's do it.  First let other backends know I'm in ANALYZE.
259          */
260         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
261         MyPgXact->vacuumFlags |= PROC_IN_ANALYZE;
262         LWLockRelease(ProcArrayLock);
263
264         /*
265          * Do the normal non-recursive ANALYZE.
266          */
267         do_analyze_rel(onerel, vacstmt, acquirefunc, relpages, false, elevel);
268
269         /*
270          * If there are child tables, do recursive ANALYZE.
271          */
272         if (onerel->rd_rel->relhassubclass)
273                 do_analyze_rel(onerel, vacstmt, acquirefunc, relpages, true, elevel);
274
275         /*
276          * Close source relation now, but keep lock so that no one deletes it
277          * before we commit.  (If someone did, they'd fail to clean up the entries
278          * we made in pg_statistic.  Also, releasing the lock before commit would
279          * expose us to concurrent-update failures in update_attstats.)
280          */
281         relation_close(onerel, NoLock);
282
283         /*
284          * Reset my PGXACT flag.  Note: we need this here, and not in vacuum_rel,
285          * because the vacuum flag is cleared by the end-of-xact code.
286          */
287         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
288         MyPgXact->vacuumFlags &= ~PROC_IN_ANALYZE;
289         LWLockRelease(ProcArrayLock);
290 }
291
292 /*
293  *      do_analyze_rel() -- analyze one relation, recursively or not
294  *
295  * Note that "acquirefunc" is only relevant for the non-inherited case.
296  * If we supported foreign tables in inheritance trees,
297  * acquire_inherited_sample_rows would need to determine the appropriate
298  * acquirefunc for each child table.
299  */
300 static void
301 do_analyze_rel(Relation onerel, VacuumStmt *vacstmt,
302                            AcquireSampleRowsFunc acquirefunc, BlockNumber relpages,
303                            bool inh, int elevel)
304 {
305         int                     attr_cnt,
306                                 tcnt,
307                                 i,
308                                 ind;
309         Relation   *Irel;
310         int                     nindexes;
311         bool            hasindex;
312         VacAttrStats **vacattrstats;
313         AnlIndexData *indexdata;
314         int                     targrows,
315                                 numrows;
316         double          totalrows,
317                                 totaldeadrows;
318         HeapTuple  *rows;
319         PGRUsage        ru0;
320         TimestampTz starttime = 0;
321         MemoryContext caller_context;
322         Oid                     save_userid;
323         int                     save_sec_context;
324         int                     save_nestlevel;
325
326         if (inh)
327                 ereport(elevel,
328                                 (errmsg("analyzing \"%s.%s\" inheritance tree",
329                                                 get_namespace_name(RelationGetNamespace(onerel)),
330                                                 RelationGetRelationName(onerel))));
331         else
332                 ereport(elevel,
333                                 (errmsg("analyzing \"%s.%s\"",
334                                                 get_namespace_name(RelationGetNamespace(onerel)),
335                                                 RelationGetRelationName(onerel))));
336
337         /*
338          * Set up a working context so that we can easily free whatever junk gets
339          * created.
340          */
341         anl_context = AllocSetContextCreate(CurrentMemoryContext,
342                                                                                 "Analyze",
343                                                                                 ALLOCSET_DEFAULT_MINSIZE,
344                                                                                 ALLOCSET_DEFAULT_INITSIZE,
345                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
346         caller_context = MemoryContextSwitchTo(anl_context);
347
348         /*
349          * Switch to the table owner's userid, so that any index functions are run
350          * as that user.  Also lock down security-restricted operations and
351          * arrange to make GUC variable changes local to this command.
352          */
353         GetUserIdAndSecContext(&save_userid, &save_sec_context);
354         SetUserIdAndSecContext(onerel->rd_rel->relowner,
355                                                    save_sec_context | SECURITY_RESTRICTED_OPERATION);
356         save_nestlevel = NewGUCNestLevel();
357
358         /* measure elapsed time iff autovacuum logging requires it */
359         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
360         {
361                 pg_rusage_init(&ru0);
362                 if (Log_autovacuum_min_duration > 0)
363                         starttime = GetCurrentTimestamp();
364         }
365
366         /*
367          * Determine which columns to analyze
368          *
369          * Note that system attributes are never analyzed.
370          */
371         if (vacstmt->va_cols != NIL)
372         {
373                 ListCell   *le;
374
375                 vacattrstats = (VacAttrStats **) palloc(list_length(vacstmt->va_cols) *
376                                                                                                 sizeof(VacAttrStats *));
377                 tcnt = 0;
378                 foreach(le, vacstmt->va_cols)
379                 {
380                         char       *col = strVal(lfirst(le));
381
382                         i = attnameAttNum(onerel, col, false);
383                         if (i == InvalidAttrNumber)
384                                 ereport(ERROR,
385                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
386                                         errmsg("column \"%s\" of relation \"%s\" does not exist",
387                                                    col, RelationGetRelationName(onerel))));
388                         vacattrstats[tcnt] = examine_attribute(onerel, i, NULL);
389                         if (vacattrstats[tcnt] != NULL)
390                                 tcnt++;
391                 }
392                 attr_cnt = tcnt;
393         }
394         else
395         {
396                 attr_cnt = onerel->rd_att->natts;
397                 vacattrstats = (VacAttrStats **)
398                         palloc(attr_cnt * sizeof(VacAttrStats *));
399                 tcnt = 0;
400                 for (i = 1; i <= attr_cnt; i++)
401                 {
402                         vacattrstats[tcnt] = examine_attribute(onerel, i, NULL);
403                         if (vacattrstats[tcnt] != NULL)
404                                 tcnt++;
405                 }
406                 attr_cnt = tcnt;
407         }
408
409         /*
410          * Open all indexes of the relation, and see if there are any analyzable
411          * columns in the indexes.      We do not analyze index columns if there was
412          * an explicit column list in the ANALYZE command, however.  If we are
413          * doing a recursive scan, we don't want to touch the parent's indexes at
414          * all.
415          */
416         if (!inh)
417                 vac_open_indexes(onerel, AccessShareLock, &nindexes, &Irel);
418         else
419         {
420                 Irel = NULL;
421                 nindexes = 0;
422         }
423         hasindex = (nindexes > 0);
424         indexdata = NULL;
425         if (hasindex)
426         {
427                 indexdata = (AnlIndexData *) palloc0(nindexes * sizeof(AnlIndexData));
428                 for (ind = 0; ind < nindexes; ind++)
429                 {
430                         AnlIndexData *thisdata = &indexdata[ind];
431                         IndexInfo  *indexInfo;
432
433                         thisdata->indexInfo = indexInfo = BuildIndexInfo(Irel[ind]);
434                         thisdata->tupleFract = 1.0; /* fix later if partial */
435                         if (indexInfo->ii_Expressions != NIL && vacstmt->va_cols == NIL)
436                         {
437                                 ListCell   *indexpr_item = list_head(indexInfo->ii_Expressions);
438
439                                 thisdata->vacattrstats = (VacAttrStats **)
440                                         palloc(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
441                                 tcnt = 0;
442                                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
443                                 {
444                                         int                     keycol = indexInfo->ii_KeyAttrNumbers[i];
445
446                                         if (keycol == 0)
447                                         {
448                                                 /* Found an index expression */
449                                                 Node       *indexkey;
450
451                                                 if (indexpr_item == NULL)               /* shouldn't happen */
452                                                         elog(ERROR, "too few entries in indexprs list");
453                                                 indexkey = (Node *) lfirst(indexpr_item);
454                                                 indexpr_item = lnext(indexpr_item);
455                                                 thisdata->vacattrstats[tcnt] =
456                                                         examine_attribute(Irel[ind], i + 1, indexkey);
457                                                 if (thisdata->vacattrstats[tcnt] != NULL)
458                                                         tcnt++;
459                                         }
460                                 }
461                                 thisdata->attr_cnt = tcnt;
462                         }
463                 }
464         }
465
466         /*
467          * Determine how many rows we need to sample, using the worst case from
468          * all analyzable columns.      We use a lower bound of 100 rows to avoid
469          * possible overflow in Vitter's algorithm.  (Note: that will also be the
470          * target in the corner case where there are no analyzable columns.)
471          */
472         targrows = 100;
473         for (i = 0; i < attr_cnt; i++)
474         {
475                 if (targrows < vacattrstats[i]->minrows)
476                         targrows = vacattrstats[i]->minrows;
477         }
478         for (ind = 0; ind < nindexes; ind++)
479         {
480                 AnlIndexData *thisdata = &indexdata[ind];
481
482                 for (i = 0; i < thisdata->attr_cnt; i++)
483                 {
484                         if (targrows < thisdata->vacattrstats[i]->minrows)
485                                 targrows = thisdata->vacattrstats[i]->minrows;
486                 }
487         }
488
489         /*
490          * Acquire the sample rows
491          */
492         rows = (HeapTuple *) palloc(targrows * sizeof(HeapTuple));
493         if (inh)
494                 numrows = acquire_inherited_sample_rows(onerel, elevel,
495                                                                                                 rows, targrows,
496                                                                                                 &totalrows, &totaldeadrows);
497         else
498                 numrows = (*acquirefunc) (onerel, elevel,
499                                                                   rows, targrows,
500                                                                   &totalrows, &totaldeadrows);
501
502         /*
503          * Compute the statistics.      Temporary results during the calculations for
504          * each column are stored in a child context.  The calc routines are
505          * responsible to make sure that whatever they store into the VacAttrStats
506          * structure is allocated in anl_context.
507          */
508         if (numrows > 0)
509         {
510                 MemoryContext col_context,
511                                         old_context;
512
513                 col_context = AllocSetContextCreate(anl_context,
514                                                                                         "Analyze Column",
515                                                                                         ALLOCSET_DEFAULT_MINSIZE,
516                                                                                         ALLOCSET_DEFAULT_INITSIZE,
517                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
518                 old_context = MemoryContextSwitchTo(col_context);
519
520                 for (i = 0; i < attr_cnt; i++)
521                 {
522                         VacAttrStats *stats = vacattrstats[i];
523                         AttributeOpts *aopt;
524
525                         stats->rows = rows;
526                         stats->tupDesc = onerel->rd_att;
527                         (*stats->compute_stats) (stats,
528                                                                          std_fetch_func,
529                                                                          numrows,
530                                                                          totalrows);
531
532                         /*
533                          * If the appropriate flavor of the n_distinct option is
534                          * specified, override with the corresponding value.
535                          */
536                         aopt = get_attribute_options(onerel->rd_id, stats->attr->attnum);
537                         if (aopt != NULL)
538                         {
539                                 float8          n_distinct;
540
541                                 n_distinct = inh ? aopt->n_distinct_inherited : aopt->n_distinct;
542                                 if (n_distinct != 0.0)
543                                         stats->stadistinct = n_distinct;
544                         }
545
546                         MemoryContextResetAndDeleteChildren(col_context);
547                 }
548
549                 if (hasindex)
550                         compute_index_stats(onerel, totalrows,
551                                                                 indexdata, nindexes,
552                                                                 rows, numrows,
553                                                                 col_context);
554
555                 MemoryContextSwitchTo(old_context);
556                 MemoryContextDelete(col_context);
557
558                 /*
559                  * Emit the completed stats rows into pg_statistic, replacing any
560                  * previous statistics for the target columns.  (If there are stats in
561                  * pg_statistic for columns we didn't process, we leave them alone.)
562                  */
563                 update_attstats(RelationGetRelid(onerel), inh,
564                                                 attr_cnt, vacattrstats);
565
566                 for (ind = 0; ind < nindexes; ind++)
567                 {
568                         AnlIndexData *thisdata = &indexdata[ind];
569
570                         update_attstats(RelationGetRelid(Irel[ind]), false,
571                                                         thisdata->attr_cnt, thisdata->vacattrstats);
572                 }
573         }
574
575         /*
576          * Update pages/tuples stats in pg_class ... but not if we're doing
577          * inherited stats.
578          */
579         if (!inh)
580                 vac_update_relstats(onerel,
581                                                         relpages,
582                                                         totalrows,
583                                                         visibilitymap_count(onerel),
584                                                         hasindex,
585                                                         InvalidTransactionId,
586                                                         InvalidMultiXactId);
587
588         /*
589          * Same for indexes. Vacuum always scans all indexes, so if we're part of
590          * VACUUM ANALYZE, don't overwrite the accurate count already inserted by
591          * VACUUM.
592          */
593         if (!inh && !(vacstmt->options & VACOPT_VACUUM))
594         {
595                 for (ind = 0; ind < nindexes; ind++)
596                 {
597                         AnlIndexData *thisdata = &indexdata[ind];
598                         double          totalindexrows;
599
600                         totalindexrows = ceil(thisdata->tupleFract * totalrows);
601                         vac_update_relstats(Irel[ind],
602                                                                 RelationGetNumberOfBlocks(Irel[ind]),
603                                                                 totalindexrows,
604                                                                 0,
605                                                                 false,
606                                                                 InvalidTransactionId,
607                                                                 InvalidMultiXactId);
608                 }
609         }
610
611         /*
612          * Report ANALYZE to the stats collector, too.  However, if doing
613          * inherited stats we shouldn't report, because the stats collector only
614          * tracks per-table stats.
615          */
616         if (!inh)
617                 pgstat_report_analyze(onerel, totalrows, totaldeadrows);
618
619         /* If this isn't part of VACUUM ANALYZE, let index AMs do cleanup */
620         if (!(vacstmt->options & VACOPT_VACUUM))
621         {
622                 for (ind = 0; ind < nindexes; ind++)
623                 {
624                         IndexBulkDeleteResult *stats;
625                         IndexVacuumInfo ivinfo;
626
627                         ivinfo.index = Irel[ind];
628                         ivinfo.analyze_only = true;
629                         ivinfo.estimated_count = true;
630                         ivinfo.message_level = elevel;
631                         ivinfo.num_heap_tuples = onerel->rd_rel->reltuples;
632                         ivinfo.strategy = vac_strategy;
633
634                         stats = index_vacuum_cleanup(&ivinfo, NULL);
635
636                         if (stats)
637                                 pfree(stats);
638                 }
639         }
640
641         /* Done with indexes */
642         vac_close_indexes(nindexes, Irel, NoLock);
643
644         /* Log the action if appropriate */
645         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
646         {
647                 if (Log_autovacuum_min_duration == 0 ||
648                         TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
649                                                                            Log_autovacuum_min_duration))
650                         ereport(LOG,
651                                         (errmsg("automatic analyze of table \"%s.%s.%s\" system usage: %s",
652                                                         get_database_name(MyDatabaseId),
653                                                         get_namespace_name(RelationGetNamespace(onerel)),
654                                                         RelationGetRelationName(onerel),
655                                                         pg_rusage_show(&ru0))));
656         }
657
658         /* Roll back any GUC changes executed by index functions */
659         AtEOXact_GUC(false, save_nestlevel);
660
661         /* Restore userid and security context */
662         SetUserIdAndSecContext(save_userid, save_sec_context);
663
664         /* Restore current context and release memory */
665         MemoryContextSwitchTo(caller_context);
666         MemoryContextDelete(anl_context);
667         anl_context = NULL;
668 }
669
670 /*
671  * Compute statistics about indexes of a relation
672  */
673 static void
674 compute_index_stats(Relation onerel, double totalrows,
675                                         AnlIndexData *indexdata, int nindexes,
676                                         HeapTuple *rows, int numrows,
677                                         MemoryContext col_context)
678 {
679         MemoryContext ind_context,
680                                 old_context;
681         Datum           values[INDEX_MAX_KEYS];
682         bool            isnull[INDEX_MAX_KEYS];
683         int                     ind,
684                                 i;
685
686         ind_context = AllocSetContextCreate(anl_context,
687                                                                                 "Analyze Index",
688                                                                                 ALLOCSET_DEFAULT_MINSIZE,
689                                                                                 ALLOCSET_DEFAULT_INITSIZE,
690                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
691         old_context = MemoryContextSwitchTo(ind_context);
692
693         for (ind = 0; ind < nindexes; ind++)
694         {
695                 AnlIndexData *thisdata = &indexdata[ind];
696                 IndexInfo  *indexInfo = thisdata->indexInfo;
697                 int                     attr_cnt = thisdata->attr_cnt;
698                 TupleTableSlot *slot;
699                 EState     *estate;
700                 ExprContext *econtext;
701                 List       *predicate;
702                 Datum      *exprvals;
703                 bool       *exprnulls;
704                 int                     numindexrows,
705                                         tcnt,
706                                         rowno;
707                 double          totalindexrows;
708
709                 /* Ignore index if no columns to analyze and not partial */
710                 if (attr_cnt == 0 && indexInfo->ii_Predicate == NIL)
711                         continue;
712
713                 /*
714                  * Need an EState for evaluation of index expressions and
715                  * partial-index predicates.  Create it in the per-index context to be
716                  * sure it gets cleaned up at the bottom of the loop.
717                  */
718                 estate = CreateExecutorState();
719                 econtext = GetPerTupleExprContext(estate);
720                 /* Need a slot to hold the current heap tuple, too */
721                 slot = MakeSingleTupleTableSlot(RelationGetDescr(onerel));
722
723                 /* Arrange for econtext's scan tuple to be the tuple under test */
724                 econtext->ecxt_scantuple = slot;
725
726                 /* Set up execution state for predicate. */
727                 predicate = (List *)
728                         ExecPrepareExpr((Expr *) indexInfo->ii_Predicate,
729                                                         estate);
730
731                 /* Compute and save index expression values */
732                 exprvals = (Datum *) palloc(numrows * attr_cnt * sizeof(Datum));
733                 exprnulls = (bool *) palloc(numrows * attr_cnt * sizeof(bool));
734                 numindexrows = 0;
735                 tcnt = 0;
736                 for (rowno = 0; rowno < numrows; rowno++)
737                 {
738                         HeapTuple       heapTuple = rows[rowno];
739
740                         /*
741                          * Reset the per-tuple context each time, to reclaim any cruft
742                          * left behind by evaluating the predicate or index expressions.
743                          */
744                         ResetExprContext(econtext);
745
746                         /* Set up for predicate or expression evaluation */
747                         ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
748
749                         /* If index is partial, check predicate */
750                         if (predicate != NIL)
751                         {
752                                 if (!ExecQual(predicate, econtext, false))
753                                         continue;
754                         }
755                         numindexrows++;
756
757                         if (attr_cnt > 0)
758                         {
759                                 /*
760                                  * Evaluate the index row to compute expression values. We
761                                  * could do this by hand, but FormIndexDatum is convenient.
762                                  */
763                                 FormIndexDatum(indexInfo,
764                                                            slot,
765                                                            estate,
766                                                            values,
767                                                            isnull);
768
769                                 /*
770                                  * Save just the columns we care about.  We copy the values
771                                  * into ind_context from the estate's per-tuple context.
772                                  */
773                                 for (i = 0; i < attr_cnt; i++)
774                                 {
775                                         VacAttrStats *stats = thisdata->vacattrstats[i];
776                                         int                     attnum = stats->attr->attnum;
777
778                                         if (isnull[attnum - 1])
779                                         {
780                                                 exprvals[tcnt] = (Datum) 0;
781                                                 exprnulls[tcnt] = true;
782                                         }
783                                         else
784                                         {
785                                                 exprvals[tcnt] = datumCopy(values[attnum - 1],
786                                                                                                    stats->attrtype->typbyval,
787                                                                                                    stats->attrtype->typlen);
788                                                 exprnulls[tcnt] = false;
789                                         }
790                                         tcnt++;
791                                 }
792                         }
793                 }
794
795                 /*
796                  * Having counted the number of rows that pass the predicate in the
797                  * sample, we can estimate the total number of rows in the index.
798                  */
799                 thisdata->tupleFract = (double) numindexrows / (double) numrows;
800                 totalindexrows = ceil(thisdata->tupleFract * totalrows);
801
802                 /*
803                  * Now we can compute the statistics for the expression columns.
804                  */
805                 if (numindexrows > 0)
806                 {
807                         MemoryContextSwitchTo(col_context);
808                         for (i = 0; i < attr_cnt; i++)
809                         {
810                                 VacAttrStats *stats = thisdata->vacattrstats[i];
811                                 AttributeOpts *aopt =
812                                 get_attribute_options(stats->attr->attrelid,
813                                                                           stats->attr->attnum);
814
815                                 stats->exprvals = exprvals + i;
816                                 stats->exprnulls = exprnulls + i;
817                                 stats->rowstride = attr_cnt;
818                                 (*stats->compute_stats) (stats,
819                                                                                  ind_fetch_func,
820                                                                                  numindexrows,
821                                                                                  totalindexrows);
822
823                                 /*
824                                  * If the n_distinct option is specified, it overrides the
825                                  * above computation.  For indices, we always use just
826                                  * n_distinct, not n_distinct_inherited.
827                                  */
828                                 if (aopt != NULL && aopt->n_distinct != 0.0)
829                                         stats->stadistinct = aopt->n_distinct;
830
831                                 MemoryContextResetAndDeleteChildren(col_context);
832                         }
833                 }
834
835                 /* And clean up */
836                 MemoryContextSwitchTo(ind_context);
837
838                 ExecDropSingleTupleTableSlot(slot);
839                 FreeExecutorState(estate);
840                 MemoryContextResetAndDeleteChildren(ind_context);
841         }
842
843         MemoryContextSwitchTo(old_context);
844         MemoryContextDelete(ind_context);
845 }
846
847 /*
848  * examine_attribute -- pre-analysis of a single column
849  *
850  * Determine whether the column is analyzable; if so, create and initialize
851  * a VacAttrStats struct for it.  If not, return NULL.
852  *
853  * If index_expr isn't NULL, then we're trying to analyze an expression index,
854  * and index_expr is the expression tree representing the column's data.
855  */
856 static VacAttrStats *
857 examine_attribute(Relation onerel, int attnum, Node *index_expr)
858 {
859         Form_pg_attribute attr = onerel->rd_att->attrs[attnum - 1];
860         HeapTuple       typtuple;
861         VacAttrStats *stats;
862         int                     i;
863         bool            ok;
864
865         /* Never analyze dropped columns */
866         if (attr->attisdropped)
867                 return NULL;
868
869         /* Don't analyze column if user has specified not to */
870         if (attr->attstattarget == 0)
871                 return NULL;
872
873         /*
874          * Create the VacAttrStats struct.      Note that we only have a copy of the
875          * fixed fields of the pg_attribute tuple.
876          */
877         stats = (VacAttrStats *) palloc0(sizeof(VacAttrStats));
878         stats->attr = (Form_pg_attribute) palloc(ATTRIBUTE_FIXED_PART_SIZE);
879         memcpy(stats->attr, attr, ATTRIBUTE_FIXED_PART_SIZE);
880
881         /*
882          * When analyzing an expression index, believe the expression tree's type
883          * not the column datatype --- the latter might be the opckeytype storage
884          * type of the opclass, which is not interesting for our purposes.      (Note:
885          * if we did anything with non-expression index columns, we'd need to
886          * figure out where to get the correct type info from, but for now that's
887          * not a problem.)      It's not clear whether anyone will care about the
888          * typmod, but we store that too just in case.
889          */
890         if (index_expr)
891         {
892                 stats->attrtypid = exprType(index_expr);
893                 stats->attrtypmod = exprTypmod(index_expr);
894         }
895         else
896         {
897                 stats->attrtypid = attr->atttypid;
898                 stats->attrtypmod = attr->atttypmod;
899         }
900
901         typtuple = SearchSysCacheCopy1(TYPEOID,
902                                                                    ObjectIdGetDatum(stats->attrtypid));
903         if (!HeapTupleIsValid(typtuple))
904                 elog(ERROR, "cache lookup failed for type %u", stats->attrtypid);
905         stats->attrtype = (Form_pg_type) GETSTRUCT(typtuple);
906         stats->anl_context = anl_context;
907         stats->tupattnum = attnum;
908
909         /*
910          * The fields describing the stats->stavalues[n] element types default to
911          * the type of the data being analyzed, but the type-specific typanalyze
912          * function can change them if it wants to store something else.
913          */
914         for (i = 0; i < STATISTIC_NUM_SLOTS; i++)
915         {
916                 stats->statypid[i] = stats->attrtypid;
917                 stats->statyplen[i] = stats->attrtype->typlen;
918                 stats->statypbyval[i] = stats->attrtype->typbyval;
919                 stats->statypalign[i] = stats->attrtype->typalign;
920         }
921
922         /*
923          * Call the type-specific typanalyze function.  If none is specified, use
924          * std_typanalyze().
925          */
926         if (OidIsValid(stats->attrtype->typanalyze))
927                 ok = DatumGetBool(OidFunctionCall1(stats->attrtype->typanalyze,
928                                                                                    PointerGetDatum(stats)));
929         else
930                 ok = std_typanalyze(stats);
931
932         if (!ok || stats->compute_stats == NULL || stats->minrows <= 0)
933         {
934                 heap_freetuple(typtuple);
935                 pfree(stats->attr);
936                 pfree(stats);
937                 return NULL;
938         }
939
940         return stats;
941 }
942
943 /*
944  * BlockSampler_Init -- prepare for random sampling of blocknumbers
945  *
946  * BlockSampler is used for stage one of our new two-stage tuple
947  * sampling mechanism as discussed on pgsql-hackers 2004-04-02 (subject
948  * "Large DB").  It selects a random sample of samplesize blocks out of
949  * the nblocks blocks in the table.  If the table has less than
950  * samplesize blocks, all blocks are selected.
951  *
952  * Since we know the total number of blocks in advance, we can use the
953  * straightforward Algorithm S from Knuth 3.4.2, rather than Vitter's
954  * algorithm.
955  */
956 static void
957 BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize)
958 {
959         bs->N = nblocks;                        /* measured table size */
960
961         /*
962          * If we decide to reduce samplesize for tables that have less or not much
963          * more than samplesize blocks, here is the place to do it.
964          */
965         bs->n = samplesize;
966         bs->t = 0;                                      /* blocks scanned so far */
967         bs->m = 0;                                      /* blocks selected so far */
968 }
969
970 static bool
971 BlockSampler_HasMore(BlockSampler bs)
972 {
973         return (bs->t < bs->N) && (bs->m < bs->n);
974 }
975
976 static BlockNumber
977 BlockSampler_Next(BlockSampler bs)
978 {
979         BlockNumber K = bs->N - bs->t;          /* remaining blocks */
980         int                     k = bs->n - bs->m;              /* blocks still to sample */
981         double          p;                              /* probability to skip block */
982         double          V;                              /* random */
983
984         Assert(BlockSampler_HasMore(bs));       /* hence K > 0 and k > 0 */
985
986         if ((BlockNumber) k >= K)
987         {
988                 /* need all the rest */
989                 bs->m++;
990                 return bs->t++;
991         }
992
993         /*----------
994          * It is not obvious that this code matches Knuth's Algorithm S.
995          * Knuth says to skip the current block with probability 1 - k/K.
996          * If we are to skip, we should advance t (hence decrease K), and
997          * repeat the same probabilistic test for the next block.  The naive
998          * implementation thus requires an anl_random_fract() call for each block
999          * number.      But we can reduce this to one anl_random_fract() call per
1000          * selected block, by noting that each time the while-test succeeds,
1001          * we can reinterpret V as a uniform random number in the range 0 to p.
1002          * Therefore, instead of choosing a new V, we just adjust p to be
1003          * the appropriate fraction of its former value, and our next loop
1004          * makes the appropriate probabilistic test.
1005          *
1006          * We have initially K > k > 0.  If the loop reduces K to equal k,
1007          * the next while-test must fail since p will become exactly zero
1008          * (we assume there will not be roundoff error in the division).
1009          * (Note: Knuth suggests a "<=" loop condition, but we use "<" just
1010          * to be doubly sure about roundoff error.)  Therefore K cannot become
1011          * less than k, which means that we cannot fail to select enough blocks.
1012          *----------
1013          */
1014         V = anl_random_fract();
1015         p = 1.0 - (double) k / (double) K;
1016         while (V < p)
1017         {
1018                 /* skip */
1019                 bs->t++;
1020                 K--;                                    /* keep K == N - t */
1021
1022                 /* adjust p to be new cutoff point in reduced range */
1023                 p *= 1.0 - (double) k / (double) K;
1024         }
1025
1026         /* select */
1027         bs->m++;
1028         return bs->t++;
1029 }
1030
1031 /*
1032  * acquire_sample_rows -- acquire a random sample of rows from the table
1033  *
1034  * Selected rows are returned in the caller-allocated array rows[], which
1035  * must have at least targrows entries.
1036  * The actual number of rows selected is returned as the function result.
1037  * We also estimate the total numbers of live and dead rows in the table,
1038  * and return them into *totalrows and *totaldeadrows, respectively.
1039  *
1040  * The returned list of tuples is in order by physical position in the table.
1041  * (We will rely on this later to derive correlation estimates.)
1042  *
1043  * As of May 2004 we use a new two-stage method:  Stage one selects up
1044  * to targrows random blocks (or all blocks, if there aren't so many).
1045  * Stage two scans these blocks and uses the Vitter algorithm to create
1046  * a random sample of targrows rows (or less, if there are less in the
1047  * sample of blocks).  The two stages are executed simultaneously: each
1048  * block is processed as soon as stage one returns its number and while
1049  * the rows are read stage two controls which ones are to be inserted
1050  * into the sample.
1051  *
1052  * Although every row has an equal chance of ending up in the final
1053  * sample, this sampling method is not perfect: not every possible
1054  * sample has an equal chance of being selected.  For large relations
1055  * the number of different blocks represented by the sample tends to be
1056  * too small.  We can live with that for now.  Improvements are welcome.
1057  *
1058  * An important property of this sampling method is that because we do
1059  * look at a statistically unbiased set of blocks, we should get
1060  * unbiased estimates of the average numbers of live and dead rows per
1061  * block.  The previous sampling method put too much credence in the row
1062  * density near the start of the table.
1063  */
1064 static int
1065 acquire_sample_rows(Relation onerel, int elevel,
1066                                         HeapTuple *rows, int targrows,
1067                                         double *totalrows, double *totaldeadrows)
1068 {
1069         int                     numrows = 0;    /* # rows now in reservoir */
1070         double          samplerows = 0; /* total # rows collected */
1071         double          liverows = 0;   /* # live rows seen */
1072         double          deadrows = 0;   /* # dead rows seen */
1073         double          rowstoskip = -1;        /* -1 means not set yet */
1074         BlockNumber totalblocks;
1075         TransactionId OldestXmin;
1076         BlockSamplerData bs;
1077         double          rstate;
1078
1079         Assert(targrows > 0);
1080
1081         totalblocks = RelationGetNumberOfBlocks(onerel);
1082
1083         /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
1084         OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
1085
1086         /* Prepare for sampling block numbers */
1087         BlockSampler_Init(&bs, totalblocks, targrows);
1088         /* Prepare for sampling rows */
1089         rstate = anl_init_selection_state(targrows);
1090
1091         /* Outer loop over blocks to sample */
1092         while (BlockSampler_HasMore(&bs))
1093         {
1094                 BlockNumber targblock = BlockSampler_Next(&bs);
1095                 Buffer          targbuffer;
1096                 Page            targpage;
1097                 OffsetNumber targoffset,
1098                                         maxoffset;
1099
1100                 vacuum_delay_point();
1101
1102                 /*
1103                  * We must maintain a pin on the target page's buffer to ensure that
1104                  * the maxoffset value stays good (else concurrent VACUUM might delete
1105                  * tuples out from under us).  Hence, pin the page until we are done
1106                  * looking at it.  We also choose to hold sharelock on the buffer
1107                  * throughout --- we could release and re-acquire sharelock for each
1108                  * tuple, but since we aren't doing much work per tuple, the extra
1109                  * lock traffic is probably better avoided.
1110                  */
1111                 targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
1112                                                                                 RBM_NORMAL, vac_strategy);
1113                 LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
1114                 targpage = BufferGetPage(targbuffer);
1115                 maxoffset = PageGetMaxOffsetNumber(targpage);
1116
1117                 /* Inner loop over all tuples on the selected page */
1118                 for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
1119                 {
1120                         ItemId          itemid;
1121                         HeapTupleData targtuple;
1122                         bool            sample_it = false;
1123
1124                         itemid = PageGetItemId(targpage, targoffset);
1125
1126                         /*
1127                          * We ignore unused and redirect line pointers.  DEAD line
1128                          * pointers should be counted as dead, because we need vacuum to
1129                          * run to get rid of them.      Note that this rule agrees with the
1130                          * way that heap_page_prune() counts things.
1131                          */
1132                         if (!ItemIdIsNormal(itemid))
1133                         {
1134                                 if (ItemIdIsDead(itemid))
1135                                         deadrows += 1;
1136                                 continue;
1137                         }
1138
1139                         ItemPointerSet(&targtuple.t_self, targblock, targoffset);
1140
1141                         targtuple.t_tableOid = RelationGetRelid(onerel);
1142                         targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
1143                         targtuple.t_len = ItemIdGetLength(itemid);
1144
1145                         switch (HeapTupleSatisfiesVacuum(&targtuple,
1146                                                                                          OldestXmin,
1147                                                                                          targbuffer))
1148                         {
1149                                 case HEAPTUPLE_LIVE:
1150                                         sample_it = true;
1151                                         liverows += 1;
1152                                         break;
1153
1154                                 case HEAPTUPLE_DEAD:
1155                                 case HEAPTUPLE_RECENTLY_DEAD:
1156                                         /* Count dead and recently-dead rows */
1157                                         deadrows += 1;
1158                                         break;
1159
1160                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1161
1162                                         /*
1163                                          * Insert-in-progress rows are not counted.  We assume
1164                                          * that when the inserting transaction commits or aborts,
1165                                          * it will send a stats message to increment the proper
1166                                          * count.  This works right only if that transaction ends
1167                                          * after we finish analyzing the table; if things happen
1168                                          * in the other order, its stats update will be
1169                                          * overwritten by ours.  However, the error will be large
1170                                          * only if the other transaction runs long enough to
1171                                          * insert many tuples, so assuming it will finish after us
1172                                          * is the safer option.
1173                                          *
1174                                          * A special case is that the inserting transaction might
1175                                          * be our own.  In this case we should count and sample
1176                                          * the row, to accommodate users who load a table and
1177                                          * analyze it in one transaction.  (pgstat_report_analyze
1178                                          * has to adjust the numbers we send to the stats
1179                                          * collector to make this come out right.)
1180                                          */
1181                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
1182                                         {
1183                                                 sample_it = true;
1184                                                 liverows += 1;
1185                                         }
1186                                         break;
1187
1188                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1189
1190                                         /*
1191                                          * We count delete-in-progress rows as still live, using
1192                                          * the same reasoning given above; but we don't bother to
1193                                          * include them in the sample.
1194                                          *
1195                                          * If the delete was done by our own transaction, however,
1196                                          * we must count the row as dead to make
1197                                          * pgstat_report_analyze's stats adjustments come out
1198                                          * right.  (Note: this works out properly when the row was
1199                                          * both inserted and deleted in our xact.)
1200                                          */
1201                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple.t_data)))
1202                                                 deadrows += 1;
1203                                         else
1204                                                 liverows += 1;
1205                                         break;
1206
1207                                 default:
1208                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1209                                         break;
1210                         }
1211
1212                         if (sample_it)
1213                         {
1214                                 /*
1215                                  * The first targrows sample rows are simply copied into the
1216                                  * reservoir. Then we start replacing tuples in the sample
1217                                  * until we reach the end of the relation.      This algorithm is
1218                                  * from Jeff Vitter's paper (see full citation below). It
1219                                  * works by repeatedly computing the number of tuples to skip
1220                                  * before selecting a tuple, which replaces a randomly chosen
1221                                  * element of the reservoir (current set of tuples).  At all
1222                                  * times the reservoir is a true random sample of the tuples
1223                                  * we've passed over so far, so when we fall off the end of
1224                                  * the relation we're done.
1225                                  */
1226                                 if (numrows < targrows)
1227                                         rows[numrows++] = heap_copytuple(&targtuple);
1228                                 else
1229                                 {
1230                                         /*
1231                                          * t in Vitter's paper is the number of records already
1232                                          * processed.  If we need to compute a new S value, we
1233                                          * must use the not-yet-incremented value of samplerows as
1234                                          * t.
1235                                          */
1236                                         if (rowstoskip < 0)
1237                                                 rowstoskip = anl_get_next_S(samplerows, targrows,
1238                                                                                                         &rstate);
1239
1240                                         if (rowstoskip <= 0)
1241                                         {
1242                                                 /*
1243                                                  * Found a suitable tuple, so save it, replacing one
1244                                                  * old tuple at random
1245                                                  */
1246                                                 int                     k = (int) (targrows * anl_random_fract());
1247
1248                                                 Assert(k >= 0 && k < targrows);
1249                                                 heap_freetuple(rows[k]);
1250                                                 rows[k] = heap_copytuple(&targtuple);
1251                                         }
1252
1253                                         rowstoskip -= 1;
1254                                 }
1255
1256                                 samplerows += 1;
1257                         }
1258                 }
1259
1260                 /* Now release the lock and pin on the page */
1261                 UnlockReleaseBuffer(targbuffer);
1262         }
1263
1264         /*
1265          * If we didn't find as many tuples as we wanted then we're done. No sort
1266          * is needed, since they're already in order.
1267          *
1268          * Otherwise we need to sort the collected tuples by position
1269          * (itempointer). It's not worth worrying about corner cases where the
1270          * tuples are already sorted.
1271          */
1272         if (numrows == targrows)
1273                 qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
1274
1275         /*
1276          * Estimate total numbers of rows in relation.  For live rows, use
1277          * vac_estimate_reltuples; for dead rows, we have no source of old
1278          * information, so we have to assume the density is the same in unseen
1279          * pages as in the pages we scanned.
1280          */
1281         *totalrows = vac_estimate_reltuples(onerel, true,
1282                                                                                 totalblocks,
1283                                                                                 bs.m,
1284                                                                                 liverows);
1285         if (bs.m > 0)
1286                 *totaldeadrows = floor((deadrows / bs.m) * totalblocks + 0.5);
1287         else
1288                 *totaldeadrows = 0.0;
1289
1290         /*
1291          * Emit some interesting relation info
1292          */
1293         ereport(elevel,
1294                         (errmsg("\"%s\": scanned %d of %u pages, "
1295                                         "containing %.0f live rows and %.0f dead rows; "
1296                                         "%d rows in sample, %.0f estimated total rows",
1297                                         RelationGetRelationName(onerel),
1298                                         bs.m, totalblocks,
1299                                         liverows, deadrows,
1300                                         numrows, *totalrows)));
1301
1302         return numrows;
1303 }
1304
1305 /* Select a random value R uniformly distributed in (0 - 1) */
1306 double
1307 anl_random_fract(void)
1308 {
1309         return ((double) random() + 1) / ((double) MAX_RANDOM_VALUE + 2);
1310 }
1311
1312 /*
1313  * These two routines embody Algorithm Z from "Random sampling with a
1314  * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
1315  * (Mar. 1985), Pages 37-57.  Vitter describes his algorithm in terms
1316  * of the count S of records to skip before processing another record.
1317  * It is computed primarily based on t, the number of records already read.
1318  * The only extra state needed between calls is W, a random state variable.
1319  *
1320  * anl_init_selection_state computes the initial W value.
1321  *
1322  * Given that we've already read t records (t >= n), anl_get_next_S
1323  * determines the number of records to skip before the next record is
1324  * processed.
1325  */
1326 double
1327 anl_init_selection_state(int n)
1328 {
1329         /* Initial value of W (for use when Algorithm Z is first applied) */
1330         return exp(-log(anl_random_fract()) / n);
1331 }
1332
1333 double
1334 anl_get_next_S(double t, int n, double *stateptr)
1335 {
1336         double          S;
1337
1338         /* The magic constant here is T from Vitter's paper */
1339         if (t <= (22.0 * n))
1340         {
1341                 /* Process records using Algorithm X until t is large enough */
1342                 double          V,
1343                                         quot;
1344
1345                 V = anl_random_fract(); /* Generate V */
1346                 S = 0;
1347                 t += 1;
1348                 /* Note: "num" in Vitter's code is always equal to t - n */
1349                 quot = (t - (double) n) / t;
1350                 /* Find min S satisfying (4.1) */
1351                 while (quot > V)
1352                 {
1353                         S += 1;
1354                         t += 1;
1355                         quot *= (t - (double) n) / t;
1356                 }
1357         }
1358         else
1359         {
1360                 /* Now apply Algorithm Z */
1361                 double          W = *stateptr;
1362                 double          term = t - (double) n + 1;
1363
1364                 for (;;)
1365                 {
1366                         double          numer,
1367                                                 numer_lim,
1368                                                 denom;
1369                         double          U,
1370                                                 X,
1371                                                 lhs,
1372                                                 rhs,
1373                                                 y,
1374                                                 tmp;
1375
1376                         /* Generate U and X */
1377                         U = anl_random_fract();
1378                         X = t * (W - 1.0);
1379                         S = floor(X);           /* S is tentatively set to floor(X) */
1380                         /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
1381                         tmp = (t + 1) / term;
1382                         lhs = exp(log(((U * tmp * tmp) * (term + S)) / (t + X)) / n);
1383                         rhs = (((t + X) / (term + S)) * term) / t;
1384                         if (lhs <= rhs)
1385                         {
1386                                 W = rhs / lhs;
1387                                 break;
1388                         }
1389                         /* Test if U <= f(S)/cg(X) */
1390                         y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X);
1391                         if ((double) n < S)
1392                         {
1393                                 denom = t;
1394                                 numer_lim = term + S;
1395                         }
1396                         else
1397                         {
1398                                 denom = t - (double) n + S;
1399                                 numer_lim = t + 1;
1400                         }
1401                         for (numer = t + S; numer >= numer_lim; numer -= 1)
1402                         {
1403                                 y *= numer / denom;
1404                                 denom -= 1;
1405                         }
1406                         W = exp(-log(anl_random_fract()) / n);          /* Generate W in advance */
1407                         if (exp(log(y) / n) <= (t + X) / t)
1408                                 break;
1409                 }
1410                 *stateptr = W;
1411         }
1412         return S;
1413 }
1414
1415 /*
1416  * qsort comparator for sorting rows[] array
1417  */
1418 static int
1419 compare_rows(const void *a, const void *b)
1420 {
1421         HeapTuple       ha = *(const HeapTuple *) a;
1422         HeapTuple       hb = *(const HeapTuple *) b;
1423         BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self);
1424         OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self);
1425         BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self);
1426         OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self);
1427
1428         if (ba < bb)
1429                 return -1;
1430         if (ba > bb)
1431                 return 1;
1432         if (oa < ob)
1433                 return -1;
1434         if (oa > ob)
1435                 return 1;
1436         return 0;
1437 }
1438
1439
1440 /*
1441  * acquire_inherited_sample_rows -- acquire sample rows from inheritance tree
1442  *
1443  * This has the same API as acquire_sample_rows, except that rows are
1444  * collected from all inheritance children as well as the specified table.
1445  * We fail and return zero if there are no inheritance children.
1446  */
1447 static int
1448 acquire_inherited_sample_rows(Relation onerel, int elevel,
1449                                                           HeapTuple *rows, int targrows,
1450                                                           double *totalrows, double *totaldeadrows)
1451 {
1452         List       *tableOIDs;
1453         Relation   *rels;
1454         double     *relblocks;
1455         double          totalblocks;
1456         int                     numrows,
1457                                 nrels,
1458                                 i;
1459         ListCell   *lc;
1460
1461         /*
1462          * Find all members of inheritance set.  We only need AccessShareLock on
1463          * the children.
1464          */
1465         tableOIDs =
1466                 find_all_inheritors(RelationGetRelid(onerel), AccessShareLock, NULL);
1467
1468         /*
1469          * Check that there's at least one descendant, else fail.  This could
1470          * happen despite analyze_rel's relhassubclass check, if table once had a
1471          * child but no longer does.  In that case, we can clear the
1472          * relhassubclass field so as not to make the same mistake again later.
1473          * (This is safe because we hold ShareUpdateExclusiveLock.)
1474          */
1475         if (list_length(tableOIDs) < 2)
1476         {
1477                 /* CCI because we already updated the pg_class row in this command */
1478                 CommandCounterIncrement();
1479                 SetRelationHasSubclass(RelationGetRelid(onerel), false);
1480                 return 0;
1481         }
1482
1483         /*
1484          * Count the blocks in all the relations.  The result could overflow
1485          * BlockNumber, so we use double arithmetic.
1486          */
1487         rels = (Relation *) palloc(list_length(tableOIDs) * sizeof(Relation));
1488         relblocks = (double *) palloc(list_length(tableOIDs) * sizeof(double));
1489         totalblocks = 0;
1490         nrels = 0;
1491         foreach(lc, tableOIDs)
1492         {
1493                 Oid                     childOID = lfirst_oid(lc);
1494                 Relation        childrel;
1495
1496                 /* We already got the needed lock */
1497                 childrel = heap_open(childOID, NoLock);
1498
1499                 /* Ignore if temp table of another backend */
1500                 if (RELATION_IS_OTHER_TEMP(childrel))
1501                 {
1502                         /* ... but release the lock on it */
1503                         Assert(childrel != onerel);
1504                         heap_close(childrel, AccessShareLock);
1505                         continue;
1506                 }
1507
1508                 rels[nrels] = childrel;
1509                 relblocks[nrels] = (double) RelationGetNumberOfBlocks(childrel);
1510                 totalblocks += relblocks[nrels];
1511                 nrels++;
1512         }
1513
1514         /*
1515          * Now sample rows from each relation, proportionally to its fraction of
1516          * the total block count.  (This might be less than desirable if the child
1517          * rels have radically different free-space percentages, but it's not
1518          * clear that it's worth working harder.)
1519          */
1520         numrows = 0;
1521         *totalrows = 0;
1522         *totaldeadrows = 0;
1523         for (i = 0; i < nrels; i++)
1524         {
1525                 Relation        childrel = rels[i];
1526                 double          childblocks = relblocks[i];
1527
1528                 if (childblocks > 0)
1529                 {
1530                         int                     childtargrows;
1531
1532                         childtargrows = (int) rint(targrows * childblocks / totalblocks);
1533                         /* Make sure we don't overrun due to roundoff error */
1534                         childtargrows = Min(childtargrows, targrows - numrows);
1535                         if (childtargrows > 0)
1536                         {
1537                                 int                     childrows;
1538                                 double          trows,
1539                                                         tdrows;
1540
1541                                 /* Fetch a random sample of the child's rows */
1542                                 childrows = acquire_sample_rows(childrel,
1543                                                                                                 elevel,
1544                                                                                                 rows + numrows,
1545                                                                                                 childtargrows,
1546                                                                                                 &trows,
1547                                                                                                 &tdrows);
1548
1549                                 /* We may need to convert from child's rowtype to parent's */
1550                                 if (childrows > 0 &&
1551                                         !equalTupleDescs(RelationGetDescr(childrel),
1552                                                                          RelationGetDescr(onerel)))
1553                                 {
1554                                         TupleConversionMap *map;
1555
1556                                         map = convert_tuples_by_name(RelationGetDescr(childrel),
1557                                                                                                  RelationGetDescr(onerel),
1558                                                                  gettext_noop("could not convert row type"));
1559                                         if (map != NULL)
1560                                         {
1561                                                 int                     j;
1562
1563                                                 for (j = 0; j < childrows; j++)
1564                                                 {
1565                                                         HeapTuple       newtup;
1566
1567                                                         newtup = do_convert_tuple(rows[numrows + j], map);
1568                                                         heap_freetuple(rows[numrows + j]);
1569                                                         rows[numrows + j] = newtup;
1570                                                 }
1571                                                 free_conversion_map(map);
1572                                         }
1573                                 }
1574
1575                                 /* And add to counts */
1576                                 numrows += childrows;
1577                                 *totalrows += trows;
1578                                 *totaldeadrows += tdrows;
1579                         }
1580                 }
1581
1582                 /*
1583                  * Note: we cannot release the child-table locks, since we may have
1584                  * pointers to their TOAST tables in the sampled rows.
1585                  */
1586                 heap_close(childrel, NoLock);
1587         }
1588
1589         return numrows;
1590 }
1591
1592
1593 /*
1594  *      update_attstats() -- update attribute statistics for one relation
1595  *
1596  *              Statistics are stored in several places: the pg_class row for the
1597  *              relation has stats about the whole relation, and there is a
1598  *              pg_statistic row for each (non-system) attribute that has ever
1599  *              been analyzed.  The pg_class values are updated by VACUUM, not here.
1600  *
1601  *              pg_statistic rows are just added or updated normally.  This means
1602  *              that pg_statistic will probably contain some deleted rows at the
1603  *              completion of a vacuum cycle, unless it happens to get vacuumed last.
1604  *
1605  *              To keep things simple, we punt for pg_statistic, and don't try
1606  *              to compute or store rows for pg_statistic itself in pg_statistic.
1607  *              This could possibly be made to work, but it's not worth the trouble.
1608  *              Note analyze_rel() has seen to it that we won't come here when
1609  *              vacuuming pg_statistic itself.
1610  *
1611  *              Note: there would be a race condition here if two backends could
1612  *              ANALYZE the same table concurrently.  Presently, we lock that out
1613  *              by taking a self-exclusive lock on the relation in analyze_rel().
1614  */
1615 static void
1616 update_attstats(Oid relid, bool inh, int natts, VacAttrStats **vacattrstats)
1617 {
1618         Relation        sd;
1619         int                     attno;
1620
1621         if (natts <= 0)
1622                 return;                                 /* nothing to do */
1623
1624         sd = heap_open(StatisticRelationId, RowExclusiveLock);
1625
1626         for (attno = 0; attno < natts; attno++)
1627         {
1628                 VacAttrStats *stats = vacattrstats[attno];
1629                 HeapTuple       stup,
1630                                         oldtup;
1631                 int                     i,
1632                                         k,
1633                                         n;
1634                 Datum           values[Natts_pg_statistic];
1635                 bool            nulls[Natts_pg_statistic];
1636                 bool            replaces[Natts_pg_statistic];
1637
1638                 /* Ignore attr if we weren't able to collect stats */
1639                 if (!stats->stats_valid)
1640                         continue;
1641
1642                 /*
1643                  * Construct a new pg_statistic tuple
1644                  */
1645                 for (i = 0; i < Natts_pg_statistic; ++i)
1646                 {
1647                         nulls[i] = false;
1648                         replaces[i] = true;
1649                 }
1650
1651                 values[Anum_pg_statistic_starelid - 1] = ObjectIdGetDatum(relid);
1652                 values[Anum_pg_statistic_staattnum - 1] = Int16GetDatum(stats->attr->attnum);
1653                 values[Anum_pg_statistic_stainherit - 1] = BoolGetDatum(inh);
1654                 values[Anum_pg_statistic_stanullfrac - 1] = Float4GetDatum(stats->stanullfrac);
1655                 values[Anum_pg_statistic_stawidth - 1] = Int32GetDatum(stats->stawidth);
1656                 values[Anum_pg_statistic_stadistinct - 1] = Float4GetDatum(stats->stadistinct);
1657                 i = Anum_pg_statistic_stakind1 - 1;
1658                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1659                 {
1660                         values[i++] = Int16GetDatum(stats->stakind[k]);         /* stakindN */
1661                 }
1662                 i = Anum_pg_statistic_staop1 - 1;
1663                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1664                 {
1665                         values[i++] = ObjectIdGetDatum(stats->staop[k]);        /* staopN */
1666                 }
1667                 i = Anum_pg_statistic_stanumbers1 - 1;
1668                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1669                 {
1670                         int                     nnum = stats->numnumbers[k];
1671
1672                         if (nnum > 0)
1673                         {
1674                                 Datum      *numdatums = (Datum *) palloc(nnum * sizeof(Datum));
1675                                 ArrayType  *arry;
1676
1677                                 for (n = 0; n < nnum; n++)
1678                                         numdatums[n] = Float4GetDatum(stats->stanumbers[k][n]);
1679                                 /* XXX knows more than it should about type float4: */
1680                                 arry = construct_array(numdatums, nnum,
1681                                                                            FLOAT4OID,
1682                                                                            sizeof(float4), FLOAT4PASSBYVAL, 'i');
1683                                 values[i++] = PointerGetDatum(arry);    /* stanumbersN */
1684                         }
1685                         else
1686                         {
1687                                 nulls[i] = true;
1688                                 values[i++] = (Datum) 0;
1689                         }
1690                 }
1691                 i = Anum_pg_statistic_stavalues1 - 1;
1692                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1693                 {
1694                         if (stats->numvalues[k] > 0)
1695                         {
1696                                 ArrayType  *arry;
1697
1698                                 arry = construct_array(stats->stavalues[k],
1699                                                                            stats->numvalues[k],
1700                                                                            stats->statypid[k],
1701                                                                            stats->statyplen[k],
1702                                                                            stats->statypbyval[k],
1703                                                                            stats->statypalign[k]);
1704                                 values[i++] = PointerGetDatum(arry);    /* stavaluesN */
1705                         }
1706                         else
1707                         {
1708                                 nulls[i] = true;
1709                                 values[i++] = (Datum) 0;
1710                         }
1711                 }
1712
1713                 /* Is there already a pg_statistic tuple for this attribute? */
1714                 oldtup = SearchSysCache3(STATRELATTINH,
1715                                                                  ObjectIdGetDatum(relid),
1716                                                                  Int16GetDatum(stats->attr->attnum),
1717                                                                  BoolGetDatum(inh));
1718
1719                 if (HeapTupleIsValid(oldtup))
1720                 {
1721                         /* Yes, replace it */
1722                         stup = heap_modify_tuple(oldtup,
1723                                                                          RelationGetDescr(sd),
1724                                                                          values,
1725                                                                          nulls,
1726                                                                          replaces);
1727                         ReleaseSysCache(oldtup);
1728                         simple_heap_update(sd, &stup->t_self, stup);
1729                 }
1730                 else
1731                 {
1732                         /* No, insert new tuple */
1733                         stup = heap_form_tuple(RelationGetDescr(sd), values, nulls);
1734                         simple_heap_insert(sd, stup);
1735                 }
1736
1737                 /* update indexes too */
1738                 CatalogUpdateIndexes(sd, stup);
1739
1740                 heap_freetuple(stup);
1741         }
1742
1743         heap_close(sd, RowExclusiveLock);
1744 }
1745
1746 /*
1747  * Standard fetch function for use by compute_stats subroutines.
1748  *
1749  * This exists to provide some insulation between compute_stats routines
1750  * and the actual storage of the sample data.
1751  */
1752 static Datum
1753 std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1754 {
1755         int                     attnum = stats->tupattnum;
1756         HeapTuple       tuple = stats->rows[rownum];
1757         TupleDesc       tupDesc = stats->tupDesc;
1758
1759         return heap_getattr(tuple, attnum, tupDesc, isNull);
1760 }
1761
1762 /*
1763  * Fetch function for analyzing index expressions.
1764  *
1765  * We have not bothered to construct index tuples, instead the data is
1766  * just in Datum arrays.
1767  */
1768 static Datum
1769 ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1770 {
1771         int                     i;
1772
1773         /* exprvals and exprnulls are already offset for proper column */
1774         i = rownum * stats->rowstride;
1775         *isNull = stats->exprnulls[i];
1776         return stats->exprvals[i];
1777 }
1778
1779
1780 /*==========================================================================
1781  *
1782  * Code below this point represents the "standard" type-specific statistics
1783  * analysis algorithms.  This code can be replaced on a per-data-type basis
1784  * by setting a nonzero value in pg_type.typanalyze.
1785  *
1786  *==========================================================================
1787  */
1788
1789
1790 /*
1791  * To avoid consuming too much memory during analysis and/or too much space
1792  * in the resulting pg_statistic rows, we ignore varlena datums that are wider
1793  * than WIDTH_THRESHOLD (after detoasting!).  This is legitimate for MCV
1794  * and distinct-value calculations since a wide value is unlikely to be
1795  * duplicated at all, much less be a most-common value.  For the same reason,
1796  * ignoring wide values will not affect our estimates of histogram bin
1797  * boundaries very much.
1798  */
1799 #define WIDTH_THRESHOLD  1024
1800
1801 #define swapInt(a,b)    do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1802 #define swapDatum(a,b)  do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1803
1804 /*
1805  * Extra information used by the default analysis routines
1806  */
1807 typedef struct
1808 {
1809         Oid                     eqopr;                  /* '=' operator for datatype, if any */
1810         Oid                     eqfunc;                 /* and associated function */
1811         Oid                     ltopr;                  /* '<' operator for datatype, if any */
1812 } StdAnalyzeData;
1813
1814 typedef struct
1815 {
1816         Datum           value;                  /* a data value */
1817         int                     tupno;                  /* position index for tuple it came from */
1818 } ScalarItem;
1819
1820 typedef struct
1821 {
1822         int                     count;                  /* # of duplicates */
1823         int                     first;                  /* values[] index of first occurrence */
1824 } ScalarMCVItem;
1825
1826 typedef struct
1827 {
1828         SortSupport ssup;
1829         int                *tupnoLink;
1830 } CompareScalarsContext;
1831
1832
1833 static void compute_minimal_stats(VacAttrStatsP stats,
1834                                           AnalyzeAttrFetchFunc fetchfunc,
1835                                           int samplerows,
1836                                           double totalrows);
1837 static void compute_scalar_stats(VacAttrStatsP stats,
1838                                          AnalyzeAttrFetchFunc fetchfunc,
1839                                          int samplerows,
1840                                          double totalrows);
1841 static int      compare_scalars(const void *a, const void *b, void *arg);
1842 static int      compare_mcvs(const void *a, const void *b);
1843
1844
1845 /*
1846  * std_typanalyze -- the default type-specific typanalyze function
1847  */
1848 bool
1849 std_typanalyze(VacAttrStats *stats)
1850 {
1851         Form_pg_attribute attr = stats->attr;
1852         Oid                     ltopr;
1853         Oid                     eqopr;
1854         StdAnalyzeData *mystats;
1855
1856         /* If the attstattarget column is negative, use the default value */
1857         /* NB: it is okay to scribble on stats->attr since it's a copy */
1858         if (attr->attstattarget < 0)
1859                 attr->attstattarget = default_statistics_target;
1860
1861         /* Look for default "<" and "=" operators for column's type */
1862         get_sort_group_operators(stats->attrtypid,
1863                                                          false, false, false,
1864                                                          &ltopr, &eqopr, NULL,
1865                                                          NULL);
1866
1867         /* If column has no "=" operator, we can't do much of anything */
1868         if (!OidIsValid(eqopr))
1869                 return false;
1870
1871         /* Save the operator info for compute_stats routines */
1872         mystats = (StdAnalyzeData *) palloc(sizeof(StdAnalyzeData));
1873         mystats->eqopr = eqopr;
1874         mystats->eqfunc = get_opcode(eqopr);
1875         mystats->ltopr = ltopr;
1876         stats->extra_data = mystats;
1877
1878         /*
1879          * Determine which standard statistics algorithm to use
1880          */
1881         if (OidIsValid(ltopr))
1882         {
1883                 /* Seems to be a scalar datatype */
1884                 stats->compute_stats = compute_scalar_stats;
1885                 /*--------------------
1886                  * The following choice of minrows is based on the paper
1887                  * "Random sampling for histogram construction: how much is enough?"
1888                  * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
1889                  * Proceedings of ACM SIGMOD International Conference on Management
1890                  * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
1891                  * says that for table size n, histogram size k, maximum relative
1892                  * error in bin size f, and error probability gamma, the minimum
1893                  * random sample size is
1894                  *              r = 4 * k * ln(2*n/gamma) / f^2
1895                  * Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
1896                  *              r = 305.82 * k
1897                  * Note that because of the log function, the dependence on n is
1898                  * quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
1899                  * bin size error with probability 0.99.  So there's no real need to
1900                  * scale for n, which is a good thing because we don't necessarily
1901                  * know it at this point.
1902                  *--------------------
1903                  */
1904                 stats->minrows = 300 * attr->attstattarget;
1905         }
1906         else
1907         {
1908                 /* Can't do much but the minimal stuff */
1909                 stats->compute_stats = compute_minimal_stats;
1910                 /* Might as well use the same minrows as above */
1911                 stats->minrows = 300 * attr->attstattarget;
1912         }
1913
1914         return true;
1915 }
1916
1917 /*
1918  *      compute_minimal_stats() -- compute minimal column statistics
1919  *
1920  *      We use this when we can find only an "=" operator for the datatype.
1921  *
1922  *      We determine the fraction of non-null rows, the average width, the
1923  *      most common values, and the (estimated) number of distinct values.
1924  *
1925  *      The most common values are determined by brute force: we keep a list
1926  *      of previously seen values, ordered by number of times seen, as we scan
1927  *      the samples.  A newly seen value is inserted just after the last
1928  *      multiply-seen value, causing the bottommost (oldest) singly-seen value
1929  *      to drop off the list.  The accuracy of this method, and also its cost,
1930  *      depend mainly on the length of the list we are willing to keep.
1931  */
1932 static void
1933 compute_minimal_stats(VacAttrStatsP stats,
1934                                           AnalyzeAttrFetchFunc fetchfunc,
1935                                           int samplerows,
1936                                           double totalrows)
1937 {
1938         int                     i;
1939         int                     null_cnt = 0;
1940         int                     nonnull_cnt = 0;
1941         int                     toowide_cnt = 0;
1942         double          total_width = 0;
1943         bool            is_varlena = (!stats->attrtype->typbyval &&
1944                                                           stats->attrtype->typlen == -1);
1945         bool            is_varwidth = (!stats->attrtype->typbyval &&
1946                                                            stats->attrtype->typlen < 0);
1947         FmgrInfo        f_cmpeq;
1948         typedef struct
1949         {
1950                 Datum           value;
1951                 int                     count;
1952         } TrackItem;
1953         TrackItem  *track;
1954         int                     track_cnt,
1955                                 track_max;
1956         int                     num_mcv = stats->attr->attstattarget;
1957         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1958
1959         /*
1960          * We track up to 2*n values for an n-element MCV list; but at least 10
1961          */
1962         track_max = 2 * num_mcv;
1963         if (track_max < 10)
1964                 track_max = 10;
1965         track = (TrackItem *) palloc(track_max * sizeof(TrackItem));
1966         track_cnt = 0;
1967
1968         fmgr_info(mystats->eqfunc, &f_cmpeq);
1969
1970         for (i = 0; i < samplerows; i++)
1971         {
1972                 Datum           value;
1973                 bool            isnull;
1974                 bool            match;
1975                 int                     firstcount1,
1976                                         j;
1977
1978                 vacuum_delay_point();
1979
1980                 value = fetchfunc(stats, i, &isnull);
1981
1982                 /* Check for null/nonnull */
1983                 if (isnull)
1984                 {
1985                         null_cnt++;
1986                         continue;
1987                 }
1988                 nonnull_cnt++;
1989
1990                 /*
1991                  * If it's a variable-width field, add up widths for average width
1992                  * calculation.  Note that if the value is toasted, we use the toasted
1993                  * width.  We don't bother with this calculation if it's a fixed-width
1994                  * type.
1995                  */
1996                 if (is_varlena)
1997                 {
1998                         total_width += VARSIZE_ANY(DatumGetPointer(value));
1999
2000                         /*
2001                          * If the value is toasted, we want to detoast it just once to
2002                          * avoid repeated detoastings and resultant excess memory usage
2003                          * during the comparisons.      Also, check to see if the value is
2004                          * excessively wide, and if so don't detoast at all --- just
2005                          * ignore the value.
2006                          */
2007                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
2008                         {
2009                                 toowide_cnt++;
2010                                 continue;
2011                         }
2012                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
2013                 }
2014                 else if (is_varwidth)
2015                 {
2016                         /* must be cstring */
2017                         total_width += strlen(DatumGetCString(value)) + 1;
2018                 }
2019
2020                 /*
2021                  * See if the value matches anything we're already tracking.
2022                  */
2023                 match = false;
2024                 firstcount1 = track_cnt;
2025                 for (j = 0; j < track_cnt; j++)
2026                 {
2027                         /* We always use the default collation for statistics */
2028                         if (DatumGetBool(FunctionCall2Coll(&f_cmpeq,
2029                                                                                            DEFAULT_COLLATION_OID,
2030                                                                                            value, track[j].value)))
2031                         {
2032                                 match = true;
2033                                 break;
2034                         }
2035                         if (j < firstcount1 && track[j].count == 1)
2036                                 firstcount1 = j;
2037                 }
2038
2039                 if (match)
2040                 {
2041                         /* Found a match */
2042                         track[j].count++;
2043                         /* This value may now need to "bubble up" in the track list */
2044                         while (j > 0 && track[j].count > track[j - 1].count)
2045                         {
2046                                 swapDatum(track[j].value, track[j - 1].value);
2047                                 swapInt(track[j].count, track[j - 1].count);
2048                                 j--;
2049                         }
2050                 }
2051                 else
2052                 {
2053                         /* No match.  Insert at head of count-1 list */
2054                         if (track_cnt < track_max)
2055                                 track_cnt++;
2056                         for (j = track_cnt - 1; j > firstcount1; j--)
2057                         {
2058                                 track[j].value = track[j - 1].value;
2059                                 track[j].count = track[j - 1].count;
2060                         }
2061                         if (firstcount1 < track_cnt)
2062                         {
2063                                 track[firstcount1].value = value;
2064                                 track[firstcount1].count = 1;
2065                         }
2066                 }
2067         }
2068
2069         /* We can only compute real stats if we found some non-null values. */
2070         if (nonnull_cnt > 0)
2071         {
2072                 int                     nmultiple,
2073                                         summultiple;
2074
2075                 stats->stats_valid = true;
2076                 /* Do the simple null-frac and width stats */
2077                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
2078                 if (is_varwidth)
2079                         stats->stawidth = total_width / (double) nonnull_cnt;
2080                 else
2081                         stats->stawidth = stats->attrtype->typlen;
2082
2083                 /* Count the number of values we found multiple times */
2084                 summultiple = 0;
2085                 for (nmultiple = 0; nmultiple < track_cnt; nmultiple++)
2086                 {
2087                         if (track[nmultiple].count == 1)
2088                                 break;
2089                         summultiple += track[nmultiple].count;
2090                 }
2091
2092                 if (nmultiple == 0)
2093                 {
2094                         /* If we found no repeated values, assume it's a unique column */
2095                         stats->stadistinct = -1.0;
2096                 }
2097                 else if (track_cnt < track_max && toowide_cnt == 0 &&
2098                                  nmultiple == track_cnt)
2099                 {
2100                         /*
2101                          * Our track list includes every value in the sample, and every
2102                          * value appeared more than once.  Assume the column has just
2103                          * these values.
2104                          */
2105                         stats->stadistinct = track_cnt;
2106                 }
2107                 else
2108                 {
2109                         /*----------
2110                          * Estimate the number of distinct values using the estimator
2111                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
2112                          *              n*d / (n - f1 + f1*n/N)
2113                          * where f1 is the number of distinct values that occurred
2114                          * exactly once in our sample of n rows (from a total of N),
2115                          * and d is the total number of distinct values in the sample.
2116                          * This is their Duj1 estimator; the other estimators they
2117                          * recommend are considerably more complex, and are numerically
2118                          * very unstable when n is much smaller than N.
2119                          *
2120                          * We assume (not very reliably!) that all the multiply-occurring
2121                          * values are reflected in the final track[] list, and the other
2122                          * nonnull values all appeared but once.  (XXX this usually
2123                          * results in a drastic overestimate of ndistinct.      Can we do
2124                          * any better?)
2125                          *----------
2126                          */
2127                         int                     f1 = nonnull_cnt - summultiple;
2128                         int                     d = f1 + nmultiple;
2129                         double          numer,
2130                                                 denom,
2131                                                 stadistinct;
2132
2133                         numer = (double) samplerows *(double) d;
2134
2135                         denom = (double) (samplerows - f1) +
2136                                 (double) f1 *(double) samplerows / totalrows;
2137
2138                         stadistinct = numer / denom;
2139                         /* Clamp to sane range in case of roundoff error */
2140                         if (stadistinct < (double) d)
2141                                 stadistinct = (double) d;
2142                         if (stadistinct > totalrows)
2143                                 stadistinct = totalrows;
2144                         stats->stadistinct = floor(stadistinct + 0.5);
2145                 }
2146
2147                 /*
2148                  * If we estimated the number of distinct values at more than 10% of
2149                  * the total row count (a very arbitrary limit), then assume that
2150                  * stadistinct should scale with the row count rather than be a fixed
2151                  * value.
2152                  */
2153                 if (stats->stadistinct > 0.1 * totalrows)
2154                         stats->stadistinct = -(stats->stadistinct / totalrows);
2155
2156                 /*
2157                  * Decide how many values are worth storing as most-common values. If
2158                  * we are able to generate a complete MCV list (all the values in the
2159                  * sample will fit, and we think these are all the ones in the table),
2160                  * then do so.  Otherwise, store only those values that are
2161                  * significantly more common than the (estimated) average. We set the
2162                  * threshold rather arbitrarily at 25% more than average, with at
2163                  * least 2 instances in the sample.
2164                  */
2165                 if (track_cnt < track_max && toowide_cnt == 0 &&
2166                         stats->stadistinct > 0 &&
2167                         track_cnt <= num_mcv)
2168                 {
2169                         /* Track list includes all values seen, and all will fit */
2170                         num_mcv = track_cnt;
2171                 }
2172                 else
2173                 {
2174                         double          ndistinct = stats->stadistinct;
2175                         double          avgcount,
2176                                                 mincount;
2177
2178                         if (ndistinct < 0)
2179                                 ndistinct = -ndistinct * totalrows;
2180                         /* estimate # of occurrences in sample of a typical value */
2181                         avgcount = (double) samplerows / ndistinct;
2182                         /* set minimum threshold count to store a value */
2183                         mincount = avgcount * 1.25;
2184                         if (mincount < 2)
2185                                 mincount = 2;
2186                         if (num_mcv > track_cnt)
2187                                 num_mcv = track_cnt;
2188                         for (i = 0; i < num_mcv; i++)
2189                         {
2190                                 if (track[i].count < mincount)
2191                                 {
2192                                         num_mcv = i;
2193                                         break;
2194                                 }
2195                         }
2196                 }
2197
2198                 /* Generate MCV slot entry */
2199                 if (num_mcv > 0)
2200                 {
2201                         MemoryContext old_context;
2202                         Datum      *mcv_values;
2203                         float4     *mcv_freqs;
2204
2205                         /* Must copy the target values into anl_context */
2206                         old_context = MemoryContextSwitchTo(stats->anl_context);
2207                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2208                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2209                         for (i = 0; i < num_mcv; i++)
2210                         {
2211                                 mcv_values[i] = datumCopy(track[i].value,
2212                                                                                   stats->attrtype->typbyval,
2213                                                                                   stats->attrtype->typlen);
2214                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2215                         }
2216                         MemoryContextSwitchTo(old_context);
2217
2218                         stats->stakind[0] = STATISTIC_KIND_MCV;
2219                         stats->staop[0] = mystats->eqopr;
2220                         stats->stanumbers[0] = mcv_freqs;
2221                         stats->numnumbers[0] = num_mcv;
2222                         stats->stavalues[0] = mcv_values;
2223                         stats->numvalues[0] = num_mcv;
2224
2225                         /*
2226                          * Accept the defaults for stats->statypid and others. They have
2227                          * been set before we were called (see vacuum.h)
2228                          */
2229                 }
2230         }
2231         else if (null_cnt > 0)
2232         {
2233                 /* We found only nulls; assume the column is entirely null */
2234                 stats->stats_valid = true;
2235                 stats->stanullfrac = 1.0;
2236                 if (is_varwidth)
2237                         stats->stawidth = 0;    /* "unknown" */
2238                 else
2239                         stats->stawidth = stats->attrtype->typlen;
2240                 stats->stadistinct = 0.0;               /* "unknown" */
2241         }
2242
2243         /* We don't need to bother cleaning up any of our temporary palloc's */
2244 }
2245
2246
2247 /*
2248  *      compute_scalar_stats() -- compute column statistics
2249  *
2250  *      We use this when we can find "=" and "<" operators for the datatype.
2251  *
2252  *      We determine the fraction of non-null rows, the average width, the
2253  *      most common values, the (estimated) number of distinct values, the
2254  *      distribution histogram, and the correlation of physical to logical order.
2255  *
2256  *      The desired stats can be determined fairly easily after sorting the
2257  *      data values into order.
2258  */
2259 static void
2260 compute_scalar_stats(VacAttrStatsP stats,
2261                                          AnalyzeAttrFetchFunc fetchfunc,
2262                                          int samplerows,
2263                                          double totalrows)
2264 {
2265         int                     i;
2266         int                     null_cnt = 0;
2267         int                     nonnull_cnt = 0;
2268         int                     toowide_cnt = 0;
2269         double          total_width = 0;
2270         bool            is_varlena = (!stats->attrtype->typbyval &&
2271                                                           stats->attrtype->typlen == -1);
2272         bool            is_varwidth = (!stats->attrtype->typbyval &&
2273                                                            stats->attrtype->typlen < 0);
2274         double          corr_xysum;
2275         SortSupportData ssup;
2276         ScalarItem *values;
2277         int                     values_cnt = 0;
2278         int                *tupnoLink;
2279         ScalarMCVItem *track;
2280         int                     track_cnt = 0;
2281         int                     num_mcv = stats->attr->attstattarget;
2282         int                     num_bins = stats->attr->attstattarget;
2283         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
2284
2285         values = (ScalarItem *) palloc(samplerows * sizeof(ScalarItem));
2286         tupnoLink = (int *) palloc(samplerows * sizeof(int));
2287         track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));
2288
2289         memset(&ssup, 0, sizeof(ssup));
2290         ssup.ssup_cxt = CurrentMemoryContext;
2291         /* We always use the default collation for statistics */
2292         ssup.ssup_collation = DEFAULT_COLLATION_OID;
2293         ssup.ssup_nulls_first = false;
2294
2295         PrepareSortSupportFromOrderingOp(mystats->ltopr, &ssup);
2296
2297         /* Initial scan to find sortable values */
2298         for (i = 0; i < samplerows; i++)
2299         {
2300                 Datum           value;
2301                 bool            isnull;
2302
2303                 vacuum_delay_point();
2304
2305                 value = fetchfunc(stats, i, &isnull);
2306
2307                 /* Check for null/nonnull */
2308                 if (isnull)
2309                 {
2310                         null_cnt++;
2311                         continue;
2312                 }
2313                 nonnull_cnt++;
2314
2315                 /*
2316                  * If it's a variable-width field, add up widths for average width
2317                  * calculation.  Note that if the value is toasted, we use the toasted
2318                  * width.  We don't bother with this calculation if it's a fixed-width
2319                  * type.
2320                  */
2321                 if (is_varlena)
2322                 {
2323                         total_width += VARSIZE_ANY(DatumGetPointer(value));
2324
2325                         /*
2326                          * If the value is toasted, we want to detoast it just once to
2327                          * avoid repeated detoastings and resultant excess memory usage
2328                          * during the comparisons.      Also, check to see if the value is
2329                          * excessively wide, and if so don't detoast at all --- just
2330                          * ignore the value.
2331                          */
2332                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
2333                         {
2334                                 toowide_cnt++;
2335                                 continue;
2336                         }
2337                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
2338                 }
2339                 else if (is_varwidth)
2340                 {
2341                         /* must be cstring */
2342                         total_width += strlen(DatumGetCString(value)) + 1;
2343                 }
2344
2345                 /* Add it to the list to be sorted */
2346                 values[values_cnt].value = value;
2347                 values[values_cnt].tupno = values_cnt;
2348                 tupnoLink[values_cnt] = values_cnt;
2349                 values_cnt++;
2350         }
2351
2352         /* We can only compute real stats if we found some sortable values. */
2353         if (values_cnt > 0)
2354         {
2355                 int                     ndistinct,      /* # distinct values in sample */
2356                                         nmultiple,      /* # that appear multiple times */
2357                                         num_hist,
2358                                         dups_cnt;
2359                 int                     slot_idx = 0;
2360                 CompareScalarsContext cxt;
2361
2362                 /* Sort the collected values */
2363                 cxt.ssup = &ssup;
2364                 cxt.tupnoLink = tupnoLink;
2365                 qsort_arg((void *) values, values_cnt, sizeof(ScalarItem),
2366                                   compare_scalars, (void *) &cxt);
2367
2368                 /*
2369                  * Now scan the values in order, find the most common ones, and also
2370                  * accumulate ordering-correlation statistics.
2371                  *
2372                  * To determine which are most common, we first have to count the
2373                  * number of duplicates of each value.  The duplicates are adjacent in
2374                  * the sorted list, so a brute-force approach is to compare successive
2375                  * datum values until we find two that are not equal. However, that
2376                  * requires N-1 invocations of the datum comparison routine, which are
2377                  * completely redundant with work that was done during the sort.  (The
2378                  * sort algorithm must at some point have compared each pair of items
2379                  * that are adjacent in the sorted order; otherwise it could not know
2380                  * that it's ordered the pair correctly.) We exploit this by having
2381                  * compare_scalars remember the highest tupno index that each
2382                  * ScalarItem has been found equal to.  At the end of the sort, a
2383                  * ScalarItem's tupnoLink will still point to itself if and only if it
2384                  * is the last item of its group of duplicates (since the group will
2385                  * be ordered by tupno).
2386                  */
2387                 corr_xysum = 0;
2388                 ndistinct = 0;
2389                 nmultiple = 0;
2390                 dups_cnt = 0;
2391                 for (i = 0; i < values_cnt; i++)
2392                 {
2393                         int                     tupno = values[i].tupno;
2394
2395                         corr_xysum += ((double) i) * ((double) tupno);
2396                         dups_cnt++;
2397                         if (tupnoLink[tupno] == tupno)
2398                         {
2399                                 /* Reached end of duplicates of this value */
2400                                 ndistinct++;
2401                                 if (dups_cnt > 1)
2402                                 {
2403                                         nmultiple++;
2404                                         if (track_cnt < num_mcv ||
2405                                                 dups_cnt > track[track_cnt - 1].count)
2406                                         {
2407                                                 /*
2408                                                  * Found a new item for the mcv list; find its
2409                                                  * position, bubbling down old items if needed. Loop
2410                                                  * invariant is that j points at an empty/ replaceable
2411                                                  * slot.
2412                                                  */
2413                                                 int                     j;
2414
2415                                                 if (track_cnt < num_mcv)
2416                                                         track_cnt++;
2417                                                 for (j = track_cnt - 1; j > 0; j--)
2418                                                 {
2419                                                         if (dups_cnt <= track[j - 1].count)
2420                                                                 break;
2421                                                         track[j].count = track[j - 1].count;
2422                                                         track[j].first = track[j - 1].first;
2423                                                 }
2424                                                 track[j].count = dups_cnt;
2425                                                 track[j].first = i + 1 - dups_cnt;
2426                                         }
2427                                 }
2428                                 dups_cnt = 0;
2429                         }
2430                 }
2431
2432                 stats->stats_valid = true;
2433                 /* Do the simple null-frac and width stats */
2434                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
2435                 if (is_varwidth)
2436                         stats->stawidth = total_width / (double) nonnull_cnt;
2437                 else
2438                         stats->stawidth = stats->attrtype->typlen;
2439
2440                 if (nmultiple == 0)
2441                 {
2442                         /* If we found no repeated values, assume it's a unique column */
2443                         stats->stadistinct = -1.0;
2444                 }
2445                 else if (toowide_cnt == 0 && nmultiple == ndistinct)
2446                 {
2447                         /*
2448                          * Every value in the sample appeared more than once.  Assume the
2449                          * column has just these values.
2450                          */
2451                         stats->stadistinct = ndistinct;
2452                 }
2453                 else
2454                 {
2455                         /*----------
2456                          * Estimate the number of distinct values using the estimator
2457                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
2458                          *              n*d / (n - f1 + f1*n/N)
2459                          * where f1 is the number of distinct values that occurred
2460                          * exactly once in our sample of n rows (from a total of N),
2461                          * and d is the total number of distinct values in the sample.
2462                          * This is their Duj1 estimator; the other estimators they
2463                          * recommend are considerably more complex, and are numerically
2464                          * very unstable when n is much smaller than N.
2465                          *
2466                          * Overwidth values are assumed to have been distinct.
2467                          *----------
2468                          */
2469                         int                     f1 = ndistinct - nmultiple + toowide_cnt;
2470                         int                     d = f1 + nmultiple;
2471                         double          numer,
2472                                                 denom,
2473                                                 stadistinct;
2474
2475                         numer = (double) samplerows *(double) d;
2476
2477                         denom = (double) (samplerows - f1) +
2478                                 (double) f1 *(double) samplerows / totalrows;
2479
2480                         stadistinct = numer / denom;
2481                         /* Clamp to sane range in case of roundoff error */
2482                         if (stadistinct < (double) d)
2483                                 stadistinct = (double) d;
2484                         if (stadistinct > totalrows)
2485                                 stadistinct = totalrows;
2486                         stats->stadistinct = floor(stadistinct + 0.5);
2487                 }
2488
2489                 /*
2490                  * If we estimated the number of distinct values at more than 10% of
2491                  * the total row count (a very arbitrary limit), then assume that
2492                  * stadistinct should scale with the row count rather than be a fixed
2493                  * value.
2494                  */
2495                 if (stats->stadistinct > 0.1 * totalrows)
2496                         stats->stadistinct = -(stats->stadistinct / totalrows);
2497
2498                 /*
2499                  * Decide how many values are worth storing as most-common values. If
2500                  * we are able to generate a complete MCV list (all the values in the
2501                  * sample will fit, and we think these are all the ones in the table),
2502                  * then do so.  Otherwise, store only those values that are
2503                  * significantly more common than the (estimated) average. We set the
2504                  * threshold rather arbitrarily at 25% more than average, with at
2505                  * least 2 instances in the sample.  Also, we won't suppress values
2506                  * that have a frequency of at least 1/K where K is the intended
2507                  * number of histogram bins; such values might otherwise cause us to
2508                  * emit duplicate histogram bin boundaries.  (We might end up with
2509                  * duplicate histogram entries anyway, if the distribution is skewed;
2510                  * but we prefer to treat such values as MCVs if at all possible.)
2511                  */
2512                 if (track_cnt == ndistinct && toowide_cnt == 0 &&
2513                         stats->stadistinct > 0 &&
2514                         track_cnt <= num_mcv)
2515                 {
2516                         /* Track list includes all values seen, and all will fit */
2517                         num_mcv = track_cnt;
2518                 }
2519                 else
2520                 {
2521                         double          ndistinct = stats->stadistinct;
2522                         double          avgcount,
2523                                                 mincount,
2524                                                 maxmincount;
2525
2526                         if (ndistinct < 0)
2527                                 ndistinct = -ndistinct * totalrows;
2528                         /* estimate # of occurrences in sample of a typical value */
2529                         avgcount = (double) samplerows / ndistinct;
2530                         /* set minimum threshold count to store a value */
2531                         mincount = avgcount * 1.25;
2532                         if (mincount < 2)
2533                                 mincount = 2;
2534                         /* don't let threshold exceed 1/K, however */
2535                         maxmincount = (double) samplerows / (double) num_bins;
2536                         if (mincount > maxmincount)
2537                                 mincount = maxmincount;
2538                         if (num_mcv > track_cnt)
2539                                 num_mcv = track_cnt;
2540                         for (i = 0; i < num_mcv; i++)
2541                         {
2542                                 if (track[i].count < mincount)
2543                                 {
2544                                         num_mcv = i;
2545                                         break;
2546                                 }
2547                         }
2548                 }
2549
2550                 /* Generate MCV slot entry */
2551                 if (num_mcv > 0)
2552                 {
2553                         MemoryContext old_context;
2554                         Datum      *mcv_values;
2555                         float4     *mcv_freqs;
2556
2557                         /* Must copy the target values into anl_context */
2558                         old_context = MemoryContextSwitchTo(stats->anl_context);
2559                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2560                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2561                         for (i = 0; i < num_mcv; i++)
2562                         {
2563                                 mcv_values[i] = datumCopy(values[track[i].first].value,
2564                                                                                   stats->attrtype->typbyval,
2565                                                                                   stats->attrtype->typlen);
2566                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2567                         }
2568                         MemoryContextSwitchTo(old_context);
2569
2570                         stats->stakind[slot_idx] = STATISTIC_KIND_MCV;
2571                         stats->staop[slot_idx] = mystats->eqopr;
2572                         stats->stanumbers[slot_idx] = mcv_freqs;
2573                         stats->numnumbers[slot_idx] = num_mcv;
2574                         stats->stavalues[slot_idx] = mcv_values;
2575                         stats->numvalues[slot_idx] = num_mcv;
2576
2577                         /*
2578                          * Accept the defaults for stats->statypid and others. They have
2579                          * been set before we were called (see vacuum.h)
2580                          */
2581                         slot_idx++;
2582                 }
2583
2584                 /*
2585                  * Generate a histogram slot entry if there are at least two distinct
2586                  * values not accounted for in the MCV list.  (This ensures the
2587                  * histogram won't collapse to empty or a singleton.)
2588                  */
2589                 num_hist = ndistinct - num_mcv;
2590                 if (num_hist > num_bins)
2591                         num_hist = num_bins + 1;
2592                 if (num_hist >= 2)
2593                 {
2594                         MemoryContext old_context;
2595                         Datum      *hist_values;
2596                         int                     nvals;
2597                         int                     pos,
2598                                                 posfrac,
2599                                                 delta,
2600                                                 deltafrac;
2601
2602                         /* Sort the MCV items into position order to speed next loop */
2603                         qsort((void *) track, num_mcv,
2604                                   sizeof(ScalarMCVItem), compare_mcvs);
2605
2606                         /*
2607                          * Collapse out the MCV items from the values[] array.
2608                          *
2609                          * Note we destroy the values[] array here... but we don't need it
2610                          * for anything more.  We do, however, still need values_cnt.
2611                          * nvals will be the number of remaining entries in values[].
2612                          */
2613                         if (num_mcv > 0)
2614                         {
2615                                 int                     src,
2616                                                         dest;
2617                                 int                     j;
2618
2619                                 src = dest = 0;
2620                                 j = 0;                  /* index of next interesting MCV item */
2621                                 while (src < values_cnt)
2622                                 {
2623                                         int                     ncopy;
2624
2625                                         if (j < num_mcv)
2626                                         {
2627                                                 int                     first = track[j].first;
2628
2629                                                 if (src >= first)
2630                                                 {
2631                                                         /* advance past this MCV item */
2632                                                         src = first + track[j].count;
2633                                                         j++;
2634                                                         continue;
2635                                                 }
2636                                                 ncopy = first - src;
2637                                         }
2638                                         else
2639                                                 ncopy = values_cnt - src;
2640                                         memmove(&values[dest], &values[src],
2641                                                         ncopy * sizeof(ScalarItem));
2642                                         src += ncopy;
2643                                         dest += ncopy;
2644                                 }
2645                                 nvals = dest;
2646                         }
2647                         else
2648                                 nvals = values_cnt;
2649                         Assert(nvals >= num_hist);
2650
2651                         /* Must copy the target values into anl_context */
2652                         old_context = MemoryContextSwitchTo(stats->anl_context);
2653                         hist_values = (Datum *) palloc(num_hist * sizeof(Datum));
2654
2655                         /*
2656                          * The object of this loop is to copy the first and last values[]
2657                          * entries along with evenly-spaced values in between.  So the
2658                          * i'th value is values[(i * (nvals - 1)) / (num_hist - 1)].  But
2659                          * computing that subscript directly risks integer overflow when
2660                          * the stats target is more than a couple thousand.  Instead we
2661                          * add (nvals - 1) / (num_hist - 1) to pos at each step, tracking
2662                          * the integral and fractional parts of the sum separately.
2663                          */
2664                         delta = (nvals - 1) / (num_hist - 1);
2665                         deltafrac = (nvals - 1) % (num_hist - 1);
2666                         pos = posfrac = 0;
2667
2668                         for (i = 0; i < num_hist; i++)
2669                         {
2670                                 hist_values[i] = datumCopy(values[pos].value,
2671                                                                                    stats->attrtype->typbyval,
2672                                                                                    stats->attrtype->typlen);
2673                                 pos += delta;
2674                                 posfrac += deltafrac;
2675                                 if (posfrac >= (num_hist - 1))
2676                                 {
2677                                         /* fractional part exceeds 1, carry to integer part */
2678                                         pos++;
2679                                         posfrac -= (num_hist - 1);
2680                                 }
2681                         }
2682
2683                         MemoryContextSwitchTo(old_context);
2684
2685                         stats->stakind[slot_idx] = STATISTIC_KIND_HISTOGRAM;
2686                         stats->staop[slot_idx] = mystats->ltopr;
2687                         stats->stavalues[slot_idx] = hist_values;
2688                         stats->numvalues[slot_idx] = num_hist;
2689
2690                         /*
2691                          * Accept the defaults for stats->statypid and others. They have
2692                          * been set before we were called (see vacuum.h)
2693                          */
2694                         slot_idx++;
2695                 }
2696
2697                 /* Generate a correlation entry if there are multiple values */
2698                 if (values_cnt > 1)
2699                 {
2700                         MemoryContext old_context;
2701                         float4     *corrs;
2702                         double          corr_xsum,
2703                                                 corr_x2sum;
2704
2705                         /* Must copy the target values into anl_context */
2706                         old_context = MemoryContextSwitchTo(stats->anl_context);
2707                         corrs = (float4 *) palloc(sizeof(float4));
2708                         MemoryContextSwitchTo(old_context);
2709
2710                         /*----------
2711                          * Since we know the x and y value sets are both
2712                          *              0, 1, ..., values_cnt-1
2713                          * we have sum(x) = sum(y) =
2714                          *              (values_cnt-1)*values_cnt / 2
2715                          * and sum(x^2) = sum(y^2) =
2716                          *              (values_cnt-1)*values_cnt*(2*values_cnt-1) / 6.
2717                          *----------
2718                          */
2719                         corr_xsum = ((double) (values_cnt - 1)) *
2720                                 ((double) values_cnt) / 2.0;
2721                         corr_x2sum = ((double) (values_cnt - 1)) *
2722                                 ((double) values_cnt) * (double) (2 * values_cnt - 1) / 6.0;
2723
2724                         /* And the correlation coefficient reduces to */
2725                         corrs[0] = (values_cnt * corr_xysum - corr_xsum * corr_xsum) /
2726                                 (values_cnt * corr_x2sum - corr_xsum * corr_xsum);
2727
2728                         stats->stakind[slot_idx] = STATISTIC_KIND_CORRELATION;
2729                         stats->staop[slot_idx] = mystats->ltopr;
2730                         stats->stanumbers[slot_idx] = corrs;
2731                         stats->numnumbers[slot_idx] = 1;
2732                         slot_idx++;
2733                 }
2734         }
2735         else if (nonnull_cnt == 0 && null_cnt > 0)
2736         {
2737                 /* We found only nulls; assume the column is entirely null */
2738                 stats->stats_valid = true;
2739                 stats->stanullfrac = 1.0;
2740                 if (is_varwidth)
2741                         stats->stawidth = 0;    /* "unknown" */
2742                 else
2743                         stats->stawidth = stats->attrtype->typlen;
2744                 stats->stadistinct = 0.0;               /* "unknown" */
2745         }
2746
2747         /* We don't need to bother cleaning up any of our temporary palloc's */
2748 }
2749
2750 /*
2751  * qsort_arg comparator for sorting ScalarItems
2752  *
2753  * Aside from sorting the items, we update the tupnoLink[] array
2754  * whenever two ScalarItems are found to contain equal datums.  The array
2755  * is indexed by tupno; for each ScalarItem, it contains the highest
2756  * tupno that that item's datum has been found to be equal to.  This allows
2757  * us to avoid additional comparisons in compute_scalar_stats().
2758  */
2759 static int
2760 compare_scalars(const void *a, const void *b, void *arg)
2761 {
2762         Datum           da = ((const ScalarItem *) a)->value;
2763         int                     ta = ((const ScalarItem *) a)->tupno;
2764         Datum           db = ((const ScalarItem *) b)->value;
2765         int                     tb = ((const ScalarItem *) b)->tupno;
2766         CompareScalarsContext *cxt = (CompareScalarsContext *) arg;
2767         int                     compare;
2768
2769         compare = ApplySortComparator(da, false, db, false, cxt->ssup);
2770         if (compare != 0)
2771                 return compare;
2772
2773         /*
2774          * The two datums are equal, so update cxt->tupnoLink[].
2775          */
2776         if (cxt->tupnoLink[ta] < tb)
2777                 cxt->tupnoLink[ta] = tb;
2778         if (cxt->tupnoLink[tb] < ta)
2779                 cxt->tupnoLink[tb] = ta;
2780
2781         /*
2782          * For equal datums, sort by tupno
2783          */
2784         return ta - tb;
2785 }
2786
2787 /*
2788  * qsort comparator for sorting ScalarMCVItems by position
2789  */
2790 static int
2791 compare_mcvs(const void *a, const void *b)
2792 {
2793         int                     da = ((const ScalarMCVItem *) a)->first;
2794         int                     db = ((const ScalarMCVItem *) b)->first;
2795
2796         return da - db;
2797 }