]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Fix for indexing problems.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.81 1998/09/02 23:05:25 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include "postgres.h"
22
23 #include "miscadmin.h"
24 #include "access/genam.h"
25 #include "access/heapam.h"
26 #include "access/transam.h"
27 #include "access/xact.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/index.h"
31 #include "catalog/pg_class.h"
32 #include "catalog/pg_index.h"
33 #include "catalog/pg_operator.h"
34 #include "catalog/pg_statistic.h"
35 #include "catalog/pg_type.h"
36 #include "commands/vacuum.h"
37 #include "fmgr.h"
38 #include "parser/parse_oper.h"
39 #include "storage/bufmgr.h"
40 #include "storage/bufpage.h"
41 #include "storage/shmem.h"
42 #include "storage/smgr.h"
43 #include "storage/itemptr.h"
44 #include "storage/lmgr.h"
45 #include "utils/builtins.h"
46 #include "utils/inval.h"
47 #include "utils/mcxt.h"
48 #include "utils/portal.h"
49 #include "utils/syscache.h"
50
51 #ifndef HAVE_GETRUSAGE
52 #include <rusagestub.h>
53 #else
54 #include <sys/time.h>
55 #include <sys/resource.h>
56 #endif
57
58  /* #include <port-protos.h> *//* Why? */
59
60 extern int      BlowawayRelationBuffers(Relation rel, BlockNumber block);
61
62 bool            VacuumRunning = false;
63
64 static Portal vc_portal;
65
66 static int      MESSAGE_LEVEL;          /* message level */
67
68 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
69 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
70 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
71 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL )
72 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \
73                                                                    stats->f_cmpgt.fn_addr != NULL && \
74                                                                    RegProcedureIsValid(stats->outfunc) )
75
76
77 /* non-export function prototypes */
78 static void vc_init(void);
79 static void vc_shutdown(void);
80 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
81 static VRelList vc_getrels(NameData *VacRelP);
82 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
83 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages);
84 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages, VPageList fraged_pages, int nindices, Relation *Irel);
85 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
86 static void vc_vacpage(Page page, VPageDescr vpd);
87 static void vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples);
88 static void vc_scanoneind(Relation indrel, int num_tuples);
89 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple tuple);
90 static void vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int16 *bucket_len);
91 static void vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats);
92 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
93 static void vc_setpagelock(Relation rel, BlockNumber blkno);
94 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
95 static void vc_reappage(VPageList vpl, VPageDescr vpc);
96 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
97 static void vc_free(VRelList vrl);
98 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
99 static void vc_clsindices(int nindices, Relation *Irel);
100 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
101 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
102 static int      vc_cmp_blk(char *left, char *right);
103 static int      vc_cmp_offno(char *left, char *right);
104 static bool vc_enough_space(VPageDescr vpd, Size len);
105
106 void
107 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
108 {
109         char       *pname;
110         MemoryContext old;
111         PortalVariableMemory pmem;
112         NameData        VacRel;
113         List       *le;
114         List       *va_cols = NIL;
115
116         /*
117          * Create a portal for safe memory across transctions.  We need to
118          * palloc the name space for it because our hash function expects the
119          * name to be on a longword boundary.  CreatePortal copies the name to
120          * safe storage for us.
121          */
122         pname = (char *) palloc(strlen(VACPNAME) + 1);
123         strcpy(pname, VACPNAME);
124         vc_portal = CreatePortal(pname);
125         pfree(pname);
126
127         if (verbose)
128                 MESSAGE_LEVEL = NOTICE;
129         else
130                 MESSAGE_LEVEL = DEBUG;
131
132         /* vacrel gets de-allocated on transaction commit */
133         if (vacrel)
134                 strcpy(VacRel.data, vacrel);
135
136         pmem = PortalGetVariableMemory(vc_portal);
137         old = MemoryContextSwitchTo((MemoryContext) pmem);
138
139         if (va_spec != NIL && !analyze)
140                 elog(ERROR, "Can't vacuum columns, only tables.  You can 'vacuum analyze' columns.");
141
142         foreach(le, va_spec)
143         {
144                 char       *col = (char *) lfirst(le);
145                 char       *dest;
146
147                 dest = (char *) palloc(strlen(col) + 1);
148                 strcpy(dest, col);
149                 va_cols = lappend(va_cols, dest);
150         }
151         MemoryContextSwitchTo(old);
152
153         /* initialize vacuum cleaner */
154         vc_init();
155
156         /* vacuum the database */
157         if (vacrel)
158                 vc_vacuum(&VacRel, analyze, va_cols);
159         else
160                 vc_vacuum(NULL, analyze, NIL);
161
162         PortalDestroy(&vc_portal);
163
164         /* clean up */
165         vc_shutdown();
166 }
167
168 /*
169  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
170  *
171  *              We run exactly one vacuum cleaner at a time.  We use the file system
172  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
173  *              cleaner instantiation crosses transaction boundaries, and we'd lose
174  *              postgres-style locks at the end of every transaction.
175  *
176  *              The strangeness with committing and starting transactions in the
177  *              init and shutdown routines is due to the fact that the vacuum cleaner
178  *              is invoked via a sql command, and so is already executing inside
179  *              a transaction.  We need to leave ourselves in a predictable state
180  *              on entry and exit to the vacuum cleaner.  We commit the transaction
181  *              started in PostgresMain() inside vc_init(), and start one in
182  *              vc_shutdown() to match the commit waiting for us back in
183  *              PostgresMain().
184  */
185 static void
186 vc_init()
187 {
188         int                     fd;
189
190         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
191         {
192                 elog(ERROR, "Can't create lock file.  Is another vacuum cleaner running?\n\
193 \tIf not, you may remove the pg_vlock file in the %s\n\
194 \tdirectory", DatabasePath);
195         }
196         close(fd);
197
198         /*
199          * By here, exclusive open on the lock file succeeded.  If we abort
200          * for any reason during vacuuming, we need to remove the lock file.
201          * This global variable is checked in the transaction manager on xact
202          * abort, and the routine vc_abort() is called if necessary.
203          */
204
205         VacuumRunning = true;
206
207         /* matches the StartTransaction in PostgresMain() */
208         CommitTransactionCommand();
209 }
210
211 static void
212 vc_shutdown()
213 {
214         /* on entry, not in a transaction */
215         if (unlink("pg_vlock") < 0)
216                 elog(ERROR, "vacuum: can't destroy lock file!");
217
218         /* okay, we're done */
219         VacuumRunning = false;
220
221         /* matches the CommitTransaction in PostgresMain() */
222         StartTransactionCommand();
223
224 }
225
226 void
227 vc_abort()
228 {
229         /* on abort, remove the vacuum cleaner lock file */
230         unlink("pg_vlock");
231
232         VacuumRunning = false;
233 }
234
235 /*
236  *      vc_vacuum() -- vacuum the database.
237  *
238  *              This routine builds a list of relations to vacuum, and then calls
239  *              code that vacuums them one at a time.  We are careful to vacuum each
240  *              relation in a separate transaction in order to avoid holding too many
241  *              locks at one time.
242  */
243 static void
244 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
245 {
246         VRelList        vrl,
247                                 cur;
248
249         /* get list of relations */
250         vrl = vc_getrels(VacRelP);
251
252         if (analyze && VacRelP == NULL && vrl != NULL)
253                 vc_delhilowstats(InvalidOid, 0, NULL);
254
255         /* vacuum each heap relation */
256         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
257                 vc_vacone(cur->vrl_relid, analyze, va_cols);
258
259         vc_free(vrl);
260 }
261
262 static VRelList
263 vc_getrels(NameData *VacRelP)
264 {
265         Relation        rel;
266         TupleDesc       tupdesc;
267         HeapScanDesc scan;
268         HeapTuple       tuple;
269         PortalVariableMemory portalmem;
270         MemoryContext old;
271         VRelList        vrl,
272                                 cur;
273         Datum           d;
274         char       *rname;
275         char            rkind;
276         bool            n;
277         bool            found = false;
278         ScanKeyData key;
279
280         StartTransactionCommand();
281
282         if (VacRelP->data)
283         {
284                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
285                                                            F_NAMEEQ,
286                                                            PointerGetDatum(VacRelP->data));
287         }
288         else
289         {
290                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
291                                                            F_CHAREQ, CharGetDatum('r'));
292         }
293
294         portalmem = PortalGetVariableMemory(vc_portal);
295         vrl = cur = (VRelList) NULL;
296
297         rel = heap_openr(RelationRelationName);
298         tupdesc = RelationGetDescr(rel);
299
300         scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
301
302         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
303         {
304                 found = true;
305
306                 d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
307                 rname = (char *) d;
308
309                 /*
310                  * don't vacuum large objects for now - something breaks when we
311                  * do
312                  */
313                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
314                         rname[1] == 'i' && rname[2] == 'n' &&
315                         (rname[3] == 'v' || rname[3] == 'x') &&
316                         rname[4] >= '0' && rname[4] <= '9')
317                 {
318                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
319                                  rname);
320                         continue;
321                 }
322
323                 d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
324
325                 rkind = DatumGetChar(d);
326
327                 /* skip system relations */
328                 if (rkind != 'r')
329                 {
330                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
331                         continue;
332                 }
333
334                 /* get a relation list entry for this guy */
335                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
336                 if (vrl == (VRelList) NULL)
337                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
338                 else
339                 {
340                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
341                         cur = cur->vrl_next;
342                 }
343                 MemoryContextSwitchTo(old);
344
345                 cur->vrl_relid = tuple->t_oid;
346                 cur->vrl_next = (VRelList) NULL;
347         }
348         if (found == false)
349                 elog(NOTICE, "Vacuum: table not found");
350
351
352         heap_endscan(scan);
353         heap_close(rel);
354
355         CommitTransactionCommand();
356
357         return vrl;
358 }
359
360 /*
361  *      vc_vacone() -- vacuum one heap relation
362  *
363  *              This routine vacuums a single heap, cleans out its indices, and
364  *              updates its statistics num_pages and num_tuples statistics.
365  *
366  *              Doing one heap at a time incurs extra overhead, since we need to
367  *              check that the heap exists again just before we vacuum it.      The
368  *              reason that we do this is so that vacuuming can be spread across
369  *              many small transactions.  Otherwise, two-phase locking would require
370  *              us to lock the entire database during one pass of the vacuum cleaner.
371  */
372 static void
373 vc_vacone(Oid relid, bool analyze, List *va_cols)
374 {
375         Relation        rel;
376         TupleDesc       tupdesc;
377         HeapTuple       tuple,
378                                 typetuple;
379         Relation        onerel;
380         VPageListData vacuum_pages; /* List of pages to vacuum and/or clean
381                                                                  * indices */
382         VPageListData fraged_pages; /* List of pages with space enough for
383                                                                  * re-using */
384         VPageDescr *vpp;
385         Relation   *Irel;
386         int32           nindices,
387                                 i;
388         VRelStats  *vacrelstats;
389
390         StartTransactionCommand();
391
392         rel = heap_openr(RelationRelationName);
393         tupdesc = RelationGetDescr(rel);
394
395         /*
396          * Race condition -- if the pg_class tuple has gone away since the
397          * last time we saw it, we don't need to vacuum it.
398          */
399         tuple = SearchSysCacheTuple(RELOID,
400                                                                 ObjectIdGetDatum(relid),
401                                                                 0, 0, 0);
402         if (!HeapTupleIsValid(tuple))
403         {
404                 heap_close(rel);
405                 CommitTransactionCommand();
406                 return;
407         }
408
409         /* now open the class and vacuum it */
410         onerel = heap_open(relid);
411
412         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
413         vacrelstats->relid = relid;
414         vacrelstats->num_pages = vacrelstats->num_tuples = 0;
415         vacrelstats->hasindex = false;
416         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
417         {
418                 int                     attr_cnt,
419                                    *attnums = NULL;
420                 Form_pg_attribute *attr;
421
422                 attr_cnt = onerel->rd_att->natts;
423                 attr = onerel->rd_att->attrs;
424
425                 if (va_cols != NIL)
426                 {
427                         int                     tcnt = 0;
428                         List       *le;
429
430                         if (length(va_cols) > attr_cnt)
431                                 elog(ERROR, "vacuum: too many attributes specified for relation %s",
432                                          (RelationGetRelationName(onerel))->data);
433                         attnums = (int *) palloc(attr_cnt * sizeof(int));
434                         foreach(le, va_cols)
435                         {
436                                 char       *col = (char *) lfirst(le);
437
438                                 for (i = 0; i < attr_cnt; i++)
439                                 {
440                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
441                                                 break;
442                                 }
443                                 if (i < attr_cnt)               /* found */
444                                         attnums[tcnt++] = i;
445                                 else
446                                 {
447                                         elog(ERROR, "vacuum: there is no attribute %s in %s",
448                                                  col, (RelationGetRelationName(onerel))->data);
449                                 }
450                         }
451                         attr_cnt = tcnt;
452                 }
453
454                 vacrelstats->vacattrstats =
455                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
456
457                 for (i = 0; i < attr_cnt; i++)
458                 {
459                         Operator        func_operator;
460                         Form_pg_operator pgopform;
461                         VacAttrStats *stats;
462
463                         stats = &vacrelstats->vacattrstats[i];
464                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
465                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
466                         stats->best = stats->guess1 = stats->guess2 = 0;
467                         stats->max = stats->min = 0;
468                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
469                         stats->max_len = stats->min_len = 0;
470                         stats->initialized = false;
471                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
472                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
473
474                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
475                         if (func_operator != NULL)
476                         {
477                                 pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
478                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
479                         }
480                         else
481                                 stats->f_cmpeq.fn_addr = NULL;
482
483                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
484                         if (func_operator != NULL)
485                         {
486                                 pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
487                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
488                         }
489                         else
490                                 stats->f_cmplt.fn_addr = NULL;
491
492                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
493                         if (func_operator != NULL)
494                         {
495                                 pgopform = (Form_pg_operator) GETSTRUCT(func_operator);
496                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
497                         }
498                         else
499                                 stats->f_cmpgt.fn_addr = NULL;
500
501                         typetuple = SearchSysCacheTuple(TYPOID,
502                                                                  ObjectIdGetDatum(stats->attr->atttypid),
503                                                                                         0, 0, 0);
504                         if (HeapTupleIsValid(typetuple))
505                                 stats->outfunc = ((Form_pg_type) GETSTRUCT(typetuple))->typoutput;
506                         else
507                                 stats->outfunc = InvalidOid;
508                 }
509                 vacrelstats->va_natts = attr_cnt;
510                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
511                 if (attnums)
512                         pfree(attnums);
513         }
514         else
515         {
516                 vacrelstats->va_natts = 0;
517                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
518         }
519
520         /* we require the relation to be locked until the indices are cleaned */
521         RelationSetLockForWrite(onerel);
522
523         /* scan it */
524         vacuum_pages.vpl_num_pages = fraged_pages.vpl_num_pages = 0;
525         vc_scanheap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
526
527         /* Now open indices */
528         Irel = (Relation *) NULL;
529         vc_getindices(vacrelstats->relid, &nindices, &Irel);
530
531         if (nindices > 0)
532                 vacrelstats->hasindex = true;
533         else
534                 vacrelstats->hasindex = false;
535
536         /* Clean/scan index relation(s) */
537         if (Irel != (Relation *) NULL)
538         {
539                 if (vacuum_pages.vpl_num_pages > 0)
540                 {
541                         for (i = 0; i < nindices; i++)
542                                 vc_vaconeind(&vacuum_pages, Irel[i], vacrelstats->num_tuples);
543                 }
544                 else
545 /* just scan indices to update statistic */
546                 {
547                         for (i = 0; i < nindices; i++)
548                                 vc_scanoneind(Irel[i], vacrelstats->num_tuples);
549                 }
550         }
551
552         if (fraged_pages.vpl_num_pages > 0) /* Try to shrink heap */
553                 vc_rpfheap(vacrelstats, onerel, &vacuum_pages, &fraged_pages, nindices, Irel);
554         else
555         {
556                 if (Irel != (Relation *) NULL)
557                         vc_clsindices(nindices, Irel);
558                 if (vacuum_pages.vpl_num_pages > 0)             /* Clean pages from
559                                                                                                  * vacuum_pages list */
560                         vc_vacheap(vacrelstats, onerel, &vacuum_pages);
561         }
562
563         /* ok - free vacuum_pages list of reapped pages */
564         if (vacuum_pages.vpl_num_pages > 0)
565         {
566                 vpp = vacuum_pages.vpl_pagedesc;
567                 for (i = 0; i < vacuum_pages.vpl_num_pages; i++, vpp++)
568                         pfree(*vpp);
569                 pfree(vacuum_pages.vpl_pagedesc);
570                 if (fraged_pages.vpl_num_pages > 0)
571                         pfree(fraged_pages.vpl_pagedesc);
572         }
573
574         /* all done with this class */
575         heap_close(onerel);
576         heap_close(rel);
577
578         /* update statistics in pg_class */
579         vc_updstats(vacrelstats->relid, vacrelstats->num_pages, vacrelstats->num_tuples,
580                                 vacrelstats->hasindex, vacrelstats);
581
582         /* next command frees attribute stats */
583
584         CommitTransactionCommand();
585 }
586
587 /*
588  *      vc_scanheap() -- scan an open heap relation
589  *
590  *              This routine sets commit times, constructs vacuum_pages list of
591  *              empty/uninitialized pages and pages with dead tuples and
592  *              ~LP_USED line pointers, constructs fraged_pages list of pages
593  *              appropriate for purposes of shrinking and maintains statistics
594  *              on the number of live tuples in a heap.
595  */
596 static void
597 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
598                         VPageList vacuum_pages, VPageList fraged_pages)
599 {
600         int                     nblocks,
601                                 blkno;
602         ItemId          itemid;
603         ItemPointer itemptr;
604         Buffer          buf;
605         HeapTuple       tuple;
606         Page            page,
607                                 tempPage = NULL;
608         OffsetNumber offnum,
609                                 maxoff;
610         bool            pgchanged,
611                                 tupgone,
612                                 dobufrel,
613                                 notup;
614         char       *relname;
615         VPageDescr      vpc,
616                                 vp;
617         uint32          tups_vacuumed,
618                                 num_tuples,
619                                 nunused,
620                                 ncrash,
621                                 empty_pages,
622                                 new_pages,
623                                 changed_pages,
624                                 empty_end_pages;
625         Size            free_size,
626                                 usable_free_size;
627         Size            min_tlen = MAXTUPLEN;
628         Size            max_tlen = 0;
629         int32           i;
630         struct rusage ru0,
631                                 ru1;
632         bool            do_shrinking = true;
633
634         getrusage(RUSAGE_SELF, &ru0);
635
636         tups_vacuumed = num_tuples = nunused = ncrash = empty_pages =
637                 new_pages = changed_pages = empty_end_pages = 0;
638         free_size = usable_free_size = 0;
639
640         relname = (RelationGetRelationName(onerel))->data;
641
642         nblocks = RelationGetNumberOfBlocks(onerel);
643
644         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
645         vpc->vpd_offsets_used = 0;
646
647         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
648
649         for (blkno = 0; blkno < nblocks; blkno++)
650         {
651                 buf = ReadBuffer(onerel, blkno);
652                 page = BufferGetPage(buf);
653                 vpc->vpd_blkno = blkno;
654                 vpc->vpd_offsets_free = 0;
655
656                 if (PageIsNew(page))
657                 {
658                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
659                                  relname, blkno);
660                         PageInit(page, BufferGetPageSize(buf), 0);
661                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
662                         free_size += (vpc->vpd_free - sizeof(ItemIdData));
663                         new_pages++;
664                         empty_end_pages++;
665                         vc_reappage(vacuum_pages, vpc);
666                         WriteBuffer(buf);
667                         continue;
668                 }
669
670                 if (PageIsEmpty(page))
671                 {
672                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
673                         free_size += (vpc->vpd_free - sizeof(ItemIdData));
674                         empty_pages++;
675                         empty_end_pages++;
676                         vc_reappage(vacuum_pages, vpc);
677                         ReleaseBuffer(buf);
678                         continue;
679                 }
680
681                 pgchanged = false;
682                 notup = true;
683                 maxoff = PageGetMaxOffsetNumber(page);
684                 for (offnum = FirstOffsetNumber;
685                          offnum <= maxoff;
686                          offnum = OffsetNumberNext(offnum))
687                 {
688                         itemid = PageGetItemId(page, offnum);
689
690                         /*
691                          * Collect un-used items too - it's possible to have indices
692                          * pointing here after crash.
693                          */
694                         if (!ItemIdIsUsed(itemid))
695                         {
696                                 vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum;
697                                 nunused++;
698                                 continue;
699                         }
700
701                         tuple = (HeapTuple) PageGetItem(page, itemid);
702                         tupgone = false;
703
704                         if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
705                         {
706                                 if (tuple->t_infomask & HEAP_XMIN_INVALID)
707                                         tupgone = true;
708                                 else
709                                 {
710                                         if (TransactionIdDidAbort(tuple->t_xmin))
711                                                 tupgone = true;
712                                         else if (TransactionIdDidCommit(tuple->t_xmin))
713                                         {
714                                                 tuple->t_infomask |= HEAP_XMIN_COMMITTED;
715                                                 pgchanged = true;
716                                         }
717                                         else if (!TransactionIdIsInProgress(tuple->t_xmin))
718                                         {
719
720                                                 /*
721                                                  * Not Aborted, Not Committed, Not in Progress -
722                                                  * so it's from crashed process. - vadim 11/26/96
723                                                  */
724                                                 ncrash++;
725                                                 tupgone = true;
726                                         }
727                                         else
728                                         {
729                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
730                                                          relname, blkno, offnum, tuple->t_xmin);
731                                                 do_shrinking = false;
732                                         }
733                                 }
734                         }
735
736                         /*
737                          * here we are concerned about tuples with xmin committed and
738                          * xmax unknown or committed
739                          */
740                         if (tuple->t_infomask & HEAP_XMIN_COMMITTED &&
741                                 !(tuple->t_infomask & HEAP_XMAX_INVALID))
742                         {
743                                 if (tuple->t_infomask & HEAP_XMAX_COMMITTED)
744                                         tupgone = true;
745                                 else if (TransactionIdDidAbort(tuple->t_xmax))
746                                 {
747                                         tuple->t_infomask |= HEAP_XMAX_INVALID;
748                                         pgchanged = true;
749                                 }
750                                 else if (TransactionIdDidCommit(tuple->t_xmax))
751                                         tupgone = true;
752                                 else if (!TransactionIdIsInProgress(tuple->t_xmax))
753                                 {
754
755                                         /*
756                                          * Not Aborted, Not Committed, Not in Progress - so it
757                                          * from crashed process. - vadim 06/02/97
758                                          */
759                                         tuple->t_infomask |= HEAP_XMAX_INVALID;;
760                                         pgchanged = true;
761                                 }
762                                 else
763                                 {
764                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
765                                                  relname, blkno, offnum, tuple->t_xmax);
766                                         do_shrinking = false;
767                                 }
768                         }
769
770                         /*
771                          * It's possibly! But from where it comes ? And should we fix
772                          * it ?  - vadim 11/28/96
773                          */
774                         itemptr = &(tuple->t_ctid);
775                         if (!ItemPointerIsValid(itemptr) ||
776                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
777                         {
778                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
779                                          relname, blkno, offnum,
780                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
781                                          itemptr->ip_posid, tupgone);
782                         }
783
784                         /*
785                          * Other checks...
786                          */
787                         if (tuple->t_len != itemid->lp_len)
788                         {
789                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
790                                          relname, blkno, offnum,
791                                          itemid->lp_len, tuple->t_len, tupgone);
792                         }
793                         if (!OidIsValid(tuple->t_oid))
794                         {
795                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
796                                          relname, blkno, offnum, tupgone);
797                         }
798
799                         if (tupgone)
800                         {
801                                 ItemId          lpp;
802
803                                 if (tempPage == (Page) NULL)
804                                 {
805                                         Size            pageSize;
806
807                                         pageSize = PageGetPageSize(page);
808                                         tempPage = (Page) palloc(pageSize);
809                                         memmove(tempPage, page, pageSize);
810                                 }
811
812                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
813
814                                 /* mark it unused */
815                                 lpp->lp_flags &= ~LP_USED;
816
817                                 vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum;
818                                 tups_vacuumed++;
819
820                         }
821                         else
822                         {
823                                 num_tuples++;
824                                 notup = false;
825                                 if (tuple->t_len < min_tlen)
826                                         min_tlen = tuple->t_len;
827                                 if (tuple->t_len > max_tlen)
828                                         max_tlen = tuple->t_len;
829                                 vc_attrstats(onerel, vacrelstats, tuple);
830                         }
831                 }
832
833                 if (pgchanged)
834                 {
835                         WriteBuffer(buf);
836                         dobufrel = false;
837                         changed_pages++;
838                 }
839                 else
840                         dobufrel = true;
841                 if (tempPage != (Page) NULL)
842                 {                                               /* Some tuples are gone */
843                         PageRepairFragmentation(tempPage);
844                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
845                         free_size += vpc->vpd_free;
846                         vc_reappage(vacuum_pages, vpc);
847                         pfree(tempPage);
848                         tempPage = (Page) NULL;
849                 }
850                 else if (vpc->vpd_offsets_free > 0)
851                 {                                               /* there are only ~LP_USED line pointers */
852                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
853                         free_size += vpc->vpd_free;
854                         vc_reappage(vacuum_pages, vpc);
855                 }
856                 if (dobufrel)
857                         ReleaseBuffer(buf);
858                 if (notup)
859                         empty_end_pages++;
860                 else
861                         empty_end_pages = 0;
862         }
863
864         pfree(vpc);
865
866         /* save stats in the rel list for use later */
867         vacrelstats->num_tuples = num_tuples;
868         vacrelstats->num_pages = nblocks;
869 /*        vacrelstats->natts = attr_cnt;*/
870         if (num_tuples == 0)
871                 min_tlen = max_tlen = 0;
872         vacrelstats->min_tlen = min_tlen;
873         vacrelstats->max_tlen = max_tlen;
874
875         vacuum_pages->vpl_empty_end_pages = empty_end_pages;
876         fraged_pages->vpl_empty_end_pages = empty_end_pages;
877
878         /*
879          * Try to make fraged_pages keeping in mind that we can't use free
880          * space of "empty" end-pages and last page if it reapped.
881          */
882         if (do_shrinking && vacuum_pages->vpl_num_pages - empty_end_pages > 0)
883         {
884                 int                     nusf;           /* blocks usefull for re-using */
885
886                 nusf = vacuum_pages->vpl_num_pages - empty_end_pages;
887                 if ((vacuum_pages->vpl_pagedesc[nusf - 1])->vpd_blkno == nblocks - empty_end_pages - 1)
888                         nusf--;
889
890                 for (i = 0; i < nusf; i++)
891                 {
892                         vp = vacuum_pages->vpl_pagedesc[i];
893                         if (vc_enough_space(vp, min_tlen))
894                         {
895                                 vc_vpinsert(fraged_pages, vp);
896                                 usable_free_size += vp->vpd_free;
897                         }
898                 }
899         }
900
901         getrusage(RUSAGE_SELF, &ru1);
902
903         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
904 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
905                  nblocks, changed_pages, vacuum_pages->vpl_num_pages, empty_pages, new_pages,
906                  num_tuples, tups_vacuumed, ncrash, nunused, min_tlen, max_tlen,
907                  free_size, usable_free_size, empty_end_pages, fraged_pages->vpl_num_pages,
908                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
909                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
910
911 }       /* vc_scanheap */
912
913
914 /*
915  *      vc_rpfheap() -- try to repaire relation' fragmentation
916  *
917  *              This routine marks dead tuples as unused and tries re-use dead space
918  *              by moving tuples (and inserting indices if needed). It constructs
919  *              Nvpl list of free-ed pages (moved tuples) and clean indices
920  *              for them after committing (in hack-manner - without losing locks
921  *              and freeing memory!) current transaction. It truncates relation
922  *              if some end-blocks are gone away.
923  */
924 static void
925 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
926                    VPageList vacuum_pages, VPageList fraged_pages, int nindices, Relation *Irel)
927 {
928         TransactionId myXID;
929         CommandId       myCID;
930         Buffer          buf,
931                                 cur_buffer;
932         int                     nblocks,
933                                 blkno;
934         Page            page,
935                                 ToPage = NULL;
936         OffsetNumber offnum = 0,
937                                 maxoff = 0,
938                                 newoff,
939                                 max_offset;
940         ItemId          itemid,
941                                 newitemid;
942         HeapTuple       tuple,
943                                 newtup;
944         TupleDesc       tupdesc = NULL;
945         Datum      *idatum = NULL;
946         char       *inulls = NULL;
947         InsertIndexResult iresult;
948         VPageListData Nvpl;
949         VPageDescr      cur_page = NULL,
950                                 last_fraged_page,
951                                 last_vacuum_page,
952                                 vpc,
953                            *vpp;
954         int                     cur_item = 0;
955         IndDesc    *Idesc,
956                            *idcur;
957         int                     last_fraged_block,
958                                 last_vacuum_block,
959                                 i;
960         Size            tuple_len;
961         int                     num_moved,
962                                 num_fraged_pages,
963                                 vacuumed_pages;
964         int                     checked_moved,
965                                 num_tuples;
966         bool            isempty,
967                                 dowrite;
968         struct rusage ru0,
969                                 ru1;
970
971         getrusage(RUSAGE_SELF, &ru0);
972
973         myXID = GetCurrentTransactionId();
974         myCID = GetCurrentCommandId();
975
976         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
977         {
978                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
979                 tupdesc = RelationGetDescr(onerel);
980                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
981                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
982         }
983
984         Nvpl.vpl_num_pages = 0;
985         num_fraged_pages = fraged_pages->vpl_num_pages;
986         last_fraged_page = fraged_pages->vpl_pagedesc[num_fraged_pages - 1];
987         last_fraged_block = last_fraged_page->vpd_blkno;
988         Assert(vacuum_pages->vpl_num_pages > vacuum_pages->vpl_empty_end_pages);
989         vacuumed_pages = vacuum_pages->vpl_num_pages - vacuum_pages->vpl_empty_end_pages;
990         last_vacuum_page = vacuum_pages->vpl_pagedesc[vacuumed_pages - 1];
991         last_vacuum_block = last_vacuum_page->vpd_blkno;
992         Assert(last_vacuum_block >= last_fraged_block);
993         cur_buffer = InvalidBuffer;
994         num_moved = 0;
995
996         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
997         vpc->vpd_offsets_used = vpc->vpd_offsets_free = 0;
998
999         nblocks = vacrelstats->num_pages;
1000         for (blkno = nblocks - vacuum_pages->vpl_empty_end_pages - 1;; blkno--)
1001         {
1002                 /* if it's reapped page and it was used by me - quit */
1003                 if (blkno == last_fraged_block && last_fraged_page->vpd_offsets_used > 0)
1004                         break;
1005
1006                 buf = ReadBuffer(onerel, blkno);
1007                 page = BufferGetPage(buf);
1008
1009                 vpc->vpd_offsets_free = 0;
1010
1011                 isempty = PageIsEmpty(page);
1012
1013                 dowrite = false;
1014                 if (blkno == last_vacuum_block) /* it's reapped page */
1015                 {
1016                         if (last_vacuum_page->vpd_offsets_free > 0) /* there are dead tuples */
1017                         {                                       /* on this page - clean */
1018                                 Assert(!isempty);
1019                                 vc_vacpage(page, last_vacuum_page);
1020                                 dowrite = true;
1021                         }
1022                         else
1023                                 Assert(isempty);
1024                         --vacuumed_pages;
1025                         Assert(vacuumed_pages > 0);
1026                         /* get prev reapped page from vacuum_pages */
1027                         last_vacuum_page = vacuum_pages->vpl_pagedesc[vacuumed_pages - 1];
1028                         last_vacuum_block = last_vacuum_page->vpd_blkno;
1029                         if (blkno == last_fraged_block)         /* this page in
1030                                                                                                  * fraged_pages too */
1031                         {
1032                                 --num_fraged_pages;
1033                                 Assert(num_fraged_pages > 0);
1034                                 Assert(last_fraged_page->vpd_offsets_used == 0);
1035                                 /* get prev reapped page from fraged_pages */
1036                                 last_fraged_page = fraged_pages->vpl_pagedesc[num_fraged_pages - 1];
1037                                 last_fraged_block = last_fraged_page->vpd_blkno;
1038                         }
1039                         Assert(last_fraged_block <= last_vacuum_block);
1040                         if (isempty)
1041                         {
1042                                 ReleaseBuffer(buf);
1043                                 continue;
1044                         }
1045                 }
1046                 else
1047                         Assert(!isempty);
1048
1049                 vpc->vpd_blkno = blkno;
1050                 maxoff = PageGetMaxOffsetNumber(page);
1051                 for (offnum = FirstOffsetNumber;
1052                          offnum <= maxoff;
1053                          offnum = OffsetNumberNext(offnum))
1054                 {
1055                         itemid = PageGetItemId(page, offnum);
1056
1057                         if (!ItemIdIsUsed(itemid))
1058                                 continue;
1059
1060                         tuple = (HeapTuple) PageGetItem(page, itemid);
1061                         tuple_len = tuple->t_len;
1062
1063                         /* try to find new page for this tuple */
1064                         if (cur_buffer == InvalidBuffer ||
1065                                 !vc_enough_space(cur_page, tuple_len))
1066                         {
1067                                 if (cur_buffer != InvalidBuffer)
1068                                 {
1069                                         WriteBuffer(cur_buffer);
1070                                         cur_buffer = InvalidBuffer;
1071
1072                                         /*
1073                                          * If no one tuple can't be added to this page -
1074                                          * remove page from fraged_pages. - vadim 11/27/96
1075                                          *
1076                                          * But we can't remove last page - this is our
1077                                          * "show-stopper" !!!   - vadim 02/25/98
1078                                          */
1079                                         if (cur_page != last_fraged_page &&
1080                                                 !vc_enough_space(cur_page, vacrelstats->min_tlen))
1081                                         {
1082                                                 Assert(num_fraged_pages > cur_item + 1);
1083                                                 memmove(fraged_pages->vpl_pagedesc + cur_item,
1084                                                                 fraged_pages->vpl_pagedesc + cur_item + 1,
1085                                                                 sizeof(VPageDescr *) * (num_fraged_pages - cur_item - 1));
1086                                                 num_fraged_pages--;
1087                                                 Assert(last_fraged_page == fraged_pages->vpl_pagedesc[num_fraged_pages - 1]);
1088                                         }
1089                                 }
1090                                 for (i = 0; i < num_fraged_pages; i++)
1091                                 {
1092                                         if (vc_enough_space(fraged_pages->vpl_pagedesc[i], tuple_len))
1093                                                 break;
1094                                 }
1095                                 if (i == num_fraged_pages)
1096                                         break;          /* can't move item anywhere */
1097                                 cur_item = i;
1098                                 cur_page = fraged_pages->vpl_pagedesc[cur_item];
1099                                 cur_buffer = ReadBuffer(onerel, cur_page->vpd_blkno);
1100                                 ToPage = BufferGetPage(cur_buffer);
1101                                 /* if this page was not used before - clean it */
1102                                 if (!PageIsEmpty(ToPage) && cur_page->vpd_offsets_used == 0)
1103                                         vc_vacpage(ToPage, cur_page);
1104                         }
1105
1106                         /* copy tuple */
1107                         newtup = (HeapTuple) palloc(tuple_len);
1108                         memmove((char *) newtup, (char *) tuple, tuple_len);
1109
1110                         /* store transaction information */
1111                         TransactionIdStore(myXID, &(newtup->t_xmin));
1112                         newtup->t_cmin = myCID;
1113                         StoreInvalidTransactionId(&(newtup->t_xmax));
1114                         /* set xmin to unknown and xmax to invalid */
1115                         newtup->t_infomask &= ~(HEAP_XACT_MASK);
1116                         newtup->t_infomask |= HEAP_XMAX_INVALID;
1117
1118                         /* add tuple to the page */
1119                         newoff = PageAddItem(ToPage, (Item) newtup, tuple_len,
1120                                                                  InvalidOffsetNumber, LP_USED);
1121                         if (newoff == InvalidOffsetNumber)
1122                         {
1123                                 elog(ERROR, "\
1124 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1125                                          tuple_len, cur_page->vpd_blkno, cur_page->vpd_free,
1126                                  cur_page->vpd_offsets_used, cur_page->vpd_offsets_free);
1127                         }
1128                         newitemid = PageGetItemId(ToPage, newoff);
1129                         pfree(newtup);
1130                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1131                         ItemPointerSet(&(newtup->t_ctid), cur_page->vpd_blkno, newoff);
1132
1133                         /* now logically delete end-tuple */
1134                         TransactionIdStore(myXID, &(tuple->t_xmax));
1135                         tuple->t_cmax = myCID;
1136                         /* set xmax to unknown */
1137                         tuple->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED);
1138
1139                         cur_page->vpd_offsets_used++;
1140                         num_moved++;
1141                         cur_page->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1142                         vpc->vpd_offsets[vpc->vpd_offsets_free++] = offnum;
1143
1144                         /* insert index' tuples if needed */
1145                         if (Irel != (Relation *) NULL)
1146                         {
1147                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1148                                 {
1149                                         FormIndexDatum(idcur->natts,
1150                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1151                                                                    newtup,
1152                                                                    tupdesc,
1153                                                                    idatum,
1154                                                                    inulls,
1155                                                                    idcur->finfoP);
1156                                         iresult = index_insert(Irel[i],
1157                                                                                    idatum,
1158                                                                                    inulls,
1159                                                                                    &newtup->t_ctid,
1160                                                                                    onerel);
1161                                         if (iresult)
1162                                                 pfree(iresult);
1163                                 }
1164                         }
1165
1166                 }                                               /* walk along page */
1167
1168                 if (vpc->vpd_offsets_free > 0)  /* some tuples were moved */
1169                 {
1170                         vc_reappage(&Nvpl, vpc);
1171                         WriteBuffer(buf);
1172                 }
1173                 else if (dowrite)
1174                         WriteBuffer(buf);
1175                 else
1176                         ReleaseBuffer(buf);
1177
1178                 if (offnum <= maxoff)
1179                         break;                          /* some item(s) left */
1180
1181         }                                                       /* walk along relation */
1182
1183         blkno++;                                        /* new number of blocks */
1184
1185         if (cur_buffer != InvalidBuffer)
1186         {
1187                 Assert(num_moved > 0);
1188                 WriteBuffer(cur_buffer);
1189         }
1190
1191         if (num_moved > 0)
1192         {
1193
1194                 /*
1195                  * We have to commit our tuple' movings before we'll truncate
1196                  * relation, but we shouldn't lose our locks. And so - quick hack:
1197                  * flush buffers and record status of current transaction as
1198                  * committed, and continue. - vadim 11/13/96
1199                  */
1200                 FlushBufferPool(!TransactionFlushEnabled());
1201                 TransactionIdCommit(myXID);
1202                 FlushBufferPool(!TransactionFlushEnabled());
1203         }
1204
1205         /*
1206          * Clean uncleaned reapped pages from vacuum_pages list and set xmin
1207          * committed for inserted tuples
1208          */
1209         checked_moved = 0;
1210         for (i = 0, vpp = vacuum_pages->vpl_pagedesc; i < vacuumed_pages; i++, vpp++)
1211         {
1212                 Assert((*vpp)->vpd_blkno < blkno);
1213                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1214                 page = BufferGetPage(buf);
1215                 if ((*vpp)->vpd_offsets_used == 0)              /* this page was not used */
1216                 {
1217
1218                         /*
1219                          * noff == 0 in empty pages only - such pages should be
1220                          * re-used
1221                          */
1222                         Assert((*vpp)->vpd_offsets_free > 0);
1223                         vc_vacpage(page, *vpp);
1224                 }
1225                 else
1226 /* this page was used */
1227                 {
1228                         num_tuples = 0;
1229                         max_offset = PageGetMaxOffsetNumber(page);
1230                         for (newoff = FirstOffsetNumber;
1231                                  newoff <= max_offset;
1232                                  newoff = OffsetNumberNext(newoff))
1233                         {
1234                                 itemid = PageGetItemId(page, newoff);
1235                                 if (!ItemIdIsUsed(itemid))
1236                                         continue;
1237                                 tuple = (HeapTuple) PageGetItem(page, itemid);
1238                                 if (TransactionIdEquals((TransactionId) tuple->t_xmin, myXID))
1239                                 {
1240                                         tuple->t_infomask |= HEAP_XMIN_COMMITTED;
1241                                         num_tuples++;
1242                                 }
1243                         }
1244                         Assert((*vpp)->vpd_offsets_used == num_tuples);
1245                         checked_moved += num_tuples;
1246                 }
1247                 WriteBuffer(buf);
1248         }
1249         Assert(num_moved == checked_moved);
1250
1251         getrusage(RUSAGE_SELF, &ru1);
1252
1253         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1254 Elapsed %u/%u sec.",
1255                  (RelationGetRelationName(onerel))->data,
1256                  nblocks, blkno, num_moved,
1257                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1258                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1259
1260         if (Nvpl.vpl_num_pages > 0)
1261         {
1262                 /* vacuum indices again if needed */
1263                 if (Irel != (Relation *) NULL)
1264                 {
1265                         VPageDescr *vpleft,
1266                                            *vpright,
1267                                                 vpsave;
1268
1269                         /* re-sort Nvpl.vpl_pagedesc */
1270                         for (vpleft = Nvpl.vpl_pagedesc,
1271                                  vpright = Nvpl.vpl_pagedesc + Nvpl.vpl_num_pages - 1;
1272                                  vpleft < vpright; vpleft++, vpright--)
1273                         {
1274                                 vpsave = *vpleft;
1275                                 *vpleft = *vpright;
1276                                 *vpright = vpsave;
1277                         }
1278                         for (i = 0; i < nindices; i++)
1279                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->num_tuples);
1280                 }
1281
1282                 /*
1283                  * clean moved tuples from last page in Nvpl list if some tuples
1284                  * left there
1285                  */
1286                 if (vpc->vpd_offsets_free > 0 && offnum <= maxoff)
1287                 {
1288                         Assert(vpc->vpd_blkno == blkno - 1);
1289                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1290                         page = BufferGetPage(buf);
1291                         num_tuples = 0;
1292                         maxoff = offnum;
1293                         for (offnum = FirstOffsetNumber;
1294                                  offnum < maxoff;
1295                                  offnum = OffsetNumberNext(offnum))
1296                         {
1297                                 itemid = PageGetItemId(page, offnum);
1298                                 if (!ItemIdIsUsed(itemid))
1299                                         continue;
1300                                 tuple = (HeapTuple) PageGetItem(page, itemid);
1301                                 Assert(TransactionIdEquals((TransactionId) tuple->t_xmax, myXID));
1302                                 itemid->lp_flags &= ~LP_USED;
1303                                 num_tuples++;
1304                         }
1305                         Assert(vpc->vpd_offsets_free == num_tuples);
1306                         PageRepairFragmentation(page);
1307                         WriteBuffer(buf);
1308                 }
1309
1310                 /* now - free new list of reapped pages */
1311                 vpp = Nvpl.vpl_pagedesc;
1312                 for (i = 0; i < Nvpl.vpl_num_pages; i++, vpp++)
1313                         pfree(*vpp);
1314                 pfree(Nvpl.vpl_pagedesc);
1315         }
1316
1317         /* truncate relation */
1318         if (blkno < nblocks)
1319         {
1320                 i = BlowawayRelationBuffers(onerel, blkno);
1321                 if (i < 0)
1322                         elog(FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1323                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1324                 Assert(blkno >= 0);
1325                 vacrelstats->num_pages = blkno; /* set new number of blocks */
1326         }
1327
1328         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1329         {
1330                 pfree(Idesc);
1331                 pfree(idatum);
1332                 pfree(inulls);
1333                 vc_clsindices(nindices, Irel);
1334         }
1335
1336         pfree(vpc);
1337
1338 }       /* vc_rpfheap */
1339
1340 /*
1341  *      vc_vacheap() -- free dead tuples
1342  *
1343  *              This routine marks dead tuples as unused and truncates relation
1344  *              if there are "empty" end-blocks.
1345  */
1346 static void
1347 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vacuum_pages)
1348 {
1349         Buffer          buf;
1350         Page            page;
1351         VPageDescr *vpp;
1352         int                     nblocks;
1353         int                     i;
1354
1355         nblocks = vacuum_pages->vpl_num_pages;
1356         nblocks -= vacuum_pages->vpl_empty_end_pages;           /* nothing to do with
1357                                                                                                                  * them */
1358
1359         for (i = 0, vpp = vacuum_pages->vpl_pagedesc; i < nblocks; i++, vpp++)
1360         {
1361                 if ((*vpp)->vpd_offsets_free > 0)
1362                 {
1363                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1364                         page = BufferGetPage(buf);
1365                         vc_vacpage(page, *vpp);
1366                         WriteBuffer(buf);
1367                 }
1368         }
1369
1370         /* truncate relation if there are some empty end-pages */
1371         if (vacuum_pages->vpl_empty_end_pages > 0)
1372         {
1373                 Assert(vacrelstats->num_pages >= vacuum_pages->vpl_empty_end_pages);
1374                 nblocks = vacrelstats->num_pages - vacuum_pages->vpl_empty_end_pages;
1375                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1376                          (RelationGetRelationName(onerel))->data,
1377                          vacrelstats->num_pages, nblocks);
1378
1379                 /*
1380                  * we have to flush "empty" end-pages (if changed, but who knows
1381                  * it) before truncation
1382                  */
1383                 FlushBufferPool(!TransactionFlushEnabled());
1384
1385                 i = BlowawayRelationBuffers(onerel, nblocks);
1386                 if (i < 0)
1387                         elog(FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1388
1389                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1390                 Assert(nblocks >= 0);
1391                 vacrelstats->num_pages = nblocks;               /* set new number of
1392                                                                                                  * blocks */
1393         }
1394
1395 }       /* vc_vacheap */
1396
1397 /*
1398  *      vc_vacpage() -- free dead tuples on a page
1399  *                                       and repaire its fragmentation.
1400  */
1401 static void
1402 vc_vacpage(Page page, VPageDescr vpd)
1403 {
1404         ItemId          itemid;
1405         int                     i;
1406
1407         Assert(vpd->vpd_offsets_used == 0);
1408         for (i = 0; i < vpd->vpd_offsets_free; i++)
1409         {
1410                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_offsets[i] - 1]);
1411                 itemid->lp_flags &= ~LP_USED;
1412         }
1413         PageRepairFragmentation(page);
1414
1415 }       /* vc_vacpage */
1416
1417 /*
1418  *      _vc_scanoneind() -- scan one index relation to update statistic.
1419  *
1420  */
1421 static void
1422 vc_scanoneind(Relation indrel, int num_tuples)
1423 {
1424         RetrieveIndexResult res;
1425         IndexScanDesc iscan;
1426         int                     nitups;
1427         int                     nipages;
1428         struct rusage ru0,
1429                                 ru1;
1430
1431         getrusage(RUSAGE_SELF, &ru0);
1432
1433         /* walk through the entire index */
1434         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1435         nitups = 0;
1436
1437         while ((res = index_getnext(iscan, ForwardScanDirection))
1438                    != (RetrieveIndexResult) NULL)
1439         {
1440                 nitups++;
1441                 pfree(res);
1442         }
1443
1444         index_endscan(iscan);
1445
1446         /* now update statistics in pg_class */
1447         nipages = RelationGetNumberOfBlocks(indrel);
1448         vc_updstats(RelationGetRelid(indrel), nipages, nitups, false, NULL);
1449
1450         getrusage(RUSAGE_SELF, &ru1);
1451
1452         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1453                  indrel->rd_rel->relname.data, nipages, nitups,
1454                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1455                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1456
1457         if (nitups != num_tuples)
1458                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1459                          indrel->rd_rel->relname.data, nitups, num_tuples);
1460
1461 }       /* vc_scanoneind */
1462
1463 /*
1464  *      vc_vaconeind() -- vacuum one index relation.
1465  *
1466  *              Vpl is the VPageList of the heap we're currently vacuuming.
1467  *              It's locked. Indrel is an index relation on the vacuumed heap.
1468  *              We don't set locks on the index relation here, since the indexed
1469  *              access methods support locking at different granularities.
1470  *              We let them handle it.
1471  *
1472  *              Finally, we arrange to update the index relation's statistics in
1473  *              pg_class.
1474  */
1475 static void
1476 vc_vaconeind(VPageList vpl, Relation indrel, int num_tuples)
1477 {
1478         RetrieveIndexResult res;
1479         IndexScanDesc iscan;
1480         ItemPointer heapptr;
1481         int                     tups_vacuumed;
1482         int                     num_index_tuples;
1483         int                     num_pages;
1484         VPageDescr      vp;
1485         struct rusage ru0,
1486                                 ru1;
1487
1488         getrusage(RUSAGE_SELF, &ru0);
1489
1490         /* walk through the entire index */
1491         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1492         tups_vacuumed = 0;
1493         num_index_tuples = 0;
1494
1495         while ((res = index_getnext(iscan, ForwardScanDirection))
1496                    != (RetrieveIndexResult) NULL)
1497         {
1498                 heapptr = &res->heap_iptr;
1499
1500                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1501                 {
1502 #if 0
1503                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1504                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1505                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1506                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1507                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1508 #endif
1509                         if (vp->vpd_offsets_free == 0)
1510                         {                                       /* this is EmptyPage !!! */
1511                                 elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
1512                                          indrel->rd_rel->relname.data,
1513                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1514                         }
1515                         ++tups_vacuumed;
1516                         index_delete(indrel, &res->index_iptr);
1517                 }
1518                 else
1519                         num_index_tuples++;
1520
1521                 pfree(res);
1522         }
1523
1524         index_endscan(iscan);
1525
1526         /* now update statistics in pg_class */
1527         num_pages = RelationGetNumberOfBlocks(indrel);
1528         vc_updstats(RelationGetRelid(indrel), num_pages, num_index_tuples, false, NULL);
1529
1530         getrusage(RUSAGE_SELF, &ru1);
1531
1532         elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1533                  indrel->rd_rel->relname.data, num_pages, num_index_tuples, tups_vacuumed,
1534                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1535                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1536
1537         if (num_index_tuples != num_tuples)
1538                 elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1539                          indrel->rd_rel->relname.data, num_index_tuples, num_tuples);
1540
1541 }       /* vc_vaconeind */
1542
1543 /*
1544  *      vc_tidreapped() -- is a particular tid reapped?
1545  *
1546  *              vpl->VPageDescr_array is sorted in right order.
1547  */
1548 static VPageDescr
1549 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1550 {
1551         OffsetNumber ioffno;
1552         OffsetNumber *voff;
1553         VPageDescr      vp,
1554                            *vpp;
1555         VPageDescrData vpd;
1556
1557         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1558         ioffno = ItemPointerGetOffsetNumber(itemptr);
1559
1560         vp = &vpd;
1561         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pagedesc),
1562                                         vpl->vpl_num_pages, sizeof(VPageDescr), (char *) &vp,
1563                                                                         vc_cmp_blk);
1564
1565         if (vpp == (VPageDescr *) NULL)
1566                 return (VPageDescr) NULL;
1567         vp = *vpp;
1568
1569         /* ok - we are on true page */
1570
1571         if (vp->vpd_offsets_free == 0)
1572         {                                                       /* this is EmptyPage !!! */
1573                 return vp;
1574         }
1575
1576         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_offsets),
1577                         vp->vpd_offsets_free, sizeof(OffsetNumber), (char *) &ioffno,
1578                                                                            vc_cmp_offno);
1579
1580         if (voff == (OffsetNumber *) NULL)
1581                 return (VPageDescr) NULL;
1582
1583         return vp;
1584
1585 }       /* vc_tidreapped */
1586
1587 /*
1588  *      vc_attrstats() -- compute column statistics used by the optimzer
1589  *
1590  *      We compute the column min, max, null and non-null counts.
1591  *      Plus we attempt to find the count of the value that occurs most
1592  *      frequently in each column
1593  *      These figures are used to compute the selectivity of the column
1594  *
1595  *      We use a three-bucked cache to get the most frequent item
1596  *      The 'guess' buckets count hits.  A cache miss causes guess1
1597  *      to get the most hit 'guess' item in the most recent cycle, and
1598  *      the new item goes into guess2.  Whenever the total count of hits
1599  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1600  *
1601  *      This method works perfectly for columns with unique values, and columns
1602  *      with only two unique values, plus nulls.
1603  *
1604  *      It becomes less perfect as the number of unique values increases and
1605  *      their distribution in the table becomes more random.
1606  *
1607  */
1608 static void
1609 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple tuple)
1610 {
1611         int                     i,
1612                                 attr_cnt = vacrelstats->va_natts;
1613         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1614         TupleDesc       tupDesc = onerel->rd_att;
1615         Datum           value;
1616         bool            isnull;
1617
1618         for (i = 0; i < attr_cnt; i++)
1619         {
1620                 VacAttrStats *stats = &vacattrstats[i];
1621                 bool            value_hit = true;
1622
1623                 value = heap_getattr(tuple,
1624                                                          stats->attr->attnum, tupDesc, &isnull);
1625
1626                 if (!VacAttrStatsEqValid(stats))
1627                         continue;
1628
1629                 if (isnull)
1630                         stats->null_cnt++;
1631                 else
1632                 {
1633                         stats->nonnull_cnt++;
1634                         if (stats->initialized == false)
1635                         {
1636                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1637                                 /* best_cnt gets incremented later */
1638                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1639                                 stats->guess1_cnt = stats->guess1_hits = 1;
1640                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1641                                 stats->guess2_hits = 1;
1642                                 if (VacAttrStatsLtGtValid(stats))
1643                                 {
1644                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1645                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1646                                 }
1647                                 stats->initialized = true;
1648                         }
1649                         if (VacAttrStatsLtGtValid(stats))
1650                         {
1651                                 if ((*fmgr_faddr(&stats->f_cmplt)) (value, stats->min))
1652                                 {
1653                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1654                                         stats->min_cnt = 0;
1655                                 }
1656                                 if ((*fmgr_faddr(&stats->f_cmpgt)) (value, stats->max))
1657                                 {
1658                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1659                                         stats->max_cnt = 0;
1660                                 }
1661                                 if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->min))
1662                                         stats->min_cnt++;
1663                                 else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->max))
1664                                         stats->max_cnt++;
1665                         }
1666                         if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->best))
1667                                 stats->best_cnt++;
1668                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess1))
1669                         {
1670                                 stats->guess1_cnt++;
1671                                 stats->guess1_hits++;
1672                         }
1673                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess2))
1674                                 stats->guess2_hits++;
1675                         else
1676                                 value_hit = false;
1677
1678                         if (stats->guess2_hits > stats->guess1_hits)
1679                         {
1680                                 swapDatum(stats->guess1, stats->guess2);
1681                                 swapInt(stats->guess1_len, stats->guess2_len);
1682                                 stats->guess1_cnt = stats->guess2_hits;
1683                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1684                         }
1685                         if (stats->guess1_cnt > stats->best_cnt)
1686                         {
1687                                 swapDatum(stats->best, stats->guess1);
1688                                 swapInt(stats->best_len, stats->guess1_len);
1689                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1690                                 stats->guess1_hits = 1;
1691                                 stats->guess2_hits = 1;
1692                         }
1693                         if (!value_hit)
1694                         {
1695                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1696                                 stats->guess1_hits = 1;
1697                                 stats->guess2_hits = 1;
1698                         }
1699                 }
1700         }
1701         return;
1702 }
1703
1704 /*
1705  *      vc_bucketcpy() -- update pg_class statistics for one relation
1706  *
1707  */
1708 static void
1709 vc_bucketcpy(Form_pg_attribute attr, Datum value, Datum *bucket, int16 *bucket_len)
1710 {
1711         if (attr->attbyval && attr->attlen != -1)
1712                 *bucket = value;
1713         else
1714         {
1715                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1716
1717                 if (len > *bucket_len)
1718                 {
1719                         if (*bucket_len != 0)
1720                                 pfree(DatumGetPointer(*bucket));
1721                         *bucket = PointerGetDatum(palloc(len));
1722                         *bucket_len = len;
1723                 }
1724                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1725         }
1726 }
1727
1728 /*
1729  *      vc_updstats() -- update pg_class statistics for one relation
1730  *
1731  *              This routine works for both index and heap relation entries in
1732  *              pg_class.  We violate no-overwrite semantics here by storing new
1733  *              values for num_tuples, num_pages, and hasindex directly in the pg_class
1734  *              tuple that's already on the page.  The reason for this is that if
1735  *              we updated these tuples in the usual way, then every tuple in pg_class
1736  *              would be replaced every day.  This would make planning and executing
1737  *              historical queries very expensive.
1738  */
1739 static void
1740 vc_updstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats)
1741 {
1742         Relation        rd,
1743                                 ad,
1744                                 sd;
1745         HeapScanDesc scan;
1746         HeapTuple       rtup,
1747                                 atup,
1748                                 stup;
1749         Form_pg_class pgcform;
1750         ScanKeyData askey;
1751         Form_pg_attribute attp;
1752         Buffer          buffer;
1753
1754         /*
1755          * update number of tuples and number of pages in pg_class
1756          */
1757         rtup = SearchSysCacheTuple(RELOID,
1758                                                            ObjectIdGetDatum(relid),
1759                                                            0, 0, 0);
1760         if (!HeapTupleIsValid(rtup))
1761                 elog(ERROR, "pg_class entry for relid %d vanished during vacuuming",
1762                          relid);
1763
1764         rd = heap_openr(RelationRelationName);
1765
1766         /* get the buffer cache tuple */
1767         rtup = heap_fetch(rd, SnapshotNow, &rtup->t_ctid, &buffer);
1768
1769         /* overwrite the existing statistics in the tuple */
1770         vc_setpagelock(rd, ItemPointerGetBlockNumber(&rtup->t_ctid));
1771         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1772         pgcform->reltuples = num_tuples;
1773         pgcform->relpages = num_pages;
1774         pgcform->relhasindex = hasindex;
1775
1776         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1777         {
1778                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1779                 int                     natts = vacrelstats->va_natts;
1780
1781                 ad = heap_openr(AttributeRelationName);
1782                 sd = heap_openr(StatisticRelationName);
1783                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1784                                                            F_INT4EQ, relid);
1785
1786                 scan = heap_beginscan(ad, false, SnapshotNow, 1, &askey);
1787
1788                 while (HeapTupleIsValid(atup = heap_getnext(scan, 0)))
1789                 {
1790                         int                     i;
1791                         float32data selratio;           /* average ratio of rows selected
1792                                                                                  * for a random constant */
1793                         VacAttrStats *stats;
1794                         Datum           values[Natts_pg_statistic];
1795                         char            nulls[Natts_pg_statistic];
1796
1797                         attp = (Form_pg_attribute) GETSTRUCT(atup);
1798                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1799                                 /* they are unique anyway */
1800                                 continue;
1801
1802                         for (i = 0; i < natts; i++)
1803                         {
1804                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1805                                         break;
1806                         }
1807                         if (i >= natts)
1808                                 continue;
1809                         stats = &(vacattrstats[i]);
1810
1811                         /* overwrite the existing statistics in the tuple */
1812                         if (VacAttrStatsEqValid(stats))
1813                         {
1814
1815                                 vc_setpagelock(ad, ItemPointerGetBlockNumber(&atup->t_ctid));
1816
1817                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1818                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1819                                         selratio = 0;
1820                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1821                                 {
1822                                         double          min_cnt_d = stats->min_cnt,
1823                                                                 max_cnt_d = stats->max_cnt,
1824                                                                 null_cnt_d = stats->null_cnt,
1825                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1826
1827                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1828                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1829                                 }
1830                                 else
1831                                 {
1832                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1833                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1834
1835                                         /*
1836                                          * we assume count of other values are 20% of best
1837                                          * count in table
1838                                          */
1839                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1840                                 }
1841                                 if (selratio > 1.0)
1842                                         selratio = 1.0;
1843                                 attp->attdisbursion = selratio;
1844                                 WriteNoReleaseBuffer(ItemPointerGetBlockNumber(&atup->t_ctid));
1845
1846                                 /* DO PG_STATISTIC INSERTS */
1847
1848                                 /*
1849                                  * doing system relations, especially pg_statistic is a
1850                                  * problem
1851                                  */
1852                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1853                                                                                                                                                  * !IsSystemRelationName(
1854                                                                                                                                                  *
1855                                          pgcform->relname.data) */ )
1856                                 {
1857                                         FmgrInfo        out_function;
1858                                         char       *out_string;
1859
1860                                         for (i = 0; i < Natts_pg_statistic; ++i)
1861                                                 nulls[i] = ' ';
1862
1863                                         /* ----------------
1864                                          *      initialize values[]
1865                                          * ----------------
1866                                          */
1867                                         i = 0;
1868                                         values[i++] = (Datum) relid;            /* 1 */
1869                                         values[i++] = (Datum) attp->attnum; /* 2 */
1870                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1871                                         fmgr_info(stats->outfunc, &out_function);
1872                                         out_string = (*fmgr_faddr(&out_function)) (stats->min, stats->attr->atttypid);
1873                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1874                                         pfree(out_string);
1875                                         out_string = (char *) (*fmgr_faddr(&out_function)) (stats->max, stats->attr->atttypid);
1876                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1877                                         pfree(out_string);
1878
1879                                         stup = heap_formtuple(sd->rd_att, values, nulls);
1880
1881                                         /* ----------------
1882                                          *      insert the tuple in the relation and get the tuple's oid.
1883                                          * ----------------
1884                                          */
1885                                         heap_insert(sd, stup);
1886                                         pfree(DatumGetPointer(values[3]));
1887                                         pfree(DatumGetPointer(values[4]));
1888                                         pfree(stup);
1889                                 }
1890                         }
1891                 }
1892                 heap_endscan(scan);
1893                 heap_close(ad);
1894                 heap_close(sd);
1895         }
1896
1897         /* XXX -- after write, should invalidate relcache in other backends */
1898         WriteNoReleaseBuffer(ItemPointerGetBlockNumber(&rtup->t_ctid));
1899
1900         /*
1901          * invalidating system relations confuses the function cache of
1902          * pg_operator and pg_opclass, bjm
1903          */
1904         if (!IsSystemRelationName(pgcform->relname.data))
1905                 RelationInvalidateHeapTuple(rd, rtup);
1906
1907         ReleaseBuffer(buffer);
1908         heap_close(rd);
1909 }
1910
1911 /*
1912  *      vc_delhilowstats() -- delete pg_statistics rows
1913  *
1914  */
1915 static void
1916 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1917 {
1918         Relation        pgstatistic;
1919         HeapScanDesc scan;
1920         HeapTuple       tuple;
1921         ScanKeyData key;
1922
1923         pgstatistic = heap_openr(StatisticRelationName);
1924
1925         if (relid != InvalidOid)
1926         {
1927                 ScanKeyEntryInitialize(&key, 0x0, Anum_pg_statistic_starelid,
1928                                                            F_OIDEQ,
1929                                                            ObjectIdGetDatum(relid));
1930                 scan = heap_beginscan(pgstatistic, false, SnapshotNow, 1, &key);
1931         }
1932         else
1933                 scan = heap_beginscan(pgstatistic, false, SnapshotNow, 0, NULL);
1934
1935         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
1936         {
1937                 if (attcnt > 0)
1938                 {
1939                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(tuple);
1940                         int                     i;
1941
1942                         for (i = 0; i < attcnt; i++)
1943                         {
1944                                 if (pgs->staattnum == attnums[i] + 1)
1945                                         break;
1946                         }
1947                         if (i >= attcnt)
1948                                 continue;               /* don't delete it */
1949                 }
1950                 heap_delete(pgstatistic, &tuple->t_ctid);
1951         }
1952
1953         heap_endscan(scan);
1954         heap_close(pgstatistic);
1955 }
1956
1957 static void
1958 vc_setpagelock(Relation rel, BlockNumber blkno)
1959 {
1960         ItemPointerData itm;
1961
1962         ItemPointerSet(&itm, blkno, 1);
1963
1964         RelationSetLockForWritePage(rel, &itm);
1965 }
1966
1967 /*
1968  *      vc_reappage() -- save a page on the array of reapped pages.
1969  *
1970  *              As a side effect of the way that the vacuuming loop for a given
1971  *              relation works, higher pages come after lower pages in the array
1972  *              (and highest tid on a page is last).
1973  */
1974 static void
1975 vc_reappage(VPageList vpl, VPageDescr vpc)
1976 {
1977         VPageDescr      newvpd;
1978
1979         /* allocate a VPageDescrData entry */
1980         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_offsets_free * sizeof(OffsetNumber));
1981
1982         /* fill it in */
1983         if (vpc->vpd_offsets_free > 0)
1984                 memmove(newvpd->vpd_offsets, vpc->vpd_offsets, vpc->vpd_offsets_free * sizeof(OffsetNumber));
1985         newvpd->vpd_blkno = vpc->vpd_blkno;
1986         newvpd->vpd_free = vpc->vpd_free;
1987         newvpd->vpd_offsets_used = vpc->vpd_offsets_used;
1988         newvpd->vpd_offsets_free = vpc->vpd_offsets_free;
1989
1990         /* insert this page into vpl list */
1991         vc_vpinsert(vpl, newvpd);
1992
1993 }       /* vc_reappage */
1994
1995 static void
1996 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
1997 {
1998
1999         /* allocate a VPageDescr entry if needed */
2000         if (vpl->vpl_num_pages == 0)
2001                 vpl->vpl_pagedesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2002         else if (vpl->vpl_num_pages % 100 == 0)
2003                 vpl->vpl_pagedesc = (VPageDescr *) repalloc(vpl->vpl_pagedesc, (vpl->vpl_num_pages + 100) * sizeof(VPageDescr));
2004         vpl->vpl_pagedesc[vpl->vpl_num_pages] = vpnew;
2005         (vpl->vpl_num_pages)++;
2006
2007 }
2008
2009 static void
2010 vc_free(VRelList vrl)
2011 {
2012         VRelList        p_vrl;
2013         MemoryContext old;
2014         PortalVariableMemory pmem;
2015
2016         pmem = PortalGetVariableMemory(vc_portal);
2017         old = MemoryContextSwitchTo((MemoryContext) pmem);
2018
2019         while (vrl != (VRelList) NULL)
2020         {
2021
2022                 /* free rel list entry */
2023                 p_vrl = vrl;
2024                 vrl = vrl->vrl_next;
2025                 pfree(p_vrl);
2026         }
2027
2028         MemoryContextSwitchTo(old);
2029 }
2030
2031 static char *
2032 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2033 {
2034         int                     res;
2035         int                     last = nelem - 1;
2036         int                     celm = nelem / 2;
2037         bool            last_move,
2038                                 first_move;
2039
2040         last_move = first_move = true;
2041         for (;;)
2042         {
2043                 if (first_move == true)
2044                 {
2045                         res = compar(bot, elm);
2046                         if (res > 0)
2047                                 return NULL;
2048                         if (res == 0)
2049                                 return bot;
2050                         first_move = false;
2051                 }
2052                 if (last_move == true)
2053                 {
2054                         res = compar(elm, bot + last * size);
2055                         if (res > 0)
2056                                 return NULL;
2057                         if (res == 0)
2058                                 return bot + last * size;
2059                         last_move = false;
2060                 }
2061                 res = compar(elm, bot + celm * size);
2062                 if (res == 0)
2063                         return bot + celm * size;
2064                 if (res < 0)
2065                 {
2066                         if (celm == 0)
2067                                 return NULL;
2068                         last = celm - 1;
2069                         celm = celm / 2;
2070                         last_move = true;
2071                         continue;
2072                 }
2073
2074                 if (celm == last)
2075                         return NULL;
2076
2077                 last = last - celm - 1;
2078                 bot = bot + (celm + 1) * size;
2079                 celm = (last + 1) / 2;
2080                 first_move = true;
2081         }
2082
2083 }       /* vc_find_eq */
2084
2085 static int
2086 vc_cmp_blk(char *left, char *right)
2087 {
2088         BlockNumber lblk,
2089                                 rblk;
2090
2091         lblk = (*((VPageDescr *) left))->vpd_blkno;
2092         rblk = (*((VPageDescr *) right))->vpd_blkno;
2093
2094         if (lblk < rblk)
2095                 return -1;
2096         if (lblk == rblk)
2097                 return 0;
2098         return 1;
2099
2100 }       /* vc_cmp_blk */
2101
2102 static int
2103 vc_cmp_offno(char *left, char *right)
2104 {
2105
2106         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2107                 return -1;
2108         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2109                 return 0;
2110         return 1;
2111
2112 }       /* vc_cmp_offno */
2113
2114
2115 static void
2116 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2117 {
2118         Relation        pgindex;
2119         Relation        irel;
2120         TupleDesc       tupdesc;
2121         HeapTuple       tuple;
2122         HeapScanDesc scan;
2123         Datum           d;
2124         int                     i,
2125                                 k;
2126         bool            n;
2127         ScanKeyData key;
2128         Oid                *ioid;
2129
2130         *nindices = i = 0;
2131
2132         ioid = (Oid *) palloc(10 * sizeof(Oid));
2133
2134         /* prepare a heap scan on the pg_index relation */
2135         pgindex = heap_openr(IndexRelationName);
2136         tupdesc = RelationGetDescr(pgindex);
2137
2138         ScanKeyEntryInitialize(&key, 0x0, Anum_pg_index_indrelid,
2139                                                    F_OIDEQ,
2140                                                    ObjectIdGetDatum(relid));
2141
2142         scan = heap_beginscan(pgindex, false, SnapshotNow, 1, &key);
2143
2144         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
2145         {
2146                 d = heap_getattr(tuple, Anum_pg_index_indexrelid,
2147                                                  tupdesc, &n);
2148                 i++;
2149                 if (i % 10 == 0)
2150                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2151                 ioid[i - 1] = DatumGetObjectId(d);
2152         }
2153
2154         heap_endscan(scan);
2155         heap_close(pgindex);
2156
2157         if (i == 0)
2158         {                                                       /* No one index found */
2159                 pfree(ioid);
2160                 return;
2161         }
2162
2163         if (Irel != (Relation **) NULL)
2164                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2165
2166         for (k = 0; i > 0;)
2167         {
2168                 irel = index_open(ioid[--i]);
2169                 if (irel != (Relation) NULL)
2170                 {
2171                         if (Irel != (Relation **) NULL)
2172                                 (*Irel)[k] = irel;
2173                         else
2174                                 index_close(irel);
2175                         k++;
2176                 }
2177                 else
2178                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2179         }
2180         *nindices = k;
2181         pfree(ioid);
2182
2183         if (Irel != (Relation **) NULL && *nindices == 0)
2184         {
2185                 pfree(*Irel);
2186                 *Irel = (Relation *) NULL;
2187         }
2188
2189 }       /* vc_getindices */
2190
2191
2192 static void
2193 vc_clsindices(int nindices, Relation *Irel)
2194 {
2195
2196         if (Irel == (Relation *) NULL)
2197                 return;
2198
2199         while (nindices--)
2200                 index_close(Irel[nindices]);
2201         pfree(Irel);
2202
2203 }       /* vc_clsindices */
2204
2205
2206 static void
2207 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2208 {
2209         IndDesc    *idcur;
2210         HeapTuple       cachetuple;
2211         AttrNumber *attnumP;
2212         int                     natts;
2213         int                     i;
2214
2215         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2216
2217         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2218         {
2219                 cachetuple = SearchSysCacheTupleCopy(INDEXRELID,
2220                                                          ObjectIdGetDatum(RelationGetRelid(Irel[i])),
2221                                                                                          0, 0, 0);
2222                 Assert(cachetuple);
2223
2224                 /*
2225                  * we never free the copy we make, because Idesc needs it for
2226                  * later
2227                  */
2228                 idcur->tform = (Form_pg_index) GETSTRUCT(cachetuple);
2229                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2230                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2231                          attnumP++, natts++);
2232                 if (idcur->tform->indproc != InvalidOid)
2233                 {
2234                         idcur->finfoP = &(idcur->finfo);
2235                         FIgetnArgs(idcur->finfoP) = natts;
2236                         natts = 1;
2237                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2238                         *(FIgetname(idcur->finfoP)) = '\0';
2239                 }
2240                 else
2241                         idcur->finfoP = (FuncIndexInfo *) NULL;
2242
2243                 idcur->natts = natts;
2244         }
2245
2246 }       /* vc_mkindesc */
2247
2248
2249 static bool
2250 vc_enough_space(VPageDescr vpd, Size len)
2251 {
2252
2253         len = DOUBLEALIGN(len);
2254
2255         if (len > vpd->vpd_free)
2256                 return false;
2257
2258         if (vpd->vpd_offsets_used < vpd->vpd_offsets_free)      /* there are free
2259                                                                                                                  * itemid(s) */
2260                 return true;                    /* and len <= free_space */
2261
2262         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2263         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2264                 return true;
2265
2266         return false;
2267
2268 }       /* vc_enough_space */