]> granicus.if.org Git - postgresql/blob - src/backend/commands/analyze.c
Teach ANALYZE to clear pg_class.relhassubclass when appropriate.
[postgresql] / src / backend / commands / analyze.c
1 /*-------------------------------------------------------------------------
2  *
3  * analyze.c
4  *        the Postgres statistics generator
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/analyze.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <math.h>
18
19 #include "access/transam.h"
20 #include "access/tupconvert.h"
21 #include "access/tuptoaster.h"
22 #include "access/xact.h"
23 #include "catalog/index.h"
24 #include "catalog/indexing.h"
25 #include "catalog/pg_collation.h"
26 #include "catalog/pg_inherits_fn.h"
27 #include "catalog/pg_namespace.h"
28 #include "commands/dbcommands.h"
29 #include "commands/tablecmds.h"
30 #include "commands/vacuum.h"
31 #include "executor/executor.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_oper.h"
35 #include "parser/parse_relation.h"
36 #include "pgstat.h"
37 #include "postmaster/autovacuum.h"
38 #include "storage/bufmgr.h"
39 #include "storage/lmgr.h"
40 #include "storage/procarray.h"
41 #include "utils/acl.h"
42 #include "utils/attoptcache.h"
43 #include "utils/datum.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/pg_rusage.h"
47 #include "utils/syscache.h"
48 #include "utils/tuplesort.h"
49 #include "utils/tqual.h"
50
51
52 /* Data structure for Algorithm S from Knuth 3.4.2 */
53 typedef struct
54 {
55         BlockNumber N;                          /* number of blocks, known in advance */
56         int                     n;                              /* desired sample size */
57         BlockNumber t;                          /* current block number */
58         int                     m;                              /* blocks selected so far */
59 } BlockSamplerData;
60
61 typedef BlockSamplerData *BlockSampler;
62
63 /* Per-index data for ANALYZE */
64 typedef struct AnlIndexData
65 {
66         IndexInfo  *indexInfo;          /* BuildIndexInfo result */
67         double          tupleFract;             /* fraction of rows for partial index */
68         VacAttrStats **vacattrstats;    /* index attrs to analyze */
69         int                     attr_cnt;
70 } AnlIndexData;
71
72
73 /* Default statistics target (GUC parameter) */
74 int                     default_statistics_target = 100;
75
76 /* A few variables that don't seem worth passing around as parameters */
77 static int      elevel = -1;
78
79 static MemoryContext anl_context = NULL;
80
81 static BufferAccessStrategy vac_strategy;
82
83
84 static void do_analyze_rel(Relation onerel, VacuumStmt *vacstmt, bool inh);
85 static void BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
86                                   int samplesize);
87 static bool BlockSampler_HasMore(BlockSampler bs);
88 static BlockNumber BlockSampler_Next(BlockSampler bs);
89 static void compute_index_stats(Relation onerel, double totalrows,
90                                         AnlIndexData *indexdata, int nindexes,
91                                         HeapTuple *rows, int numrows,
92                                         MemoryContext col_context);
93 static VacAttrStats *examine_attribute(Relation onerel, int attnum,
94                                   Node *index_expr);
95 static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
96                                         int targrows, double *totalrows, double *totaldeadrows);
97 static double random_fract(void);
98 static double init_selection_state(int n);
99 static double get_next_S(double t, int n, double *stateptr);
100 static int      compare_rows(const void *a, const void *b);
101 static int acquire_inherited_sample_rows(Relation onerel,
102                                                           HeapTuple *rows, int targrows,
103                                                           double *totalrows, double *totaldeadrows);
104 static void update_attstats(Oid relid, bool inh,
105                                 int natts, VacAttrStats **vacattrstats);
106 static Datum std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
107 static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
108
109 static bool std_typanalyze(VacAttrStats *stats);
110
111
112 /*
113  *      analyze_rel() -- analyze one relation
114  */
115 void
116 analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
117 {
118         Relation        onerel;
119
120         /* Set up static variables */
121         if (vacstmt->options & VACOPT_VERBOSE)
122                 elevel = INFO;
123         else
124                 elevel = DEBUG2;
125
126         vac_strategy = bstrategy;
127
128         /*
129          * Check for user-requested abort.
130          */
131         CHECK_FOR_INTERRUPTS();
132
133         /*
134          * Open the relation, getting ShareUpdateExclusiveLock to ensure that two
135          * ANALYZEs don't run on it concurrently.  (This also locks out a
136          * concurrent VACUUM, which doesn't matter much at the moment but might
137          * matter if we ever try to accumulate stats on dead tuples.) If the rel
138          * has been dropped since we last saw it, we don't need to process it.
139          */
140         if (!(vacstmt->options & VACOPT_NOWAIT))
141                 onerel = try_relation_open(relid, ShareUpdateExclusiveLock);
142         else if (ConditionalLockRelationOid(relid, ShareUpdateExclusiveLock))
143                 onerel = try_relation_open(relid, NoLock);
144         else
145         {
146                 onerel = NULL;
147                 if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
148                         ereport(LOG,
149                                         (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
150                                   errmsg("skipping analyze of \"%s\" --- lock not available",
151                                                  vacstmt->relation->relname)));
152         }
153         if (!onerel)
154                 return;
155
156         /*
157          * Check permissions --- this should match vacuum's check!
158          */
159         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
160                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
161         {
162                 /* No need for a WARNING if we already complained during VACUUM */
163                 if (!(vacstmt->options & VACOPT_VACUUM))
164                 {
165                         if (onerel->rd_rel->relisshared)
166                                 ereport(WARNING,
167                                  (errmsg("skipping \"%s\" --- only superuser can analyze it",
168                                                  RelationGetRelationName(onerel))));
169                         else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
170                                 ereport(WARNING,
171                                                 (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
172                                                                 RelationGetRelationName(onerel))));
173                         else
174                                 ereport(WARNING,
175                                                 (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
176                                                                 RelationGetRelationName(onerel))));
177                 }
178                 relation_close(onerel, ShareUpdateExclusiveLock);
179                 return;
180         }
181
182         /*
183          * Check that it's a plain table; we used to do this in get_rel_oids() but
184          * seems safer to check after we've locked the relation.
185          */
186         if (onerel->rd_rel->relkind != RELKIND_RELATION)
187         {
188                 /* No need for a WARNING if we already complained during VACUUM */
189                 if (!(vacstmt->options & VACOPT_VACUUM))
190                         ereport(WARNING,
191                                         (errmsg("skipping \"%s\" --- cannot analyze non-tables or special system tables",
192                                                         RelationGetRelationName(onerel))));
193                 relation_close(onerel, ShareUpdateExclusiveLock);
194                 return;
195         }
196
197         /*
198          * Silently ignore tables that are temp tables of other backends ---
199          * trying to analyze these is rather pointless, since their contents are
200          * probably not up-to-date on disk.  (We don't throw a warning here; it
201          * would just lead to chatter during a database-wide ANALYZE.)
202          */
203         if (RELATION_IS_OTHER_TEMP(onerel))
204         {
205                 relation_close(onerel, ShareUpdateExclusiveLock);
206                 return;
207         }
208
209         /*
210          * We can ANALYZE any table except pg_statistic. See update_attstats
211          */
212         if (RelationGetRelid(onerel) == StatisticRelationId)
213         {
214                 relation_close(onerel, ShareUpdateExclusiveLock);
215                 return;
216         }
217
218         /*
219          * OK, let's do it.  First let other backends know I'm in ANALYZE.
220          */
221         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
222         MyProc->vacuumFlags |= PROC_IN_ANALYZE;
223         LWLockRelease(ProcArrayLock);
224
225         /*
226          * Do the normal non-recursive ANALYZE.
227          */
228         do_analyze_rel(onerel, vacstmt, false);
229
230         /*
231          * If there are child tables, do recursive ANALYZE.
232          */
233         if (onerel->rd_rel->relhassubclass)
234                 do_analyze_rel(onerel, vacstmt, true);
235
236         /*
237          * Close source relation now, but keep lock so that no one deletes it
238          * before we commit.  (If someone did, they'd fail to clean up the entries
239          * we made in pg_statistic.  Also, releasing the lock before commit would
240          * expose us to concurrent-update failures in update_attstats.)
241          */
242         relation_close(onerel, NoLock);
243
244         /*
245          * Reset my PGPROC flag.  Note: we need this here, and not in vacuum_rel,
246          * because the vacuum flag is cleared by the end-of-xact code.
247          */
248         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
249         MyProc->vacuumFlags &= ~PROC_IN_ANALYZE;
250         LWLockRelease(ProcArrayLock);
251 }
252
253 /*
254  *      do_analyze_rel() -- analyze one relation, recursively or not
255  */
256 static void
257 do_analyze_rel(Relation onerel, VacuumStmt *vacstmt, bool inh)
258 {
259         int                     attr_cnt,
260                                 tcnt,
261                                 i,
262                                 ind;
263         Relation   *Irel;
264         int                     nindexes;
265         bool            hasindex;
266         VacAttrStats **vacattrstats;
267         AnlIndexData *indexdata;
268         int                     targrows,
269                                 numrows;
270         double          totalrows,
271                                 totaldeadrows;
272         HeapTuple  *rows;
273         PGRUsage        ru0;
274         TimestampTz starttime = 0;
275         MemoryContext caller_context;
276         Oid                     save_userid;
277         int                     save_sec_context;
278         int                     save_nestlevel;
279
280         if (inh)
281                 ereport(elevel,
282                                 (errmsg("analyzing \"%s.%s\" inheritance tree",
283                                                 get_namespace_name(RelationGetNamespace(onerel)),
284                                                 RelationGetRelationName(onerel))));
285         else
286                 ereport(elevel,
287                                 (errmsg("analyzing \"%s.%s\"",
288                                                 get_namespace_name(RelationGetNamespace(onerel)),
289                                                 RelationGetRelationName(onerel))));
290
291         /*
292          * Set up a working context so that we can easily free whatever junk gets
293          * created.
294          */
295         anl_context = AllocSetContextCreate(CurrentMemoryContext,
296                                                                                 "Analyze",
297                                                                                 ALLOCSET_DEFAULT_MINSIZE,
298                                                                                 ALLOCSET_DEFAULT_INITSIZE,
299                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
300         caller_context = MemoryContextSwitchTo(anl_context);
301
302         /*
303          * Switch to the table owner's userid, so that any index functions are run
304          * as that user.  Also lock down security-restricted operations and
305          * arrange to make GUC variable changes local to this command.
306          */
307         GetUserIdAndSecContext(&save_userid, &save_sec_context);
308         SetUserIdAndSecContext(onerel->rd_rel->relowner,
309                                                    save_sec_context | SECURITY_RESTRICTED_OPERATION);
310         save_nestlevel = NewGUCNestLevel();
311
312         /* measure elapsed time iff autovacuum logging requires it */
313         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
314         {
315                 pg_rusage_init(&ru0);
316                 if (Log_autovacuum_min_duration > 0)
317                         starttime = GetCurrentTimestamp();
318         }
319
320         /*
321          * Determine which columns to analyze
322          *
323          * Note that system attributes are never analyzed.
324          */
325         if (vacstmt->va_cols != NIL)
326         {
327                 ListCell   *le;
328
329                 vacattrstats = (VacAttrStats **) palloc(list_length(vacstmt->va_cols) *
330                                                                                                 sizeof(VacAttrStats *));
331                 tcnt = 0;
332                 foreach(le, vacstmt->va_cols)
333                 {
334                         char       *col = strVal(lfirst(le));
335
336                         i = attnameAttNum(onerel, col, false);
337                         if (i == InvalidAttrNumber)
338                                 ereport(ERROR,
339                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
340                                         errmsg("column \"%s\" of relation \"%s\" does not exist",
341                                                    col, RelationGetRelationName(onerel))));
342                         vacattrstats[tcnt] = examine_attribute(onerel, i, NULL);
343                         if (vacattrstats[tcnt] != NULL)
344                                 tcnt++;
345                 }
346                 attr_cnt = tcnt;
347         }
348         else
349         {
350                 attr_cnt = onerel->rd_att->natts;
351                 vacattrstats = (VacAttrStats **)
352                         palloc(attr_cnt * sizeof(VacAttrStats *));
353                 tcnt = 0;
354                 for (i = 1; i <= attr_cnt; i++)
355                 {
356                         vacattrstats[tcnt] = examine_attribute(onerel, i, NULL);
357                         if (vacattrstats[tcnt] != NULL)
358                                 tcnt++;
359                 }
360                 attr_cnt = tcnt;
361         }
362
363         /*
364          * Open all indexes of the relation, and see if there are any analyzable
365          * columns in the indexes.      We do not analyze index columns if there was
366          * an explicit column list in the ANALYZE command, however.  If we are
367          * doing a recursive scan, we don't want to touch the parent's indexes at
368          * all.
369          */
370         if (!inh)
371                 vac_open_indexes(onerel, AccessShareLock, &nindexes, &Irel);
372         else
373         {
374                 Irel = NULL;
375                 nindexes = 0;
376         }
377         hasindex = (nindexes > 0);
378         indexdata = NULL;
379         if (hasindex)
380         {
381                 indexdata = (AnlIndexData *) palloc0(nindexes * sizeof(AnlIndexData));
382                 for (ind = 0; ind < nindexes; ind++)
383                 {
384                         AnlIndexData *thisdata = &indexdata[ind];
385                         IndexInfo  *indexInfo;
386
387                         thisdata->indexInfo = indexInfo = BuildIndexInfo(Irel[ind]);
388                         thisdata->tupleFract = 1.0; /* fix later if partial */
389                         if (indexInfo->ii_Expressions != NIL && vacstmt->va_cols == NIL)
390                         {
391                                 ListCell   *indexpr_item = list_head(indexInfo->ii_Expressions);
392
393                                 thisdata->vacattrstats = (VacAttrStats **)
394                                         palloc(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
395                                 tcnt = 0;
396                                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
397                                 {
398                                         int                     keycol = indexInfo->ii_KeyAttrNumbers[i];
399
400                                         if (keycol == 0)
401                                         {
402                                                 /* Found an index expression */
403                                                 Node       *indexkey;
404
405                                                 if (indexpr_item == NULL)               /* shouldn't happen */
406                                                         elog(ERROR, "too few entries in indexprs list");
407                                                 indexkey = (Node *) lfirst(indexpr_item);
408                                                 indexpr_item = lnext(indexpr_item);
409                                                 thisdata->vacattrstats[tcnt] =
410                                                         examine_attribute(Irel[ind], i + 1, indexkey);
411                                                 if (thisdata->vacattrstats[tcnt] != NULL)
412                                                         tcnt++;
413                                         }
414                                 }
415                                 thisdata->attr_cnt = tcnt;
416                         }
417                 }
418         }
419
420         /*
421          * Determine how many rows we need to sample, using the worst case from
422          * all analyzable columns.      We use a lower bound of 100 rows to avoid
423          * possible overflow in Vitter's algorithm.  (Note: that will also be
424          * the target in the corner case where there are no analyzable columns.)
425          */
426         targrows = 100;
427         for (i = 0; i < attr_cnt; i++)
428         {
429                 if (targrows < vacattrstats[i]->minrows)
430                         targrows = vacattrstats[i]->minrows;
431         }
432         for (ind = 0; ind < nindexes; ind++)
433         {
434                 AnlIndexData *thisdata = &indexdata[ind];
435
436                 for (i = 0; i < thisdata->attr_cnt; i++)
437                 {
438                         if (targrows < thisdata->vacattrstats[i]->minrows)
439                                 targrows = thisdata->vacattrstats[i]->minrows;
440                 }
441         }
442
443         /*
444          * Acquire the sample rows
445          */
446         rows = (HeapTuple *) palloc(targrows * sizeof(HeapTuple));
447         if (inh)
448                 numrows = acquire_inherited_sample_rows(onerel, rows, targrows,
449                                                                                                 &totalrows, &totaldeadrows);
450         else
451                 numrows = acquire_sample_rows(onerel, rows, targrows,
452                                                                           &totalrows, &totaldeadrows);
453
454         /*
455          * Compute the statistics.      Temporary results during the calculations for
456          * each column are stored in a child context.  The calc routines are
457          * responsible to make sure that whatever they store into the VacAttrStats
458          * structure is allocated in anl_context.
459          */
460         if (numrows > 0)
461         {
462                 MemoryContext col_context,
463                                         old_context;
464
465                 col_context = AllocSetContextCreate(anl_context,
466                                                                                         "Analyze Column",
467                                                                                         ALLOCSET_DEFAULT_MINSIZE,
468                                                                                         ALLOCSET_DEFAULT_INITSIZE,
469                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
470                 old_context = MemoryContextSwitchTo(col_context);
471
472                 for (i = 0; i < attr_cnt; i++)
473                 {
474                         VacAttrStats *stats = vacattrstats[i];
475                         AttributeOpts *aopt =
476                         get_attribute_options(onerel->rd_id, stats->attr->attnum);
477
478                         stats->rows = rows;
479                         stats->tupDesc = onerel->rd_att;
480                         (*stats->compute_stats) (stats,
481                                                                          std_fetch_func,
482                                                                          numrows,
483                                                                          totalrows);
484
485                         /*
486                          * If the appropriate flavor of the n_distinct option is
487                          * specified, override with the corresponding value.
488                          */
489                         if (aopt != NULL)
490                         {
491                                 float8          n_distinct =
492                                 inh ? aopt->n_distinct_inherited : aopt->n_distinct;
493
494                                 if (n_distinct != 0.0)
495                                         stats->stadistinct = n_distinct;
496                         }
497
498                         MemoryContextResetAndDeleteChildren(col_context);
499                 }
500
501                 if (hasindex)
502                         compute_index_stats(onerel, totalrows,
503                                                                 indexdata, nindexes,
504                                                                 rows, numrows,
505                                                                 col_context);
506
507                 MemoryContextSwitchTo(old_context);
508                 MemoryContextDelete(col_context);
509
510                 /*
511                  * Emit the completed stats rows into pg_statistic, replacing any
512                  * previous statistics for the target columns.  (If there are stats in
513                  * pg_statistic for columns we didn't process, we leave them alone.)
514                  */
515                 update_attstats(RelationGetRelid(onerel), inh,
516                                                 attr_cnt, vacattrstats);
517
518                 for (ind = 0; ind < nindexes; ind++)
519                 {
520                         AnlIndexData *thisdata = &indexdata[ind];
521
522                         update_attstats(RelationGetRelid(Irel[ind]), false,
523                                                         thisdata->attr_cnt, thisdata->vacattrstats);
524                 }
525         }
526
527         /*
528          * Update pages/tuples stats in pg_class ... but not if we're doing
529          * inherited stats.
530          */
531         if (!inh)
532                 vac_update_relstats(onerel,
533                                                         RelationGetNumberOfBlocks(onerel),
534                                                         totalrows, hasindex, InvalidTransactionId);
535
536         /*
537          * Same for indexes. Vacuum always scans all indexes, so if we're part of
538          * VACUUM ANALYZE, don't overwrite the accurate count already inserted by
539          * VACUUM.
540          */
541         if (!inh && !(vacstmt->options & VACOPT_VACUUM))
542         {
543                 for (ind = 0; ind < nindexes; ind++)
544                 {
545                         AnlIndexData *thisdata = &indexdata[ind];
546                         double          totalindexrows;
547
548                         totalindexrows = ceil(thisdata->tupleFract * totalrows);
549                         vac_update_relstats(Irel[ind],
550                                                                 RelationGetNumberOfBlocks(Irel[ind]),
551                                                                 totalindexrows, false, InvalidTransactionId);
552                 }
553         }
554
555         /*
556          * Report ANALYZE to the stats collector, too.  However, if doing
557          * inherited stats we shouldn't report, because the stats collector only
558          * tracks per-table stats.
559          */
560         if (!inh)
561                 pgstat_report_analyze(onerel, totalrows, totaldeadrows);
562
563         /* If this isn't part of VACUUM ANALYZE, let index AMs do cleanup */
564         if (!(vacstmt->options & VACOPT_VACUUM))
565         {
566                 for (ind = 0; ind < nindexes; ind++)
567                 {
568                         IndexBulkDeleteResult *stats;
569                         IndexVacuumInfo ivinfo;
570
571                         ivinfo.index = Irel[ind];
572                         ivinfo.analyze_only = true;
573                         ivinfo.estimated_count = true;
574                         ivinfo.message_level = elevel;
575                         ivinfo.num_heap_tuples = onerel->rd_rel->reltuples;
576                         ivinfo.strategy = vac_strategy;
577
578                         stats = index_vacuum_cleanup(&ivinfo, NULL);
579
580                         if (stats)
581                                 pfree(stats);
582                 }
583         }
584
585         /* Done with indexes */
586         vac_close_indexes(nindexes, Irel, NoLock);
587
588         /* Log the action if appropriate */
589         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
590         {
591                 if (Log_autovacuum_min_duration == 0 ||
592                         TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
593                                                                            Log_autovacuum_min_duration))
594                         ereport(LOG,
595                                         (errmsg("automatic analyze of table \"%s.%s.%s\" system usage: %s",
596                                                         get_database_name(MyDatabaseId),
597                                                         get_namespace_name(RelationGetNamespace(onerel)),
598                                                         RelationGetRelationName(onerel),
599                                                         pg_rusage_show(&ru0))));
600         }
601
602         /* Roll back any GUC changes executed by index functions */
603         AtEOXact_GUC(false, save_nestlevel);
604
605         /* Restore userid and security context */
606         SetUserIdAndSecContext(save_userid, save_sec_context);
607
608         /* Restore current context and release memory */
609         MemoryContextSwitchTo(caller_context);
610         MemoryContextDelete(anl_context);
611         anl_context = NULL;
612 }
613
614 /*
615  * Compute statistics about indexes of a relation
616  */
617 static void
618 compute_index_stats(Relation onerel, double totalrows,
619                                         AnlIndexData *indexdata, int nindexes,
620                                         HeapTuple *rows, int numrows,
621                                         MemoryContext col_context)
622 {
623         MemoryContext ind_context,
624                                 old_context;
625         Datum           values[INDEX_MAX_KEYS];
626         bool            isnull[INDEX_MAX_KEYS];
627         int                     ind,
628                                 i;
629
630         ind_context = AllocSetContextCreate(anl_context,
631                                                                                 "Analyze Index",
632                                                                                 ALLOCSET_DEFAULT_MINSIZE,
633                                                                                 ALLOCSET_DEFAULT_INITSIZE,
634                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
635         old_context = MemoryContextSwitchTo(ind_context);
636
637         for (ind = 0; ind < nindexes; ind++)
638         {
639                 AnlIndexData *thisdata = &indexdata[ind];
640                 IndexInfo  *indexInfo = thisdata->indexInfo;
641                 int                     attr_cnt = thisdata->attr_cnt;
642                 TupleTableSlot *slot;
643                 EState     *estate;
644                 ExprContext *econtext;
645                 List       *predicate;
646                 Datum      *exprvals;
647                 bool       *exprnulls;
648                 int                     numindexrows,
649                                         tcnt,
650                                         rowno;
651                 double          totalindexrows;
652
653                 /* Ignore index if no columns to analyze and not partial */
654                 if (attr_cnt == 0 && indexInfo->ii_Predicate == NIL)
655                         continue;
656
657                 /*
658                  * Need an EState for evaluation of index expressions and
659                  * partial-index predicates.  Create it in the per-index context to be
660                  * sure it gets cleaned up at the bottom of the loop.
661                  */
662                 estate = CreateExecutorState();
663                 econtext = GetPerTupleExprContext(estate);
664                 /* Need a slot to hold the current heap tuple, too */
665                 slot = MakeSingleTupleTableSlot(RelationGetDescr(onerel));
666
667                 /* Arrange for econtext's scan tuple to be the tuple under test */
668                 econtext->ecxt_scantuple = slot;
669
670                 /* Set up execution state for predicate. */
671                 predicate = (List *)
672                         ExecPrepareExpr((Expr *) indexInfo->ii_Predicate,
673                                                         estate);
674
675                 /* Compute and save index expression values */
676                 exprvals = (Datum *) palloc(numrows * attr_cnt * sizeof(Datum));
677                 exprnulls = (bool *) palloc(numrows * attr_cnt * sizeof(bool));
678                 numindexrows = 0;
679                 tcnt = 0;
680                 for (rowno = 0; rowno < numrows; rowno++)
681                 {
682                         HeapTuple       heapTuple = rows[rowno];
683
684                         /*
685                          * Reset the per-tuple context each time, to reclaim any cruft
686                          * left behind by evaluating the predicate or index expressions.
687                          */
688                         ResetExprContext(econtext);
689
690                         /* Set up for predicate or expression evaluation */
691                         ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
692
693                         /* If index is partial, check predicate */
694                         if (predicate != NIL)
695                         {
696                                 if (!ExecQual(predicate, econtext, false))
697                                         continue;
698                         }
699                         numindexrows++;
700
701                         if (attr_cnt > 0)
702                         {
703                                 /*
704                                  * Evaluate the index row to compute expression values. We
705                                  * could do this by hand, but FormIndexDatum is convenient.
706                                  */
707                                 FormIndexDatum(indexInfo,
708                                                            slot,
709                                                            estate,
710                                                            values,
711                                                            isnull);
712
713                                 /*
714                                  * Save just the columns we care about.  We copy the values
715                                  * into ind_context from the estate's per-tuple context.
716                                  */
717                                 for (i = 0; i < attr_cnt; i++)
718                                 {
719                                         VacAttrStats *stats = thisdata->vacattrstats[i];
720                                         int                     attnum = stats->attr->attnum;
721
722                                         if (isnull[attnum - 1])
723                                         {
724                                                 exprvals[tcnt] = (Datum) 0;
725                                                 exprnulls[tcnt] = true;
726                                         }
727                                         else
728                                         {
729                                                 exprvals[tcnt] = datumCopy(values[attnum - 1],
730                                                                                                    stats->attrtype->typbyval,
731                                                                                                    stats->attrtype->typlen);
732                                                 exprnulls[tcnt] = false;
733                                         }
734                                         tcnt++;
735                                 }
736                         }
737                 }
738
739                 /*
740                  * Having counted the number of rows that pass the predicate in the
741                  * sample, we can estimate the total number of rows in the index.
742                  */
743                 thisdata->tupleFract = (double) numindexrows / (double) numrows;
744                 totalindexrows = ceil(thisdata->tupleFract * totalrows);
745
746                 /*
747                  * Now we can compute the statistics for the expression columns.
748                  */
749                 if (numindexrows > 0)
750                 {
751                         MemoryContextSwitchTo(col_context);
752                         for (i = 0; i < attr_cnt; i++)
753                         {
754                                 VacAttrStats *stats = thisdata->vacattrstats[i];
755                                 AttributeOpts *aopt =
756                                 get_attribute_options(stats->attr->attrelid,
757                                                                           stats->attr->attnum);
758
759                                 stats->exprvals = exprvals + i;
760                                 stats->exprnulls = exprnulls + i;
761                                 stats->rowstride = attr_cnt;
762                                 (*stats->compute_stats) (stats,
763                                                                                  ind_fetch_func,
764                                                                                  numindexrows,
765                                                                                  totalindexrows);
766
767                                 /*
768                                  * If the n_distinct option is specified, it overrides the
769                                  * above computation.  For indices, we always use just
770                                  * n_distinct, not n_distinct_inherited.
771                                  */
772                                 if (aopt != NULL && aopt->n_distinct != 0.0)
773                                         stats->stadistinct = aopt->n_distinct;
774
775                                 MemoryContextResetAndDeleteChildren(col_context);
776                         }
777                 }
778
779                 /* And clean up */
780                 MemoryContextSwitchTo(ind_context);
781
782                 ExecDropSingleTupleTableSlot(slot);
783                 FreeExecutorState(estate);
784                 MemoryContextResetAndDeleteChildren(ind_context);
785         }
786
787         MemoryContextSwitchTo(old_context);
788         MemoryContextDelete(ind_context);
789 }
790
791 /*
792  * examine_attribute -- pre-analysis of a single column
793  *
794  * Determine whether the column is analyzable; if so, create and initialize
795  * a VacAttrStats struct for it.  If not, return NULL.
796  *
797  * If index_expr isn't NULL, then we're trying to analyze an expression index,
798  * and index_expr is the expression tree representing the column's data.
799  */
800 static VacAttrStats *
801 examine_attribute(Relation onerel, int attnum, Node *index_expr)
802 {
803         Form_pg_attribute attr = onerel->rd_att->attrs[attnum - 1];
804         HeapTuple       typtuple;
805         VacAttrStats *stats;
806         int                     i;
807         bool            ok;
808
809         /* Never analyze dropped columns */
810         if (attr->attisdropped)
811                 return NULL;
812
813         /* Don't analyze column if user has specified not to */
814         if (attr->attstattarget == 0)
815                 return NULL;
816
817         /*
818          * Create the VacAttrStats struct.      Note that we only have a copy of the
819          * fixed fields of the pg_attribute tuple.
820          */
821         stats = (VacAttrStats *) palloc0(sizeof(VacAttrStats));
822         stats->attr = (Form_pg_attribute) palloc(ATTRIBUTE_FIXED_PART_SIZE);
823         memcpy(stats->attr, attr, ATTRIBUTE_FIXED_PART_SIZE);
824
825         /*
826          * When analyzing an expression index, believe the expression tree's type
827          * not the column datatype --- the latter might be the opckeytype storage
828          * type of the opclass, which is not interesting for our purposes.      (Note:
829          * if we did anything with non-expression index columns, we'd need to
830          * figure out where to get the correct type info from, but for now that's
831          * not a problem.)      It's not clear whether anyone will care about the
832          * typmod, but we store that too just in case.
833          */
834         if (index_expr)
835         {
836                 stats->attrtypid = exprType(index_expr);
837                 stats->attrtypmod = exprTypmod(index_expr);
838         }
839         else
840         {
841                 stats->attrtypid = attr->atttypid;
842                 stats->attrtypmod = attr->atttypmod;
843         }
844
845         typtuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(stats->attrtypid));
846         if (!HeapTupleIsValid(typtuple))
847                 elog(ERROR, "cache lookup failed for type %u", stats->attrtypid);
848         stats->attrtype = (Form_pg_type) palloc(sizeof(FormData_pg_type));
849         memcpy(stats->attrtype, GETSTRUCT(typtuple), sizeof(FormData_pg_type));
850         ReleaseSysCache(typtuple);
851         stats->anl_context = anl_context;
852         stats->tupattnum = attnum;
853
854         /*
855          * The fields describing the stats->stavalues[n] element types default to
856          * the type of the data being analyzed, but the type-specific typanalyze
857          * function can change them if it wants to store something else.
858          */
859         for (i = 0; i < STATISTIC_NUM_SLOTS; i++)
860         {
861                 stats->statypid[i] = stats->attrtypid;
862                 stats->statyplen[i] = stats->attrtype->typlen;
863                 stats->statypbyval[i] = stats->attrtype->typbyval;
864                 stats->statypalign[i] = stats->attrtype->typalign;
865         }
866
867         /*
868          * Call the type-specific typanalyze function.  If none is specified, use
869          * std_typanalyze().
870          */
871         if (OidIsValid(stats->attrtype->typanalyze))
872                 ok = DatumGetBool(OidFunctionCall1(stats->attrtype->typanalyze,
873                                                                                    PointerGetDatum(stats)));
874         else
875                 ok = std_typanalyze(stats);
876
877         if (!ok || stats->compute_stats == NULL || stats->minrows <= 0)
878         {
879                 pfree(stats->attrtype);
880                 pfree(stats->attr);
881                 pfree(stats);
882                 return NULL;
883         }
884
885         return stats;
886 }
887
888 /*
889  * BlockSampler_Init -- prepare for random sampling of blocknumbers
890  *
891  * BlockSampler is used for stage one of our new two-stage tuple
892  * sampling mechanism as discussed on pgsql-hackers 2004-04-02 (subject
893  * "Large DB").  It selects a random sample of samplesize blocks out of
894  * the nblocks blocks in the table.  If the table has less than
895  * samplesize blocks, all blocks are selected.
896  *
897  * Since we know the total number of blocks in advance, we can use the
898  * straightforward Algorithm S from Knuth 3.4.2, rather than Vitter's
899  * algorithm.
900  */
901 static void
902 BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize)
903 {
904         bs->N = nblocks;                        /* measured table size */
905
906         /*
907          * If we decide to reduce samplesize for tables that have less or not much
908          * more than samplesize blocks, here is the place to do it.
909          */
910         bs->n = samplesize;
911         bs->t = 0;                                      /* blocks scanned so far */
912         bs->m = 0;                                      /* blocks selected so far */
913 }
914
915 static bool
916 BlockSampler_HasMore(BlockSampler bs)
917 {
918         return (bs->t < bs->N) && (bs->m < bs->n);
919 }
920
921 static BlockNumber
922 BlockSampler_Next(BlockSampler bs)
923 {
924         BlockNumber K = bs->N - bs->t;          /* remaining blocks */
925         int                     k = bs->n - bs->m;              /* blocks still to sample */
926         double          p;                              /* probability to skip block */
927         double          V;                              /* random */
928
929         Assert(BlockSampler_HasMore(bs));       /* hence K > 0 and k > 0 */
930
931         if ((BlockNumber) k >= K)
932         {
933                 /* need all the rest */
934                 bs->m++;
935                 return bs->t++;
936         }
937
938         /*----------
939          * It is not obvious that this code matches Knuth's Algorithm S.
940          * Knuth says to skip the current block with probability 1 - k/K.
941          * If we are to skip, we should advance t (hence decrease K), and
942          * repeat the same probabilistic test for the next block.  The naive
943          * implementation thus requires a random_fract() call for each block
944          * number.      But we can reduce this to one random_fract() call per
945          * selected block, by noting that each time the while-test succeeds,
946          * we can reinterpret V as a uniform random number in the range 0 to p.
947          * Therefore, instead of choosing a new V, we just adjust p to be
948          * the appropriate fraction of its former value, and our next loop
949          * makes the appropriate probabilistic test.
950          *
951          * We have initially K > k > 0.  If the loop reduces K to equal k,
952          * the next while-test must fail since p will become exactly zero
953          * (we assume there will not be roundoff error in the division).
954          * (Note: Knuth suggests a "<=" loop condition, but we use "<" just
955          * to be doubly sure about roundoff error.)  Therefore K cannot become
956          * less than k, which means that we cannot fail to select enough blocks.
957          *----------
958          */
959         V = random_fract();
960         p = 1.0 - (double) k / (double) K;
961         while (V < p)
962         {
963                 /* skip */
964                 bs->t++;
965                 K--;                                    /* keep K == N - t */
966
967                 /* adjust p to be new cutoff point in reduced range */
968                 p *= 1.0 - (double) k / (double) K;
969         }
970
971         /* select */
972         bs->m++;
973         return bs->t++;
974 }
975
976 /*
977  * acquire_sample_rows -- acquire a random sample of rows from the table
978  *
979  * Selected rows are returned in the caller-allocated array rows[], which
980  * must have at least targrows entries.
981  * The actual number of rows selected is returned as the function result.
982  * We also estimate the total numbers of live and dead rows in the table,
983  * and return them into *totalrows and *totaldeadrows, respectively.
984  *
985  * The returned list of tuples is in order by physical position in the table.
986  * (We will rely on this later to derive correlation estimates.)
987  *
988  * As of May 2004 we use a new two-stage method:  Stage one selects up
989  * to targrows random blocks (or all blocks, if there aren't so many).
990  * Stage two scans these blocks and uses the Vitter algorithm to create
991  * a random sample of targrows rows (or less, if there are less in the
992  * sample of blocks).  The two stages are executed simultaneously: each
993  * block is processed as soon as stage one returns its number and while
994  * the rows are read stage two controls which ones are to be inserted
995  * into the sample.
996  *
997  * Although every row has an equal chance of ending up in the final
998  * sample, this sampling method is not perfect: not every possible
999  * sample has an equal chance of being selected.  For large relations
1000  * the number of different blocks represented by the sample tends to be
1001  * too small.  We can live with that for now.  Improvements are welcome.
1002  *
1003  * An important property of this sampling method is that because we do
1004  * look at a statistically unbiased set of blocks, we should get
1005  * unbiased estimates of the average numbers of live and dead rows per
1006  * block.  The previous sampling method put too much credence in the row
1007  * density near the start of the table.
1008  */
1009 static int
1010 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
1011                                         double *totalrows, double *totaldeadrows)
1012 {
1013         int                     numrows = 0;    /* # rows now in reservoir */
1014         double          samplerows = 0; /* total # rows collected */
1015         double          liverows = 0;   /* # live rows seen */
1016         double          deadrows = 0;   /* # dead rows seen */
1017         double          rowstoskip = -1;        /* -1 means not set yet */
1018         BlockNumber totalblocks;
1019         TransactionId OldestXmin;
1020         BlockSamplerData bs;
1021         double          rstate;
1022
1023         Assert(targrows > 0);
1024
1025         totalblocks = RelationGetNumberOfBlocks(onerel);
1026
1027         /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
1028         OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
1029
1030         /* Prepare for sampling block numbers */
1031         BlockSampler_Init(&bs, totalblocks, targrows);
1032         /* Prepare for sampling rows */
1033         rstate = init_selection_state(targrows);
1034
1035         /* Outer loop over blocks to sample */
1036         while (BlockSampler_HasMore(&bs))
1037         {
1038                 BlockNumber targblock = BlockSampler_Next(&bs);
1039                 Buffer          targbuffer;
1040                 Page            targpage;
1041                 OffsetNumber targoffset,
1042                                         maxoffset;
1043
1044                 vacuum_delay_point();
1045
1046                 /*
1047                  * We must maintain a pin on the target page's buffer to ensure that
1048                  * the maxoffset value stays good (else concurrent VACUUM might delete
1049                  * tuples out from under us).  Hence, pin the page until we are done
1050                  * looking at it.  We also choose to hold sharelock on the buffer
1051                  * throughout --- we could release and re-acquire sharelock for each
1052                  * tuple, but since we aren't doing much work per tuple, the extra
1053                  * lock traffic is probably better avoided.
1054                  */
1055                 targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
1056                                                                                 RBM_NORMAL, vac_strategy);
1057                 LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
1058                 targpage = BufferGetPage(targbuffer);
1059                 maxoffset = PageGetMaxOffsetNumber(targpage);
1060
1061                 /* Inner loop over all tuples on the selected page */
1062                 for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
1063                 {
1064                         ItemId          itemid;
1065                         HeapTupleData targtuple;
1066                         bool            sample_it = false;
1067
1068                         itemid = PageGetItemId(targpage, targoffset);
1069
1070                         /*
1071                          * We ignore unused and redirect line pointers.  DEAD line
1072                          * pointers should be counted as dead, because we need vacuum to
1073                          * run to get rid of them.      Note that this rule agrees with the
1074                          * way that heap_page_prune() counts things.
1075                          */
1076                         if (!ItemIdIsNormal(itemid))
1077                         {
1078                                 if (ItemIdIsDead(itemid))
1079                                         deadrows += 1;
1080                                 continue;
1081                         }
1082
1083                         ItemPointerSet(&targtuple.t_self, targblock, targoffset);
1084
1085                         targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
1086                         targtuple.t_len = ItemIdGetLength(itemid);
1087
1088                         switch (HeapTupleSatisfiesVacuum(targtuple.t_data,
1089                                                                                          OldestXmin,
1090                                                                                          targbuffer))
1091                         {
1092                                 case HEAPTUPLE_LIVE:
1093                                         sample_it = true;
1094                                         liverows += 1;
1095                                         break;
1096
1097                                 case HEAPTUPLE_DEAD:
1098                                 case HEAPTUPLE_RECENTLY_DEAD:
1099                                         /* Count dead and recently-dead rows */
1100                                         deadrows += 1;
1101                                         break;
1102
1103                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1104
1105                                         /*
1106                                          * Insert-in-progress rows are not counted.  We assume
1107                                          * that when the inserting transaction commits or aborts,
1108                                          * it will send a stats message to increment the proper
1109                                          * count.  This works right only if that transaction ends
1110                                          * after we finish analyzing the table; if things happen
1111                                          * in the other order, its stats update will be
1112                                          * overwritten by ours.  However, the error will be large
1113                                          * only if the other transaction runs long enough to
1114                                          * insert many tuples, so assuming it will finish after us
1115                                          * is the safer option.
1116                                          *
1117                                          * A special case is that the inserting transaction might
1118                                          * be our own.  In this case we should count and sample
1119                                          * the row, to accommodate users who load a table and
1120                                          * analyze it in one transaction.  (pgstat_report_analyze
1121                                          * has to adjust the numbers we send to the stats
1122                                          * collector to make this come out right.)
1123                                          */
1124                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
1125                                         {
1126                                                 sample_it = true;
1127                                                 liverows += 1;
1128                                         }
1129                                         break;
1130
1131                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1132
1133                                         /*
1134                                          * We count delete-in-progress rows as still live, using
1135                                          * the same reasoning given above; but we don't bother to
1136                                          * include them in the sample.
1137                                          *
1138                                          * If the delete was done by our own transaction, however,
1139                                          * we must count the row as dead to make
1140                                          * pgstat_report_analyze's stats adjustments come out
1141                                          * right.  (Note: this works out properly when the row was
1142                                          * both inserted and deleted in our xact.)
1143                                          */
1144                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data)))
1145                                                 deadrows += 1;
1146                                         else
1147                                                 liverows += 1;
1148                                         break;
1149
1150                                 default:
1151                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1152                                         break;
1153                         }
1154
1155                         if (sample_it)
1156                         {
1157                                 /*
1158                                  * The first targrows sample rows are simply copied into the
1159                                  * reservoir. Then we start replacing tuples in the sample
1160                                  * until we reach the end of the relation.      This algorithm is
1161                                  * from Jeff Vitter's paper (see full citation below). It
1162                                  * works by repeatedly computing the number of tuples to skip
1163                                  * before selecting a tuple, which replaces a randomly chosen
1164                                  * element of the reservoir (current set of tuples).  At all
1165                                  * times the reservoir is a true random sample of the tuples
1166                                  * we've passed over so far, so when we fall off the end of
1167                                  * the relation we're done.
1168                                  */
1169                                 if (numrows < targrows)
1170                                         rows[numrows++] = heap_copytuple(&targtuple);
1171                                 else
1172                                 {
1173                                         /*
1174                                          * t in Vitter's paper is the number of records already
1175                                          * processed.  If we need to compute a new S value, we
1176                                          * must use the not-yet-incremented value of samplerows as
1177                                          * t.
1178                                          */
1179                                         if (rowstoskip < 0)
1180                                                 rowstoskip = get_next_S(samplerows, targrows, &rstate);
1181
1182                                         if (rowstoskip <= 0)
1183                                         {
1184                                                 /*
1185                                                  * Found a suitable tuple, so save it, replacing one
1186                                                  * old tuple at random
1187                                                  */
1188                                                 int                     k = (int) (targrows * random_fract());
1189
1190                                                 Assert(k >= 0 && k < targrows);
1191                                                 heap_freetuple(rows[k]);
1192                                                 rows[k] = heap_copytuple(&targtuple);
1193                                         }
1194
1195                                         rowstoskip -= 1;
1196                                 }
1197
1198                                 samplerows += 1;
1199                         }
1200                 }
1201
1202                 /* Now release the lock and pin on the page */
1203                 UnlockReleaseBuffer(targbuffer);
1204         }
1205
1206         /*
1207          * If we didn't find as many tuples as we wanted then we're done. No sort
1208          * is needed, since they're already in order.
1209          *
1210          * Otherwise we need to sort the collected tuples by position
1211          * (itempointer). It's not worth worrying about corner cases where the
1212          * tuples are already sorted.
1213          */
1214         if (numrows == targrows)
1215                 qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
1216
1217         /*
1218          * Estimate total numbers of rows in relation.  For live rows, use
1219          * vac_estimate_reltuples; for dead rows, we have no source of old
1220          * information, so we have to assume the density is the same in unseen
1221          * pages as in the pages we scanned.
1222          */
1223         *totalrows = vac_estimate_reltuples(onerel, true,
1224                                                                                 totalblocks,
1225                                                                                 bs.m,
1226                                                                                 liverows);
1227         if (bs.m > 0)
1228                 *totaldeadrows = floor((deadrows / bs.m) * totalblocks + 0.5);
1229         else
1230                 *totaldeadrows = 0.0;
1231
1232         /*
1233          * Emit some interesting relation info
1234          */
1235         ereport(elevel,
1236                         (errmsg("\"%s\": scanned %d of %u pages, "
1237                                         "containing %.0f live rows and %.0f dead rows; "
1238                                         "%d rows in sample, %.0f estimated total rows",
1239                                         RelationGetRelationName(onerel),
1240                                         bs.m, totalblocks,
1241                                         liverows, deadrows,
1242                                         numrows, *totalrows)));
1243
1244         return numrows;
1245 }
1246
1247 /* Select a random value R uniformly distributed in (0 - 1) */
1248 static double
1249 random_fract(void)
1250 {
1251         return ((double) random() + 1) / ((double) MAX_RANDOM_VALUE + 2);
1252 }
1253
1254 /*
1255  * These two routines embody Algorithm Z from "Random sampling with a
1256  * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
1257  * (Mar. 1985), Pages 37-57.  Vitter describes his algorithm in terms
1258  * of the count S of records to skip before processing another record.
1259  * It is computed primarily based on t, the number of records already read.
1260  * The only extra state needed between calls is W, a random state variable.
1261  *
1262  * init_selection_state computes the initial W value.
1263  *
1264  * Given that we've already read t records (t >= n), get_next_S
1265  * determines the number of records to skip before the next record is
1266  * processed.
1267  */
1268 static double
1269 init_selection_state(int n)
1270 {
1271         /* Initial value of W (for use when Algorithm Z is first applied) */
1272         return exp(-log(random_fract()) / n);
1273 }
1274
1275 static double
1276 get_next_S(double t, int n, double *stateptr)
1277 {
1278         double          S;
1279
1280         /* The magic constant here is T from Vitter's paper */
1281         if (t <= (22.0 * n))
1282         {
1283                 /* Process records using Algorithm X until t is large enough */
1284                 double          V,
1285                                         quot;
1286
1287                 V = random_fract();             /* Generate V */
1288                 S = 0;
1289                 t += 1;
1290                 /* Note: "num" in Vitter's code is always equal to t - n */
1291                 quot = (t - (double) n) / t;
1292                 /* Find min S satisfying (4.1) */
1293                 while (quot > V)
1294                 {
1295                         S += 1;
1296                         t += 1;
1297                         quot *= (t - (double) n) / t;
1298                 }
1299         }
1300         else
1301         {
1302                 /* Now apply Algorithm Z */
1303                 double          W = *stateptr;
1304                 double          term = t - (double) n + 1;
1305
1306                 for (;;)
1307                 {
1308                         double          numer,
1309                                                 numer_lim,
1310                                                 denom;
1311                         double          U,
1312                                                 X,
1313                                                 lhs,
1314                                                 rhs,
1315                                                 y,
1316                                                 tmp;
1317
1318                         /* Generate U and X */
1319                         U = random_fract();
1320                         X = t * (W - 1.0);
1321                         S = floor(X);           /* S is tentatively set to floor(X) */
1322                         /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
1323                         tmp = (t + 1) / term;
1324                         lhs = exp(log(((U * tmp * tmp) * (term + S)) / (t + X)) / n);
1325                         rhs = (((t + X) / (term + S)) * term) / t;
1326                         if (lhs <= rhs)
1327                         {
1328                                 W = rhs / lhs;
1329                                 break;
1330                         }
1331                         /* Test if U <= f(S)/cg(X) */
1332                         y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X);
1333                         if ((double) n < S)
1334                         {
1335                                 denom = t;
1336                                 numer_lim = term + S;
1337                         }
1338                         else
1339                         {
1340                                 denom = t - (double) n + S;
1341                                 numer_lim = t + 1;
1342                         }
1343                         for (numer = t + S; numer >= numer_lim; numer -= 1)
1344                         {
1345                                 y *= numer / denom;
1346                                 denom -= 1;
1347                         }
1348                         W = exp(-log(random_fract()) / n);      /* Generate W in advance */
1349                         if (exp(log(y) / n) <= (t + X) / t)
1350                                 break;
1351                 }
1352                 *stateptr = W;
1353         }
1354         return S;
1355 }
1356
1357 /*
1358  * qsort comparator for sorting rows[] array
1359  */
1360 static int
1361 compare_rows(const void *a, const void *b)
1362 {
1363         HeapTuple       ha = *(HeapTuple *) a;
1364         HeapTuple       hb = *(HeapTuple *) b;
1365         BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self);
1366         OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self);
1367         BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self);
1368         OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self);
1369
1370         if (ba < bb)
1371                 return -1;
1372         if (ba > bb)
1373                 return 1;
1374         if (oa < ob)
1375                 return -1;
1376         if (oa > ob)
1377                 return 1;
1378         return 0;
1379 }
1380
1381
1382 /*
1383  * acquire_inherited_sample_rows -- acquire sample rows from inheritance tree
1384  *
1385  * This has the same API as acquire_sample_rows, except that rows are
1386  * collected from all inheritance children as well as the specified table.
1387  * We fail and return zero if there are no inheritance children.
1388  */
1389 static int
1390 acquire_inherited_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
1391                                                           double *totalrows, double *totaldeadrows)
1392 {
1393         List       *tableOIDs;
1394         Relation   *rels;
1395         double     *relblocks;
1396         double          totalblocks;
1397         int                     numrows,
1398                                 nrels,
1399                                 i;
1400         ListCell   *lc;
1401
1402         /*
1403          * Find all members of inheritance set.  We only need AccessShareLock on
1404          * the children.
1405          */
1406         tableOIDs =
1407                 find_all_inheritors(RelationGetRelid(onerel), AccessShareLock, NULL);
1408
1409         /*
1410          * Check that there's at least one descendant, else fail.  This could
1411          * happen despite analyze_rel's relhassubclass check, if table once had a
1412          * child but no longer does.  In that case, we can clear the
1413          * relhassubclass field so as not to make the same mistake again later.
1414          * (This is safe because we hold ShareUpdateExclusiveLock.)
1415          */
1416         if (list_length(tableOIDs) < 2)
1417         {
1418                 /* CCI because we already updated the pg_class row in this command */
1419                 CommandCounterIncrement();
1420                 SetRelationHasSubclass(RelationGetRelid(onerel), false);
1421                 return 0;
1422         }
1423
1424         /*
1425          * Count the blocks in all the relations.  The result could overflow
1426          * BlockNumber, so we use double arithmetic.
1427          */
1428         rels = (Relation *) palloc(list_length(tableOIDs) * sizeof(Relation));
1429         relblocks = (double *) palloc(list_length(tableOIDs) * sizeof(double));
1430         totalblocks = 0;
1431         nrels = 0;
1432         foreach(lc, tableOIDs)
1433         {
1434                 Oid                     childOID = lfirst_oid(lc);
1435                 Relation        childrel;
1436
1437                 /* We already got the needed lock */
1438                 childrel = heap_open(childOID, NoLock);
1439
1440                 /* Ignore if temp table of another backend */
1441                 if (RELATION_IS_OTHER_TEMP(childrel))
1442                 {
1443                         /* ... but release the lock on it */
1444                         Assert(childrel != onerel);
1445                         heap_close(childrel, AccessShareLock);
1446                         continue;
1447                 }
1448
1449                 rels[nrels] = childrel;
1450                 relblocks[nrels] = (double) RelationGetNumberOfBlocks(childrel);
1451                 totalblocks += relblocks[nrels];
1452                 nrels++;
1453         }
1454
1455         /*
1456          * Now sample rows from each relation, proportionally to its fraction of
1457          * the total block count.  (This might be less than desirable if the child
1458          * rels have radically different free-space percentages, but it's not
1459          * clear that it's worth working harder.)
1460          */
1461         numrows = 0;
1462         *totalrows = 0;
1463         *totaldeadrows = 0;
1464         for (i = 0; i < nrels; i++)
1465         {
1466                 Relation        childrel = rels[i];
1467                 double          childblocks = relblocks[i];
1468
1469                 if (childblocks > 0)
1470                 {
1471                         int                     childtargrows;
1472
1473                         childtargrows = (int) rint(targrows * childblocks / totalblocks);
1474                         /* Make sure we don't overrun due to roundoff error */
1475                         childtargrows = Min(childtargrows, targrows - numrows);
1476                         if (childtargrows > 0)
1477                         {
1478                                 int                     childrows;
1479                                 double          trows,
1480                                                         tdrows;
1481
1482                                 /* Fetch a random sample of the child's rows */
1483                                 childrows = acquire_sample_rows(childrel,
1484                                                                                                 rows + numrows,
1485                                                                                                 childtargrows,
1486                                                                                                 &trows,
1487                                                                                                 &tdrows);
1488
1489                                 /* We may need to convert from child's rowtype to parent's */
1490                                 if (childrows > 0 &&
1491                                         !equalTupleDescs(RelationGetDescr(childrel),
1492                                                                          RelationGetDescr(onerel)))
1493                                 {
1494                                         TupleConversionMap *map;
1495
1496                                         map = convert_tuples_by_name(RelationGetDescr(childrel),
1497                                                                                                  RelationGetDescr(onerel),
1498                                                                  gettext_noop("could not convert row type"));
1499                                         if (map != NULL)
1500                                         {
1501                                                 int                     j;
1502
1503                                                 for (j = 0; j < childrows; j++)
1504                                                 {
1505                                                         HeapTuple       newtup;
1506
1507                                                         newtup = do_convert_tuple(rows[numrows + j], map);
1508                                                         heap_freetuple(rows[numrows + j]);
1509                                                         rows[numrows + j] = newtup;
1510                                                 }
1511                                                 free_conversion_map(map);
1512                                         }
1513                                 }
1514
1515                                 /* And add to counts */
1516                                 numrows += childrows;
1517                                 *totalrows += trows;
1518                                 *totaldeadrows += tdrows;
1519                         }
1520                 }
1521
1522                 /*
1523                  * Note: we cannot release the child-table locks, since we may have
1524                  * pointers to their TOAST tables in the sampled rows.
1525                  */
1526                 heap_close(childrel, NoLock);
1527         }
1528
1529         return numrows;
1530 }
1531
1532
1533 /*
1534  *      update_attstats() -- update attribute statistics for one relation
1535  *
1536  *              Statistics are stored in several places: the pg_class row for the
1537  *              relation has stats about the whole relation, and there is a
1538  *              pg_statistic row for each (non-system) attribute that has ever
1539  *              been analyzed.  The pg_class values are updated by VACUUM, not here.
1540  *
1541  *              pg_statistic rows are just added or updated normally.  This means
1542  *              that pg_statistic will probably contain some deleted rows at the
1543  *              completion of a vacuum cycle, unless it happens to get vacuumed last.
1544  *
1545  *              To keep things simple, we punt for pg_statistic, and don't try
1546  *              to compute or store rows for pg_statistic itself in pg_statistic.
1547  *              This could possibly be made to work, but it's not worth the trouble.
1548  *              Note analyze_rel() has seen to it that we won't come here when
1549  *              vacuuming pg_statistic itself.
1550  *
1551  *              Note: there would be a race condition here if two backends could
1552  *              ANALYZE the same table concurrently.  Presently, we lock that out
1553  *              by taking a self-exclusive lock on the relation in analyze_rel().
1554  */
1555 static void
1556 update_attstats(Oid relid, bool inh, int natts, VacAttrStats **vacattrstats)
1557 {
1558         Relation        sd;
1559         int                     attno;
1560
1561         if (natts <= 0)
1562                 return;                                 /* nothing to do */
1563
1564         sd = heap_open(StatisticRelationId, RowExclusiveLock);
1565
1566         for (attno = 0; attno < natts; attno++)
1567         {
1568                 VacAttrStats *stats = vacattrstats[attno];
1569                 HeapTuple       stup,
1570                                         oldtup;
1571                 int                     i,
1572                                         k,
1573                                         n;
1574                 Datum           values[Natts_pg_statistic];
1575                 bool            nulls[Natts_pg_statistic];
1576                 bool            replaces[Natts_pg_statistic];
1577
1578                 /* Ignore attr if we weren't able to collect stats */
1579                 if (!stats->stats_valid)
1580                         continue;
1581
1582                 /*
1583                  * Construct a new pg_statistic tuple
1584                  */
1585                 for (i = 0; i < Natts_pg_statistic; ++i)
1586                 {
1587                         nulls[i] = false;
1588                         replaces[i] = true;
1589                 }
1590
1591                 values[Anum_pg_statistic_starelid - 1] = ObjectIdGetDatum(relid);
1592                 values[Anum_pg_statistic_staattnum - 1] = Int16GetDatum(stats->attr->attnum);
1593                 values[Anum_pg_statistic_stainherit - 1] = BoolGetDatum(inh);
1594                 values[Anum_pg_statistic_stanullfrac - 1] = Float4GetDatum(stats->stanullfrac);
1595                 values[Anum_pg_statistic_stawidth - 1] = Int32GetDatum(stats->stawidth);
1596                 values[Anum_pg_statistic_stadistinct - 1] = Float4GetDatum(stats->stadistinct);
1597                 i = Anum_pg_statistic_stakind1 - 1;
1598                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1599                 {
1600                         values[i++] = Int16GetDatum(stats->stakind[k]);         /* stakindN */
1601                 }
1602                 i = Anum_pg_statistic_staop1 - 1;
1603                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1604                 {
1605                         values[i++] = ObjectIdGetDatum(stats->staop[k]);        /* staopN */
1606                 }
1607                 i = Anum_pg_statistic_stanumbers1 - 1;
1608                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1609                 {
1610                         int                     nnum = stats->numnumbers[k];
1611
1612                         if (nnum > 0)
1613                         {
1614                                 Datum      *numdatums = (Datum *) palloc(nnum * sizeof(Datum));
1615                                 ArrayType  *arry;
1616
1617                                 for (n = 0; n < nnum; n++)
1618                                         numdatums[n] = Float4GetDatum(stats->stanumbers[k][n]);
1619                                 /* XXX knows more than it should about type float4: */
1620                                 arry = construct_array(numdatums, nnum,
1621                                                                            FLOAT4OID,
1622                                                                            sizeof(float4), FLOAT4PASSBYVAL, 'i');
1623                                 values[i++] = PointerGetDatum(arry);    /* stanumbersN */
1624                         }
1625                         else
1626                         {
1627                                 nulls[i] = true;
1628                                 values[i++] = (Datum) 0;
1629                         }
1630                 }
1631                 i = Anum_pg_statistic_stavalues1 - 1;
1632                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1633                 {
1634                         if (stats->numvalues[k] > 0)
1635                         {
1636                                 ArrayType  *arry;
1637
1638                                 arry = construct_array(stats->stavalues[k],
1639                                                                            stats->numvalues[k],
1640                                                                            stats->statypid[k],
1641                                                                            stats->statyplen[k],
1642                                                                            stats->statypbyval[k],
1643                                                                            stats->statypalign[k]);
1644                                 values[i++] = PointerGetDatum(arry);    /* stavaluesN */
1645                         }
1646                         else
1647                         {
1648                                 nulls[i] = true;
1649                                 values[i++] = (Datum) 0;
1650                         }
1651                 }
1652
1653                 /* Is there already a pg_statistic tuple for this attribute? */
1654                 oldtup = SearchSysCache3(STATRELATTINH,
1655                                                                  ObjectIdGetDatum(relid),
1656                                                                  Int16GetDatum(stats->attr->attnum),
1657                                                                  BoolGetDatum(inh));
1658
1659                 if (HeapTupleIsValid(oldtup))
1660                 {
1661                         /* Yes, replace it */
1662                         stup = heap_modify_tuple(oldtup,
1663                                                                          RelationGetDescr(sd),
1664                                                                          values,
1665                                                                          nulls,
1666                                                                          replaces);
1667                         ReleaseSysCache(oldtup);
1668                         simple_heap_update(sd, &stup->t_self, stup);
1669                 }
1670                 else
1671                 {
1672                         /* No, insert new tuple */
1673                         stup = heap_form_tuple(RelationGetDescr(sd), values, nulls);
1674                         simple_heap_insert(sd, stup);
1675                 }
1676
1677                 /* update indexes too */
1678                 CatalogUpdateIndexes(sd, stup);
1679
1680                 heap_freetuple(stup);
1681         }
1682
1683         heap_close(sd, RowExclusiveLock);
1684 }
1685
1686 /*
1687  * Standard fetch function for use by compute_stats subroutines.
1688  *
1689  * This exists to provide some insulation between compute_stats routines
1690  * and the actual storage of the sample data.
1691  */
1692 static Datum
1693 std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1694 {
1695         int                     attnum = stats->tupattnum;
1696         HeapTuple       tuple = stats->rows[rownum];
1697         TupleDesc       tupDesc = stats->tupDesc;
1698
1699         return heap_getattr(tuple, attnum, tupDesc, isNull);
1700 }
1701
1702 /*
1703  * Fetch function for analyzing index expressions.
1704  *
1705  * We have not bothered to construct index tuples, instead the data is
1706  * just in Datum arrays.
1707  */
1708 static Datum
1709 ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1710 {
1711         int                     i;
1712
1713         /* exprvals and exprnulls are already offset for proper column */
1714         i = rownum * stats->rowstride;
1715         *isNull = stats->exprnulls[i];
1716         return stats->exprvals[i];
1717 }
1718
1719
1720 /*==========================================================================
1721  *
1722  * Code below this point represents the "standard" type-specific statistics
1723  * analysis algorithms.  This code can be replaced on a per-data-type basis
1724  * by setting a nonzero value in pg_type.typanalyze.
1725  *
1726  *==========================================================================
1727  */
1728
1729
1730 /*
1731  * To avoid consuming too much memory during analysis and/or too much space
1732  * in the resulting pg_statistic rows, we ignore varlena datums that are wider
1733  * than WIDTH_THRESHOLD (after detoasting!).  This is legitimate for MCV
1734  * and distinct-value calculations since a wide value is unlikely to be
1735  * duplicated at all, much less be a most-common value.  For the same reason,
1736  * ignoring wide values will not affect our estimates of histogram bin
1737  * boundaries very much.
1738  */
1739 #define WIDTH_THRESHOLD  1024
1740
1741 #define swapInt(a,b)    do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1742 #define swapDatum(a,b)  do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1743
1744 /*
1745  * Extra information used by the default analysis routines
1746  */
1747 typedef struct
1748 {
1749         Oid                     eqopr;                  /* '=' operator for datatype, if any */
1750         Oid                     eqfunc;                 /* and associated function */
1751         Oid                     ltopr;                  /* '<' operator for datatype, if any */
1752 } StdAnalyzeData;
1753
1754 typedef struct
1755 {
1756         Datum           value;                  /* a data value */
1757         int                     tupno;                  /* position index for tuple it came from */
1758 } ScalarItem;
1759
1760 typedef struct
1761 {
1762         int                     count;                  /* # of duplicates */
1763         int                     first;                  /* values[] index of first occurrence */
1764 } ScalarMCVItem;
1765
1766 typedef struct
1767 {
1768         FmgrInfo   *cmpFn;
1769         int                     cmpFlags;
1770         int                *tupnoLink;
1771 } CompareScalarsContext;
1772
1773
1774 static void compute_minimal_stats(VacAttrStatsP stats,
1775                                           AnalyzeAttrFetchFunc fetchfunc,
1776                                           int samplerows,
1777                                           double totalrows);
1778 static void compute_scalar_stats(VacAttrStatsP stats,
1779                                          AnalyzeAttrFetchFunc fetchfunc,
1780                                          int samplerows,
1781                                          double totalrows);
1782 static int      compare_scalars(const void *a, const void *b, void *arg);
1783 static int      compare_mcvs(const void *a, const void *b);
1784
1785
1786 /*
1787  * std_typanalyze -- the default type-specific typanalyze function
1788  */
1789 static bool
1790 std_typanalyze(VacAttrStats *stats)
1791 {
1792         Form_pg_attribute attr = stats->attr;
1793         Oid                     ltopr;
1794         Oid                     eqopr;
1795         StdAnalyzeData *mystats;
1796
1797         /* If the attstattarget column is negative, use the default value */
1798         /* NB: it is okay to scribble on stats->attr since it's a copy */
1799         if (attr->attstattarget < 0)
1800                 attr->attstattarget = default_statistics_target;
1801
1802         /* Look for default "<" and "=" operators for column's type */
1803         get_sort_group_operators(stats->attrtypid,
1804                                                          false, false, false,
1805                                                          &ltopr, &eqopr, NULL,
1806                                                          NULL);
1807
1808         /* If column has no "=" operator, we can't do much of anything */
1809         if (!OidIsValid(eqopr))
1810                 return false;
1811
1812         /* Save the operator info for compute_stats routines */
1813         mystats = (StdAnalyzeData *) palloc(sizeof(StdAnalyzeData));
1814         mystats->eqopr = eqopr;
1815         mystats->eqfunc = get_opcode(eqopr);
1816         mystats->ltopr = ltopr;
1817         stats->extra_data = mystats;
1818
1819         /*
1820          * Determine which standard statistics algorithm to use
1821          */
1822         if (OidIsValid(ltopr))
1823         {
1824                 /* Seems to be a scalar datatype */
1825                 stats->compute_stats = compute_scalar_stats;
1826                 /*--------------------
1827                  * The following choice of minrows is based on the paper
1828                  * "Random sampling for histogram construction: how much is enough?"
1829                  * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
1830                  * Proceedings of ACM SIGMOD International Conference on Management
1831                  * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
1832                  * says that for table size n, histogram size k, maximum relative
1833                  * error in bin size f, and error probability gamma, the minimum
1834                  * random sample size is
1835                  *              r = 4 * k * ln(2*n/gamma) / f^2
1836                  * Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
1837                  *              r = 305.82 * k
1838                  * Note that because of the log function, the dependence on n is
1839                  * quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
1840                  * bin size error with probability 0.99.  So there's no real need to
1841                  * scale for n, which is a good thing because we don't necessarily
1842                  * know it at this point.
1843                  *--------------------
1844                  */
1845                 stats->minrows = 300 * attr->attstattarget;
1846         }
1847         else
1848         {
1849                 /* Can't do much but the minimal stuff */
1850                 stats->compute_stats = compute_minimal_stats;
1851                 /* Might as well use the same minrows as above */
1852                 stats->minrows = 300 * attr->attstattarget;
1853         }
1854
1855         return true;
1856 }
1857
1858 /*
1859  *      compute_minimal_stats() -- compute minimal column statistics
1860  *
1861  *      We use this when we can find only an "=" operator for the datatype.
1862  *
1863  *      We determine the fraction of non-null rows, the average width, the
1864  *      most common values, and the (estimated) number of distinct values.
1865  *
1866  *      The most common values are determined by brute force: we keep a list
1867  *      of previously seen values, ordered by number of times seen, as we scan
1868  *      the samples.  A newly seen value is inserted just after the last
1869  *      multiply-seen value, causing the bottommost (oldest) singly-seen value
1870  *      to drop off the list.  The accuracy of this method, and also its cost,
1871  *      depend mainly on the length of the list we are willing to keep.
1872  */
1873 static void
1874 compute_minimal_stats(VacAttrStatsP stats,
1875                                           AnalyzeAttrFetchFunc fetchfunc,
1876                                           int samplerows,
1877                                           double totalrows)
1878 {
1879         int                     i;
1880         int                     null_cnt = 0;
1881         int                     nonnull_cnt = 0;
1882         int                     toowide_cnt = 0;
1883         double          total_width = 0;
1884         bool            is_varlena = (!stats->attrtype->typbyval &&
1885                                                           stats->attrtype->typlen == -1);
1886         bool            is_varwidth = (!stats->attrtype->typbyval &&
1887                                                            stats->attrtype->typlen < 0);
1888         FmgrInfo        f_cmpeq;
1889         typedef struct
1890         {
1891                 Datum           value;
1892                 int                     count;
1893         } TrackItem;
1894         TrackItem  *track;
1895         int                     track_cnt,
1896                                 track_max;
1897         int                     num_mcv = stats->attr->attstattarget;
1898         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1899
1900         /*
1901          * We track up to 2*n values for an n-element MCV list; but at least 10
1902          */
1903         track_max = 2 * num_mcv;
1904         if (track_max < 10)
1905                 track_max = 10;
1906         track = (TrackItem *) palloc(track_max * sizeof(TrackItem));
1907         track_cnt = 0;
1908
1909         fmgr_info(mystats->eqfunc, &f_cmpeq);
1910
1911         for (i = 0; i < samplerows; i++)
1912         {
1913                 Datum           value;
1914                 bool            isnull;
1915                 bool            match;
1916                 int                     firstcount1,
1917                                         j;
1918
1919                 vacuum_delay_point();
1920
1921                 value = fetchfunc(stats, i, &isnull);
1922
1923                 /* Check for null/nonnull */
1924                 if (isnull)
1925                 {
1926                         null_cnt++;
1927                         continue;
1928                 }
1929                 nonnull_cnt++;
1930
1931                 /*
1932                  * If it's a variable-width field, add up widths for average width
1933                  * calculation.  Note that if the value is toasted, we use the toasted
1934                  * width.  We don't bother with this calculation if it's a fixed-width
1935                  * type.
1936                  */
1937                 if (is_varlena)
1938                 {
1939                         total_width += VARSIZE_ANY(DatumGetPointer(value));
1940
1941                         /*
1942                          * If the value is toasted, we want to detoast it just once to
1943                          * avoid repeated detoastings and resultant excess memory usage
1944                          * during the comparisons.      Also, check to see if the value is
1945                          * excessively wide, and if so don't detoast at all --- just
1946                          * ignore the value.
1947                          */
1948                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
1949                         {
1950                                 toowide_cnt++;
1951                                 continue;
1952                         }
1953                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
1954                 }
1955                 else if (is_varwidth)
1956                 {
1957                         /* must be cstring */
1958                         total_width += strlen(DatumGetCString(value)) + 1;
1959                 }
1960
1961                 /*
1962                  * See if the value matches anything we're already tracking.
1963                  */
1964                 match = false;
1965                 firstcount1 = track_cnt;
1966                 for (j = 0; j < track_cnt; j++)
1967                 {
1968                         /* We always use the default collation for statistics */
1969                         if (DatumGetBool(FunctionCall2Coll(&f_cmpeq,
1970                                                                                            DEFAULT_COLLATION_OID,
1971                                                                                            value, track[j].value)))
1972                         {
1973                                 match = true;
1974                                 break;
1975                         }
1976                         if (j < firstcount1 && track[j].count == 1)
1977                                 firstcount1 = j;
1978                 }
1979
1980                 if (match)
1981                 {
1982                         /* Found a match */
1983                         track[j].count++;
1984                         /* This value may now need to "bubble up" in the track list */
1985                         while (j > 0 && track[j].count > track[j - 1].count)
1986                         {
1987                                 swapDatum(track[j].value, track[j - 1].value);
1988                                 swapInt(track[j].count, track[j - 1].count);
1989                                 j--;
1990                         }
1991                 }
1992                 else
1993                 {
1994                         /* No match.  Insert at head of count-1 list */
1995                         if (track_cnt < track_max)
1996                                 track_cnt++;
1997                         for (j = track_cnt - 1; j > firstcount1; j--)
1998                         {
1999                                 track[j].value = track[j - 1].value;
2000                                 track[j].count = track[j - 1].count;
2001                         }
2002                         if (firstcount1 < track_cnt)
2003                         {
2004                                 track[firstcount1].value = value;
2005                                 track[firstcount1].count = 1;
2006                         }
2007                 }
2008         }
2009
2010         /* We can only compute real stats if we found some non-null values. */
2011         if (nonnull_cnt > 0)
2012         {
2013                 int                     nmultiple,
2014                                         summultiple;
2015
2016                 stats->stats_valid = true;
2017                 /* Do the simple null-frac and width stats */
2018                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
2019                 if (is_varwidth)
2020                         stats->stawidth = total_width / (double) nonnull_cnt;
2021                 else
2022                         stats->stawidth = stats->attrtype->typlen;
2023
2024                 /* Count the number of values we found multiple times */
2025                 summultiple = 0;
2026                 for (nmultiple = 0; nmultiple < track_cnt; nmultiple++)
2027                 {
2028                         if (track[nmultiple].count == 1)
2029                                 break;
2030                         summultiple += track[nmultiple].count;
2031                 }
2032
2033                 if (nmultiple == 0)
2034                 {
2035                         /* If we found no repeated values, assume it's a unique column */
2036                         stats->stadistinct = -1.0;
2037                 }
2038                 else if (track_cnt < track_max && toowide_cnt == 0 &&
2039                                  nmultiple == track_cnt)
2040                 {
2041                         /*
2042                          * Our track list includes every value in the sample, and every
2043                          * value appeared more than once.  Assume the column has just
2044                          * these values.
2045                          */
2046                         stats->stadistinct = track_cnt;
2047                 }
2048                 else
2049                 {
2050                         /*----------
2051                          * Estimate the number of distinct values using the estimator
2052                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
2053                          *              n*d / (n - f1 + f1*n/N)
2054                          * where f1 is the number of distinct values that occurred
2055                          * exactly once in our sample of n rows (from a total of N),
2056                          * and d is the total number of distinct values in the sample.
2057                          * This is their Duj1 estimator; the other estimators they
2058                          * recommend are considerably more complex, and are numerically
2059                          * very unstable when n is much smaller than N.
2060                          *
2061                          * We assume (not very reliably!) that all the multiply-occurring
2062                          * values are reflected in the final track[] list, and the other
2063                          * nonnull values all appeared but once.  (XXX this usually
2064                          * results in a drastic overestimate of ndistinct.      Can we do
2065                          * any better?)
2066                          *----------
2067                          */
2068                         int                     f1 = nonnull_cnt - summultiple;
2069                         int                     d = f1 + nmultiple;
2070                         double          numer,
2071                                                 denom,
2072                                                 stadistinct;
2073
2074                         numer = (double) samplerows *(double) d;
2075
2076                         denom = (double) (samplerows - f1) +
2077                                 (double) f1 *(double) samplerows / totalrows;
2078
2079                         stadistinct = numer / denom;
2080                         /* Clamp to sane range in case of roundoff error */
2081                         if (stadistinct < (double) d)
2082                                 stadistinct = (double) d;
2083                         if (stadistinct > totalrows)
2084                                 stadistinct = totalrows;
2085                         stats->stadistinct = floor(stadistinct + 0.5);
2086                 }
2087
2088                 /*
2089                  * If we estimated the number of distinct values at more than 10% of
2090                  * the total row count (a very arbitrary limit), then assume that
2091                  * stadistinct should scale with the row count rather than be a fixed
2092                  * value.
2093                  */
2094                 if (stats->stadistinct > 0.1 * totalrows)
2095                         stats->stadistinct = -(stats->stadistinct / totalrows);
2096
2097                 /*
2098                  * Decide how many values are worth storing as most-common values. If
2099                  * we are able to generate a complete MCV list (all the values in the
2100                  * sample will fit, and we think these are all the ones in the table),
2101                  * then do so.  Otherwise, store only those values that are
2102                  * significantly more common than the (estimated) average. We set the
2103                  * threshold rather arbitrarily at 25% more than average, with at
2104                  * least 2 instances in the sample.
2105                  */
2106                 if (track_cnt < track_max && toowide_cnt == 0 &&
2107                         stats->stadistinct > 0 &&
2108                         track_cnt <= num_mcv)
2109                 {
2110                         /* Track list includes all values seen, and all will fit */
2111                         num_mcv = track_cnt;
2112                 }
2113                 else
2114                 {
2115                         double          ndistinct = stats->stadistinct;
2116                         double          avgcount,
2117                                                 mincount;
2118
2119                         if (ndistinct < 0)
2120                                 ndistinct = -ndistinct * totalrows;
2121                         /* estimate # of occurrences in sample of a typical value */
2122                         avgcount = (double) samplerows / ndistinct;
2123                         /* set minimum threshold count to store a value */
2124                         mincount = avgcount * 1.25;
2125                         if (mincount < 2)
2126                                 mincount = 2;
2127                         if (num_mcv > track_cnt)
2128                                 num_mcv = track_cnt;
2129                         for (i = 0; i < num_mcv; i++)
2130                         {
2131                                 if (track[i].count < mincount)
2132                                 {
2133                                         num_mcv = i;
2134                                         break;
2135                                 }
2136                         }
2137                 }
2138
2139                 /* Generate MCV slot entry */
2140                 if (num_mcv > 0)
2141                 {
2142                         MemoryContext old_context;
2143                         Datum      *mcv_values;
2144                         float4     *mcv_freqs;
2145
2146                         /* Must copy the target values into anl_context */
2147                         old_context = MemoryContextSwitchTo(stats->anl_context);
2148                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2149                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2150                         for (i = 0; i < num_mcv; i++)
2151                         {
2152                                 mcv_values[i] = datumCopy(track[i].value,
2153                                                                                   stats->attrtype->typbyval,
2154                                                                                   stats->attrtype->typlen);
2155                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2156                         }
2157                         MemoryContextSwitchTo(old_context);
2158
2159                         stats->stakind[0] = STATISTIC_KIND_MCV;
2160                         stats->staop[0] = mystats->eqopr;
2161                         stats->stanumbers[0] = mcv_freqs;
2162                         stats->numnumbers[0] = num_mcv;
2163                         stats->stavalues[0] = mcv_values;
2164                         stats->numvalues[0] = num_mcv;
2165
2166                         /*
2167                          * Accept the defaults for stats->statypid and others. They have
2168                          * been set before we were called (see vacuum.h)
2169                          */
2170                 }
2171         }
2172         else if (null_cnt > 0)
2173         {
2174                 /* We found only nulls; assume the column is entirely null */
2175                 stats->stats_valid = true;
2176                 stats->stanullfrac = 1.0;
2177                 if (is_varwidth)
2178                         stats->stawidth = 0;    /* "unknown" */
2179                 else
2180                         stats->stawidth = stats->attrtype->typlen;
2181                 stats->stadistinct = 0.0;               /* "unknown" */
2182         }
2183
2184         /* We don't need to bother cleaning up any of our temporary palloc's */
2185 }
2186
2187
2188 /*
2189  *      compute_scalar_stats() -- compute column statistics
2190  *
2191  *      We use this when we can find "=" and "<" operators for the datatype.
2192  *
2193  *      We determine the fraction of non-null rows, the average width, the
2194  *      most common values, the (estimated) number of distinct values, the
2195  *      distribution histogram, and the correlation of physical to logical order.
2196  *
2197  *      The desired stats can be determined fairly easily after sorting the
2198  *      data values into order.
2199  */
2200 static void
2201 compute_scalar_stats(VacAttrStatsP stats,
2202                                          AnalyzeAttrFetchFunc fetchfunc,
2203                                          int samplerows,
2204                                          double totalrows)
2205 {
2206         int                     i;
2207         int                     null_cnt = 0;
2208         int                     nonnull_cnt = 0;
2209         int                     toowide_cnt = 0;
2210         double          total_width = 0;
2211         bool            is_varlena = (!stats->attrtype->typbyval &&
2212                                                           stats->attrtype->typlen == -1);
2213         bool            is_varwidth = (!stats->attrtype->typbyval &&
2214                                                            stats->attrtype->typlen < 0);
2215         double          corr_xysum;
2216         Oid                     cmpFn;
2217         int                     cmpFlags;
2218         FmgrInfo        f_cmpfn;
2219         ScalarItem *values;
2220         int                     values_cnt = 0;
2221         int                *tupnoLink;
2222         ScalarMCVItem *track;
2223         int                     track_cnt = 0;
2224         int                     num_mcv = stats->attr->attstattarget;
2225         int                     num_bins = stats->attr->attstattarget;
2226         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
2227
2228         values = (ScalarItem *) palloc(samplerows * sizeof(ScalarItem));
2229         tupnoLink = (int *) palloc(samplerows * sizeof(int));
2230         track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));
2231
2232         SelectSortFunction(mystats->ltopr, false, &cmpFn, &cmpFlags);
2233         fmgr_info(cmpFn, &f_cmpfn);
2234
2235         /* Initial scan to find sortable values */
2236         for (i = 0; i < samplerows; i++)
2237         {
2238                 Datum           value;
2239                 bool            isnull;
2240
2241                 vacuum_delay_point();
2242
2243                 value = fetchfunc(stats, i, &isnull);
2244
2245                 /* Check for null/nonnull */
2246                 if (isnull)
2247                 {
2248                         null_cnt++;
2249                         continue;
2250                 }
2251                 nonnull_cnt++;
2252
2253                 /*
2254                  * If it's a variable-width field, add up widths for average width
2255                  * calculation.  Note that if the value is toasted, we use the toasted
2256                  * width.  We don't bother with this calculation if it's a fixed-width
2257                  * type.
2258                  */
2259                 if (is_varlena)
2260                 {
2261                         total_width += VARSIZE_ANY(DatumGetPointer(value));
2262
2263                         /*
2264                          * If the value is toasted, we want to detoast it just once to
2265                          * avoid repeated detoastings and resultant excess memory usage
2266                          * during the comparisons.      Also, check to see if the value is
2267                          * excessively wide, and if so don't detoast at all --- just
2268                          * ignore the value.
2269                          */
2270                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
2271                         {
2272                                 toowide_cnt++;
2273                                 continue;
2274                         }
2275                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
2276                 }
2277                 else if (is_varwidth)
2278                 {
2279                         /* must be cstring */
2280                         total_width += strlen(DatumGetCString(value)) + 1;
2281                 }
2282
2283                 /* Add it to the list to be sorted */
2284                 values[values_cnt].value = value;
2285                 values[values_cnt].tupno = values_cnt;
2286                 tupnoLink[values_cnt] = values_cnt;
2287                 values_cnt++;
2288         }
2289
2290         /* We can only compute real stats if we found some sortable values. */
2291         if (values_cnt > 0)
2292         {
2293                 int                     ndistinct,      /* # distinct values in sample */
2294                                         nmultiple,      /* # that appear multiple times */
2295                                         num_hist,
2296                                         dups_cnt;
2297                 int                     slot_idx = 0;
2298                 CompareScalarsContext cxt;
2299
2300                 /* Sort the collected values */
2301                 cxt.cmpFn = &f_cmpfn;
2302                 cxt.cmpFlags = cmpFlags;
2303                 cxt.tupnoLink = tupnoLink;
2304                 qsort_arg((void *) values, values_cnt, sizeof(ScalarItem),
2305                                   compare_scalars, (void *) &cxt);
2306
2307                 /*
2308                  * Now scan the values in order, find the most common ones, and also
2309                  * accumulate ordering-correlation statistics.
2310                  *
2311                  * To determine which are most common, we first have to count the
2312                  * number of duplicates of each value.  The duplicates are adjacent in
2313                  * the sorted list, so a brute-force approach is to compare successive
2314                  * datum values until we find two that are not equal. However, that
2315                  * requires N-1 invocations of the datum comparison routine, which are
2316                  * completely redundant with work that was done during the sort.  (The
2317                  * sort algorithm must at some point have compared each pair of items
2318                  * that are adjacent in the sorted order; otherwise it could not know
2319                  * that it's ordered the pair correctly.) We exploit this by having
2320                  * compare_scalars remember the highest tupno index that each
2321                  * ScalarItem has been found equal to.  At the end of the sort, a
2322                  * ScalarItem's tupnoLink will still point to itself if and only if it
2323                  * is the last item of its group of duplicates (since the group will
2324                  * be ordered by tupno).
2325                  */
2326                 corr_xysum = 0;
2327                 ndistinct = 0;
2328                 nmultiple = 0;
2329                 dups_cnt = 0;
2330                 for (i = 0; i < values_cnt; i++)
2331                 {
2332                         int                     tupno = values[i].tupno;
2333
2334                         corr_xysum += ((double) i) * ((double) tupno);
2335                         dups_cnt++;
2336                         if (tupnoLink[tupno] == tupno)
2337                         {
2338                                 /* Reached end of duplicates of this value */
2339                                 ndistinct++;
2340                                 if (dups_cnt > 1)
2341                                 {
2342                                         nmultiple++;
2343                                         if (track_cnt < num_mcv ||
2344                                                 dups_cnt > track[track_cnt - 1].count)
2345                                         {
2346                                                 /*
2347                                                  * Found a new item for the mcv list; find its
2348                                                  * position, bubbling down old items if needed. Loop
2349                                                  * invariant is that j points at an empty/ replaceable
2350                                                  * slot.
2351                                                  */
2352                                                 int                     j;
2353
2354                                                 if (track_cnt < num_mcv)
2355                                                         track_cnt++;
2356                                                 for (j = track_cnt - 1; j > 0; j--)
2357                                                 {
2358                                                         if (dups_cnt <= track[j - 1].count)
2359                                                                 break;
2360                                                         track[j].count = track[j - 1].count;
2361                                                         track[j].first = track[j - 1].first;
2362                                                 }
2363                                                 track[j].count = dups_cnt;
2364                                                 track[j].first = i + 1 - dups_cnt;
2365                                         }
2366                                 }
2367                                 dups_cnt = 0;
2368                         }
2369                 }
2370
2371                 stats->stats_valid = true;
2372                 /* Do the simple null-frac and width stats */
2373                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
2374                 if (is_varwidth)
2375                         stats->stawidth = total_width / (double) nonnull_cnt;
2376                 else
2377                         stats->stawidth = stats->attrtype->typlen;
2378
2379                 if (nmultiple == 0)
2380                 {
2381                         /* If we found no repeated values, assume it's a unique column */
2382                         stats->stadistinct = -1.0;
2383                 }
2384                 else if (toowide_cnt == 0 && nmultiple == ndistinct)
2385                 {
2386                         /*
2387                          * Every value in the sample appeared more than once.  Assume the
2388                          * column has just these values.
2389                          */
2390                         stats->stadistinct = ndistinct;
2391                 }
2392                 else
2393                 {
2394                         /*----------
2395                          * Estimate the number of distinct values using the estimator
2396                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
2397                          *              n*d / (n - f1 + f1*n/N)
2398                          * where f1 is the number of distinct values that occurred
2399                          * exactly once in our sample of n rows (from a total of N),
2400                          * and d is the total number of distinct values in the sample.
2401                          * This is their Duj1 estimator; the other estimators they
2402                          * recommend are considerably more complex, and are numerically
2403                          * very unstable when n is much smaller than N.
2404                          *
2405                          * Overwidth values are assumed to have been distinct.
2406                          *----------
2407                          */
2408                         int                     f1 = ndistinct - nmultiple + toowide_cnt;
2409                         int                     d = f1 + nmultiple;
2410                         double          numer,
2411                                                 denom,
2412                                                 stadistinct;
2413
2414                         numer = (double) samplerows *(double) d;
2415
2416                         denom = (double) (samplerows - f1) +
2417                                 (double) f1 *(double) samplerows / totalrows;
2418
2419                         stadistinct = numer / denom;
2420                         /* Clamp to sane range in case of roundoff error */
2421                         if (stadistinct < (double) d)
2422                                 stadistinct = (double) d;
2423                         if (stadistinct > totalrows)
2424                                 stadistinct = totalrows;
2425                         stats->stadistinct = floor(stadistinct + 0.5);
2426                 }
2427
2428                 /*
2429                  * If we estimated the number of distinct values at more than 10% of
2430                  * the total row count (a very arbitrary limit), then assume that
2431                  * stadistinct should scale with the row count rather than be a fixed
2432                  * value.
2433                  */
2434                 if (stats->stadistinct > 0.1 * totalrows)
2435                         stats->stadistinct = -(stats->stadistinct / totalrows);
2436
2437                 /*
2438                  * Decide how many values are worth storing as most-common values. If
2439                  * we are able to generate a complete MCV list (all the values in the
2440                  * sample will fit, and we think these are all the ones in the table),
2441                  * then do so.  Otherwise, store only those values that are
2442                  * significantly more common than the (estimated) average. We set the
2443                  * threshold rather arbitrarily at 25% more than average, with at
2444                  * least 2 instances in the sample.  Also, we won't suppress values
2445                  * that have a frequency of at least 1/K where K is the intended
2446                  * number of histogram bins; such values might otherwise cause us to
2447                  * emit duplicate histogram bin boundaries.  (We might end up with
2448                  * duplicate histogram entries anyway, if the distribution is skewed;
2449                  * but we prefer to treat such values as MCVs if at all possible.)
2450                  */
2451                 if (track_cnt == ndistinct && toowide_cnt == 0 &&
2452                         stats->stadistinct > 0 &&
2453                         track_cnt <= num_mcv)
2454                 {
2455                         /* Track list includes all values seen, and all will fit */
2456                         num_mcv = track_cnt;
2457                 }
2458                 else
2459                 {
2460                         double          ndistinct = stats->stadistinct;
2461                         double          avgcount,
2462                                                 mincount,
2463                                                 maxmincount;
2464
2465                         if (ndistinct < 0)
2466                                 ndistinct = -ndistinct * totalrows;
2467                         /* estimate # of occurrences in sample of a typical value */
2468                         avgcount = (double) samplerows / ndistinct;
2469                         /* set minimum threshold count to store a value */
2470                         mincount = avgcount * 1.25;
2471                         if (mincount < 2)
2472                                 mincount = 2;
2473                         /* don't let threshold exceed 1/K, however */
2474                         maxmincount = (double) samplerows / (double) num_bins;
2475                         if (mincount > maxmincount)
2476                                 mincount = maxmincount;
2477                         if (num_mcv > track_cnt)
2478                                 num_mcv = track_cnt;
2479                         for (i = 0; i < num_mcv; i++)
2480                         {
2481                                 if (track[i].count < mincount)
2482                                 {
2483                                         num_mcv = i;
2484                                         break;
2485                                 }
2486                         }
2487                 }
2488
2489                 /* Generate MCV slot entry */
2490                 if (num_mcv > 0)
2491                 {
2492                         MemoryContext old_context;
2493                         Datum      *mcv_values;
2494                         float4     *mcv_freqs;
2495
2496                         /* Must copy the target values into anl_context */
2497                         old_context = MemoryContextSwitchTo(stats->anl_context);
2498                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2499                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2500                         for (i = 0; i < num_mcv; i++)
2501                         {
2502                                 mcv_values[i] = datumCopy(values[track[i].first].value,
2503                                                                                   stats->attrtype->typbyval,
2504                                                                                   stats->attrtype->typlen);
2505                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2506                         }
2507                         MemoryContextSwitchTo(old_context);
2508
2509                         stats->stakind[slot_idx] = STATISTIC_KIND_MCV;
2510                         stats->staop[slot_idx] = mystats->eqopr;
2511                         stats->stanumbers[slot_idx] = mcv_freqs;
2512                         stats->numnumbers[slot_idx] = num_mcv;
2513                         stats->stavalues[slot_idx] = mcv_values;
2514                         stats->numvalues[slot_idx] = num_mcv;
2515
2516                         /*
2517                          * Accept the defaults for stats->statypid and others. They have
2518                          * been set before we were called (see vacuum.h)
2519                          */
2520                         slot_idx++;
2521                 }
2522
2523                 /*
2524                  * Generate a histogram slot entry if there are at least two distinct
2525                  * values not accounted for in the MCV list.  (This ensures the
2526                  * histogram won't collapse to empty or a singleton.)
2527                  */
2528                 num_hist = ndistinct - num_mcv;
2529                 if (num_hist > num_bins)
2530                         num_hist = num_bins + 1;
2531                 if (num_hist >= 2)
2532                 {
2533                         MemoryContext old_context;
2534                         Datum      *hist_values;
2535                         int                     nvals;
2536                         int                     pos,
2537                                                 posfrac,
2538                                                 delta,
2539                                                 deltafrac;
2540
2541                         /* Sort the MCV items into position order to speed next loop */
2542                         qsort((void *) track, num_mcv,
2543                                   sizeof(ScalarMCVItem), compare_mcvs);
2544
2545                         /*
2546                          * Collapse out the MCV items from the values[] array.
2547                          *
2548                          * Note we destroy the values[] array here... but we don't need it
2549                          * for anything more.  We do, however, still need values_cnt.
2550                          * nvals will be the number of remaining entries in values[].
2551                          */
2552                         if (num_mcv > 0)
2553                         {
2554                                 int                     src,
2555                                                         dest;
2556                                 int                     j;
2557
2558                                 src = dest = 0;
2559                                 j = 0;                  /* index of next interesting MCV item */
2560                                 while (src < values_cnt)
2561                                 {
2562                                         int                     ncopy;
2563
2564                                         if (j < num_mcv)
2565                                         {
2566                                                 int                     first = track[j].first;
2567
2568                                                 if (src >= first)
2569                                                 {
2570                                                         /* advance past this MCV item */
2571                                                         src = first + track[j].count;
2572                                                         j++;
2573                                                         continue;
2574                                                 }
2575                                                 ncopy = first - src;
2576                                         }
2577                                         else
2578                                                 ncopy = values_cnt - src;
2579                                         memmove(&values[dest], &values[src],
2580                                                         ncopy * sizeof(ScalarItem));
2581                                         src += ncopy;
2582                                         dest += ncopy;
2583                                 }
2584                                 nvals = dest;
2585                         }
2586                         else
2587                                 nvals = values_cnt;
2588                         Assert(nvals >= num_hist);
2589
2590                         /* Must copy the target values into anl_context */
2591                         old_context = MemoryContextSwitchTo(stats->anl_context);
2592                         hist_values = (Datum *) palloc(num_hist * sizeof(Datum));
2593
2594                         /*
2595                          * The object of this loop is to copy the first and last values[]
2596                          * entries along with evenly-spaced values in between.  So the
2597                          * i'th value is values[(i * (nvals - 1)) / (num_hist - 1)].  But
2598                          * computing that subscript directly risks integer overflow when
2599                          * the stats target is more than a couple thousand.  Instead we
2600                          * add (nvals - 1) / (num_hist - 1) to pos at each step, tracking
2601                          * the integral and fractional parts of the sum separately.
2602                          */
2603                         delta = (nvals - 1) / (num_hist - 1);
2604                         deltafrac = (nvals - 1) % (num_hist - 1);
2605                         pos = posfrac = 0;
2606
2607                         for (i = 0; i < num_hist; i++)
2608                         {
2609                                 hist_values[i] = datumCopy(values[pos].value,
2610                                                                                    stats->attrtype->typbyval,
2611                                                                                    stats->attrtype->typlen);
2612                                 pos += delta;
2613                                 posfrac += deltafrac;
2614                                 if (posfrac >= (num_hist - 1))
2615                                 {
2616                                         /* fractional part exceeds 1, carry to integer part */
2617                                         pos++;
2618                                         posfrac -= (num_hist - 1);
2619                                 }
2620                         }
2621
2622                         MemoryContextSwitchTo(old_context);
2623
2624                         stats->stakind[slot_idx] = STATISTIC_KIND_HISTOGRAM;
2625                         stats->staop[slot_idx] = mystats->ltopr;
2626                         stats->stavalues[slot_idx] = hist_values;
2627                         stats->numvalues[slot_idx] = num_hist;
2628
2629                         /*
2630                          * Accept the defaults for stats->statypid and others. They have
2631                          * been set before we were called (see vacuum.h)
2632                          */
2633                         slot_idx++;
2634                 }
2635
2636                 /* Generate a correlation entry if there are multiple values */
2637                 if (values_cnt > 1)
2638                 {
2639                         MemoryContext old_context;
2640                         float4     *corrs;
2641                         double          corr_xsum,
2642                                                 corr_x2sum;
2643
2644                         /* Must copy the target values into anl_context */
2645                         old_context = MemoryContextSwitchTo(stats->anl_context);
2646                         corrs = (float4 *) palloc(sizeof(float4));
2647                         MemoryContextSwitchTo(old_context);
2648
2649                         /*----------
2650                          * Since we know the x and y value sets are both
2651                          *              0, 1, ..., values_cnt-1
2652                          * we have sum(x) = sum(y) =
2653                          *              (values_cnt-1)*values_cnt / 2
2654                          * and sum(x^2) = sum(y^2) =
2655                          *              (values_cnt-1)*values_cnt*(2*values_cnt-1) / 6.
2656                          *----------
2657                          */
2658                         corr_xsum = ((double) (values_cnt - 1)) *
2659                                 ((double) values_cnt) / 2.0;
2660                         corr_x2sum = ((double) (values_cnt - 1)) *
2661                                 ((double) values_cnt) * (double) (2 * values_cnt - 1) / 6.0;
2662
2663                         /* And the correlation coefficient reduces to */
2664                         corrs[0] = (values_cnt * corr_xysum - corr_xsum * corr_xsum) /
2665                                 (values_cnt * corr_x2sum - corr_xsum * corr_xsum);
2666
2667                         stats->stakind[slot_idx] = STATISTIC_KIND_CORRELATION;
2668                         stats->staop[slot_idx] = mystats->ltopr;
2669                         stats->stanumbers[slot_idx] = corrs;
2670                         stats->numnumbers[slot_idx] = 1;
2671                         slot_idx++;
2672                 }
2673         }
2674         else if (nonnull_cnt == 0 && null_cnt > 0)
2675         {
2676                 /* We found only nulls; assume the column is entirely null */
2677                 stats->stats_valid = true;
2678                 stats->stanullfrac = 1.0;
2679                 if (is_varwidth)
2680                         stats->stawidth = 0;    /* "unknown" */
2681                 else
2682                         stats->stawidth = stats->attrtype->typlen;
2683                 stats->stadistinct = 0.0;               /* "unknown" */
2684         }
2685
2686         /* We don't need to bother cleaning up any of our temporary palloc's */
2687 }
2688
2689 /*
2690  * qsort_arg comparator for sorting ScalarItems
2691  *
2692  * Aside from sorting the items, we update the tupnoLink[] array
2693  * whenever two ScalarItems are found to contain equal datums.  The array
2694  * is indexed by tupno; for each ScalarItem, it contains the highest
2695  * tupno that that item's datum has been found to be equal to.  This allows
2696  * us to avoid additional comparisons in compute_scalar_stats().
2697  */
2698 static int
2699 compare_scalars(const void *a, const void *b, void *arg)
2700 {
2701         Datum           da = ((ScalarItem *) a)->value;
2702         int                     ta = ((ScalarItem *) a)->tupno;
2703         Datum           db = ((ScalarItem *) b)->value;
2704         int                     tb = ((ScalarItem *) b)->tupno;
2705         CompareScalarsContext *cxt = (CompareScalarsContext *) arg;
2706         int32           compare;
2707
2708         /* We always use the default collation for statistics */
2709         compare = ApplySortFunction(cxt->cmpFn, cxt->cmpFlags,
2710                                                                 DEFAULT_COLLATION_OID,
2711                                                                 da, false, db, false);
2712         if (compare != 0)
2713                 return compare;
2714
2715         /*
2716          * The two datums are equal, so update cxt->tupnoLink[].
2717          */
2718         if (cxt->tupnoLink[ta] < tb)
2719                 cxt->tupnoLink[ta] = tb;
2720         if (cxt->tupnoLink[tb] < ta)
2721                 cxt->tupnoLink[tb] = ta;
2722
2723         /*
2724          * For equal datums, sort by tupno
2725          */
2726         return ta - tb;
2727 }
2728
2729 /*
2730  * qsort comparator for sorting ScalarMCVItems by position
2731  */
2732 static int
2733 compare_mcvs(const void *a, const void *b)
2734 {
2735         int                     da = ((ScalarMCVItem *) a)->first;
2736         int                     db = ((ScalarMCVItem *) b)->first;
2737
2738         return da - db;
2739 }