]> granicus.if.org Git - postgresql/blob - src/backend/commands/analyze.c
Support column-level privileges, as required by SQL standard.
[postgresql] / src / backend / commands / analyze.c
1 /*-------------------------------------------------------------------------
2  *
3  * analyze.c
4  *        the Postgres statistics generator
5  *
6  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.133 2009/01/22 20:16:01 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <math.h>
18
19 #include "access/heapam.h"
20 #include "access/transam.h"
21 #include "access/tuptoaster.h"
22 #include "access/xact.h"
23 #include "catalog/index.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_namespace.h"
27 #include "commands/dbcommands.h"
28 #include "commands/vacuum.h"
29 #include "executor/executor.h"
30 #include "miscadmin.h"
31 #include "nodes/nodeFuncs.h"
32 #include "parser/parse_oper.h"
33 #include "parser/parse_relation.h"
34 #include "pgstat.h"
35 #include "postmaster/autovacuum.h"
36 #include "storage/bufmgr.h"
37 #include "storage/proc.h"
38 #include "storage/procarray.h"
39 #include "utils/acl.h"
40 #include "utils/datum.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/pg_rusage.h"
44 #include "utils/syscache.h"
45 #include "utils/tuplesort.h"
46 #include "utils/tqual.h"
47
48
49 /* Data structure for Algorithm S from Knuth 3.4.2 */
50 typedef struct
51 {
52         BlockNumber N;                          /* number of blocks, known in advance */
53         int                     n;                              /* desired sample size */
54         BlockNumber t;                          /* current block number */
55         int                     m;                              /* blocks selected so far */
56 } BlockSamplerData;
57 typedef BlockSamplerData *BlockSampler;
58
59 /* Per-index data for ANALYZE */
60 typedef struct AnlIndexData
61 {
62         IndexInfo  *indexInfo;          /* BuildIndexInfo result */
63         double          tupleFract;             /* fraction of rows for partial index */
64         VacAttrStats **vacattrstats;    /* index attrs to analyze */
65         int                     attr_cnt;
66 } AnlIndexData;
67
68
69 /* Default statistics target (GUC parameter) */
70 int                     default_statistics_target = 100;
71
72 /* A few variables that don't seem worth passing around as parameters */
73 static int      elevel = -1;
74
75 static MemoryContext anl_context = NULL;
76
77 static BufferAccessStrategy vac_strategy;
78
79
80 static void BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
81                                   int samplesize);
82 static bool BlockSampler_HasMore(BlockSampler bs);
83 static BlockNumber BlockSampler_Next(BlockSampler bs);
84 static void compute_index_stats(Relation onerel, double totalrows,
85                                         AnlIndexData *indexdata, int nindexes,
86                                         HeapTuple *rows, int numrows,
87                                         MemoryContext col_context);
88 static VacAttrStats *examine_attribute(Relation onerel, int attnum);
89 static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
90                                         int targrows, double *totalrows, double *totaldeadrows);
91 static double random_fract(void);
92 static double init_selection_state(int n);
93 static double get_next_S(double t, int n, double *stateptr);
94 static int      compare_rows(const void *a, const void *b);
95 static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
96 static Datum std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
97 static Datum ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull);
98
99 static bool std_typanalyze(VacAttrStats *stats);
100
101
102 /*
103  *      analyze_rel() -- analyze one relation
104  *
105  * If update_reltuples is true, we update reltuples and relpages columns
106  * in pg_class.  Caller should pass false if we're part of VACUUM ANALYZE,
107  * and the VACUUM didn't skip any pages.  We only have an approximate count,
108  * so we don't want to overwrite the accurate values already inserted by the
109  * VACUUM in that case.  VACUUM always scans all indexes, however, so the
110  * pg_class entries for indexes are never updated if we're part of VACUUM
111  * ANALYZE.
112  */
113 void
114 analyze_rel(Oid relid, VacuumStmt *vacstmt,
115                         BufferAccessStrategy bstrategy, bool update_reltuples)
116 {
117         Relation        onerel;
118         int                     attr_cnt,
119                                 tcnt,
120                                 i,
121                                 ind;
122         Relation   *Irel;
123         int                     nindexes;
124         bool            hasindex;
125         bool            analyzableindex;
126         VacAttrStats **vacattrstats;
127         AnlIndexData *indexdata;
128         int                     targrows,
129                                 numrows;
130         double          totalrows,
131                                 totaldeadrows;
132         HeapTuple  *rows;
133         PGRUsage        ru0;
134         TimestampTz starttime = 0;
135         Oid                     save_userid;
136         bool            save_secdefcxt;
137
138         if (vacstmt->verbose)
139                 elevel = INFO;
140         else
141                 elevel = DEBUG2;
142
143         vac_strategy = bstrategy;
144
145         /*
146          * Use the current context for storing analysis info.  vacuum.c ensures
147          * that this context will be cleared when I return, thus releasing the
148          * memory allocated here.
149          */
150         anl_context = CurrentMemoryContext;
151
152         /*
153          * Check for user-requested abort.      Note we want this to be inside a
154          * transaction, so xact.c doesn't issue useless WARNING.
155          */
156         CHECK_FOR_INTERRUPTS();
157
158         /*
159          * Open the relation, getting ShareUpdateExclusiveLock to ensure that two
160          * ANALYZEs don't run on it concurrently.  (This also locks out a
161          * concurrent VACUUM, which doesn't matter much at the moment but might
162          * matter if we ever try to accumulate stats on dead tuples.) If the rel
163          * has been dropped since we last saw it, we don't need to process it.
164          */
165         onerel = try_relation_open(relid, ShareUpdateExclusiveLock);
166         if (!onerel)
167                 return;
168
169         /*
170          * Check permissions --- this should match vacuum's check!
171          */
172         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
173                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
174         {
175                 /* No need for a WARNING if we already complained during VACUUM */
176                 if (!vacstmt->vacuum)
177                 {
178                         if (onerel->rd_rel->relisshared)
179                                 ereport(WARNING,
180                                                 (errmsg("skipping \"%s\" --- only superuser can analyze it",
181                                                                 RelationGetRelationName(onerel))));
182                         else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
183                                 ereport(WARNING,
184                                                 (errmsg("skipping \"%s\" --- only superuser or database owner can analyze it",
185                                                                 RelationGetRelationName(onerel))));
186                         else
187                                 ereport(WARNING,
188                                                 (errmsg("skipping \"%s\" --- only table or database owner can analyze it",
189                                                                 RelationGetRelationName(onerel))));
190                 }
191                 relation_close(onerel, ShareUpdateExclusiveLock);
192                 return;
193         }
194
195         /*
196          * Check that it's a plain table; we used to do this in get_rel_oids() but
197          * seems safer to check after we've locked the relation.
198          */
199         if (onerel->rd_rel->relkind != RELKIND_RELATION)
200         {
201                 /* No need for a WARNING if we already complained during VACUUM */
202                 if (!vacstmt->vacuum)
203                         ereport(WARNING,
204                                         (errmsg("skipping \"%s\" --- cannot analyze indexes, views, or special system tables",
205                                                         RelationGetRelationName(onerel))));
206                 relation_close(onerel, ShareUpdateExclusiveLock);
207                 return;
208         }
209
210         /*
211          * Silently ignore tables that are temp tables of other backends ---
212          * trying to analyze these is rather pointless, since their contents are
213          * probably not up-to-date on disk.  (We don't throw a warning here; it
214          * would just lead to chatter during a database-wide ANALYZE.)
215          */
216         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
217         {
218                 relation_close(onerel, ShareUpdateExclusiveLock);
219                 return;
220         }
221
222         /*
223          * We can ANALYZE any table except pg_statistic. See update_attstats
224          */
225         if (RelationGetRelid(onerel) == StatisticRelationId)
226         {
227                 relation_close(onerel, ShareUpdateExclusiveLock);
228                 return;
229         }
230
231         ereport(elevel,
232                         (errmsg("analyzing \"%s.%s\"",
233                                         get_namespace_name(RelationGetNamespace(onerel)),
234                                         RelationGetRelationName(onerel))));
235
236         /*
237          * Switch to the table owner's userid, so that any index functions are
238          * run as that user.
239          */
240         GetUserIdAndContext(&save_userid, &save_secdefcxt);
241         SetUserIdAndContext(onerel->rd_rel->relowner, true);
242
243         /* let others know what I'm doing */
244         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
245         MyProc->vacuumFlags |= PROC_IN_ANALYZE;
246         LWLockRelease(ProcArrayLock);
247
248         /* measure elapsed time iff autovacuum logging requires it */
249         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
250         {
251                 pg_rusage_init(&ru0);
252                 if (Log_autovacuum_min_duration > 0)
253                         starttime = GetCurrentTimestamp();
254         }
255
256         /*
257          * Determine which columns to analyze
258          *
259          * Note that system attributes are never analyzed.
260          */
261         if (vacstmt->va_cols != NIL)
262         {
263                 ListCell   *le;
264
265                 vacattrstats = (VacAttrStats **) palloc(list_length(vacstmt->va_cols) *
266                                                                                                 sizeof(VacAttrStats *));
267                 tcnt = 0;
268                 foreach(le, vacstmt->va_cols)
269                 {
270                         char       *col = strVal(lfirst(le));
271
272                         i = attnameAttNum(onerel, col, false);
273                         if (i == InvalidAttrNumber)
274                                 ereport(ERROR,
275                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
276                                         errmsg("column \"%s\" of relation \"%s\" does not exist",
277                                                    col, RelationGetRelationName(onerel))));
278                         vacattrstats[tcnt] = examine_attribute(onerel, i);
279                         if (vacattrstats[tcnt] != NULL)
280                                 tcnt++;
281                 }
282                 attr_cnt = tcnt;
283         }
284         else
285         {
286                 attr_cnt = onerel->rd_att->natts;
287                 vacattrstats = (VacAttrStats **)
288                         palloc(attr_cnt * sizeof(VacAttrStats *));
289                 tcnt = 0;
290                 for (i = 1; i <= attr_cnt; i++)
291                 {
292                         vacattrstats[tcnt] = examine_attribute(onerel, i);
293                         if (vacattrstats[tcnt] != NULL)
294                                 tcnt++;
295                 }
296                 attr_cnt = tcnt;
297         }
298
299         /*
300          * Open all indexes of the relation, and see if there are any analyzable
301          * columns in the indexes.      We do not analyze index columns if there was
302          * an explicit column list in the ANALYZE command, however.
303          */
304         vac_open_indexes(onerel, AccessShareLock, &nindexes, &Irel);
305         hasindex = (nindexes > 0);
306         indexdata = NULL;
307         analyzableindex = false;
308         if (hasindex)
309         {
310                 indexdata = (AnlIndexData *) palloc0(nindexes * sizeof(AnlIndexData));
311                 for (ind = 0; ind < nindexes; ind++)
312                 {
313                         AnlIndexData *thisdata = &indexdata[ind];
314                         IndexInfo  *indexInfo;
315
316                         thisdata->indexInfo = indexInfo = BuildIndexInfo(Irel[ind]);
317                         thisdata->tupleFract = 1.0; /* fix later if partial */
318                         if (indexInfo->ii_Expressions != NIL && vacstmt->va_cols == NIL)
319                         {
320                                 ListCell   *indexpr_item = list_head(indexInfo->ii_Expressions);
321
322                                 thisdata->vacattrstats = (VacAttrStats **)
323                                         palloc(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
324                                 tcnt = 0;
325                                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
326                                 {
327                                         int                     keycol = indexInfo->ii_KeyAttrNumbers[i];
328
329                                         if (keycol == 0)
330                                         {
331                                                 /* Found an index expression */
332                                                 Node       *indexkey;
333
334                                                 if (indexpr_item == NULL)               /* shouldn't happen */
335                                                         elog(ERROR, "too few entries in indexprs list");
336                                                 indexkey = (Node *) lfirst(indexpr_item);
337                                                 indexpr_item = lnext(indexpr_item);
338
339                                                 /*
340                                                  * Can't analyze if the opclass uses a storage type
341                                                  * different from the expression result type. We'd get
342                                                  * confused because the type shown in pg_attribute for
343                                                  * the index column doesn't match what we are getting
344                                                  * from the expression. Perhaps this can be fixed
345                                                  * someday, but for now, punt.
346                                                  */
347                                                 if (exprType(indexkey) !=
348                                                         Irel[ind]->rd_att->attrs[i]->atttypid)
349                                                         continue;
350
351                                                 thisdata->vacattrstats[tcnt] =
352                                                         examine_attribute(Irel[ind], i + 1);
353                                                 if (thisdata->vacattrstats[tcnt] != NULL)
354                                                 {
355                                                         tcnt++;
356                                                         analyzableindex = true;
357                                                 }
358                                         }
359                                 }
360                                 thisdata->attr_cnt = tcnt;
361                         }
362                 }
363         }
364
365         /*
366          * Quit if no analyzable columns
367          */
368         if (attr_cnt <= 0 && !analyzableindex)
369         {
370                 /*
371                  * We report that the table is empty; this is just so that the
372                  * autovacuum code doesn't go nuts trying to get stats about a
373                  * zero-column table.
374                  */
375                 if (update_reltuples)
376                         pgstat_report_analyze(onerel, 0, 0);
377                 goto cleanup;
378         }
379
380         /*
381          * Determine how many rows we need to sample, using the worst case from
382          * all analyzable columns.      We use a lower bound of 100 rows to avoid
383          * possible overflow in Vitter's algorithm.
384          */
385         targrows = 100;
386         for (i = 0; i < attr_cnt; i++)
387         {
388                 if (targrows < vacattrstats[i]->minrows)
389                         targrows = vacattrstats[i]->minrows;
390         }
391         for (ind = 0; ind < nindexes; ind++)
392         {
393                 AnlIndexData *thisdata = &indexdata[ind];
394
395                 for (i = 0; i < thisdata->attr_cnt; i++)
396                 {
397                         if (targrows < thisdata->vacattrstats[i]->minrows)
398                                 targrows = thisdata->vacattrstats[i]->minrows;
399                 }
400         }
401
402         /*
403          * Acquire the sample rows
404          */
405         rows = (HeapTuple *) palloc(targrows * sizeof(HeapTuple));
406         numrows = acquire_sample_rows(onerel, rows, targrows,
407                                                                   &totalrows, &totaldeadrows);
408
409         /*
410          * Compute the statistics.      Temporary results during the calculations for
411          * each column are stored in a child context.  The calc routines are
412          * responsible to make sure that whatever they store into the VacAttrStats
413          * structure is allocated in anl_context.
414          */
415         if (numrows > 0)
416         {
417                 MemoryContext col_context,
418                                         old_context;
419
420                 col_context = AllocSetContextCreate(anl_context,
421                                                                                         "Analyze Column",
422                                                                                         ALLOCSET_DEFAULT_MINSIZE,
423                                                                                         ALLOCSET_DEFAULT_INITSIZE,
424                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
425                 old_context = MemoryContextSwitchTo(col_context);
426
427                 for (i = 0; i < attr_cnt; i++)
428                 {
429                         VacAttrStats *stats = vacattrstats[i];
430
431                         stats->rows = rows;
432                         stats->tupDesc = onerel->rd_att;
433                         (*stats->compute_stats) (stats,
434                                                                          std_fetch_func,
435                                                                          numrows,
436                                                                          totalrows);
437                         MemoryContextResetAndDeleteChildren(col_context);
438                 }
439
440                 if (hasindex)
441                         compute_index_stats(onerel, totalrows,
442                                                                 indexdata, nindexes,
443                                                                 rows, numrows,
444                                                                 col_context);
445
446                 MemoryContextSwitchTo(old_context);
447                 MemoryContextDelete(col_context);
448
449                 /*
450                  * Emit the completed stats rows into pg_statistic, replacing any
451                  * previous statistics for the target columns.  (If there are stats in
452                  * pg_statistic for columns we didn't process, we leave them alone.)
453                  */
454                 update_attstats(relid, attr_cnt, vacattrstats);
455
456                 for (ind = 0; ind < nindexes; ind++)
457                 {
458                         AnlIndexData *thisdata = &indexdata[ind];
459
460                         update_attstats(RelationGetRelid(Irel[ind]),
461                                                         thisdata->attr_cnt, thisdata->vacattrstats);
462                 }
463         }
464
465         /*
466          * Update pages/tuples stats in pg_class.
467          */
468         if (update_reltuples)
469         {
470                 vac_update_relstats(onerel,
471                                                         RelationGetNumberOfBlocks(onerel),
472                                                         totalrows, hasindex, InvalidTransactionId);
473                 /* report results to the stats collector, too */
474                 pgstat_report_analyze(onerel, totalrows, totaldeadrows);
475         }
476
477         /*
478          * Same for indexes. Vacuum always scans all indexes, so if we're part of
479          * VACUUM ANALYZE, don't overwrite the accurate count already inserted by 
480          * VACUUM.
481          */
482         if (!vacstmt->vacuum)
483         {
484                 for (ind = 0; ind < nindexes; ind++)
485                 {
486                         AnlIndexData *thisdata = &indexdata[ind];
487                         double          totalindexrows;
488
489                         totalindexrows = ceil(thisdata->tupleFract * totalrows);
490                         vac_update_relstats(Irel[ind],
491                                                                 RelationGetNumberOfBlocks(Irel[ind]),
492                                                                 totalindexrows, false, InvalidTransactionId);
493                 }
494         }
495
496         /* We skip to here if there were no analyzable columns */
497 cleanup:
498
499         /* Done with indexes */
500         vac_close_indexes(nindexes, Irel, NoLock);
501
502         /*
503          * Close source relation now, but keep lock so that no one deletes it
504          * before we commit.  (If someone did, they'd fail to clean up the entries
505          * we made in pg_statistic.  Also, releasing the lock before commit would
506          * expose us to concurrent-update failures in update_attstats.)
507          */
508         relation_close(onerel, NoLock);
509
510         /* Log the action if appropriate */
511         if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
512         {
513                 if (Log_autovacuum_min_duration == 0 ||
514                         TimestampDifferenceExceeds(starttime, GetCurrentTimestamp(),
515                                                                            Log_autovacuum_min_duration))
516                         ereport(LOG,
517                                         (errmsg("automatic analyze of table \"%s.%s.%s\" system usage: %s",
518                                                         get_database_name(MyDatabaseId),
519                                                         get_namespace_name(RelationGetNamespace(onerel)),
520                                                         RelationGetRelationName(onerel),
521                                                         pg_rusage_show(&ru0))));
522         }
523
524         /*
525          * Reset my PGPROC flag.  Note: we need this here, and not in vacuum_rel,
526          * because the vacuum flag is cleared by the end-of-xact code.
527          */
528         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
529         MyProc->vacuumFlags &= ~PROC_IN_ANALYZE;
530         LWLockRelease(ProcArrayLock);
531
532         /* Restore userid */
533         SetUserIdAndContext(save_userid, save_secdefcxt);
534 }
535
536 /*
537  * Compute statistics about indexes of a relation
538  */
539 static void
540 compute_index_stats(Relation onerel, double totalrows,
541                                         AnlIndexData *indexdata, int nindexes,
542                                         HeapTuple *rows, int numrows,
543                                         MemoryContext col_context)
544 {
545         MemoryContext ind_context,
546                                 old_context;
547         Datum           values[INDEX_MAX_KEYS];
548         bool            isnull[INDEX_MAX_KEYS];
549         int                     ind,
550                                 i;
551
552         ind_context = AllocSetContextCreate(anl_context,
553                                                                                 "Analyze Index",
554                                                                                 ALLOCSET_DEFAULT_MINSIZE,
555                                                                                 ALLOCSET_DEFAULT_INITSIZE,
556                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
557         old_context = MemoryContextSwitchTo(ind_context);
558
559         for (ind = 0; ind < nindexes; ind++)
560         {
561                 AnlIndexData *thisdata = &indexdata[ind];
562                 IndexInfo  *indexInfo = thisdata->indexInfo;
563                 int                     attr_cnt = thisdata->attr_cnt;
564                 TupleTableSlot *slot;
565                 EState     *estate;
566                 ExprContext *econtext;
567                 List       *predicate;
568                 Datum      *exprvals;
569                 bool       *exprnulls;
570                 int                     numindexrows,
571                                         tcnt,
572                                         rowno;
573                 double          totalindexrows;
574
575                 /* Ignore index if no columns to analyze and not partial */
576                 if (attr_cnt == 0 && indexInfo->ii_Predicate == NIL)
577                         continue;
578
579                 /*
580                  * Need an EState for evaluation of index expressions and
581                  * partial-index predicates.  Create it in the per-index context to be
582                  * sure it gets cleaned up at the bottom of the loop.
583                  */
584                 estate = CreateExecutorState();
585                 econtext = GetPerTupleExprContext(estate);
586                 /* Need a slot to hold the current heap tuple, too */
587                 slot = MakeSingleTupleTableSlot(RelationGetDescr(onerel));
588
589                 /* Arrange for econtext's scan tuple to be the tuple under test */
590                 econtext->ecxt_scantuple = slot;
591
592                 /* Set up execution state for predicate. */
593                 predicate = (List *)
594                         ExecPrepareExpr((Expr *) indexInfo->ii_Predicate,
595                                                         estate);
596
597                 /* Compute and save index expression values */
598                 exprvals = (Datum *) palloc(numrows * attr_cnt * sizeof(Datum));
599                 exprnulls = (bool *) palloc(numrows * attr_cnt * sizeof(bool));
600                 numindexrows = 0;
601                 tcnt = 0;
602                 for (rowno = 0; rowno < numrows; rowno++)
603                 {
604                         HeapTuple       heapTuple = rows[rowno];
605
606                         /* Set up for predicate or expression evaluation */
607                         ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
608
609                         /* If index is partial, check predicate */
610                         if (predicate != NIL)
611                         {
612                                 if (!ExecQual(predicate, econtext, false))
613                                         continue;
614                         }
615                         numindexrows++;
616
617                         if (attr_cnt > 0)
618                         {
619                                 /*
620                                  * Evaluate the index row to compute expression values. We
621                                  * could do this by hand, but FormIndexDatum is convenient.
622                                  */
623                                 FormIndexDatum(indexInfo,
624                                                            slot,
625                                                            estate,
626                                                            values,
627                                                            isnull);
628
629                                 /*
630                                  * Save just the columns we care about.
631                                  */
632                                 for (i = 0; i < attr_cnt; i++)
633                                 {
634                                         VacAttrStats *stats = thisdata->vacattrstats[i];
635                                         int                     attnum = stats->attr->attnum;
636
637                                         exprvals[tcnt] = values[attnum - 1];
638                                         exprnulls[tcnt] = isnull[attnum - 1];
639                                         tcnt++;
640                                 }
641                         }
642                 }
643
644                 /*
645                  * Having counted the number of rows that pass the predicate in the
646                  * sample, we can estimate the total number of rows in the index.
647                  */
648                 thisdata->tupleFract = (double) numindexrows / (double) numrows;
649                 totalindexrows = ceil(thisdata->tupleFract * totalrows);
650
651                 /*
652                  * Now we can compute the statistics for the expression columns.
653                  */
654                 if (numindexrows > 0)
655                 {
656                         MemoryContextSwitchTo(col_context);
657                         for (i = 0; i < attr_cnt; i++)
658                         {
659                                 VacAttrStats *stats = thisdata->vacattrstats[i];
660
661                                 stats->exprvals = exprvals + i;
662                                 stats->exprnulls = exprnulls + i;
663                                 stats->rowstride = attr_cnt;
664                                 (*stats->compute_stats) (stats,
665                                                                                  ind_fetch_func,
666                                                                                  numindexrows,
667                                                                                  totalindexrows);
668                                 MemoryContextResetAndDeleteChildren(col_context);
669                         }
670                 }
671
672                 /* And clean up */
673                 MemoryContextSwitchTo(ind_context);
674
675                 ExecDropSingleTupleTableSlot(slot);
676                 FreeExecutorState(estate);
677                 MemoryContextResetAndDeleteChildren(ind_context);
678         }
679
680         MemoryContextSwitchTo(old_context);
681         MemoryContextDelete(ind_context);
682 }
683
684 /*
685  * examine_attribute -- pre-analysis of a single column
686  *
687  * Determine whether the column is analyzable; if so, create and initialize
688  * a VacAttrStats struct for it.  If not, return NULL.
689  */
690 static VacAttrStats *
691 examine_attribute(Relation onerel, int attnum)
692 {
693         Form_pg_attribute attr = onerel->rd_att->attrs[attnum - 1];
694         HeapTuple       typtuple;
695         VacAttrStats *stats;
696         int                     i;
697         bool            ok;
698
699         /* Never analyze dropped columns */
700         if (attr->attisdropped)
701                 return NULL;
702
703         /* Don't analyze column if user has specified not to */
704         if (attr->attstattarget == 0)
705                 return NULL;
706
707         /*
708          * Create the VacAttrStats struct.  Note that we only have a copy of
709          * the fixed fields of the pg_attribute tuple.
710          */
711         stats = (VacAttrStats *) palloc0(sizeof(VacAttrStats));
712         stats->attr = (Form_pg_attribute) palloc(ATTRIBUTE_FIXED_PART_SIZE);
713         memcpy(stats->attr, attr, ATTRIBUTE_FIXED_PART_SIZE);
714         typtuple = SearchSysCache(TYPEOID,
715                                                           ObjectIdGetDatum(attr->atttypid),
716                                                           0, 0, 0);
717         if (!HeapTupleIsValid(typtuple))
718                 elog(ERROR, "cache lookup failed for type %u", attr->atttypid);
719         stats->attrtype = (Form_pg_type) palloc(sizeof(FormData_pg_type));
720         memcpy(stats->attrtype, GETSTRUCT(typtuple), sizeof(FormData_pg_type));
721         ReleaseSysCache(typtuple);
722         stats->anl_context = anl_context;
723         stats->tupattnum = attnum;
724
725         /*
726          * The fields describing the stats->stavalues[n] element types default
727          * to the type of the field being analyzed, but the type-specific
728          * typanalyze function can change them if it wants to store something
729          * else.
730          */
731         for (i = 0; i < STATISTIC_NUM_SLOTS; i++)
732         {
733                 stats->statypid[i] = stats->attr->atttypid;
734                 stats->statyplen[i] = stats->attrtype->typlen;
735                 stats->statypbyval[i] = stats->attrtype->typbyval;
736                 stats->statypalign[i] = stats->attrtype->typalign;
737         }
738
739         /*
740          * Call the type-specific typanalyze function.  If none is specified, use
741          * std_typanalyze().
742          */
743         if (OidIsValid(stats->attrtype->typanalyze))
744                 ok = DatumGetBool(OidFunctionCall1(stats->attrtype->typanalyze,
745                                                                                    PointerGetDatum(stats)));
746         else
747                 ok = std_typanalyze(stats);
748
749         if (!ok || stats->compute_stats == NULL || stats->minrows <= 0)
750         {
751                 pfree(stats->attrtype);
752                 pfree(stats->attr);
753                 pfree(stats);
754                 return NULL;
755         }
756
757         return stats;
758 }
759
760 /*
761  * BlockSampler_Init -- prepare for random sampling of blocknumbers
762  *
763  * BlockSampler is used for stage one of our new two-stage tuple
764  * sampling mechanism as discussed on pgsql-hackers 2004-04-02 (subject
765  * "Large DB").  It selects a random sample of samplesize blocks out of
766  * the nblocks blocks in the table.  If the table has less than
767  * samplesize blocks, all blocks are selected.
768  *
769  * Since we know the total number of blocks in advance, we can use the
770  * straightforward Algorithm S from Knuth 3.4.2, rather than Vitter's
771  * algorithm.
772  */
773 static void
774 BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize)
775 {
776         bs->N = nblocks;                        /* measured table size */
777
778         /*
779          * If we decide to reduce samplesize for tables that have less or not much
780          * more than samplesize blocks, here is the place to do it.
781          */
782         bs->n = samplesize;
783         bs->t = 0;                                      /* blocks scanned so far */
784         bs->m = 0;                                      /* blocks selected so far */
785 }
786
787 static bool
788 BlockSampler_HasMore(BlockSampler bs)
789 {
790         return (bs->t < bs->N) && (bs->m < bs->n);
791 }
792
793 static BlockNumber
794 BlockSampler_Next(BlockSampler bs)
795 {
796         BlockNumber K = bs->N - bs->t;          /* remaining blocks */
797         int                     k = bs->n - bs->m;              /* blocks still to sample */
798         double          p;                              /* probability to skip block */
799         double          V;                              /* random */
800
801         Assert(BlockSampler_HasMore(bs));       /* hence K > 0 and k > 0 */
802
803         if ((BlockNumber) k >= K)
804         {
805                 /* need all the rest */
806                 bs->m++;
807                 return bs->t++;
808         }
809
810         /*----------
811          * It is not obvious that this code matches Knuth's Algorithm S.
812          * Knuth says to skip the current block with probability 1 - k/K.
813          * If we are to skip, we should advance t (hence decrease K), and
814          * repeat the same probabilistic test for the next block.  The naive
815          * implementation thus requires a random_fract() call for each block
816          * number.      But we can reduce this to one random_fract() call per
817          * selected block, by noting that each time the while-test succeeds,
818          * we can reinterpret V as a uniform random number in the range 0 to p.
819          * Therefore, instead of choosing a new V, we just adjust p to be
820          * the appropriate fraction of its former value, and our next loop
821          * makes the appropriate probabilistic test.
822          *
823          * We have initially K > k > 0.  If the loop reduces K to equal k,
824          * the next while-test must fail since p will become exactly zero
825          * (we assume there will not be roundoff error in the division).
826          * (Note: Knuth suggests a "<=" loop condition, but we use "<" just
827          * to be doubly sure about roundoff error.)  Therefore K cannot become
828          * less than k, which means that we cannot fail to select enough blocks.
829          *----------
830          */
831         V = random_fract();
832         p = 1.0 - (double) k / (double) K;
833         while (V < p)
834         {
835                 /* skip */
836                 bs->t++;
837                 K--;                                    /* keep K == N - t */
838
839                 /* adjust p to be new cutoff point in reduced range */
840                 p *= 1.0 - (double) k / (double) K;
841         }
842
843         /* select */
844         bs->m++;
845         return bs->t++;
846 }
847
848 /*
849  * acquire_sample_rows -- acquire a random sample of rows from the table
850  *
851  * As of May 2004 we use a new two-stage method:  Stage one selects up
852  * to targrows random blocks (or all blocks, if there aren't so many).
853  * Stage two scans these blocks and uses the Vitter algorithm to create
854  * a random sample of targrows rows (or less, if there are less in the
855  * sample of blocks).  The two stages are executed simultaneously: each
856  * block is processed as soon as stage one returns its number and while
857  * the rows are read stage two controls which ones are to be inserted
858  * into the sample.
859  *
860  * Although every row has an equal chance of ending up in the final
861  * sample, this sampling method is not perfect: not every possible
862  * sample has an equal chance of being selected.  For large relations
863  * the number of different blocks represented by the sample tends to be
864  * too small.  We can live with that for now.  Improvements are welcome.
865  *
866  * We also estimate the total numbers of live and dead rows in the table,
867  * and return them into *totalrows and *totaldeadrows, respectively.
868  *
869  * An important property of this sampling method is that because we do
870  * look at a statistically unbiased set of blocks, we should get
871  * unbiased estimates of the average numbers of live and dead rows per
872  * block.  The previous sampling method put too much credence in the row
873  * density near the start of the table.
874  *
875  * The returned list of tuples is in order by physical position in the table.
876  * (We will rely on this later to derive correlation estimates.)
877  */
878 static int
879 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
880                                         double *totalrows, double *totaldeadrows)
881 {
882         int                     numrows = 0;    /* # rows now in reservoir */
883         double          samplerows = 0; /* total # rows collected */
884         double          liverows = 0;   /* # live rows seen */
885         double          deadrows = 0;   /* # dead rows seen */
886         double          rowstoskip = -1;        /* -1 means not set yet */
887         BlockNumber totalblocks;
888         TransactionId OldestXmin;
889         BlockSamplerData bs;
890         double          rstate;
891
892         Assert(targrows > 1);
893
894         totalblocks = RelationGetNumberOfBlocks(onerel);
895
896         /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
897         OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
898
899         /* Prepare for sampling block numbers */
900         BlockSampler_Init(&bs, totalblocks, targrows);
901         /* Prepare for sampling rows */
902         rstate = init_selection_state(targrows);
903
904         /* Outer loop over blocks to sample */
905         while (BlockSampler_HasMore(&bs))
906         {
907                 BlockNumber targblock = BlockSampler_Next(&bs);
908                 Buffer          targbuffer;
909                 Page            targpage;
910                 OffsetNumber targoffset,
911                                         maxoffset;
912
913                 vacuum_delay_point();
914
915                 /*
916                  * We must maintain a pin on the target page's buffer to ensure that
917                  * the maxoffset value stays good (else concurrent VACUUM might delete
918                  * tuples out from under us).  Hence, pin the page until we are done
919                  * looking at it.  We also choose to hold sharelock on the buffer
920                  * throughout --- we could release and re-acquire sharelock for
921                  * each tuple, but since we aren't doing much work per tuple, the
922                  * extra lock traffic is probably better avoided.
923                  */
924                 targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
925                                                                                 RBM_NORMAL, vac_strategy);
926                 LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
927                 targpage = BufferGetPage(targbuffer);
928                 maxoffset = PageGetMaxOffsetNumber(targpage);
929
930                 /* Inner loop over all tuples on the selected page */
931                 for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
932                 {
933                         ItemId          itemid;
934                         HeapTupleData targtuple;
935                         bool            sample_it = false;
936
937                         itemid = PageGetItemId(targpage, targoffset);
938
939                         /*
940                          * We ignore unused and redirect line pointers.  DEAD line
941                          * pointers should be counted as dead, because we need vacuum
942                          * to run to get rid of them.  Note that this rule agrees with
943                          * the way that heap_page_prune() counts things.
944                          */
945                         if (!ItemIdIsNormal(itemid))
946                         {
947                                 if (ItemIdIsDead(itemid))
948                                         deadrows += 1;
949                                 continue;
950                         }
951
952                         ItemPointerSet(&targtuple.t_self, targblock, targoffset);
953
954                         targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
955                         targtuple.t_len = ItemIdGetLength(itemid);
956
957                         switch (HeapTupleSatisfiesVacuum(targtuple.t_data,
958                                                                                          OldestXmin,
959                                                                                          targbuffer))
960                         {
961                                 case HEAPTUPLE_LIVE:
962                                         sample_it = true;
963                                         liverows += 1;
964                                         break;
965
966                                 case HEAPTUPLE_DEAD:
967                                 case HEAPTUPLE_RECENTLY_DEAD:
968                                         /* Count dead and recently-dead rows */
969                                         deadrows += 1;
970                                         break;
971
972                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
973                                         /*
974                                          * Insert-in-progress rows are not counted.  We assume
975                                          * that when the inserting transaction commits or aborts,
976                                          * it will send a stats message to increment the proper
977                                          * count.  This works right only if that transaction ends
978                                          * after we finish analyzing the table; if things happen
979                                          * in the other order, its stats update will be
980                                          * overwritten by ours.  However, the error will be
981                                          * large only if the other transaction runs long enough
982                                          * to insert many tuples, so assuming it will finish
983                                          * after us is the safer option.
984                                          *
985                                          * A special case is that the inserting transaction might
986                                          * be our own.  In this case we should count and sample
987                                          * the row, to accommodate users who load a table and
988                                          * analyze it in one transaction.  (pgstat_report_analyze
989                                          * has to adjust the numbers we send to the stats collector
990                                          * to make this come out right.)
991                                          */
992                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
993                                         {
994                                                 sample_it = true;
995                                                 liverows += 1;
996                                         }
997                                         break;
998
999                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1000                                         /*
1001                                          * We count delete-in-progress rows as still live, using
1002                                          * the same reasoning given above; but we don't bother to
1003                                          * include them in the sample.
1004                                          *
1005                                          * If the delete was done by our own transaction, however,
1006                                          * we must count the row as dead to make
1007                                          * pgstat_report_analyze's stats adjustments come out
1008                                          * right.  (Note: this works out properly when the row
1009                                          * was both inserted and deleted in our xact.)
1010                                          */
1011                                         if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data)))
1012                                                 deadrows += 1;
1013                                         else
1014                                                 liverows += 1;
1015                                         break;
1016
1017                                 default:
1018                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1019                                         break;
1020                         }
1021
1022                         if (sample_it)
1023                         {
1024                                 /*
1025                                  * The first targrows sample rows are simply copied into the
1026                                  * reservoir. Then we start replacing tuples in the sample
1027                                  * until we reach the end of the relation.      This algorithm is
1028                                  * from Jeff Vitter's paper (see full citation below). It
1029                                  * works by repeatedly computing the number of tuples to skip
1030                                  * before selecting a tuple, which replaces a randomly chosen
1031                                  * element of the reservoir (current set of tuples).  At all
1032                                  * times the reservoir is a true random sample of the tuples
1033                                  * we've passed over so far, so when we fall off the end of
1034                                  * the relation we're done.
1035                                  */
1036                                 if (numrows < targrows)
1037                                         rows[numrows++] = heap_copytuple(&targtuple);
1038                                 else
1039                                 {
1040                                         /*
1041                                          * t in Vitter's paper is the number of records already
1042                                          * processed.  If we need to compute a new S value, we
1043                                          * must use the not-yet-incremented value of samplerows
1044                                          * as t.
1045                                          */
1046                                         if (rowstoskip < 0)
1047                                                 rowstoskip = get_next_S(samplerows, targrows, &rstate);
1048
1049                                         if (rowstoskip <= 0)
1050                                         {
1051                                                 /*
1052                                                  * Found a suitable tuple, so save it, replacing one
1053                                                  * old tuple at random
1054                                                  */
1055                                                 int                     k = (int) (targrows * random_fract());
1056
1057                                                 Assert(k >= 0 && k < targrows);
1058                                                 heap_freetuple(rows[k]);
1059                                                 rows[k] = heap_copytuple(&targtuple);
1060                                         }
1061
1062                                         rowstoskip -= 1;
1063                                 }
1064
1065                                 samplerows += 1;
1066                         }
1067                 }
1068
1069                 /* Now release the lock and pin on the page */
1070                 UnlockReleaseBuffer(targbuffer);
1071         }
1072
1073         /*
1074          * If we didn't find as many tuples as we wanted then we're done. No sort
1075          * is needed, since they're already in order.
1076          *
1077          * Otherwise we need to sort the collected tuples by position
1078          * (itempointer). It's not worth worrying about corner cases where the
1079          * tuples are already sorted.
1080          */
1081         if (numrows == targrows)
1082                 qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
1083
1084         /*
1085          * Estimate total numbers of rows in relation.
1086          */
1087         if (bs.m > 0)
1088         {
1089                 *totalrows = floor((liverows * totalblocks) / bs.m + 0.5);
1090                 *totaldeadrows = floor((deadrows * totalblocks) / bs.m + 0.5);
1091         }
1092         else
1093         {
1094                 *totalrows = 0.0;
1095                 *totaldeadrows = 0.0;
1096         }
1097
1098         /*
1099          * Emit some interesting relation info
1100          */
1101         ereport(elevel,
1102                         (errmsg("\"%s\": scanned %d of %u pages, "
1103                                         "containing %.0f live rows and %.0f dead rows; "
1104                                         "%d rows in sample, %.0f estimated total rows",
1105                                         RelationGetRelationName(onerel),
1106                                         bs.m, totalblocks,
1107                                         liverows, deadrows,
1108                                         numrows, *totalrows)));
1109
1110         return numrows;
1111 }
1112
1113 /* Select a random value R uniformly distributed in (0 - 1) */
1114 static double
1115 random_fract(void)
1116 {
1117         return ((double) random() + 1) / ((double) MAX_RANDOM_VALUE + 2);
1118 }
1119
1120 /*
1121  * These two routines embody Algorithm Z from "Random sampling with a
1122  * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
1123  * (Mar. 1985), Pages 37-57.  Vitter describes his algorithm in terms
1124  * of the count S of records to skip before processing another record.
1125  * It is computed primarily based on t, the number of records already read.
1126  * The only extra state needed between calls is W, a random state variable.
1127  *
1128  * init_selection_state computes the initial W value.
1129  *
1130  * Given that we've already read t records (t >= n), get_next_S
1131  * determines the number of records to skip before the next record is
1132  * processed.
1133  */
1134 static double
1135 init_selection_state(int n)
1136 {
1137         /* Initial value of W (for use when Algorithm Z is first applied) */
1138         return exp(-log(random_fract()) / n);
1139 }
1140
1141 static double
1142 get_next_S(double t, int n, double *stateptr)
1143 {
1144         double          S;
1145
1146         /* The magic constant here is T from Vitter's paper */
1147         if (t <= (22.0 * n))
1148         {
1149                 /* Process records using Algorithm X until t is large enough */
1150                 double          V,
1151                                         quot;
1152
1153                 V = random_fract();             /* Generate V */
1154                 S = 0;
1155                 t += 1;
1156                 /* Note: "num" in Vitter's code is always equal to t - n */
1157                 quot = (t - (double) n) / t;
1158                 /* Find min S satisfying (4.1) */
1159                 while (quot > V)
1160                 {
1161                         S += 1;
1162                         t += 1;
1163                         quot *= (t - (double) n) / t;
1164                 }
1165         }
1166         else
1167         {
1168                 /* Now apply Algorithm Z */
1169                 double          W = *stateptr;
1170                 double          term = t - (double) n + 1;
1171
1172                 for (;;)
1173                 {
1174                         double          numer,
1175                                                 numer_lim,
1176                                                 denom;
1177                         double          U,
1178                                                 X,
1179                                                 lhs,
1180                                                 rhs,
1181                                                 y,
1182                                                 tmp;
1183
1184                         /* Generate U and X */
1185                         U = random_fract();
1186                         X = t * (W - 1.0);
1187                         S = floor(X);           /* S is tentatively set to floor(X) */
1188                         /* Test if U <= h(S)/cg(X) in the manner of (6.3) */
1189                         tmp = (t + 1) / term;
1190                         lhs = exp(log(((U * tmp * tmp) * (term + S)) / (t + X)) / n);
1191                         rhs = (((t + X) / (term + S)) * term) / t;
1192                         if (lhs <= rhs)
1193                         {
1194                                 W = rhs / lhs;
1195                                 break;
1196                         }
1197                         /* Test if U <= f(S)/cg(X) */
1198                         y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X);
1199                         if ((double) n < S)
1200                         {
1201                                 denom = t;
1202                                 numer_lim = term + S;
1203                         }
1204                         else
1205                         {
1206                                 denom = t - (double) n + S;
1207                                 numer_lim = t + 1;
1208                         }
1209                         for (numer = t + S; numer >= numer_lim; numer -= 1)
1210                         {
1211                                 y *= numer / denom;
1212                                 denom -= 1;
1213                         }
1214                         W = exp(-log(random_fract()) / n);      /* Generate W in advance */
1215                         if (exp(log(y) / n) <= (t + X) / t)
1216                                 break;
1217                 }
1218                 *stateptr = W;
1219         }
1220         return S;
1221 }
1222
1223 /*
1224  * qsort comparator for sorting rows[] array
1225  */
1226 static int
1227 compare_rows(const void *a, const void *b)
1228 {
1229         HeapTuple       ha = *(HeapTuple *) a;
1230         HeapTuple       hb = *(HeapTuple *) b;
1231         BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self);
1232         OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self);
1233         BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self);
1234         OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self);
1235
1236         if (ba < bb)
1237                 return -1;
1238         if (ba > bb)
1239                 return 1;
1240         if (oa < ob)
1241                 return -1;
1242         if (oa > ob)
1243                 return 1;
1244         return 0;
1245 }
1246
1247
1248 /*
1249  *      update_attstats() -- update attribute statistics for one relation
1250  *
1251  *              Statistics are stored in several places: the pg_class row for the
1252  *              relation has stats about the whole relation, and there is a
1253  *              pg_statistic row for each (non-system) attribute that has ever
1254  *              been analyzed.  The pg_class values are updated by VACUUM, not here.
1255  *
1256  *              pg_statistic rows are just added or updated normally.  This means
1257  *              that pg_statistic will probably contain some deleted rows at the
1258  *              completion of a vacuum cycle, unless it happens to get vacuumed last.
1259  *
1260  *              To keep things simple, we punt for pg_statistic, and don't try
1261  *              to compute or store rows for pg_statistic itself in pg_statistic.
1262  *              This could possibly be made to work, but it's not worth the trouble.
1263  *              Note analyze_rel() has seen to it that we won't come here when
1264  *              vacuuming pg_statistic itself.
1265  *
1266  *              Note: there would be a race condition here if two backends could
1267  *              ANALYZE the same table concurrently.  Presently, we lock that out
1268  *              by taking a self-exclusive lock on the relation in analyze_rel().
1269  */
1270 static void
1271 update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats)
1272 {
1273         Relation        sd;
1274         int                     attno;
1275
1276         if (natts <= 0)
1277                 return;                                 /* nothing to do */
1278
1279         sd = heap_open(StatisticRelationId, RowExclusiveLock);
1280
1281         for (attno = 0; attno < natts; attno++)
1282         {
1283                 VacAttrStats *stats = vacattrstats[attno];
1284                 HeapTuple       stup,
1285                                         oldtup;
1286                 int                     i,
1287                                         k,
1288                                         n;
1289                 Datum           values[Natts_pg_statistic];
1290                 bool            nulls[Natts_pg_statistic];
1291                 bool            replaces[Natts_pg_statistic];
1292
1293                 /* Ignore attr if we weren't able to collect stats */
1294                 if (!stats->stats_valid)
1295                         continue;
1296
1297                 /*
1298                  * Construct a new pg_statistic tuple
1299                  */
1300                 for (i = 0; i < Natts_pg_statistic; ++i)
1301                 {
1302                         nulls[i] = false;
1303                         replaces[i] = true;
1304                 }
1305
1306                 i = 0;
1307                 values[i++] = ObjectIdGetDatum(relid);  /* starelid */
1308                 values[i++] = Int16GetDatum(stats->attr->attnum);               /* staattnum */
1309                 values[i++] = Float4GetDatum(stats->stanullfrac);               /* stanullfrac */
1310                 values[i++] = Int32GetDatum(stats->stawidth);   /* stawidth */
1311                 values[i++] = Float4GetDatum(stats->stadistinct);               /* stadistinct */
1312                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1313                 {
1314                         values[i++] = Int16GetDatum(stats->stakind[k]);         /* stakindN */
1315                 }
1316                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1317                 {
1318                         values[i++] = ObjectIdGetDatum(stats->staop[k]);        /* staopN */
1319                 }
1320                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1321                 {
1322                         int                     nnum = stats->numnumbers[k];
1323
1324                         if (nnum > 0)
1325                         {
1326                                 Datum      *numdatums = (Datum *) palloc(nnum * sizeof(Datum));
1327                                 ArrayType  *arry;
1328
1329                                 for (n = 0; n < nnum; n++)
1330                                         numdatums[n] = Float4GetDatum(stats->stanumbers[k][n]);
1331                                 /* XXX knows more than it should about type float4: */
1332                                 arry = construct_array(numdatums, nnum,
1333                                                                            FLOAT4OID,
1334                                                                            sizeof(float4), FLOAT4PASSBYVAL, 'i');
1335                                 values[i++] = PointerGetDatum(arry);    /* stanumbersN */
1336                         }
1337                         else
1338                         {
1339                                 nulls[i] = true;
1340                                 values[i++] = (Datum) 0;
1341                         }
1342                 }
1343                 for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
1344                 {
1345                         if (stats->numvalues[k] > 0)
1346                         {
1347                                 ArrayType  *arry;
1348
1349                                 arry = construct_array(stats->stavalues[k],
1350                                                                            stats->numvalues[k],
1351                                                                            stats->statypid[k],
1352                                                                            stats->statyplen[k],
1353                                                                            stats->statypbyval[k],
1354                                                                            stats->statypalign[k]);
1355                                 values[i++] = PointerGetDatum(arry);    /* stavaluesN */
1356                         }
1357                         else
1358                         {
1359                                 nulls[i] = true;
1360                                 values[i++] = (Datum) 0;
1361                         }
1362                 }
1363
1364                 /* Is there already a pg_statistic tuple for this attribute? */
1365                 oldtup = SearchSysCache(STATRELATT,
1366                                                                 ObjectIdGetDatum(relid),
1367                                                                 Int16GetDatum(stats->attr->attnum),
1368                                                                 0, 0);
1369
1370                 if (HeapTupleIsValid(oldtup))
1371                 {
1372                         /* Yes, replace it */
1373                         stup = heap_modify_tuple(oldtup,
1374                                                                         RelationGetDescr(sd),
1375                                                                         values,
1376                                                                         nulls,
1377                                                                         replaces);
1378                         ReleaseSysCache(oldtup);
1379                         simple_heap_update(sd, &stup->t_self, stup);
1380                 }
1381                 else
1382                 {
1383                         /* No, insert new tuple */
1384                         stup = heap_form_tuple(RelationGetDescr(sd), values, nulls);
1385                         simple_heap_insert(sd, stup);
1386                 }
1387
1388                 /* update indexes too */
1389                 CatalogUpdateIndexes(sd, stup);
1390
1391                 heap_freetuple(stup);
1392         }
1393
1394         heap_close(sd, RowExclusiveLock);
1395 }
1396
1397 /*
1398  * Standard fetch function for use by compute_stats subroutines.
1399  *
1400  * This exists to provide some insulation between compute_stats routines
1401  * and the actual storage of the sample data.
1402  */
1403 static Datum
1404 std_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1405 {
1406         int                     attnum = stats->tupattnum;
1407         HeapTuple       tuple = stats->rows[rownum];
1408         TupleDesc       tupDesc = stats->tupDesc;
1409
1410         return heap_getattr(tuple, attnum, tupDesc, isNull);
1411 }
1412
1413 /*
1414  * Fetch function for analyzing index expressions.
1415  *
1416  * We have not bothered to construct index tuples, instead the data is
1417  * just in Datum arrays.
1418  */
1419 static Datum
1420 ind_fetch_func(VacAttrStatsP stats, int rownum, bool *isNull)
1421 {
1422         int                     i;
1423
1424         /* exprvals and exprnulls are already offset for proper column */
1425         i = rownum * stats->rowstride;
1426         *isNull = stats->exprnulls[i];
1427         return stats->exprvals[i];
1428 }
1429
1430
1431 /*==========================================================================
1432  *
1433  * Code below this point represents the "standard" type-specific statistics
1434  * analysis algorithms.  This code can be replaced on a per-data-type basis
1435  * by setting a nonzero value in pg_type.typanalyze.
1436  *
1437  *==========================================================================
1438  */
1439
1440
1441 /*
1442  * To avoid consuming too much memory during analysis and/or too much space
1443  * in the resulting pg_statistic rows, we ignore varlena datums that are wider
1444  * than WIDTH_THRESHOLD (after detoasting!).  This is legitimate for MCV
1445  * and distinct-value calculations since a wide value is unlikely to be
1446  * duplicated at all, much less be a most-common value.  For the same reason,
1447  * ignoring wide values will not affect our estimates of histogram bin
1448  * boundaries very much.
1449  */
1450 #define WIDTH_THRESHOLD  1024
1451
1452 #define swapInt(a,b)    do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1453 #define swapDatum(a,b)  do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
1454
1455 /*
1456  * Extra information used by the default analysis routines
1457  */
1458 typedef struct
1459 {
1460         Oid                     eqopr;                  /* '=' operator for datatype, if any */
1461         Oid                     eqfunc;                 /* and associated function */
1462         Oid                     ltopr;                  /* '<' operator for datatype, if any */
1463 } StdAnalyzeData;
1464
1465 typedef struct
1466 {
1467         Datum           value;                  /* a data value */
1468         int                     tupno;                  /* position index for tuple it came from */
1469 } ScalarItem;
1470
1471 typedef struct
1472 {
1473         int                     count;                  /* # of duplicates */
1474         int                     first;                  /* values[] index of first occurrence */
1475 } ScalarMCVItem;
1476
1477 typedef struct
1478 {
1479         FmgrInfo   *cmpFn;
1480         int                     cmpFlags;
1481         int                *tupnoLink;
1482 } CompareScalarsContext;
1483
1484
1485 static void compute_minimal_stats(VacAttrStatsP stats,
1486                                           AnalyzeAttrFetchFunc fetchfunc,
1487                                           int samplerows,
1488                                           double totalrows);
1489 static void compute_scalar_stats(VacAttrStatsP stats,
1490                                          AnalyzeAttrFetchFunc fetchfunc,
1491                                          int samplerows,
1492                                          double totalrows);
1493 static int      compare_scalars(const void *a, const void *b, void *arg);
1494 static int      compare_mcvs(const void *a, const void *b);
1495
1496
1497 /*
1498  * std_typanalyze -- the default type-specific typanalyze function
1499  */
1500 static bool
1501 std_typanalyze(VacAttrStats *stats)
1502 {
1503         Form_pg_attribute attr = stats->attr;
1504         Oid                     ltopr;
1505         Oid                     eqopr;
1506         StdAnalyzeData *mystats;
1507
1508         /* If the attstattarget column is negative, use the default value */
1509         /* NB: it is okay to scribble on stats->attr since it's a copy */
1510         if (attr->attstattarget < 0)
1511                 attr->attstattarget = default_statistics_target;
1512
1513         /* Look for default "<" and "=" operators for column's type */
1514         get_sort_group_operators(attr->atttypid,
1515                                                          false, false, false,
1516                                                          &ltopr, &eqopr, NULL);
1517
1518         /* If column has no "=" operator, we can't do much of anything */
1519         if (!OidIsValid(eqopr))
1520                 return false;
1521
1522         /* Save the operator info for compute_stats routines */
1523         mystats = (StdAnalyzeData *) palloc(sizeof(StdAnalyzeData));
1524         mystats->eqopr = eqopr;
1525         mystats->eqfunc = get_opcode(eqopr);
1526         mystats->ltopr = ltopr;
1527         stats->extra_data = mystats;
1528
1529         /*
1530          * Determine which standard statistics algorithm to use
1531          */
1532         if (OidIsValid(ltopr))
1533         {
1534                 /* Seems to be a scalar datatype */
1535                 stats->compute_stats = compute_scalar_stats;
1536                 /*--------------------
1537                  * The following choice of minrows is based on the paper
1538                  * "Random sampling for histogram construction: how much is enough?"
1539                  * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
1540                  * Proceedings of ACM SIGMOD International Conference on Management
1541                  * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
1542                  * says that for table size n, histogram size k, maximum relative
1543                  * error in bin size f, and error probability gamma, the minimum
1544                  * random sample size is
1545                  *              r = 4 * k * ln(2*n/gamma) / f^2
1546                  * Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
1547                  *              r = 305.82 * k
1548                  * Note that because of the log function, the dependence on n is
1549                  * quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
1550                  * bin size error with probability 0.99.  So there's no real need to
1551                  * scale for n, which is a good thing because we don't necessarily
1552                  * know it at this point.
1553                  *--------------------
1554                  */
1555                 stats->minrows = 300 * attr->attstattarget;
1556         }
1557         else
1558         {
1559                 /* Can't do much but the minimal stuff */
1560                 stats->compute_stats = compute_minimal_stats;
1561                 /* Might as well use the same minrows as above */
1562                 stats->minrows = 300 * attr->attstattarget;
1563         }
1564
1565         return true;
1566 }
1567
1568 /*
1569  *      compute_minimal_stats() -- compute minimal column statistics
1570  *
1571  *      We use this when we can find only an "=" operator for the datatype.
1572  *
1573  *      We determine the fraction of non-null rows, the average width, the
1574  *      most common values, and the (estimated) number of distinct values.
1575  *
1576  *      The most common values are determined by brute force: we keep a list
1577  *      of previously seen values, ordered by number of times seen, as we scan
1578  *      the samples.  A newly seen value is inserted just after the last
1579  *      multiply-seen value, causing the bottommost (oldest) singly-seen value
1580  *      to drop off the list.  The accuracy of this method, and also its cost,
1581  *      depend mainly on the length of the list we are willing to keep.
1582  */
1583 static void
1584 compute_minimal_stats(VacAttrStatsP stats,
1585                                           AnalyzeAttrFetchFunc fetchfunc,
1586                                           int samplerows,
1587                                           double totalrows)
1588 {
1589         int                     i;
1590         int                     null_cnt = 0;
1591         int                     nonnull_cnt = 0;
1592         int                     toowide_cnt = 0;
1593         double          total_width = 0;
1594         bool            is_varlena = (!stats->attr->attbyval &&
1595                                                           stats->attr->attlen == -1);
1596         bool            is_varwidth = (!stats->attr->attbyval &&
1597                                                            stats->attr->attlen < 0);
1598         FmgrInfo        f_cmpeq;
1599         typedef struct
1600         {
1601                 Datum           value;
1602                 int                     count;
1603         } TrackItem;
1604         TrackItem  *track;
1605         int                     track_cnt,
1606                                 track_max;
1607         int                     num_mcv = stats->attr->attstattarget;
1608         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1609
1610         /*
1611          * We track up to 2*n values for an n-element MCV list; but at least 10
1612          */
1613         track_max = 2 * num_mcv;
1614         if (track_max < 10)
1615                 track_max = 10;
1616         track = (TrackItem *) palloc(track_max * sizeof(TrackItem));
1617         track_cnt = 0;
1618
1619         fmgr_info(mystats->eqfunc, &f_cmpeq);
1620
1621         for (i = 0; i < samplerows; i++)
1622         {
1623                 Datum           value;
1624                 bool            isnull;
1625                 bool            match;
1626                 int                     firstcount1,
1627                                         j;
1628
1629                 vacuum_delay_point();
1630
1631                 value = fetchfunc(stats, i, &isnull);
1632
1633                 /* Check for null/nonnull */
1634                 if (isnull)
1635                 {
1636                         null_cnt++;
1637                         continue;
1638                 }
1639                 nonnull_cnt++;
1640
1641                 /*
1642                  * If it's a variable-width field, add up widths for average width
1643                  * calculation.  Note that if the value is toasted, we use the toasted
1644                  * width.  We don't bother with this calculation if it's a fixed-width
1645                  * type.
1646                  */
1647                 if (is_varlena)
1648                 {
1649                         total_width += VARSIZE_ANY(DatumGetPointer(value));
1650
1651                         /*
1652                          * If the value is toasted, we want to detoast it just once to
1653                          * avoid repeated detoastings and resultant excess memory usage
1654                          * during the comparisons.      Also, check to see if the value is
1655                          * excessively wide, and if so don't detoast at all --- just
1656                          * ignore the value.
1657                          */
1658                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
1659                         {
1660                                 toowide_cnt++;
1661                                 continue;
1662                         }
1663                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
1664                 }
1665                 else if (is_varwidth)
1666                 {
1667                         /* must be cstring */
1668                         total_width += strlen(DatumGetCString(value)) + 1;
1669                 }
1670
1671                 /*
1672                  * See if the value matches anything we're already tracking.
1673                  */
1674                 match = false;
1675                 firstcount1 = track_cnt;
1676                 for (j = 0; j < track_cnt; j++)
1677                 {
1678                         if (DatumGetBool(FunctionCall2(&f_cmpeq, value, track[j].value)))
1679                         {
1680                                 match = true;
1681                                 break;
1682                         }
1683                         if (j < firstcount1 && track[j].count == 1)
1684                                 firstcount1 = j;
1685                 }
1686
1687                 if (match)
1688                 {
1689                         /* Found a match */
1690                         track[j].count++;
1691                         /* This value may now need to "bubble up" in the track list */
1692                         while (j > 0 && track[j].count > track[j - 1].count)
1693                         {
1694                                 swapDatum(track[j].value, track[j - 1].value);
1695                                 swapInt(track[j].count, track[j - 1].count);
1696                                 j--;
1697                         }
1698                 }
1699                 else
1700                 {
1701                         /* No match.  Insert at head of count-1 list */
1702                         if (track_cnt < track_max)
1703                                 track_cnt++;
1704                         for (j = track_cnt - 1; j > firstcount1; j--)
1705                         {
1706                                 track[j].value = track[j - 1].value;
1707                                 track[j].count = track[j - 1].count;
1708                         }
1709                         if (firstcount1 < track_cnt)
1710                         {
1711                                 track[firstcount1].value = value;
1712                                 track[firstcount1].count = 1;
1713                         }
1714                 }
1715         }
1716
1717         /* We can only compute real stats if we found some non-null values. */
1718         if (nonnull_cnt > 0)
1719         {
1720                 int                     nmultiple,
1721                                         summultiple;
1722
1723                 stats->stats_valid = true;
1724                 /* Do the simple null-frac and width stats */
1725                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
1726                 if (is_varwidth)
1727                         stats->stawidth = total_width / (double) nonnull_cnt;
1728                 else
1729                         stats->stawidth = stats->attrtype->typlen;
1730
1731                 /* Count the number of values we found multiple times */
1732                 summultiple = 0;
1733                 for (nmultiple = 0; nmultiple < track_cnt; nmultiple++)
1734                 {
1735                         if (track[nmultiple].count == 1)
1736                                 break;
1737                         summultiple += track[nmultiple].count;
1738                 }
1739
1740                 if (nmultiple == 0)
1741                 {
1742                         /* If we found no repeated values, assume it's a unique column */
1743                         stats->stadistinct = -1.0;
1744                 }
1745                 else if (track_cnt < track_max && toowide_cnt == 0 &&
1746                                  nmultiple == track_cnt)
1747                 {
1748                         /*
1749                          * Our track list includes every value in the sample, and every
1750                          * value appeared more than once.  Assume the column has just
1751                          * these values.
1752                          */
1753                         stats->stadistinct = track_cnt;
1754                 }
1755                 else
1756                 {
1757                         /*----------
1758                          * Estimate the number of distinct values using the estimator
1759                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
1760                          *              n*d / (n - f1 + f1*n/N)
1761                          * where f1 is the number of distinct values that occurred
1762                          * exactly once in our sample of n rows (from a total of N),
1763                          * and d is the total number of distinct values in the sample.
1764                          * This is their Duj1 estimator; the other estimators they
1765                          * recommend are considerably more complex, and are numerically
1766                          * very unstable when n is much smaller than N.
1767                          *
1768                          * We assume (not very reliably!) that all the multiply-occurring
1769                          * values are reflected in the final track[] list, and the other
1770                          * nonnull values all appeared but once.  (XXX this usually
1771                          * results in a drastic overestimate of ndistinct.      Can we do
1772                          * any better?)
1773                          *----------
1774                          */
1775                         int                     f1 = nonnull_cnt - summultiple;
1776                         int                     d = f1 + nmultiple;
1777                         double          numer,
1778                                                 denom,
1779                                                 stadistinct;
1780
1781                         numer = (double) samplerows *(double) d;
1782
1783                         denom = (double) (samplerows - f1) +
1784                                 (double) f1 *(double) samplerows / totalrows;
1785
1786                         stadistinct = numer / denom;
1787                         /* Clamp to sane range in case of roundoff error */
1788                         if (stadistinct < (double) d)
1789                                 stadistinct = (double) d;
1790                         if (stadistinct > totalrows)
1791                                 stadistinct = totalrows;
1792                         stats->stadistinct = floor(stadistinct + 0.5);
1793                 }
1794
1795                 /*
1796                  * If we estimated the number of distinct values at more than 10% of
1797                  * the total row count (a very arbitrary limit), then assume that
1798                  * stadistinct should scale with the row count rather than be a fixed
1799                  * value.
1800                  */
1801                 if (stats->stadistinct > 0.1 * totalrows)
1802                         stats->stadistinct = -(stats->stadistinct / totalrows);
1803
1804                 /*
1805                  * Decide how many values are worth storing as most-common values. If
1806                  * we are able to generate a complete MCV list (all the values in the
1807                  * sample will fit, and we think these are all the ones in the table),
1808                  * then do so.  Otherwise, store only those values that are
1809                  * significantly more common than the (estimated) average. We set the
1810                  * threshold rather arbitrarily at 25% more than average, with at
1811                  * least 2 instances in the sample.
1812                  */
1813                 if (track_cnt < track_max && toowide_cnt == 0 &&
1814                         stats->stadistinct > 0 &&
1815                         track_cnt <= num_mcv)
1816                 {
1817                         /* Track list includes all values seen, and all will fit */
1818                         num_mcv = track_cnt;
1819                 }
1820                 else
1821                 {
1822                         double          ndistinct = stats->stadistinct;
1823                         double          avgcount,
1824                                                 mincount;
1825
1826                         if (ndistinct < 0)
1827                                 ndistinct = -ndistinct * totalrows;
1828                         /* estimate # of occurrences in sample of a typical value */
1829                         avgcount = (double) samplerows / ndistinct;
1830                         /* set minimum threshold count to store a value */
1831                         mincount = avgcount * 1.25;
1832                         if (mincount < 2)
1833                                 mincount = 2;
1834                         if (num_mcv > track_cnt)
1835                                 num_mcv = track_cnt;
1836                         for (i = 0; i < num_mcv; i++)
1837                         {
1838                                 if (track[i].count < mincount)
1839                                 {
1840                                         num_mcv = i;
1841                                         break;
1842                                 }
1843                         }
1844                 }
1845
1846                 /* Generate MCV slot entry */
1847                 if (num_mcv > 0)
1848                 {
1849                         MemoryContext old_context;
1850                         Datum      *mcv_values;
1851                         float4     *mcv_freqs;
1852
1853                         /* Must copy the target values into anl_context */
1854                         old_context = MemoryContextSwitchTo(stats->anl_context);
1855                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
1856                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
1857                         for (i = 0; i < num_mcv; i++)
1858                         {
1859                                 mcv_values[i] = datumCopy(track[i].value,
1860                                                                                   stats->attr->attbyval,
1861                                                                                   stats->attr->attlen);
1862                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
1863                         }
1864                         MemoryContextSwitchTo(old_context);
1865
1866                         stats->stakind[0] = STATISTIC_KIND_MCV;
1867                         stats->staop[0] = mystats->eqopr;
1868                         stats->stanumbers[0] = mcv_freqs;
1869                         stats->numnumbers[0] = num_mcv;
1870                         stats->stavalues[0] = mcv_values;
1871                         stats->numvalues[0] = num_mcv;
1872                         /*
1873                          * Accept the defaults for stats->statypid and others.
1874                          * They have been set before we were called (see vacuum.h)
1875                          */
1876                 }
1877         }
1878         else if (null_cnt > 0)
1879         {
1880                 /* We found only nulls; assume the column is entirely null */
1881                 stats->stats_valid = true;
1882                 stats->stanullfrac = 1.0;
1883                 if (is_varwidth)
1884                         stats->stawidth = 0;    /* "unknown" */
1885                 else
1886                         stats->stawidth = stats->attrtype->typlen;
1887                 stats->stadistinct = 0.0;               /* "unknown" */
1888         }
1889
1890         /* We don't need to bother cleaning up any of our temporary palloc's */
1891 }
1892
1893
1894 /*
1895  *      compute_scalar_stats() -- compute column statistics
1896  *
1897  *      We use this when we can find "=" and "<" operators for the datatype.
1898  *
1899  *      We determine the fraction of non-null rows, the average width, the
1900  *      most common values, the (estimated) number of distinct values, the
1901  *      distribution histogram, and the correlation of physical to logical order.
1902  *
1903  *      The desired stats can be determined fairly easily after sorting the
1904  *      data values into order.
1905  */
1906 static void
1907 compute_scalar_stats(VacAttrStatsP stats,
1908                                          AnalyzeAttrFetchFunc fetchfunc,
1909                                          int samplerows,
1910                                          double totalrows)
1911 {
1912         int                     i;
1913         int                     null_cnt = 0;
1914         int                     nonnull_cnt = 0;
1915         int                     toowide_cnt = 0;
1916         double          total_width = 0;
1917         bool            is_varlena = (!stats->attr->attbyval &&
1918                                                           stats->attr->attlen == -1);
1919         bool            is_varwidth = (!stats->attr->attbyval &&
1920                                                            stats->attr->attlen < 0);
1921         double          corr_xysum;
1922         Oid                     cmpFn;
1923         int                     cmpFlags;
1924         FmgrInfo        f_cmpfn;
1925         ScalarItem *values;
1926         int                     values_cnt = 0;
1927         int                *tupnoLink;
1928         ScalarMCVItem *track;
1929         int                     track_cnt = 0;
1930         int                     num_mcv = stats->attr->attstattarget;
1931         int                     num_bins = stats->attr->attstattarget;
1932         StdAnalyzeData *mystats = (StdAnalyzeData *) stats->extra_data;
1933
1934         values = (ScalarItem *) palloc(samplerows * sizeof(ScalarItem));
1935         tupnoLink = (int *) palloc(samplerows * sizeof(int));
1936         track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));
1937
1938         SelectSortFunction(mystats->ltopr, false, &cmpFn, &cmpFlags);
1939         fmgr_info(cmpFn, &f_cmpfn);
1940
1941         /* Initial scan to find sortable values */
1942         for (i = 0; i < samplerows; i++)
1943         {
1944                 Datum           value;
1945                 bool            isnull;
1946
1947                 vacuum_delay_point();
1948
1949                 value = fetchfunc(stats, i, &isnull);
1950
1951                 /* Check for null/nonnull */
1952                 if (isnull)
1953                 {
1954                         null_cnt++;
1955                         continue;
1956                 }
1957                 nonnull_cnt++;
1958
1959                 /*
1960                  * If it's a variable-width field, add up widths for average width
1961                  * calculation.  Note that if the value is toasted, we use the toasted
1962                  * width.  We don't bother with this calculation if it's a fixed-width
1963                  * type.
1964                  */
1965                 if (is_varlena)
1966                 {
1967                         total_width += VARSIZE_ANY(DatumGetPointer(value));
1968
1969                         /*
1970                          * If the value is toasted, we want to detoast it just once to
1971                          * avoid repeated detoastings and resultant excess memory usage
1972                          * during the comparisons.      Also, check to see if the value is
1973                          * excessively wide, and if so don't detoast at all --- just
1974                          * ignore the value.
1975                          */
1976                         if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
1977                         {
1978                                 toowide_cnt++;
1979                                 continue;
1980                         }
1981                         value = PointerGetDatum(PG_DETOAST_DATUM(value));
1982                 }
1983                 else if (is_varwidth)
1984                 {
1985                         /* must be cstring */
1986                         total_width += strlen(DatumGetCString(value)) + 1;
1987                 }
1988
1989                 /* Add it to the list to be sorted */
1990                 values[values_cnt].value = value;
1991                 values[values_cnt].tupno = values_cnt;
1992                 tupnoLink[values_cnt] = values_cnt;
1993                 values_cnt++;
1994         }
1995
1996         /* We can only compute real stats if we found some sortable values. */
1997         if (values_cnt > 0)
1998         {
1999                 int                     ndistinct,      /* # distinct values in sample */
2000                                         nmultiple,      /* # that appear multiple times */
2001                                         num_hist,
2002                                         dups_cnt;
2003                 int                     slot_idx = 0;
2004                 CompareScalarsContext cxt;
2005
2006                 /* Sort the collected values */
2007                 cxt.cmpFn = &f_cmpfn;
2008                 cxt.cmpFlags = cmpFlags;
2009                 cxt.tupnoLink = tupnoLink;
2010                 qsort_arg((void *) values, values_cnt, sizeof(ScalarItem),
2011                                   compare_scalars, (void *) &cxt);
2012
2013                 /*
2014                  * Now scan the values in order, find the most common ones, and also
2015                  * accumulate ordering-correlation statistics.
2016                  *
2017                  * To determine which are most common, we first have to count the
2018                  * number of duplicates of each value.  The duplicates are adjacent in
2019                  * the sorted list, so a brute-force approach is to compare successive
2020                  * datum values until we find two that are not equal. However, that
2021                  * requires N-1 invocations of the datum comparison routine, which are
2022                  * completely redundant with work that was done during the sort.  (The
2023                  * sort algorithm must at some point have compared each pair of items
2024                  * that are adjacent in the sorted order; otherwise it could not know
2025                  * that it's ordered the pair correctly.) We exploit this by having
2026                  * compare_scalars remember the highest tupno index that each
2027                  * ScalarItem has been found equal to.  At the end of the sort, a
2028                  * ScalarItem's tupnoLink will still point to itself if and only if it
2029                  * is the last item of its group of duplicates (since the group will
2030                  * be ordered by tupno).
2031                  */
2032                 corr_xysum = 0;
2033                 ndistinct = 0;
2034                 nmultiple = 0;
2035                 dups_cnt = 0;
2036                 for (i = 0; i < values_cnt; i++)
2037                 {
2038                         int                     tupno = values[i].tupno;
2039
2040                         corr_xysum += ((double) i) * ((double) tupno);
2041                         dups_cnt++;
2042                         if (tupnoLink[tupno] == tupno)
2043                         {
2044                                 /* Reached end of duplicates of this value */
2045                                 ndistinct++;
2046                                 if (dups_cnt > 1)
2047                                 {
2048                                         nmultiple++;
2049                                         if (track_cnt < num_mcv ||
2050                                                 dups_cnt > track[track_cnt - 1].count)
2051                                         {
2052                                                 /*
2053                                                  * Found a new item for the mcv list; find its
2054                                                  * position, bubbling down old items if needed. Loop
2055                                                  * invariant is that j points at an empty/ replaceable
2056                                                  * slot.
2057                                                  */
2058                                                 int                     j;
2059
2060                                                 if (track_cnt < num_mcv)
2061                                                         track_cnt++;
2062                                                 for (j = track_cnt - 1; j > 0; j--)
2063                                                 {
2064                                                         if (dups_cnt <= track[j - 1].count)
2065                                                                 break;
2066                                                         track[j].count = track[j - 1].count;
2067                                                         track[j].first = track[j - 1].first;
2068                                                 }
2069                                                 track[j].count = dups_cnt;
2070                                                 track[j].first = i + 1 - dups_cnt;
2071                                         }
2072                                 }
2073                                 dups_cnt = 0;
2074                         }
2075                 }
2076
2077                 stats->stats_valid = true;
2078                 /* Do the simple null-frac and width stats */
2079                 stats->stanullfrac = (double) null_cnt / (double) samplerows;
2080                 if (is_varwidth)
2081                         stats->stawidth = total_width / (double) nonnull_cnt;
2082                 else
2083                         stats->stawidth = stats->attrtype->typlen;
2084
2085                 if (nmultiple == 0)
2086                 {
2087                         /* If we found no repeated values, assume it's a unique column */
2088                         stats->stadistinct = -1.0;
2089                 }
2090                 else if (toowide_cnt == 0 && nmultiple == ndistinct)
2091                 {
2092                         /*
2093                          * Every value in the sample appeared more than once.  Assume the
2094                          * column has just these values.
2095                          */
2096                         stats->stadistinct = ndistinct;
2097                 }
2098                 else
2099                 {
2100                         /*----------
2101                          * Estimate the number of distinct values using the estimator
2102                          * proposed by Haas and Stokes in IBM Research Report RJ 10025:
2103                          *              n*d / (n - f1 + f1*n/N)
2104                          * where f1 is the number of distinct values that occurred
2105                          * exactly once in our sample of n rows (from a total of N),
2106                          * and d is the total number of distinct values in the sample.
2107                          * This is their Duj1 estimator; the other estimators they
2108                          * recommend are considerably more complex, and are numerically
2109                          * very unstable when n is much smaller than N.
2110                          *
2111                          * Overwidth values are assumed to have been distinct.
2112                          *----------
2113                          */
2114                         int                     f1 = ndistinct - nmultiple + toowide_cnt;
2115                         int                     d = f1 + nmultiple;
2116                         double          numer,
2117                                                 denom,
2118                                                 stadistinct;
2119
2120                         numer = (double) samplerows *(double) d;
2121
2122                         denom = (double) (samplerows - f1) +
2123                                 (double) f1 *(double) samplerows / totalrows;
2124
2125                         stadistinct = numer / denom;
2126                         /* Clamp to sane range in case of roundoff error */
2127                         if (stadistinct < (double) d)
2128                                 stadistinct = (double) d;
2129                         if (stadistinct > totalrows)
2130                                 stadistinct = totalrows;
2131                         stats->stadistinct = floor(stadistinct + 0.5);
2132                 }
2133
2134                 /*
2135                  * If we estimated the number of distinct values at more than 10% of
2136                  * the total row count (a very arbitrary limit), then assume that
2137                  * stadistinct should scale with the row count rather than be a fixed
2138                  * value.
2139                  */
2140                 if (stats->stadistinct > 0.1 * totalrows)
2141                         stats->stadistinct = -(stats->stadistinct / totalrows);
2142
2143                 /*
2144                  * Decide how many values are worth storing as most-common values. If
2145                  * we are able to generate a complete MCV list (all the values in the
2146                  * sample will fit, and we think these are all the ones in the table),
2147                  * then do so.  Otherwise, store only those values that are
2148                  * significantly more common than the (estimated) average. We set the
2149                  * threshold rather arbitrarily at 25% more than average, with at
2150                  * least 2 instances in the sample.  Also, we won't suppress values
2151                  * that have a frequency of at least 1/K where K is the intended
2152                  * number of histogram bins; such values might otherwise cause us to
2153                  * emit duplicate histogram bin boundaries.  (We might end up with
2154                  * duplicate histogram entries anyway, if the distribution is skewed;
2155                  * but we prefer to treat such values as MCVs if at all possible.)
2156                  */
2157                 if (track_cnt == ndistinct && toowide_cnt == 0 &&
2158                         stats->stadistinct > 0 &&
2159                         track_cnt <= num_mcv)
2160                 {
2161                         /* Track list includes all values seen, and all will fit */
2162                         num_mcv = track_cnt;
2163                 }
2164                 else
2165                 {
2166                         double          ndistinct = stats->stadistinct;
2167                         double          avgcount,
2168                                                 mincount,
2169                                                 maxmincount;
2170
2171                         if (ndistinct < 0)
2172                                 ndistinct = -ndistinct * totalrows;
2173                         /* estimate # of occurrences in sample of a typical value */
2174                         avgcount = (double) samplerows / ndistinct;
2175                         /* set minimum threshold count to store a value */
2176                         mincount = avgcount * 1.25;
2177                         if (mincount < 2)
2178                                 mincount = 2;
2179                         /* don't let threshold exceed 1/K, however */
2180                         maxmincount = (double) samplerows / (double) num_bins;
2181                         if (mincount > maxmincount)
2182                                 mincount = maxmincount;
2183                         if (num_mcv > track_cnt)
2184                                 num_mcv = track_cnt;
2185                         for (i = 0; i < num_mcv; i++)
2186                         {
2187                                 if (track[i].count < mincount)
2188                                 {
2189                                         num_mcv = i;
2190                                         break;
2191                                 }
2192                         }
2193                 }
2194
2195                 /* Generate MCV slot entry */
2196                 if (num_mcv > 0)
2197                 {
2198                         MemoryContext old_context;
2199                         Datum      *mcv_values;
2200                         float4     *mcv_freqs;
2201
2202                         /* Must copy the target values into anl_context */
2203                         old_context = MemoryContextSwitchTo(stats->anl_context);
2204                         mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
2205                         mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
2206                         for (i = 0; i < num_mcv; i++)
2207                         {
2208                                 mcv_values[i] = datumCopy(values[track[i].first].value,
2209                                                                                   stats->attr->attbyval,
2210                                                                                   stats->attr->attlen);
2211                                 mcv_freqs[i] = (double) track[i].count / (double) samplerows;
2212                         }
2213                         MemoryContextSwitchTo(old_context);
2214
2215                         stats->stakind[slot_idx] = STATISTIC_KIND_MCV;
2216                         stats->staop[slot_idx] = mystats->eqopr;
2217                         stats->stanumbers[slot_idx] = mcv_freqs;
2218                         stats->numnumbers[slot_idx] = num_mcv;
2219                         stats->stavalues[slot_idx] = mcv_values;
2220                         stats->numvalues[slot_idx] = num_mcv;
2221                         /*
2222                          * Accept the defaults for stats->statypid and others.
2223                          * They have been set before we were called (see vacuum.h)
2224                          */
2225                         slot_idx++;
2226                 }
2227
2228                 /*
2229                  * Generate a histogram slot entry if there are at least two distinct
2230                  * values not accounted for in the MCV list.  (This ensures the
2231                  * histogram won't collapse to empty or a singleton.)
2232                  */
2233                 num_hist = ndistinct - num_mcv;
2234                 if (num_hist > num_bins)
2235                         num_hist = num_bins + 1;
2236                 if (num_hist >= 2)
2237                 {
2238                         MemoryContext old_context;
2239                         Datum      *hist_values;
2240                         int                     nvals;
2241
2242                         /* Sort the MCV items into position order to speed next loop */
2243                         qsort((void *) track, num_mcv,
2244                                   sizeof(ScalarMCVItem), compare_mcvs);
2245
2246                         /*
2247                          * Collapse out the MCV items from the values[] array.
2248                          *
2249                          * Note we destroy the values[] array here... but we don't need it
2250                          * for anything more.  We do, however, still need values_cnt.
2251                          * nvals will be the number of remaining entries in values[].
2252                          */
2253                         if (num_mcv > 0)
2254                         {
2255                                 int                     src,
2256                                                         dest;
2257                                 int                     j;
2258
2259                                 src = dest = 0;
2260                                 j = 0;                  /* index of next interesting MCV item */
2261                                 while (src < values_cnt)
2262                                 {
2263                                         int                     ncopy;
2264
2265                                         if (j < num_mcv)
2266                                         {
2267                                                 int                     first = track[j].first;
2268
2269                                                 if (src >= first)
2270                                                 {
2271                                                         /* advance past this MCV item */
2272                                                         src = first + track[j].count;
2273                                                         j++;
2274                                                         continue;
2275                                                 }
2276                                                 ncopy = first - src;
2277                                         }
2278                                         else
2279                                                 ncopy = values_cnt - src;
2280                                         memmove(&values[dest], &values[src],
2281                                                         ncopy * sizeof(ScalarItem));
2282                                         src += ncopy;
2283                                         dest += ncopy;
2284                                 }
2285                                 nvals = dest;
2286                         }
2287                         else
2288                                 nvals = values_cnt;
2289                         Assert(nvals >= num_hist);
2290
2291                         /* Must copy the target values into anl_context */
2292                         old_context = MemoryContextSwitchTo(stats->anl_context);
2293                         hist_values = (Datum *) palloc(num_hist * sizeof(Datum));
2294                         for (i = 0; i < num_hist; i++)
2295                         {
2296                                 int                     pos;
2297
2298                                 pos = (i * (nvals - 1)) / (num_hist - 1);
2299                                 hist_values[i] = datumCopy(values[pos].value,
2300                                                                                    stats->attr->attbyval,
2301                                                                                    stats->attr->attlen);
2302                         }
2303                         MemoryContextSwitchTo(old_context);
2304
2305                         stats->stakind[slot_idx] = STATISTIC_KIND_HISTOGRAM;
2306                         stats->staop[slot_idx] = mystats->ltopr;
2307                         stats->stavalues[slot_idx] = hist_values;
2308                         stats->numvalues[slot_idx] = num_hist;
2309                         /*
2310                          * Accept the defaults for stats->statypid and others.
2311                          * They have been set before we were called (see vacuum.h)
2312                          */
2313                         slot_idx++;
2314                 }
2315
2316                 /* Generate a correlation entry if there are multiple values */
2317                 if (values_cnt > 1)
2318                 {
2319                         MemoryContext old_context;
2320                         float4     *corrs;
2321                         double          corr_xsum,
2322                                                 corr_x2sum;
2323
2324                         /* Must copy the target values into anl_context */
2325                         old_context = MemoryContextSwitchTo(stats->anl_context);
2326                         corrs = (float4 *) palloc(sizeof(float4));
2327                         MemoryContextSwitchTo(old_context);
2328
2329                         /*----------
2330                          * Since we know the x and y value sets are both
2331                          *              0, 1, ..., values_cnt-1
2332                          * we have sum(x) = sum(y) =
2333                          *              (values_cnt-1)*values_cnt / 2
2334                          * and sum(x^2) = sum(y^2) =
2335                          *              (values_cnt-1)*values_cnt*(2*values_cnt-1) / 6.
2336                          *----------
2337                          */
2338                         corr_xsum = ((double) (values_cnt - 1)) *
2339                                 ((double) values_cnt) / 2.0;
2340                         corr_x2sum = ((double) (values_cnt - 1)) *
2341                                 ((double) values_cnt) * (double) (2 * values_cnt - 1) / 6.0;
2342
2343                         /* And the correlation coefficient reduces to */
2344                         corrs[0] = (values_cnt * corr_xysum - corr_xsum * corr_xsum) /
2345                                 (values_cnt * corr_x2sum - corr_xsum * corr_xsum);
2346
2347                         stats->stakind[slot_idx] = STATISTIC_KIND_CORRELATION;
2348                         stats->staop[slot_idx] = mystats->ltopr;
2349                         stats->stanumbers[slot_idx] = corrs;
2350                         stats->numnumbers[slot_idx] = 1;
2351                         slot_idx++;
2352                 }
2353         }
2354         else if (nonnull_cnt == 0 && null_cnt > 0)
2355         {
2356                 /* We found only nulls; assume the column is entirely null */
2357                 stats->stats_valid = true;
2358                 stats->stanullfrac = 1.0;
2359                 if (is_varwidth)
2360                         stats->stawidth = 0;    /* "unknown" */
2361                 else
2362                         stats->stawidth = stats->attrtype->typlen;
2363                 stats->stadistinct = 0.0;               /* "unknown" */
2364         }
2365
2366         /* We don't need to bother cleaning up any of our temporary palloc's */
2367 }
2368
2369 /*
2370  * qsort_arg comparator for sorting ScalarItems
2371  *
2372  * Aside from sorting the items, we update the tupnoLink[] array
2373  * whenever two ScalarItems are found to contain equal datums.  The array
2374  * is indexed by tupno; for each ScalarItem, it contains the highest
2375  * tupno that that item's datum has been found to be equal to.  This allows
2376  * us to avoid additional comparisons in compute_scalar_stats().
2377  */
2378 static int
2379 compare_scalars(const void *a, const void *b, void *arg)
2380 {
2381         Datum           da = ((ScalarItem *) a)->value;
2382         int                     ta = ((ScalarItem *) a)->tupno;
2383         Datum           db = ((ScalarItem *) b)->value;
2384         int                     tb = ((ScalarItem *) b)->tupno;
2385         CompareScalarsContext *cxt = (CompareScalarsContext *) arg;
2386         int32           compare;
2387
2388         compare = ApplySortFunction(cxt->cmpFn, cxt->cmpFlags,
2389                                                                 da, false, db, false);
2390         if (compare != 0)
2391                 return compare;
2392
2393         /*
2394          * The two datums are equal, so update cxt->tupnoLink[].
2395          */
2396         if (cxt->tupnoLink[ta] < tb)
2397                 cxt->tupnoLink[ta] = tb;
2398         if (cxt->tupnoLink[tb] < ta)
2399                 cxt->tupnoLink[tb] = ta;
2400
2401         /*
2402          * For equal datums, sort by tupno
2403          */
2404         return ta - tb;
2405 }
2406
2407 /*
2408  * qsort comparator for sorting ScalarMCVItems by position
2409  */
2410 static int
2411 compare_mcvs(const void *a, const void *b)
2412 {
2413         int                     da = ((ScalarMCVItem *) a)->first;
2414         int                     db = ((ScalarMCVItem *) b)->first;
2415
2416         return da - db;
2417 }