]> granicus.if.org Git - postgresql/blob - src/backend/commands/analyze.c
Minor correction: cause ALTER ROLE role ROLE rolenames to behave
[postgresql] / src / backend / commands / analyze.c
1 /*-------------------------------------------------------------------------
2  *
3  * analyze.c
4  *        the Postgres statistics generator
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.87 2005/07/14 05:13:39 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <math.h>
18
19 #include "access/heapam.h"
20 #include "access/tuptoaster.h"
21 #include "catalog/catalog.h"
22 #include "catalog/index.h"
23 #include "catalog/indexing.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_operator.h"
26 #include "commands/vacuum.h"
27 #include "executor/executor.h"
28 #include "miscadmin.h"
29 #include "parser/parse_expr.h"
30 #include "parser/parse_oper.h"
31 #include "parser/parse_relation.h"
32 #include "pgstat.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/datum.h"
36 #include "utils/fmgroids.h"
37 #include "utils/lsyscache.h"
38 #include "utils/memutils.h"
39 #include "utils/syscache.h"
40 #include "utils/tuplesort.h"
41
42
43 /* Data structure for Algorithm S from Knuth 3.4.2 */
44 typedef struct
45 {
46         BlockNumber N;                          /* number of blocks, known in advance */
47         int                     n;                              /* desired sample size */
48         BlockNumber t;                          /* current block number */
49         int                     m;                              /* blocks selected so far */
50 } BlockSamplerData;
51 typedef BlockSamplerData *BlockSampler;
52
53 /* Per-index data for ANALYZE */
54 typedef struct AnlIndexData
55 {
56         IndexInfo  *indexInfo;          /* BuildIndexInfo result */
57         double          tupleFract;             /* fraction of rows for partial index */
58         VacAttrStats **vacattrstats;    /* index attrs to analyze */
59         int                     attr_cnt;
60 } AnlIndexData;
61
62
63 /* Default statistics target (GUC parameter) */
64 int                     default_statistics_target = 10;
65
66 static int      elevel = -1;
67
68 static MemoryContext anl_context = NULL;
69
70
71 static void BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
72                                   int samplesize);
73 static bool BlockSampler_HasMore(BlockSampler bs);
74 static BlockNumber BlockSampler_Next(BlockSampler bs);
75 static void compute_index_stats(Relation onerel, double totalrows,
76                                         AnlIndexData *indexdata, int nindexes,
77                                         HeapTuple *rows, int numrows,
78                                         MemoryContext col_context);
79 static VacAttrStats *examine_attribute(Relation onerel, int attnum);
80 static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
81                                         int targrows, double *totalrows, double *totaldeadrows);
82 static double random_fract(void);
83 static double init_selection_state(int n);
84 static double get_next_S(double t, int n, double *stateptr);
85 static int      compare_rows(const void *a, const void *b);
86 static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
87 static Datum std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
88 static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
89
90 static bool std_typanalyze(VacAttrStats *stats);
91
92
93 /*
94  *      analyze_rel() -- analyze one relation
95  */
96 void
97 analyze_rel(Oid relid, VacuumStmt *vacstmt)
98 {
99         Relation        onerel;
100         int                     attr_cnt,
101                                 tcnt,
102                                 i,
103                                 ind;
104         Relation   *Irel;
105         int                     nindexes;
106         bool            hasindex;
107         bool            analyzableindex;
108         VacAttrStats **vacattrstats;
109         AnlIndexData *indexdata;
110         int                     targrows,
111                                 numrows;
112         double          totalrows,
113                                 totaldeadrows;
114         HeapTuple  *rows;
115
116         if (vacstmt->verbose)
117                 elevel = INFO;
118         else
119                 elevel = DEBUG2;
120
121         /*
122          * Use the current context for storing analysis info.  vacuum.c
123          * ensures that this context will be cleared when I return, thus
124          * releasing the memory allocated here.
125          */
126         anl_context = CurrentMemoryContext;
127
128         /*
129          * Check for user-requested abort.      Note we want this to be inside a
130          * transaction, so xact.c doesn't issue useless WARNING.
131          */
132         CHECK_FOR_INTERRUPTS();
133
134         /*
135          * Race condition -- if the pg_class tuple has gone away since the
136          * last time we saw it, we don't need to process it.
137          */
138         if (!SearchSysCacheExists(RELOID,
139                                                           ObjectIdGetDatum(relid),
140                                                           0, 0, 0))
141                 return;
142
143         /*
144          * Open the class, getting only a read lock on it, and check
145          * permissions. Permissions check should match vacuum's check!
146          */
147         onerel = relation_open(relid, AccessShareLock);
148
149         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
150                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
151         {
152                 /* No need for a WARNING if we already complained during VACUUM */
153                 if (!vacstmt->vacuum)
154                         ereport(WARNING,
155                                         (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
156                                                         RelationGetRelationName(onerel))));
157                 relation_close(onerel, AccessShareLock);
158                 return;
159         }
160
161         /*
162          * Check that it's a plain table; we used to do this in get_rel_oids()
163          * but seems safer to check after we've locked the relation.
164          */
165         if (onerel->rd_rel->relkind != RELKIND_RELATION)
166         {
167                 /* No need for a WARNING if we already complained during VACUUM */
168                 if (!vacstmt->vacuum)
169                         ereport(WARNING,
170                                         (errmsg("skipping \"%s\" --- cannot analyze indexes, views, or special system tables",
171                                                         RelationGetRelationName(onerel))));
172                 relation_close(onerel, AccessShareLock);
173                 return;
174         }
175
176         /*
177          * Silently ignore tables that are temp tables of other backends ---
178          * trying to analyze these is rather pointless, since their contents
179          * are probably not up-to-date on disk.  (We don't throw a warning
180          * here; it would just lead to chatter during a database-wide
181          * ANALYZE.)
182          */
183         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
184         {
185                 relation_close(onerel, AccessShareLock);
186                 return;
187         }
188
189         /*
190          * We can ANALYZE any table except pg_statistic. See update_attstats
191          */
192         if (RelationGetRelid(onerel) == StatisticRelationId)
193         {
194                 relation_close(onerel, AccessShareLock);
195                 return;
196         }
197
198         ereport(elevel,
199                         (errmsg("analyzing \"%s.%s\"",
200                                         get_namespace_name(RelationGetNamespace(onerel)),
201                                         RelationGetRelationName(onerel))));
202
203         /*
204          * Determine which columns to analyze
205          *
206          * Note that system attributes are never analyzed.
207          */
208         if (vacstmt->va_cols != NIL)
209         {
210                 ListCell   *le;
211
212                 vacattrstats = (VacAttrStats **) palloc(list_length(vacstmt->va_cols) *
213                                                                                                 sizeof(VacAttrStats *));
214                 tcnt = 0;
215                 foreach(le, vacstmt->va_cols)
216                 {
217                         char       *col = strVal(lfirst(le));
218
219                         i = attnameAttNum(onerel, col, false);
220                         vacattrstats[tcnt] = examine_attribute(onerel, i);
221                         if (vacattrstats[tcnt] != NULL)
222                                 tcnt++;
223                 }
224                 attr_cnt = tcnt;
225         }
226         else
227         {
228                 attr_cnt = onerel->rd_att->natts;
229                 vacattrstats = (VacAttrStats **)
230                         palloc(attr_cnt * sizeof(VacAttrStats *));
231                 tcnt = 0;
232                 for (i = 1; i <= attr_cnt; i++)
233                 {
234                         vacattrstats[tcnt] = examine_attribute(onerel, i);
235                         if (vacattrstats[tcnt] != NULL)
236                                 tcnt++;
237                 }
238                 attr_cnt = tcnt;
239         }
240
241         /*
242          * Open all indexes of the relation, and see if there are any
243          * analyzable columns in the indexes.  We do not analyze index columns
244          * if there was an explicit column list in the ANALYZE command,
245          * however.
246          */
247         vac_open_indexes(onerel, AccessShareLock, &nindexes, &Irel);
248         hasindex = (nindexes > 0);
249         indexdata = NULL;
250         analyzableindex = false;
251         if (hasindex)
252         {
253                 indexdata = (AnlIndexData *) palloc0(nindexes * sizeof(AnlIndexData));
254                 for (ind = 0; ind < nindexes; ind++)
255                 {
256                         AnlIndexData *thisdata = &indexdata[ind];
257                         IndexInfo  *indexInfo;
258
259                         thisdata->indexInfo = indexInfo = BuildIndexInfo(Irel[ind]);
260                         thisdata->tupleFract = 1.0; /* fix later if partial */
261                         if (indexInfo->ii_Expressions != NIL && vacstmt->va_cols == NIL)
262                         {
263                                 ListCell   *indexpr_item = list_head(indexInfo->ii_Expressions);
264
265                                 thisdata->vacattrstats = (VacAttrStats **)
266                                         palloc(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
267                                 tcnt = 0;
268                                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
269                                 {
270                                         int                     keycol = indexInfo->ii_KeyAttrNumbers[i];
271
272                                         if (keycol == 0)
273                                         {
274                                                 /* Found an index expression */
275                                                 Node       *indexkey;
276
277                                                 if (indexpr_item == NULL)               /* shouldn't happen */
278                                                         elog(ERROR, "too few entries in indexprs list");
279                                                 indexkey = (Node *) lfirst(indexpr_item);
280                                                 indexpr_item = lnext(indexpr_item);
281
282                                                 /*
283                                                  * Can't analyze if the opclass uses a storage
284                                                  * type different from the expression result type.
285                                                  * We'd get confused because the type shown in
286                                                  * pg_attribute for the index column doesn't match
287                                                  * what we are getting from the expression.
288                                                  * Perhaps this can be fixed someday, but for now,
289                                                  * punt.
290                                                  */
291                                                 if (exprType(indexkey) !=
292                                                         Irel[ind]->rd_att->attrs[i]->atttypid)
293                                                         continue;
294
295                                                 thisdata->vacattrstats[tcnt] =
296                                                         examine_attribute(Irel[ind], i + 1);
297                                                 if (thisdata->vacattrstats[tcnt] != NULL)
298                                                 {
299                                                         tcnt++;
300                                                         analyzableindex = true;
301                                                 }
302                                         }
303                                 }
304                                 thisdata->attr_cnt = tcnt;
305                         }
306                 }
307         }
308
309         /*
310          * Quit if no analyzable columns
311          */
312         if (attr_cnt <= 0 && !analyzableindex)
313         {
314                 /*
315                  * We report that the table is empty; this is just so that the
316                  * autovacuum code doesn't go nuts trying to get stats about
317                  * a zero-column table.
318                  */
319                 if (!vacstmt->vacuum)
320                         pgstat_report_analyze(RelationGetRelid(onerel), 0, 0);
321
322                 vac_close_indexes(nindexes, Irel, AccessShareLock);
323                 relation_close(onerel, AccessShareLock);
324                 return;
325         }
326
327         /*
328          * Determine how many rows we need to sample, using the worst case
329          * from all analyzable columns.  We use a lower bound of 100 rows to
330          * avoid possible overflow in Vitter's algorithm.
331          */
332         targrows = 100;
333         for (i = 0; i < attr_cnt; i++)
334         {
335                 if (targrows < vacattrstats[i]->minrows)
336                         targrows = vacattrstats[i]->minrows;
337         }
338         for (ind = 0; ind < nindexes; ind++)
339         {
340                 AnlIndexData *thisdata = &indexdata[ind];
341
342                 for (i = 0; i < thisdata->attr_cnt; i++)
343                 {
344                         if (targrows < thisdata->vacattrstats[i]->minrows)
345                                 targrows = thisdata->vacattrstats[i]->minrows;
346                 }
347         }
348
349         /*
350          * Acquire the sample rows
351          */
352         rows = (HeapTuple *) palloc(targrows * sizeof(HeapTuple));
353         numrows = acquire_sample_rows(onerel, rows, targrows,
354                                                                   &totalrows, &totaldeadrows);
355
356         /*
357          * Compute the statistics.      Temporary results during the calculations
358          * for each column are stored in a child context.  The calc routines
359          * are responsible to make sure that whatever they store into the
360          * VacAttrStats structure is allocated in anl_context.
361          */
362         if (numrows > 0)
363         {
364                 MemoryContext col_context,
365                                         old_context;
366
367                 col_context = AllocSetContextCreate(anl_context,
368                                                                                         "Analyze Column",
369                                                                                         ALLOCSET_DEFAULT_MINSIZE,
370                                                                                         ALLOCSET_DEFAULT_INITSIZE,
371                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
372                 old_context = MemoryContextSwitchTo(col_context);
373
374                 for (i = 0; i < attr_cnt; i++)
375                 {
376                         VacAttrStats *stats = vacattrstats[i];
377
378                         stats->rows = rows;
379                         stats->tupDesc = onerel->rd_att;
380                         (*stats->compute_stats) (stats,
381                                                                          std_fetch_func,
382                                                                          numrows,
383                                                                          totalrows);
384                         MemoryContextResetAndDeleteChildren(col_context);
385                 }
386
387                 if (hasindex)
388                         compute_index_stats(onerel, totalrows,
389                                                                 indexdata, nindexes,
390                                                                 rows, numrows,
391                                                                 col_context);
392
393                 MemoryContextSwitchTo(old_context);
394                 MemoryContextDelete(col_context);
395
396                 /*
397                  * Emit the completed stats rows into pg_statistic, replacing any
398                  * previous statistics for the target columns.  (If there are
399                  * stats in pg_statistic for columns we didn't process, we leave
400                  * them alone.)
401                  */
402                 update_attstats(relid, attr_cnt, vacattrstats);
403
404                 for (ind = 0; ind < nindexes; ind++)
405                 {
406                         AnlIndexData *thisdata = &indexdata[ind];
407
408                         update_attstats(RelationGetRelid(Irel[ind]),
409                                                         thisdata->attr_cnt, thisdata->vacattrstats);
410                 }
411         }
412
413         /*
414          * If we are running a standalone ANALYZE, update pages/tuples stats
415          * in pg_class.  We know the accurate page count from the smgr, but
416          * only an approximate number of tuples; therefore, if we are part of
417          * VACUUM ANALYZE do *not* overwrite the accurate count already
418          * inserted by VACUUM.  The same consideration applies to indexes.
419          */
420         if (!vacstmt->vacuum)
421         {
422                 vac_update_relstats(RelationGetRelid(onerel),
423                                                         RelationGetNumberOfBlocks(onerel),
424                                                         totalrows,
425                                                         hasindex);
426                 for (ind = 0; ind < nindexes; ind++)
427                 {
428                         AnlIndexData *thisdata = &indexdata[ind];
429                         double          totalindexrows;
430
431                         totalindexrows = ceil(thisdata->tupleFract * totalrows);
432                         vac_update_relstats(RelationGetRelid(Irel[ind]),
433                                                                 RelationGetNumberOfBlocks(Irel[ind]),
434                                                                 totalindexrows,
435                                                                 false);
436                 }
437
438                 /* report results to the stats collector, too */
439                 pgstat_report_analyze(RelationGetRelid(onerel), totalrows,
440                                                           totaldeadrows);
441         }
442
443         /* Done with indexes */
444         vac_close_indexes(nindexes, Irel, NoLock);
445
446         /*
447          * Close source relation now, but keep lock so that no one deletes it
448          * before we commit.  (If someone did, they'd fail to clean up the
449          * entries we made in pg_statistic.)
450          */
451         relation_close(onerel, NoLock);
452 }
453
454 /*
455  * Compute statistics about indexes of a relation
456  */
457 static void
458 compute_index_stats(Relation onerel, double totalrows,
459                                         AnlIndexData *indexdata, int nindexes,
460                                         HeapTuple *rows, int numrows,
461                                         MemoryContext col_context)
462 {
463         MemoryContext ind_context,
464                                 old_context;
465         Datum           values[INDEX_MAX_KEYS];
466         bool            isnull[INDEX_MAX_KEYS];
467         int                     ind,
468                                 i;
469
470         ind_context = AllocSetContextCreate(anl_context,
471                                                                                 "Analyze Index",
472                                                                                 ALLOCSET_DEFAULT_MINSIZE,
473                                                                                 ALLOCSET_DEFAULT_INITSIZE,
474                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
475         old_context = MemoryContextSwitchTo(ind_context);
476
477         for (ind = 0; ind < nindexes; ind++)
478         {
479                 AnlIndexData *thisdata = &indexdata[ind];
480                 IndexInfo  *indexInfo = thisdata->indexInfo;
481                 int                     attr_cnt = thisdata->attr_cnt;
482                 TupleTableSlot *slot;
483                 EState     *estate;
484                 ExprContext *econtext;
485                 List       *predicate;
486                 Datum      *exprvals;
487                 bool       *exprnulls;
488                 int                     numindexrows,
489                                         tcnt,
490                                         rowno;
491                 double          totalindexrows;
492
493                 /* Ignore index if no columns to analyze and not partial */
494                 if (attr_cnt == 0 && indexInfo->ii_Predicate == NIL)
495                         continue;
496
497                 /*
498                  * Need an EState for evaluation of index expressions and
499                  * partial-index predicates.  Create it in the per-index context
500                  * to be sure it gets cleaned up at the bottom of the loop.
501                  */
502                 estate = CreateExecutorState();
503                 econtext = GetPerTupleExprContext(estate);
504                 /* Need a slot to hold the current heap tuple, too */
505                 slot = MakeSingleTupleTableSlot(RelationGetDescr(onerel));
506
507                 /* Arrange for econtext's scan tuple to be the tuple under test */
508                 econtext->ecxt_scantuple = slot;
509
510                 /* Set up execution state for predicate. */
511                 predicate = (List *)
512                         ExecPrepareExpr((Expr *) indexInfo->ii_Predicate,
513                                                         estate);
514
515                 /* Compute and save index expression values */
516                 exprvals = (Datum *) palloc(numrows * attr_cnt * sizeof(Datum));
517                 exprnulls = (bool *) palloc(numrows * attr_cnt * sizeof(bool));
518                 numindexrows = 0;
519                 tcnt = 0;
520                 for (rowno = 0; rowno < numrows; rowno++)
521                 {
522                         HeapTuple       heapTuple = rows[rowno];
523
524                         /* Set up for predicate or expression evaluation */
525                         ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
526
527                         /* If index is partial, check predicate */
528                         if (predicate != NIL)
529                         {
530                                 if (!ExecQual(predicate, econtext, false))
531                                         continue;
532                         }
533                         numindexrows++;
534
535                         if (attr_cnt > 0)
536                         {
537                                 /*
538                                  * Evaluate the index row to compute expression values. We
539                                  * could do this by hand, but FormIndexDatum is
540                                  * convenient.
541                                  */
542                                 FormIndexDatum(indexInfo,
543                                                            slot,
544                                                            estate,
545                                                            values,
546                                                            isnull);
547
548                                 /*
549                                  * Save just the columns we care about.
550                                  */
551                                 for (i = 0; i < attr_cnt; i++)
552                                 {
553                                         VacAttrStats *stats = thisdata->vacattrstats[i];
554                                         int                     attnum = stats->attr->attnum;
555
556                                         exprvals[tcnt] = values[attnum - 1];
557                                         exprnulls[tcnt] = isnull[attnum - 1];
558                                         tcnt++;
559                                 }
560                         }
561                 }
562
563                 /*
564                  * Having counted the number of rows that pass the predicate in
565                  * the sample, we can estimate the total number of rows in the
566                  * index.
567                  */
568                 thisdata->tupleFract = (double) numindexrows / (double) numrows;
569                 totalindexrows = ceil(thisdata->tupleFract * totalrows);
570
571                 /*
572                  * Now we can compute the statistics for the expression columns.
573                  */
574                 if (numindexrows > 0)
575                 {
576                         MemoryContextSwitchTo(col_context);
577                         for (i = 0; i < attr_cnt; i++)
578                         {
579                                 VacAttrStats *stats = thisdata->vacattrstats[i];
580
581                                 stats->exprvals = exprvals + i;
582                                 stats->exprnulls = exprnulls + i;
583                                 stats->rowstride = attr_cnt;
584                                 (*stats->compute_stats) (stats,
585                                                                                  ind_fetch_func,
586                                                                                  numindexrows,
587                                                                                  totalindexrows);
588                                 MemoryContextResetAndDeleteChildren(col_context);
589                         }
590                 }
591
592                 /* And clean up */
593                 MemoryContextSwitchTo(ind_context);
594
595                 ExecDropSingleTupleTableSlot(slot);
596                 FreeExecutorState(estate);
597                 MemoryContextResetAndDeleteChildren(ind_context);
598         }
599
600         MemoryContextSwitchTo(old_context);
601         MemoryContextDelete(ind_context);
602 }
603
604 /*
605  * examine_attribute -- pre-analysis of a single column
606  *
607  * Determine whether the column is analyzable; if so, create and initialize
608  * a VacAttrStats struct for it.  If not, return NULL.
609  */
610 static VacAttrStats *
611 examine_attribute(Relation onerel, int attnum)
612 {
613         Form_pg_attribute attr = onerel->rd_att->attrs[attnum - 1];
614         HeapTuple       typtuple;
615         VacAttrStats *stats;
616         bool            ok;
617
618         /* Never analyze dropped columns */
619         if (attr->attisdropped)
620                 return NULL;
621
622         /* Don't analyze column if user has specified not to */
623         if (attr->attstattarget == 0)
624                 return NULL;
625
626         /*
627          * Create the VacAttrStats struct.
628          */
629         stats = (VacAttrStats *) palloc0(sizeof(VacAttrStats));
630         stats->attr = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
631         memcpy(stats->attr, attr, ATTRIBUTE_TUPLE_SIZE);
632         typtuple = SearchSysCache(TYPEOID,
633                                                           ObjectIdGetDatum(attr->atttypid),
634                                                           0, 0, 0);
635         if (!HeapTupleIsValid(typtuple))
636                 elog(ERROR, "cache lookup failed for type %u", attr->atttypid);
637         stats->attrtype = (Form_pg_type) palloc(sizeof(FormData_pg_type));
638         memcpy(stats->attrtype, GETSTRUCT(typtuple), sizeof(FormData_pg_type));
639         ReleaseSysCache(typtuple);
640         stats->anl_context = anl_context;
641         stats->tupattnum = attnum;
642
643         /*
644          * Call the type-specific typanalyze function.  If none is specified,
645          * use std_typanalyze().
646          */
647         if (OidIsValid(stats->attrtype->typanalyze))
648                 ok = DatumGetBool(OidFunctionCall1(stats->attrtype->typanalyze,
649                                                                                    PointerGetDatum(stats)));
650         else
651                 ok = std_typanalyze(stats);
652
653         if (!ok || stats->compute_stats == NULL || stats->minrows <= 0)
654         {
655                 pfree(stats->attrtype);
656                 pfree(stats->attr);
657                 pfree(stats);
658                 return NULL;
659         }
660
661         return stats;
662 }
663
664 /*
665  * BlockSampler_Init -- prepare for random sampling of blocknumbers
666  *
667  * BlockSampler is used for stage one of our new two-stage tuple
668  * sampling mechanism as discussed on pgsql-hackers 2004-04-02 (subject
669  * "Large DB").  It selects a random sample of samplesize blocks out of
670  * the nblocks blocks in the table.  If the table has less than
671  * samplesize blocks, all blocks are selected.
672  *
673  * Since we know the total number of blocks in advance, we can use the
674  * straightforward Algorithm S from Knuth 3.4.2, rather than Vitter's
675  * algorithm.
676  */
677 static void
678 BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize)
679 {
680         bs->N = nblocks;                        /* measured table size */
681
682         /*
683          * If we decide to reduce samplesize for tables that have less or not
684          * much more than samplesize blocks, here is the place to do it.
685          */
686         bs->n = samplesize;
687         bs->t = 0;                                      /* blocks scanned so far */
688         bs->m = 0;                                      /* blocks selected so far */
689 }
690
691 static bool
692 BlockSampler_HasMore(BlockSampler bs)
693 {
694         return (bs->t < bs->N) && (bs->m < bs->n);
695 }
696
697 static BlockNumber
698 BlockSampler_Next(BlockSampler bs)
699 {
700         BlockNumber K = bs->N - bs->t;          /* remaining blocks */
701         int                     k = bs->n - bs->m;              /* blocks still to sample */
702         double          p;                              /* probability to skip block */
703         double          V;                              /* random */
704
705         Assert(BlockSampler_HasMore(bs));       /* hence K > 0 and k > 0 */
706
707         if ((BlockNumber) k >= K)
708         {
709                 /* need all the rest */
710                 bs->m++;
711                 return bs->t++;
712         }
713
714         /*----------
715          * It is not obvious that this code matches Knuth's Algorithm S.
716          * Knuth says to skip the current block with probability 1 - k/K.
717          * If we are to skip, we should advance t (hence decrease K), and
718          * repeat the same probabilistic test for the next block.  The naive
719          * implementation thus requires a random_fract() call for each block
720          * number.      But we can reduce this to one random_fract() call per
721          * selected block, by noting that each time the while-test succeeds,
722          * we can reinterpret V as a uniform random number in the range 0 to p.
723          * Therefore, instead of choosing a new V, we just adjust p to be
724          * the appropriate fraction of its former value, and our next loop
725          * makes the appropriate probabilistic test.
726          *
727          * We have initially K > k > 0.  If the loop reduces K to equal k,
728          * the next while-test must fail since p will become exactly zero
729          * (we assume there will not be roundoff error in the division).
730          * (Note: Knuth suggests a "<=" loop condition, but we use "<" just
731          * to be doubly sure about roundoff error.)  Therefore K cannot become
732          * less than k, which means that we cannot fail to select enough blocks.
733          *----------
734          */
735         V = random_fract();
736         p = 1.0 - (double) k / (double) K;
737         while (V < p)
738         {
739                 /* skip */
740                 bs->t++;
741                 K--;                                    /* keep K == N - t */
742
743                 /* adjust p to be new cutoff point in reduced range */
744                 p *= 1.0 - (double) k / (double) K;
745         }
746
747         /* select */
748         bs->m++;
749         return bs->t++;
750 }
751
752 /*
753  * acquire_sample_rows -- acquire a random sample of rows from the table
754  *
755  * As of May 2004 we use a new two-stage method:  Stage one selects up
756  * to targrows random blocks (or all blocks, if there aren't so many).
757  * Stage two scans these blocks and uses the Vitter algorithm to create
758  * a random sample of targrows rows (or less, if there are less in the
759  * sample of blocks).  The two stages are executed simultaneously: each
760  * block is processed as soon as stage one returns its number and while
761  * the rows are read stage two controls which ones are to be inserted
762  * into the sample.
763  *
764  * Although every row has an equal chance of ending up in the final
765  * sample, this sampling method is not perfect: not every possible
766  * sample has an equal chance of being selected.  For large relations
767  * the number of different blocks represented by the sample tends to be
768  * too small.  We can live with that for now.  Improvements are welcome.
769  *
770  * We also estimate the total numbers of live and dead rows in the table,
771  * and return them into *totalrows and *totaldeadrows, respectively.
772  *
773  * An important property of this sampling method is that because we do
774  * look at a statistically unbiased set of blocks, we should get
775  * unbiased estimates of the average numbers of live and dead rows per
776  * block.  The previous sampling method put too much credence in the row
777  * density near the start of the table.
778  *
779  * The returned list of tuples is in order by physical position in the table.
780  * (We will rely on this later to derive correlation estimates.)
781  */
782 static int
783 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
784                                         double *totalrows, double *totaldeadrows)
785 {
786         int                     numrows = 0;    /* # rows collected */
787         double          liverows = 0;   /* # rows seen */
788         double          deadrows = 0;   /* # dead rows seen */
789         double          rowstoskip = -1;        /* -1 means not set yet */
790         BlockNumber totalblocks;
791         BlockSamplerData bs;
792         double          rstate;
793
794         Assert(targrows > 1);
795
796         totalblocks = RelationGetNumberOfBlocks(onerel);
797
798         /* Prepare for sampling block numbers */
799         BlockSampler_Init(&bs, totalblocks, targrows);
800         /* Prepare for sampling rows */
801         rstate = init_selection_state(targrows);
802
803         /* Outer loop over blocks to sample */
804         while (BlockSampler_HasMore(&bs))
805         {
806                 BlockNumber targblock = BlockSampler_Next(&bs);
807                 Buffer          targbuffer;
808                 Page            targpage;
809                 OffsetNumber targoffset,
810                                         maxoffset;
811
812                 vacuum_delay_point();
813
814                 /*
815                  * We must maintain a pin on the target page's buffer to ensure
816                  * that the maxoffset value stays good (else concurrent VACUUM
817                  * might delete tuples out from under us).      Hence, pin the page
818                  * until we are done looking at it.  We don't maintain a lock on
819                  * the page, so tuples could get added to it, but we ignore such
820                  * tuples.
821                  */
822                 targbuffer = ReadBuffer(onerel, targblock);
823                 LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
824                 targpage = BufferGetPage(targbuffer);
825                 maxoffset = PageGetMaxOffsetNumber(targpage);
826                 LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
827
828                 /* Inner loop over all tuples on the selected page */
829                 for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
830                 {
831                         HeapTupleData targtuple;
832
833                         ItemPointerSet(&targtuple.t_self, targblock, targoffset);
834                         /* We use heap_release_fetch to avoid useless bufmgr traffic */
835                         if (heap_release_fetch(onerel, SnapshotNow,
836                                                                    &targtuple, &targbuffer,
837                                                                    true, NULL))
838                         {
839                                 /*
840                                  * The first targrows live rows are simply copied into the
841                                  * reservoir. Then we start replacing tuples in the sample
842                                  * until we reach the end of the relation.      This algorithm
843                                  * is from Jeff Vitter's paper (see full citation below).
844                                  * It works by repeatedly computing the number of tuples
845                                  * to skip before selecting a tuple, which replaces a
846                                  * randomly chosen element of the reservoir (current set
847                                  * of tuples).  At all times the reservoir is a true
848                                  * random sample of the tuples we've passed over so far,
849                                  * so when we fall off the end of the relation we're done.
850                                  */
851                                 if (numrows < targrows)
852                                         rows[numrows++] = heap_copytuple(&targtuple);
853                                 else
854                                 {
855                                         /*
856                                          * t in Vitter's paper is the number of records
857                                          * already processed.  If we need to compute a new S
858                                          * value, we must use the not-yet-incremented value of
859                                          * liverows as t.
860                                          */
861                                         if (rowstoskip < 0)
862                                                 rowstoskip = get_next_S(liverows, targrows, &rstate);
863
864                                         if (rowstoskip <= 0)
865                                         {
866                                                 /*
867                                                  * Found a suitable tuple, so save it, replacing
868                                                  * one old tuple at random
869                                                  */
870                                                 int                     k = (int) (targrows * random_fract());
871
872                                                 Assert(k >= 0 && k < targrows);
873                                                 heap_freetuple(rows[k]);
874                                                 rows[k] = heap_copytuple(&targtuple);
875                                         }
876
877                                         rowstoskip -= 1;
878                                 }
879
880                                 liverows += 1;
881                         }
882                         else
883                         {
884                                 /* Count dead rows, but not empty slots */
885                                 if (targtuple.t_data != NULL)
886                                         deadrows += 1;
887                         }
888                 }
889
890                 /* Now release the pin on the page */
891                 ReleaseBuffer(targbuffer);
892         }
893
894         /*
895          * If we didn't find as many tuples as we wanted then we're done. No
896          * sort is needed, since they're already in order.
897          *
898          * Otherwise we need to sort the collected tuples by position
899          * (itempointer).  It's not worth worrying about corner cases where
900          * the tuples are already sorted.
901          */
902         if (numrows == targrows)
903                 qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
904
905         /*
906          * Estimate total numbers of rows in relation.
907          */
908         if (bs.m > 0)
909         {
910                 *totalrows = floor((liverows * totalblocks) / bs.m + 0.5);
911                 *totaldeadrows = floor((deadrows * totalblocks) / bs.m + 0.5);
912         }
913         else
914         {
915                 *totalrows = 0.0;
916                 *totaldeadrows = 0.0;
917         }
918
919         /*
920          * Emit some interesting relation info
921          */
922         ereport(elevel,
923                         (errmsg("\"%s\": scanned %d of %u pages, "
924                                         "containing %.0f live rows and %.0f dead rows; "
925                                         "%d rows in sample, %.0f estimated total rows",
926                                         RelationGetRelationName(onerel),
927                                         bs.m, totalblocks,
928                                         liverows, deadrows,
929                                         numrows, *totalrows)));
930
931         return numrows;
932 }
933
934 /* Select a random value R uniformly distributed in 0 < R < 1 */
935 static double
936 random_fract(void)
937 {
938         long            z;
939
940         /* random() can produce endpoint values, try again if so */
941         do
942         {
943                 z = random();
944         } while (z <= 0 || z >= MAX_RANDOM_VALUE);
945         return (double) z / (double) MAX_RANDOM_VALUE;
946 }
947
948 /*
949  * These two routines embody Algorithm Z from "Random sampling with a
950  * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
951  * (Mar. 1985), Pages 37-57.  Vitter describes his algorithm in terms
952  * of the count S of records to skip before processing another record.
953  * It is computed primarily based on t, the number of records already read.
954  * The only extra state needed between calls is W, a random state variable.
955  *
956  * init_selection_state computes the initial W value.
957  *
958  * Given that we've already read t records (t >= n), get_next_S
959  * determines the number of records to skip before the next record is
960  * processed.
961  */
962 static double
963 init_selection_state(int n)
964 {
965         /* Initial value of W (for use when Algorithm Z is first applied) */
966         return exp(-log(random_fract()) / n);
967 }
968
969 static double
970 get_next_S(double t, int n, double *stateptr)
971 {
972         double          S;
973
974         /* The magic constant here is T from Vitter's paper */
975         if (t <= (22.0 * n))
976         {
977                 /* Process records using Algorithm X until t is large enough */
978                 double          V,
979                                         quot;
980
981                 V = random_fract();             /* Generate V */
982                 S = 0;
983                 t += 1;
984                 /* Note: "num" in Vitter's code is always equal to t - n */
985                 quot = (t - (double) n) / t;
986                 /* Find min S satisfying (4.1) */
987                 while (quot > V)
988                 {
989                         S += 1;
990                         t += 1;
991                         quot *= (t - (double) n) / t;
992                 }
993         }
994         else
995         {
996                 /* Now apply Algorithm Z */
997                 double          W = *stateptr;
998                 double          term = t - (double) n + 1;
999
1000                 for (;;)
1001                 {
1002                         double          numer,
1003                                                 numer_lim,
1004                                                 denom;
1005                         double          U,
1006                                                 X,
1007                                                 lhs,
1008                                                 rhs,
1009                                                 y,
1010                                                 tmp;
1011
1012                         /* Generate U and X */
1013                         U = random_fract();
1014                         X = t * (W - 1.0);
1015                         S = floor(X);           /* S is tentatively set to floor(X) */
1016                         /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
1017                         tmp = (t + 1) / term;
1018                         lhs = exp(log(((U * tmp * tmp) * (term + S)) / (t + X)) / n);
1019                         rhs = (((t + X) / (term + S)) * term) / t;
1020                         if (lhs <= rhs)
1021                         {
1022                                 W = rhs / lhs;
1023                                 break;
1024                         }
1025                         /* Test if U <= f(S)/cg(X) */
1026                         y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X);
1027                         if ((double) n < S)
1028                         {
1029                                 denom = t;
1030                                 numer_lim = term + S;
1031                         }
1032                         else
1033                         {
1034                                 denom = t - (double) n + S;
1035                                 numer_lim = t + 1;
1036                         }
1037                         for (numer = t + S; numer >= numer_lim; numer -= 1)
1038                         {
1039                                 y *= numer / denom;
1040                                 denom -= 1;
1041                         }
1042                         W = exp(-log(random_fract()) / n);      /* Generate W in advance */
1043                         if (exp(log(y) / n) <= (t + X) / t)
1044                                 break;
1045                 }
1046                 *stateptr = W;
1047         }
1048         return S;
1049 }
1050
1051 /*
1052  * qsort comparator for sorting rows[] array
1053  */
1054 static int
1055 compare_rows(const void *a, const void *b)
1056 {
1057         HeapTuple       ha = *(HeapTuple *) a;
1058         HeapTuple       hb = *(HeapTuple *) b;
1059         BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self);
1060         OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self);
1061         BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self);
1062         OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self);
1063
1064         if (ba < bb)
1065                 return -1;
1066         if (ba > bb)
1067                 return 1;
1068         if (oa < ob)
1069                 return -1;
1070         if (oa > ob)
1071                 return 1;
1072         return 0;
1073 }
1074
1075
1076 /*
1077  *      update_attstats() -- update attribute statistics for one relation
1078  *
1079  *              Statistics are stored in several places: the pg_class row for the
1080  *              relation has stats about the whole relation, and there is a
1081  *              pg_statistic row for each (non-system) attribute that has ever
1082  *              been analyzed.  The pg_class values are updated by VACUUM, not here.
1083  *
1084  *              pg_statistic rows are just added or updated normally.  This means
1085  *              that pg_statistic will probably contain some deleted rows at the
1086  *              completion of a vacuum cycle, unless it happens to get vacuumed last.
1087  *
1088  *              To keep things simple, we punt for pg_statistic, and don't try
1089  *              to compute or store rows for pg_statistic itself in pg_statistic.
1090  *              This could possibly be made to work, but it's not worth the trouble.
1091  *              Note analyze_rel() has seen to it that we won't come here when
1092  *              vacuuming pg_statistic itself.
1093  *
1094  *              Note: if two backends concurrently try to analyze the same relation,
1095  *              the second one is likely to fail here with a "tuple concurrently
1096  *              updated" error.  This is slightly annoying, but no real harm is done.
1097  *              We could prevent the problem by using a stronger lock on the
1098  *              relation for ANALYZE (ie, ShareUpdateExclusiveLock instead
1099  *              of AccessShareLock); but that cure seems worse than the disease,
1100  *              especially now that ANALYZE doesn't start a new transaction
1101  *              for each relation.      The lock could be held for a long time...
1102  */
1103 static void
1104 update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats)
1105 {
1106         Relation        sd;
1107         int                     attno;
1108
1109         if (natts <= 0)
1110                 return;                                 /* nothing to do */
1111
1112         sd = heap_open(StatisticRelationId, RowExclusiveLock);
1113
1114         for (attno = 0; attno < natts; attno++)
1115         {
1116                 VacAttrStats *stats = vacattrstats[attno];
1117                 HeapTuple       stup,
1118                                         oldtup;
1119                 int                     i,
1120                                         k,
1121                                         n;
1122                 Datum           values[Natts_pg_statistic];
1123                 char            nulls[Natts_pg_statistic];
1124                 char            replaces[Natts_pg_statistic];
1125
1126                 /* Ignore attr if we weren't able to collect stats */
1127                 if (!stats->stats_valid)
1128                         continue;
1129
1130                 /*
1131                  * Construct a new pg_statistic tuple
1132                  */
1133                 for (i = 0; i < Natts_pg_statistic; ++i)
1134                 {
1135                         nulls[i] = ' ';
1136                         replaces[i] = 'r';
1137                 }
1138
1139                 i = 0;
1140                 values[i++] = ObjectIdGetDatum(relid);  /* starelid */
1141                 values[i++] = Int16GetDatum(stats->attr->attnum);               /* staattnum */
1142                 values[i++] = Float4GetDatum(stats->stanullfrac);               /* stanullfrac */
1143                 values[i++] = Int32GetDatum(stats->stawidth);   /* stawidth */
1144                 values[i++] = Float4GetDatum(stats->stadistinct);               /* stadistinct */
1145                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1146                 {
1147                         values[i++] = Int16GetDatum(stats->stakind[k]);         /* stakindN */
1148                 }
1149                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1150                 {
1151                         values[i++] = ObjectIdGetDatum(stats->staop[k]);        /* staopN */
1152                 }
1153                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1154                 {
1155                         int                     nnum = stats->numnumbers[k];
1156
1157                         if (nnum > 0)
1158                         {
1159                                 Datum      *numdatums = (Datum *) palloc(nnum * sizeof(Datum));
1160                                 ArrayType  *arry;
1161
1162                                 for (n = 0; n < nnum; n++)
1163                                         numdatums[n] = Float4GetDatum(stats->stanumbers[k][n]);
1164                                 /* XXX knows more than it should about type float4: */
1165                                 arry = construct_array(numdatums, nnum,
1166                                                                            FLOAT4OID,
1167                                                                            sizeof(float4), false, 'i');
1168                                 values[i++] = PointerGetDatum(arry);    /* stanumbersN */
1169                         }
1170                         else
1171                         {
1172                                 nulls[i] = 'n';
1173                                 values[i++] = (Datum) 0;
1174                         }
1175                 }
1176                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1177                 {
1178                         if (stats->numvalues[k] > 0)
1179                         {
1180                                 ArrayType  *arry;
1181
1182                                 arry = construct_array(stats->stavalues[k],
1183                                                                            stats->numvalues[k],
1184                                                                            stats->attr->atttypid,
1185                                                                            stats->attrtype->typlen,
1186                                                                            stats->attrtype->typbyval,
1187                                                                            stats->attrtype->typalign);
1188                                 values[i++] = PointerGetDatum(arry);    /* stavaluesN */
1189                         }
1190                         else
1191                         {
1192                                 nulls[i] = 'n';
1193                                 values[i++] = (Datum) 0;
1194                         }
1195                 }
1196
1197                 /* Is there already a pg_statistic tuple for this attribute? */
1198                 oldtup = SearchSysCache(STATRELATT,
1199                                                                 ObjectIdGetDatum(relid),
1200                                                                 Int16GetDatum(stats->attr->attnum),
1201                                                                 0, 0);
1202
1203                 if (HeapTupleIsValid(oldtup))
1204                 {
1205                         /* Yes, replace it */
1206                         stup = heap_modifytuple(oldtup,
1207                                                                         RelationGetDescr(sd),
1208                                                                         values,
1209                                                                         nulls,
1210                                                                         replaces);
1211                         ReleaseSysCache(oldtup);
1212                         simple_heap_update(sd, &stup->t_self, stup);
1213                 }
1214                 else
1215                 {
1216                         /* No, insert new tuple */
1217                         stup = heap_formtuple(sd->rd_att, values, nulls);
1218                         simple_heap_insert(sd, stup);
1219                 }
1220
1221                 /* update indexes too */
1222                 CatalogUpdateIndexes(sd, stup);
1223
1224                 heap_freetuple(stup);
1225         }
1226
1227         heap_close(sd, RowExclusiveLock);
1228 }
1229
1230 /*
1231  * Standard fetch function for use by compute_stats subroutines.
1232  *
1233  * This exists to provide some insulation between compute_stats routines
1234  * and the actual storage of the sample data.
1235  */
1236 static Datum
1237 std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1238 {
1239         int                     attnum = stats->tupattnum;
1240         HeapTuple       tuple = stats->rows[rownum];
1241         TupleDesc       tupDesc = stats->tupDesc;
1242
1243         return heap_getattr(tuple, attnum, tupDesc, isNull);
1244 }
1245
1246 /*
1247  * Fetch function for analyzing index expressions.
1248  *
1249  * We have not bothered to construct index tuples, instead the data is
1250  * just in Datum arrays.
1251  */
1252 static Datum
1253 ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1254 {
1255         int                     i;
1256
1257         /* exprvals and exprnulls are already offset for proper column */
1258         i = rownum * stats->rowstride;
1259         *isNull = stats->exprnulls[i];
1260         return stats->exprvals[i];
1261 }
1262
1263
1264 /*==========================================================================
1265  *
1266  * Code below this point represents the "standard" type-specific statistics
1267  * analysis algorithms.  This code can be replaced on a per-data-type basis
1268  * by setting a nonzero value in pg_type.typanalyze.
1269  *
1270  *==========================================================================
1271  */
1272
1273
1274 /*
1275  * To avoid consuming too much memory during analysis and/or too much space
1276  * in the resulting pg_statistic rows, we ignore varlena datums that are wider
1277  * than WIDTH_THRESHOLD (after detoasting!).  This is legitimate for MCV
1278  * and distinct-value calculations since a wide value is unlikely to be
1279  * duplicated at all, much less be a most-common value.  For the same reason,
1280  * ignoring wide values will not affect our estimates of histogram bin
1281  * boundaries very much.
1282  */
1283 #define WIDTH_THRESHOLD  1024
1284
1285 #define swapInt(a,b)    do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1286 #define swapDatum(a,b)  do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1287
1288 /*
1289  * Extra information used by the default analysis routines
1290  */
1291 typedef struct
1292 {
1293         Oid                     eqopr;                  /* '=' operator for datatype, if any */
1294         Oid                     eqfunc;                 /* and associated function */
1295         Oid                     ltopr;                  /* '<' operator for datatype, if any */
1296 } StdAnalyzeData;
1297
1298 typedef struct
1299 {
1300         Datum           value;                  /* a data value */
1301         int                     tupno;                  /* position index for tuple it came from */
1302 } ScalarItem;
1303
1304 typedef struct
1305 {
1306         int                     count;                  /* # of duplicates */
1307         int                     first;                  /* values[] index of first occurrence */
1308 } ScalarMCVItem;
1309
1310
1311 /* context information for compare_scalars() */
1312 static FmgrInfo *datumCmpFn;
1313 static SortFunctionKind datumCmpFnKind;
1314 static int *datumCmpTupnoLink;
1315
1316
1317 static void compute_minimal_stats(VacAttrStatsP stats,
1318                                           AnalyzeAttrFetchFunc fetchfunc,
1319                                           int samplerows,
1320                                           double totalrows);
1321 static void compute_scalar_stats(VacAttrStatsP stats,
1322                                          AnalyzeAttrFetchFunc fetchfunc,
1323                                          int samplerows,
1324                                          double totalrows);
1325 static int      compare_scalars(const void *a, const void *b);
1326 static int      compare_mcvs(const void *a, const void *b);
1327
1328
1329 /*
1330  * std_typanalyze -- the default type-specific typanalyze function
1331  */
1332 static bool
1333 std_typanalyze(VacAttrStats *stats)
1334 {
1335         Form_pg_attribute attr = stats->attr;
1336         Operator        func_operator;
1337         Oid                     eqopr = InvalidOid;
1338         Oid                     eqfunc = InvalidOid;
1339         Oid                     ltopr = InvalidOid;
1340         StdAnalyzeData *mystats;
1341
1342         /* If the attstattarget column is negative, use the default value */
1343         /* NB: it is okay to scribble on stats->attr since it's a copy */
1344         if (attr->attstattarget < 0)
1345                 attr->attstattarget = default_statistics_target;
1346
1347         /* If column has no "=" operator, we can't do much of anything */
1348         func_operator = equality_oper(attr->atttypid, true);
1349         if (func_operator != NULL)
1350         {
1351                 eqopr = oprid(func_operator);
1352                 eqfunc = oprfuncid(func_operator);
1353                 ReleaseSysCache(func_operator);
1354         }
1355         if (!OidIsValid(eqfunc))
1356                 return false;
1357
1358         /* Is there a "<" operator with suitable semantics? */
1359         func_operator = ordering_oper(attr->atttypid, true);
1360         if (func_operator != NULL)
1361         {
1362                 ltopr = oprid(func_operator);
1363                 ReleaseSysCache(func_operator);
1364         }
1365
1366         /* Save the operator info for compute_stats routines */
1367         mystats = (StdAnalyzeData *) palloc(sizeof(StdAnalyzeData));
1368         mystats->eqopr = eqopr;
1369         mystats->eqfunc = eqfunc;
1370         mystats->ltopr = ltopr;
1371         stats->extra_data = mystats;
1372
1373         /*
1374          * Determine which standard statistics algorithm to use
1375          */
1376         if (OidIsValid(ltopr))
1377         {
1378                 /* Seems to be a scalar datatype */
1379                 stats->compute_stats = compute_scalar_stats;
1380                 /*--------------------
1381                  * The following choice of minrows is based on the paper
1382                  * "Random sampling for histogram construction: how much is enough?"
1383                  * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
1384                  * Proceedings of ACM SIGMOD International Conference on Management
1385                  * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
1386                  * says that for table size n, histogram size k, maximum relative
1387                  * error in bin size f, and error probability gamma, the minimum
1388                  * random sample size is
1389                  *              r = 4 * k * ln(2*n/gamma) / f^2
1390                  * Taking f = 0.5, gamma = 0.01, n = 1 million rows, we obtain
1391                  *              r = 305.82 * k
1392                  * Note that because of the log function, the dependence on n is
1393                  * quite weak; even at n = 1 billion, a 300*k sample gives <= 0.59
1394                  * bin size error with probability 0.99.  So there's no real need to
1395                  * scale for n, which is a good thing because we don't necessarily
1396                  * know it at this point.
1397                  *--------------------
1398                  */
1399                 stats->minrows = 300 * attr->attstattarget;
1400         }
1401         else
1402         {
1403                 /* Can't do much but the minimal stuff */
1404                 stats->compute_stats = compute_minimal_stats;
1405                 /* Might as well use the same minrows as above */
1406                 stats->minrows = 300 * attr->attstattarget;
1407         }
1408
1409         return true;
1410 }
1411
1412 /*
1413  *      compute_minimal_stats() -- compute minimal column statistics
1414  *
1415  *      We use this when we can find only an "=" operator for the datatype.
1416  *
1417  *      We determine the fraction of non-null rows, the average width, the
1418  *      most common values, and the (estimated) number of distinct values.
1419  *
1420  *      The most common values are determined by brute force: we keep a list
1421  *      of previously seen values, ordered by number of times seen, as we scan
1422  *      the samples.  A newly seen value is inserted just after the last
1423  *      multiply-seen value, causing the bottommost (oldest) singly-seen value
1424  *      to drop off the list.  The accuracy of this method, and also its cost,
1425  *      depend mainly on the length of the list we are willing to keep.
1426  */
1427 static void
1428 compute_minimal_stats(VacAttrStatsP stats,
1429                                           AnalyzeAttrFetchFunc fetchfunc,
1430                                           int samplerows,
1431                                           double totalrows)
1432 {
1433         int                     i;
1434         int                     null_cnt = 0;
1435         int                     nonnull_cnt = 0;
1436         int                     toowide_cnt = 0;
1437         double          total_width = 0;
1438         bool            is_varlena = (!stats->attr->attbyval &&
1439                                                           stats->attr->attlen == -1);
1440         bool            is_varwidth = (!stats->attr->attbyval &&
1441                                                            stats->attr->attlen < 0);
1442         FmgrInfo        f_cmpeq;
1443         typedef struct
1444         {
1445                 Datum           value;
1446                 int                     count;
1447         } TrackItem;
1448         TrackItem  *track;
1449         int                     track_cnt,
1450                                 track_max;
1451         int                     num_mcv = stats->attr->attstattarget;
1452         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1453
1454         /*
1455          * We track up to 2*n values for an n-element MCV list; but at least
1456          * 10
1457          */
1458         track_max = 2 * num_mcv;
1459         if (track_max < 10)
1460                 track_max = 10;
1461         track = (TrackItem *) palloc(track_max * sizeof(TrackItem));
1462         track_cnt = 0;
1463
1464         fmgr_info(mystats->eqfunc, &f_cmpeq);
1465
1466         for (i = 0; i < samplerows; i++)
1467         {
1468                 Datum           value;
1469                 bool            isnull;
1470                 bool            match;
1471                 int                     firstcount1,
1472                                         j;
1473
1474                 vacuum_delay_point();
1475
1476                 value = fetchfunc(stats, i, &isnull);
1477
1478                 /* Check for null/nonnull */
1479                 if (isnull)
1480                 {
1481                         null_cnt++;
1482                         continue;
1483                 }
1484                 nonnull_cnt++;
1485
1486                 /*
1487                  * If it's a variable-width field, add up widths for average width
1488                  * calculation.  Note that if the value is toasted, we use the
1489                  * toasted width.  We don't bother with this calculation if it's a
1490                  * fixed-width type.
1491                  */
1492                 if (is_varlena)
1493                 {
1494                         total_width += VARSIZE(DatumGetPointer(value));
1495
1496                         /*
1497                          * If the value is toasted, we want to detoast it just once to
1498                          * avoid repeated detoastings and resultant excess memory
1499                          * usage during the comparisons.  Also, check to see if the
1500                          * value is excessively wide, and if so don't detoast at all
1501                          * --- just ignore the value.
1502                          */
1503                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
1504                         {
1505                                 toowide_cnt++;
1506                                 continue;
1507                         }
1508                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
1509                 }
1510                 else if (is_varwidth)
1511                 {
1512                         /* must be cstring */
1513                         total_width += strlen(DatumGetCString(value)) + 1;
1514                 }
1515
1516                 /*
1517                  * See if the value matches anything we're already tracking.
1518                  */
1519                 match = false;
1520                 firstcount1 = track_cnt;
1521                 for (j = 0; j < track_cnt; j++)
1522                 {
1523                         if (DatumGetBool(FunctionCall2(&f_cmpeq, value, track[j].value)))
1524                         {
1525                                 match = true;
1526                                 break;
1527                         }
1528                         if (j < firstcount1 && track[j].count == 1)
1529                                 firstcount1 = j;
1530                 }
1531
1532                 if (match)
1533                 {
1534                         /* Found a match */
1535                         track[j].count++;
1536                         /* This value may now need to "bubble up" in the track list */
1537                         while (j > 0 && track[j].count > track[j - 1].count)
1538                         {
1539                                 swapDatum(track[j].value, track[j - 1].value);
1540                                 swapInt(track[j].count, track[j - 1].count);
1541                                 j--;
1542                         }
1543                 }
1544                 else
1545                 {
1546                         /* No match.  Insert at head of count-1 list */
1547                         if (track_cnt < track_max)
1548                                 track_cnt++;
1549                         for (j = track_cnt - 1; j > firstcount1; j--)
1550                         {
1551                                 track[j].value = track[j - 1].value;
1552                                 track[j].count = track[j - 1].count;
1553                         }
1554                         if (firstcount1 < track_cnt)
1555                         {
1556                                 track[firstcount1].value = value;
1557                                 track[firstcount1].count = 1;
1558                         }
1559                 }
1560         }
1561
1562         /* We can only compute real stats if we found some non-null values. */
1563         if (nonnull_cnt > 0)
1564         {
1565                 int                     nmultiple,
1566                                         summultiple;
1567
1568                 stats->stats_valid = true;
1569                 /* Do the simple null-frac and width stats */
1570                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
1571                 if (is_varwidth)
1572                         stats->stawidth = total_width / (double) nonnull_cnt;
1573                 else
1574                         stats->stawidth = stats->attrtype->typlen;
1575
1576                 /* Count the number of values we found multiple times */
1577                 summultiple = 0;
1578                 for (nmultiple = 0; nmultiple < track_cnt; nmultiple++)
1579                 {
1580                         if (track[nmultiple].count == 1)
1581                                 break;
1582                         summultiple += track[nmultiple].count;
1583                 }
1584
1585                 if (nmultiple == 0)
1586                 {
1587                         /* If we found no repeated values, assume it's a unique column */
1588                         stats->stadistinct = -1.0;
1589                 }
1590                 else if (track_cnt < track_max && toowide_cnt == 0 &&
1591                                  nmultiple == track_cnt)
1592                 {
1593                         /*
1594                          * Our track list includes every value in the sample, and
1595                          * every value appeared more than once.  Assume the column has
1596                          * just these values.
1597                          */
1598                         stats->stadistinct = track_cnt;
1599                 }
1600                 else
1601                 {
1602                         /*----------
1603                          * Estimate the number of distinct values using the estimator
1604                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
1605                          *              n*d / (n - f1 + f1*n/N)
1606                          * where f1 is the number of distinct values that occurred
1607                          * exactly once in our sample of n rows (from a total of N),
1608                          * and d is the total number of distinct values in the sample.
1609                          * This is their Duj1 estimator; the other estimators they
1610                          * recommend are considerably more complex, and are numerically
1611                          * very unstable when n is much smaller than N.
1612                          *
1613                          * We assume (not very reliably!) that all the multiply-occurring
1614                          * values are reflected in the final track[] list, and the other
1615                          * nonnull values all appeared but once.  (XXX this usually
1616                          * results in a drastic overestimate of ndistinct.      Can we do
1617                          * any better?)
1618                          *----------
1619                          */
1620                         int                     f1 = nonnull_cnt - summultiple;
1621                         int                     d = f1 + nmultiple;
1622                         double          numer,
1623                                                 denom,
1624                                                 stadistinct;
1625
1626                         numer = (double) samplerows *(double) d;
1627
1628                         denom = (double) (samplerows - f1) +
1629                                 (double) f1 *(double) samplerows / totalrows;
1630
1631                         stadistinct = numer / denom;
1632                         /* Clamp to sane range in case of roundoff error */
1633                         if (stadistinct < (double) d)
1634                                 stadistinct = (double) d;
1635                         if (stadistinct > totalrows)
1636                                 stadistinct = totalrows;
1637                         stats->stadistinct = floor(stadistinct + 0.5);
1638                 }
1639
1640                 /*
1641                  * If we estimated the number of distinct values at more than 10%
1642                  * of the total row count (a very arbitrary limit), then assume
1643                  * that stadistinct should scale with the row count rather than be
1644                  * a fixed value.
1645                  */
1646                 if (stats->stadistinct > 0.1 * totalrows)
1647                         stats->stadistinct = -(stats->stadistinct / totalrows);
1648
1649                 /*
1650                  * Decide how many values are worth storing as most-common values.
1651                  * If we are able to generate a complete MCV list (all the values
1652                  * in the sample will fit, and we think these are all the ones in
1653                  * the table), then do so.      Otherwise, store only those values
1654                  * that are significantly more common than the (estimated)
1655                  * average. We set the threshold rather arbitrarily at 25% more
1656                  * than average, with at least 2 instances in the sample.
1657                  */
1658                 if (track_cnt < track_max && toowide_cnt == 0 &&
1659                         stats->stadistinct > 0 &&
1660                         track_cnt <= num_mcv)
1661                 {
1662                         /* Track list includes all values seen, and all will fit */
1663                         num_mcv = track_cnt;
1664                 }
1665                 else
1666                 {
1667                         double          ndistinct = stats->stadistinct;
1668                         double          avgcount,
1669                                                 mincount;
1670
1671                         if (ndistinct < 0)
1672                                 ndistinct = -ndistinct * totalrows;
1673                         /* estimate # of occurrences in sample of a typical value */
1674                         avgcount = (double) samplerows / ndistinct;
1675                         /* set minimum threshold count to store a value */
1676                         mincount = avgcount * 1.25;
1677                         if (mincount < 2)
1678                                 mincount = 2;
1679                         if (num_mcv > track_cnt)
1680                                 num_mcv = track_cnt;
1681                         for (i = 0; i < num_mcv; i++)
1682                         {
1683                                 if (track[i].count < mincount)
1684                                 {
1685                                         num_mcv = i;
1686                                         break;
1687                                 }
1688                         }
1689                 }
1690
1691                 /* Generate MCV slot entry */
1692                 if (num_mcv > 0)
1693                 {
1694                         MemoryContext old_context;
1695                         Datum      *mcv_values;
1696                         float4     *mcv_freqs;
1697
1698                         /* Must copy the target values into anl_context */
1699                         old_context = MemoryContextSwitchTo(stats->anl_context);
1700                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
1701                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
1702                         for (i = 0; i < num_mcv; i++)
1703                         {
1704                                 mcv_values[i] = datumCopy(track[i].value,
1705                                                                                   stats->attr->attbyval,
1706                                                                                   stats->attr->attlen);
1707                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
1708                         }
1709                         MemoryContextSwitchTo(old_context);
1710
1711                         stats->stakind[0] = STATISTIC_KIND_MCV;
1712                         stats->staop[0] = mystats->eqopr;
1713                         stats->stanumbers[0] = mcv_freqs;
1714                         stats->numnumbers[0] = num_mcv;
1715                         stats->stavalues[0] = mcv_values;
1716                         stats->numvalues[0] = num_mcv;
1717                 }
1718         }
1719         else if (null_cnt > 0)
1720         {
1721                 /* We found only nulls; assume the column is entirely null */
1722                 stats->stats_valid = true;
1723                 stats->stanullfrac = 1.0;
1724                 if (is_varwidth)
1725                         stats->stawidth = 0;                            /* "unknown" */
1726                 else
1727                         stats->stawidth = stats->attrtype->typlen;
1728                 stats->stadistinct = 0.0;                               /* "unknown" */
1729         }
1730
1731         /* We don't need to bother cleaning up any of our temporary palloc's */
1732 }
1733
1734
1735 /*
1736  *      compute_scalar_stats() -- compute column statistics
1737  *
1738  *      We use this when we can find "=" and "<" operators for the datatype.
1739  *
1740  *      We determine the fraction of non-null rows, the average width, the
1741  *      most common values, the (estimated) number of distinct values, the
1742  *      distribution histogram, and the correlation of physical to logical order.
1743  *
1744  *      The desired stats can be determined fairly easily after sorting the
1745  *      data values into order.
1746  */
1747 static void
1748 compute_scalar_stats(VacAttrStatsP stats,
1749                                          AnalyzeAttrFetchFunc fetchfunc,
1750                                          int samplerows,
1751                                          double totalrows)
1752 {
1753         int                     i;
1754         int                     null_cnt = 0;
1755         int                     nonnull_cnt = 0;
1756         int                     toowide_cnt = 0;
1757         double          total_width = 0;
1758         bool            is_varlena = (!stats->attr->attbyval &&
1759                                                           stats->attr->attlen == -1);
1760         bool            is_varwidth = (!stats->attr->attbyval &&
1761                                                            stats->attr->attlen < 0);
1762         double          corr_xysum;
1763         RegProcedure cmpFn;
1764         SortFunctionKind cmpFnKind;
1765         FmgrInfo        f_cmpfn;
1766         ScalarItem *values;
1767         int                     values_cnt = 0;
1768         int                *tupnoLink;
1769         ScalarMCVItem *track;
1770         int                     track_cnt = 0;
1771         int                     num_mcv = stats->attr->attstattarget;
1772         int                     num_bins = stats->attr->attstattarget;
1773         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1774
1775         values = (ScalarItem *) palloc(samplerows * sizeof(ScalarItem));
1776         tupnoLink = (int *) palloc(samplerows * sizeof(int));
1777         track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));
1778
1779         SelectSortFunction(mystats->ltopr, &cmpFn, &cmpFnKind);
1780         fmgr_info(cmpFn, &f_cmpfn);
1781
1782         /* Initial scan to find sortable values */
1783         for (i = 0; i < samplerows; i++)
1784         {
1785                 Datum           value;
1786                 bool            isnull;
1787
1788                 vacuum_delay_point();
1789
1790                 value = fetchfunc(stats, i, &isnull);
1791
1792                 /* Check for null/nonnull */
1793                 if (isnull)
1794                 {
1795                         null_cnt++;
1796                         continue;
1797                 }
1798                 nonnull_cnt++;
1799
1800                 /*
1801                  * If it's a variable-width field, add up widths for average width
1802                  * calculation.  Note that if the value is toasted, we use the
1803                  * toasted width.  We don't bother with this calculation if it's a
1804                  * fixed-width type.
1805                  */
1806                 if (is_varlena)
1807                 {
1808                         total_width += VARSIZE(DatumGetPointer(value));
1809
1810                         /*
1811                          * If the value is toasted, we want to detoast it just once to
1812                          * avoid repeated detoastings and resultant excess memory
1813                          * usage during the comparisons.  Also, check to see if the
1814                          * value is excessively wide, and if so don't detoast at all
1815                          * --- just ignore the value.
1816                          */
1817                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
1818                         {
1819                                 toowide_cnt++;
1820                                 continue;
1821                         }
1822                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
1823                 }
1824                 else if (is_varwidth)
1825                 {
1826                         /* must be cstring */
1827                         total_width += strlen(DatumGetCString(value)) + 1;
1828                 }
1829
1830                 /* Add it to the list to be sorted */
1831                 values[values_cnt].value = value;
1832                 values[values_cnt].tupno = values_cnt;
1833                 tupnoLink[values_cnt] = values_cnt;
1834                 values_cnt++;
1835         }
1836
1837         /* We can only compute real stats if we found some sortable values. */
1838         if (values_cnt > 0)
1839         {
1840                 int                     ndistinct,      /* # distinct values in sample */
1841                                         nmultiple,      /* # that appear multiple times */
1842                                         num_hist,
1843                                         dups_cnt;
1844                 int                     slot_idx = 0;
1845
1846                 /* Sort the collected values */
1847                 datumCmpFn = &f_cmpfn;
1848                 datumCmpFnKind = cmpFnKind;
1849                 datumCmpTupnoLink = tupnoLink;
1850                 qsort((void *) values, values_cnt,
1851                           sizeof(ScalarItem), compare_scalars);
1852
1853                 /*
1854                  * Now scan the values in order, find the most common ones, and
1855                  * also accumulate ordering-correlation statistics.
1856                  *
1857                  * To determine which are most common, we first have to count the
1858                  * number of duplicates of each value.  The duplicates are
1859                  * adjacent in the sorted list, so a brute-force approach is to
1860                  * compare successive datum values until we find two that are not
1861                  * equal. However, that requires N-1 invocations of the datum
1862                  * comparison routine, which are completely redundant with work
1863                  * that was done during the sort.  (The sort algorithm must at
1864                  * some point have compared each pair of items that are adjacent
1865                  * in the sorted order; otherwise it could not know that it's
1866                  * ordered the pair correctly.) We exploit this by having
1867                  * compare_scalars remember the highest tupno index that each
1868                  * ScalarItem has been found equal to.  At the end of the sort, a
1869                  * ScalarItem's tupnoLink will still point to itself if and only
1870                  * if it is the last item of its group of duplicates (since the
1871                  * group will be ordered by tupno).
1872                  */
1873                 corr_xysum = 0;
1874                 ndistinct = 0;
1875                 nmultiple = 0;
1876                 dups_cnt = 0;
1877                 for (i = 0; i < values_cnt; i++)
1878                 {
1879                         int                     tupno = values[i].tupno;
1880
1881                         corr_xysum += ((double) i) * ((double) tupno);
1882                         dups_cnt++;
1883                         if (tupnoLink[tupno] == tupno)
1884                         {
1885                                 /* Reached end of duplicates of this value */
1886                                 ndistinct++;
1887                                 if (dups_cnt > 1)
1888                                 {
1889                                         nmultiple++;
1890                                         if (track_cnt < num_mcv ||
1891                                                 dups_cnt > track[track_cnt - 1].count)
1892                                         {
1893                                                 /*
1894                                                  * Found a new item for the mcv list; find its
1895                                                  * position, bubbling down old items if needed.
1896                                                  * Loop invariant is that j points at an empty/
1897                                                  * replaceable slot.
1898                                                  */
1899                                                 int                     j;
1900
1901                                                 if (track_cnt < num_mcv)
1902                                                         track_cnt++;
1903                                                 for (j = track_cnt - 1; j > 0; j--)
1904                                                 {
1905                                                         if (dups_cnt <= track[j - 1].count)
1906                                                                 break;
1907                                                         track[j].count = track[j - 1].count;
1908                                                         track[j].first = track[j - 1].first;
1909                                                 }
1910                                                 track[j].count = dups_cnt;
1911                                                 track[j].first = i + 1 - dups_cnt;
1912                                         }
1913                                 }
1914                                 dups_cnt = 0;
1915                         }
1916                 }
1917
1918                 stats->stats_valid = true;
1919                 /* Do the simple null-frac and width stats */
1920                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
1921                 if (is_varwidth)
1922                         stats->stawidth = total_width / (double) nonnull_cnt;
1923                 else
1924                         stats->stawidth = stats->attrtype->typlen;
1925
1926                 if (nmultiple == 0)
1927                 {
1928                         /* If we found no repeated values, assume it's a unique column */
1929                         stats->stadistinct = -1.0;
1930                 }
1931                 else if (toowide_cnt == 0 && nmultiple == ndistinct)
1932                 {
1933                         /*
1934                          * Every value in the sample appeared more than once.  Assume
1935                          * the column has just these values.
1936                          */
1937                         stats->stadistinct = ndistinct;
1938                 }
1939                 else
1940                 {
1941                         /*----------
1942                          * Estimate the number of distinct values using the estimator
1943                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
1944                          *              n*d / (n - f1 + f1*n/N)
1945                          * where f1 is the number of distinct values that occurred
1946                          * exactly once in our sample of n rows (from a total of N),
1947                          * and d is the total number of distinct values in the sample.
1948                          * This is their Duj1 estimator; the other estimators they
1949                          * recommend are considerably more complex, and are numerically
1950                          * very unstable when n is much smaller than N.
1951                          *
1952                          * Overwidth values are assumed to have been distinct.
1953                          *----------
1954                          */
1955                         int                     f1 = ndistinct - nmultiple + toowide_cnt;
1956                         int                     d = f1 + nmultiple;
1957                         double          numer,
1958                                                 denom,
1959                                                 stadistinct;
1960
1961                         numer = (double) samplerows *(double) d;
1962
1963                         denom = (double) (samplerows - f1) +
1964                                 (double) f1 *(double) samplerows / totalrows;
1965
1966                         stadistinct = numer / denom;
1967                         /* Clamp to sane range in case of roundoff error */
1968                         if (stadistinct < (double) d)
1969                                 stadistinct = (double) d;
1970                         if (stadistinct > totalrows)
1971                                 stadistinct = totalrows;
1972                         stats->stadistinct = floor(stadistinct + 0.5);
1973                 }
1974
1975                 /*
1976                  * If we estimated the number of distinct values at more than 10%
1977                  * of the total row count (a very arbitrary limit), then assume
1978                  * that stadistinct should scale with the row count rather than be
1979                  * a fixed value.
1980                  */
1981                 if (stats->stadistinct > 0.1 * totalrows)
1982                         stats->stadistinct = -(stats->stadistinct / totalrows);
1983
1984                 /*
1985                  * Decide how many values are worth storing as most-common values.
1986                  * If we are able to generate a complete MCV list (all the values
1987                  * in the sample will fit, and we think these are all the ones in
1988                  * the table), then do so.      Otherwise, store only those values
1989                  * that are significantly more common than the (estimated)
1990                  * average. We set the threshold rather arbitrarily at 25% more
1991                  * than average, with at least 2 instances in the sample.  Also,
1992                  * we won't suppress values that have a frequency of at least 1/K
1993                  * where K is the intended number of histogram bins; such values
1994                  * might otherwise cause us to emit duplicate histogram bin
1995                  * boundaries.
1996                  */
1997                 if (track_cnt == ndistinct && toowide_cnt == 0 &&
1998                         stats->stadistinct > 0 &&
1999                         track_cnt <= num_mcv)
2000                 {
2001                         /* Track list includes all values seen, and all will fit */
2002                         num_mcv = track_cnt;
2003                 }
2004                 else
2005                 {
2006                         double          ndistinct = stats->stadistinct;
2007                         double          avgcount,
2008                                                 mincount,
2009                                                 maxmincount;
2010
2011                         if (ndistinct < 0)
2012                                 ndistinct = -ndistinct * totalrows;
2013                         /* estimate # of occurrences in sample of a typical value */
2014                         avgcount = (double) samplerows / ndistinct;
2015                         /* set minimum threshold count to store a value */
2016                         mincount = avgcount * 1.25;
2017                         if (mincount < 2)
2018                                 mincount = 2;
2019                         /* don't let threshold exceed 1/K, however */
2020                         maxmincount = (double) samplerows / (double) num_bins;
2021                         if (mincount > maxmincount)
2022                                 mincount = maxmincount;
2023                         if (num_mcv > track_cnt)
2024                                 num_mcv = track_cnt;
2025                         for (i = 0; i < num_mcv; i++)
2026                         {
2027                                 if (track[i].count < mincount)
2028                                 {
2029                                         num_mcv = i;
2030                                         break;
2031                                 }
2032                         }
2033                 }
2034
2035                 /* Generate MCV slot entry */
2036                 if (num_mcv > 0)
2037                 {
2038                         MemoryContext old_context;
2039                         Datum      *mcv_values;
2040                         float4     *mcv_freqs;
2041
2042                         /* Must copy the target values into anl_context */
2043                         old_context = MemoryContextSwitchTo(stats->anl_context);
2044                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2045                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2046                         for (i = 0; i < num_mcv; i++)
2047                         {
2048                                 mcv_values[i] = datumCopy(values[track[i].first].value,
2049                                                                                   stats->attr->attbyval,
2050                                                                                   stats->attr->attlen);
2051                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2052                         }
2053                         MemoryContextSwitchTo(old_context);
2054
2055                         stats->stakind[slot_idx] = STATISTIC_KIND_MCV;
2056                         stats->staop[slot_idx] = mystats->eqopr;
2057                         stats->stanumbers[slot_idx] = mcv_freqs;
2058                         stats->numnumbers[slot_idx] = num_mcv;
2059                         stats->stavalues[slot_idx] = mcv_values;
2060                         stats->numvalues[slot_idx] = num_mcv;
2061                         slot_idx++;
2062                 }
2063
2064                 /*
2065                  * Generate a histogram slot entry if there are at least two
2066                  * distinct values not accounted for in the MCV list.  (This
2067                  * ensures the histogram won't collapse to empty or a singleton.)
2068                  */
2069                 num_hist = ndistinct - num_mcv;
2070                 if (num_hist > num_bins)
2071                         num_hist = num_bins + 1;
2072                 if (num_hist >= 2)
2073                 {
2074                         MemoryContext old_context;
2075                         Datum      *hist_values;
2076                         int                     nvals;
2077
2078                         /* Sort the MCV items into position order to speed next loop */
2079                         qsort((void *) track, num_mcv,
2080                                   sizeof(ScalarMCVItem), compare_mcvs);
2081
2082                         /*
2083                          * Collapse out the MCV items from the values[] array.
2084                          *
2085                          * Note we destroy the values[] array here... but we don't need
2086                          * it for anything more.  We do, however, still need
2087                          * values_cnt. nvals will be the number of remaining entries
2088                          * in values[].
2089                          */
2090                         if (num_mcv > 0)
2091                         {
2092                                 int                     src,
2093                                                         dest;
2094                                 int                     j;
2095
2096                                 src = dest = 0;
2097                                 j = 0;                  /* index of next interesting MCV item */
2098                                 while (src < values_cnt)
2099                                 {
2100                                         int                     ncopy;
2101
2102                                         if (j < num_mcv)
2103                                         {
2104                                                 int                     first = track[j].first;
2105
2106                                                 if (src >= first)
2107                                                 {
2108                                                         /* advance past this MCV item */
2109                                                         src = first + track[j].count;
2110                                                         j++;
2111                                                         continue;
2112                                                 }
2113                                                 ncopy = first - src;
2114                                         }
2115                                         else
2116                                                 ncopy = values_cnt - src;
2117                                         memmove(&values[dest], &values[src],
2118                                                         ncopy * sizeof(ScalarItem));
2119                                         src += ncopy;
2120                                         dest += ncopy;
2121                                 }
2122                                 nvals = dest;
2123                         }
2124                         else
2125                                 nvals = values_cnt;
2126                         Assert(nvals >= num_hist);
2127
2128                         /* Must copy the target values into anl_context */
2129                         old_context = MemoryContextSwitchTo(stats->anl_context);
2130                         hist_values = (Datum *) palloc(num_hist * sizeof(Datum));
2131                         for (i = 0; i < num_hist; i++)
2132                         {
2133                                 int                     pos;
2134
2135                                 pos = (i * (nvals - 1)) / (num_hist - 1);
2136                                 hist_values[i] = datumCopy(values[pos].value,
2137                                                                                    stats->attr->attbyval,
2138                                                                                    stats->attr->attlen);
2139                         }
2140                         MemoryContextSwitchTo(old_context);
2141
2142                         stats->stakind[slot_idx] = STATISTIC_KIND_HISTOGRAM;
2143                         stats->staop[slot_idx] = mystats->ltopr;
2144                         stats->stavalues[slot_idx] = hist_values;
2145                         stats->numvalues[slot_idx] = num_hist;
2146                         slot_idx++;
2147                 }
2148
2149                 /* Generate a correlation entry if there are multiple values */
2150                 if (values_cnt > 1)
2151                 {
2152                         MemoryContext old_context;
2153                         float4     *corrs;
2154                         double          corr_xsum,
2155                                                 corr_x2sum;
2156
2157                         /* Must copy the target values into anl_context */
2158                         old_context = MemoryContextSwitchTo(stats->anl_context);
2159                         corrs = (float4 *) palloc(sizeof(float4));
2160                         MemoryContextSwitchTo(old_context);
2161
2162                         /*----------
2163                          * Since we know the x and y value sets are both
2164                          *              0, 1, ..., values_cnt-1
2165                          * we have sum(x) = sum(y) =
2166                          *              (values_cnt-1)*values_cnt / 2
2167                          * and sum(x^2) = sum(y^2) =
2168                          *              (values_cnt-1)*values_cnt*(2*values_cnt-1) / 6.
2169                          *----------
2170                          */
2171                         corr_xsum = ((double) (values_cnt - 1)) *
2172                                 ((double) values_cnt) / 2.0;
2173                         corr_x2sum = ((double) (values_cnt - 1)) *
2174                                 ((double) values_cnt) * (double) (2 * values_cnt - 1) / 6.0;
2175
2176                         /* And the correlation coefficient reduces to */
2177                         corrs[0] = (values_cnt * corr_xysum - corr_xsum * corr_xsum) /
2178                                 (values_cnt * corr_x2sum - corr_xsum * corr_xsum);
2179
2180                         stats->stakind[slot_idx] = STATISTIC_KIND_CORRELATION;
2181                         stats->staop[slot_idx] = mystats->ltopr;
2182                         stats->stanumbers[slot_idx] = corrs;
2183                         stats->numnumbers[slot_idx] = 1;
2184                         slot_idx++;
2185                 }
2186         }
2187         else if (nonnull_cnt == 0 && null_cnt > 0)
2188         {
2189                 /* We found only nulls; assume the column is entirely null */
2190                 stats->stats_valid = true;
2191                 stats->stanullfrac = 1.0;
2192                 if (is_varwidth)
2193                         stats->stawidth = 0;                            /* "unknown" */
2194                 else
2195                         stats->stawidth = stats->attrtype->typlen;
2196                 stats->stadistinct = 0.0;                               /* "unknown" */
2197         }
2198
2199         /* We don't need to bother cleaning up any of our temporary palloc's */
2200 }
2201
2202 /*
2203  * qsort comparator for sorting ScalarItems
2204  *
2205  * Aside from sorting the items, we update the datumCmpTupnoLink[] array
2206  * whenever two ScalarItems are found to contain equal datums.  The array
2207  * is indexed by tupno; for each ScalarItem, it contains the highest
2208  * tupno that that item's datum has been found to be equal to.  This allows
2209  * us to avoid additional comparisons in compute_scalar_stats().
2210  */
2211 static int
2212 compare_scalars(const void *a, const void *b)
2213 {
2214         Datum           da = ((ScalarItem *) a)->value;
2215         int                     ta = ((ScalarItem *) a)->tupno;
2216         Datum           db = ((ScalarItem *) b)->value;
2217         int                     tb = ((ScalarItem *) b)->tupno;
2218         int32           compare;
2219
2220         compare = ApplySortFunction(datumCmpFn, datumCmpFnKind,
2221                                                                 da, false, db, false);
2222         if (compare != 0)
2223                 return compare;
2224
2225         /*
2226          * The two datums are equal, so update datumCmpTupnoLink[].
2227          */
2228         if (datumCmpTupnoLink[ta] < tb)
2229                 datumCmpTupnoLink[ta] = tb;
2230         if (datumCmpTupnoLink[tb] < ta)
2231                 datumCmpTupnoLink[tb] = ta;
2232
2233         /*
2234          * For equal datums, sort by tupno
2235          */
2236         return ta - tb;
2237 }
2238
2239 /*
2240  * qsort comparator for sorting ScalarMCVItems by position
2241  */
2242 static int
2243 compare_mcvs(const void *a, const void *b)
2244 {
2245         int                     da = ((ScalarMCVItem *) a)->first;
2246         int                     db = ((ScalarMCVItem *) b)->first;
2247
2248         return da - db;
2249 }