1 /*-------------------------------------------------------------------------
4 * the postgres vacuum cleaner
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.34 1997/06/06 03:41:16 momjian Exp $
12 *-------------------------------------------------------------------------
16 #include <sys/types.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <utils/builtins.h>
46 #include <commands/vacuum.h>
47 #include <parser/catalog_utils.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 # include <rusagestub.h>
53 # include <sys/time.h>
54 # include <sys/resource.h>
57 #include <port-protos.h>
59 bool VacuumRunning = false;
61 static Portal vc_portal;
63 static int MESSAGE_LEVEL; /* message level */
65 #define swapLong(a,b) {long tmp; tmp=a; a=b; b=tmp;}
66 #define swapInt(a,b) {int tmp; tmp=a; a=b; b=tmp;}
67 #define swapDatum(a,b) {Datum tmp; tmp=a; a=b; b=tmp;}
68 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
69 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
70 stats->f_cmpgt != NULL && \
71 RegProcedureIsValid(stats->outfunc) )
74 /* non-export function prototypes */
75 static void vc_init(void);
76 static void vc_shutdown(void);
77 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
78 static VRelList vc_getrels(NameData *VacRelP);
79 static void vc_vacone (Oid relid, bool analyze, List *va_cols);
80 static void vc_scanheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
81 static void vc_rpfheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
82 static void vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList vpl);
83 static void vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
84 static void vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
85 static void vc_scanoneind (Relation indrel, int nhtups);
86 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
87 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
88 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
89 static void vc_delhilowstats (Oid relid, int attcnt, int *attnums);
90 static void vc_setpagelock(Relation rel, BlockNumber blkno);
91 static VPageDescr vc_tidreapped (ItemPointer itemptr, VPageList vpl);
92 static void vc_reappage (VPageList vpl, VPageDescr vpc);
93 static void vc_vpinsert (VPageList vpl, VPageDescr vpnew);
94 static void vc_free(VRelList vrl);
95 static void vc_getindices (Oid relid, int *nindices, Relation **Irel);
96 static void vc_clsindices (int nindices, Relation *Irel);
97 static Relation vc_getarchrel(Relation heaprel);
98 static void vc_archive(Relation archrel, HeapTuple htup);
99 static bool vc_isarchrel(char *rname);
100 static void vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
101 static char * vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
102 static int vc_cmp_blk (char *left, char *right);
103 static int vc_cmp_offno (char *left, char *right);
104 static bool vc_enough_space (VPageDescr vpd, Size len);
107 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
111 PortalVariableMemory pmem;
117 * Create a portal for safe memory across transctions. We need to
118 * palloc the name space for it because our hash function expects
119 * the name to be on a longword boundary. CreatePortal copies the
120 * name to safe storage for us.
122 pname = (char *) palloc(strlen(VACPNAME) + 1);
123 strcpy(pname, VACPNAME);
124 vc_portal = CreatePortal(pname);
128 MESSAGE_LEVEL = NOTICE;
130 MESSAGE_LEVEL = DEBUG;
132 /* vacrel gets de-allocated on transaction commit */
134 strcpy(VacRel.data,vacrel);
136 pmem = PortalGetVariableMemory(vc_portal);
137 old = MemoryContextSwitchTo((MemoryContext)pmem);
139 Assert ( va_spec == NIL || analyze );
140 foreach (le, va_spec)
142 char *col = (char*)lfirst(le);
145 dest = (char*) palloc (strlen (col) + 1);
147 va_cols = lappend (va_cols, dest);
149 (void) MemoryContextSwitchTo(old);
151 /* initialize vacuum cleaner */
154 /* vacuum the database */
156 vc_vacuum (&VacRel, analyze, va_cols);
158 vc_vacuum (NULL, analyze, NIL);
160 PortalDestroy (&vc_portal);
167 * vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
169 * We run exactly one vacuum cleaner at a time. We use the file system
170 * to guarantee an exclusive lock on vacuuming, since a single vacuum
171 * cleaner instantiation crosses transaction boundaries, and we'd lose
172 * postgres-style locks at the end of every transaction.
174 * The strangeness with committing and starting transactions in the
175 * init and shutdown routines is due to the fact that the vacuum cleaner
176 * is invoked via a sql command, and so is already executing inside
177 * a transaction. We need to leave ourselves in a predictable state
178 * on entry and exit to the vacuum cleaner. We commit the transaction
179 * started in PostgresMain() inside vc_init(), and start one in
180 * vc_shutdown() to match the commit waiting for us back in
188 if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
189 elog(WARN, "can't create lock file -- another vacuum cleaner running?");
194 * By here, exclusive open on the lock file succeeded. If we abort
195 * for any reason during vacuuming, we need to remove the lock file.
196 * This global variable is checked in the transaction manager on xact
197 * abort, and the routine vc_abort() is called if necessary.
200 VacuumRunning = true;
202 /* matches the StartTransaction in PostgresMain() */
203 CommitTransactionCommand();
209 /* on entry, not in a transaction */
210 if (unlink("pg_vlock") < 0)
211 elog(WARN, "vacuum: can't destroy lock file!");
213 /* okay, we're done */
214 VacuumRunning = false;
216 /* matches the CommitTransaction in PostgresMain() */
217 StartTransactionCommand();
224 /* on abort, remove the vacuum cleaner lock file */
225 (void) unlink("pg_vlock");
227 VacuumRunning = false;
231 * vc_vacuum() -- vacuum the database.
233 * This routine builds a list of relations to vacuum, and then calls
234 * code that vacuums them one at a time. We are careful to vacuum each
235 * relation in a separate transaction in order to avoid holding too many
239 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
243 /* get list of relations */
244 vrl = vc_getrels(VacRelP);
246 if ( analyze && VacRelP == NULL && vrl != NULL )
247 vc_delhilowstats (InvalidOid, 0, NULL);
249 /* vacuum each heap relation */
250 for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
251 vc_vacone (cur->vrl_relid, analyze, va_cols);
257 vc_getrels(NameData *VacRelP)
261 HeapScanDesc pgcscan;
264 PortalVariableMemory portalmem;
275 StartTransactionCommand();
278 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
279 NameEqualRegProcedure,
280 PointerGetDatum(VacRelP->data));
282 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
283 CharacterEqualRegProcedure, CharGetDatum('r'));
286 portalmem = PortalGetVariableMemory(vc_portal);
287 vrl = cur = (VRelList) NULL;
289 pgclass = heap_openr(RelationRelationName);
290 pgcdesc = RelationGetTupleDescriptor(pgclass);
292 pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
294 while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
299 * We have to be careful not to vacuum the archive (since it
300 * already contains vacuumed tuples), and not to vacuum
301 * relations on write-once storage managers like the Sony
302 * jukebox at Berkeley.
305 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
309 /* skip archive relations */
310 if (vc_isarchrel(rname)) {
315 /* don't vacuum large objects for now - something breaks when we do */
316 if ( (strlen(rname) > 4) && rname[0] == 'x' &&
317 rname[1] == 'i' && rname[2] == 'n' &&
318 (rname[3] == 'v' || rname[3] == 'x'))
320 elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now",
326 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
328 smgrno = DatumGetInt16(d);
330 /* skip write-once storage managers */
331 if (smgriswo(smgrno)) {
336 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
339 rkind = DatumGetChar(d);
341 /* skip system relations */
344 elog(NOTICE, "Vacuum: can not process index and certain system tables" );
348 /* get a relation list entry for this guy */
349 old = MemoryContextSwitchTo((MemoryContext)portalmem);
350 if (vrl == (VRelList) NULL) {
351 vrl = cur = (VRelList) palloc(sizeof(VRelListData));
353 cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
356 (void) MemoryContextSwitchTo(old);
358 cur->vrl_relid = pgctup->t_oid;
359 cur->vrl_next = (VRelList) NULL;
361 /* wei hates it if you forget to do this */
365 elog(NOTICE, "Vacuum: table not found" );
368 heap_endscan(pgcscan);
371 CommitTransactionCommand();
377 * vc_vacone() -- vacuum one heap relation
379 * This routine vacuums a single heap, cleans out its indices, and
380 * updates its statistics npages and ntups statistics.
382 * Doing one heap at a time incurs extra overhead, since we need to
383 * check that the heap exists again just before we vacuum it. The
384 * reason that we do this is so that vacuuming can be spread across
385 * many small transactions. Otherwise, two-phase locking would require
386 * us to lock the entire database during one pass of the vacuum cleaner.
389 vc_vacone (Oid relid, bool analyze, List *va_cols)
393 HeapTuple pgctup, pgttup;
395 HeapScanDesc pgcscan;
398 VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
399 VPageListData Fvpl; /* List of pages with space enough for re-using */
403 VRelStats *vacrelstats;
405 StartTransactionCommand();
407 ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
408 ObjectIdEqualRegProcedure,
409 ObjectIdGetDatum(relid));
411 pgclass = heap_openr(RelationRelationName);
412 pgcdesc = RelationGetTupleDescriptor(pgclass);
413 pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
416 * Race condition -- if the pg_class tuple has gone away since the
417 * last time we saw it, we don't need to vacuum it.
420 if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
421 heap_endscan(pgcscan);
423 CommitTransactionCommand();
427 /* now open the class and vacuum it */
428 onerel = heap_open(relid);
430 vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
431 vacrelstats->relid = relid;
432 vacrelstats->npages = vacrelstats->ntups = 0;
433 vacrelstats->hasindex = false;
434 if ( analyze && !IsSystemRelationName ((RelationGetRelationName (onerel))->data) )
436 int attr_cnt, *attnums = NULL;
437 AttributeTupleForm *attr;
439 attr_cnt = onerel->rd_att->natts;
440 attr = onerel->rd_att->attrs;
442 if ( va_cols != NIL )
447 if ( length (va_cols) > attr_cnt )
448 elog (WARN, "vacuum: too many attributes specified for relation %.*s",
449 NAMEDATALEN, (RelationGetRelationName(onerel))->data);
450 attnums = (int*) palloc (attr_cnt * sizeof (int));
451 foreach (le, va_cols)
453 char *col = (char*) lfirst(le);
455 for (i = 0; i < attr_cnt; i++)
457 if ( namestrcmp (&(attr[i]->attname), col) == 0 )
460 if ( i < attr_cnt ) /* found */
464 elog (WARN, "vacuum: there is no attribute %s in %.*s",
465 col, NAMEDATALEN, (RelationGetRelationName(onerel))->data);
471 vacrelstats->vacattrstats =
472 (VacAttrStats *) palloc (attr_cnt * sizeof(VacAttrStats));
474 for (i = 0; i < attr_cnt; i++)
476 Operator func_operator;
477 OperatorTupleForm pgopform;
480 stats = &vacrelstats->vacattrstats[i];
481 stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
482 memmove (stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
483 stats->best = stats->guess1 = stats->guess2 = 0;
484 stats->max = stats->min = 0;
485 stats->best_len = stats->guess1_len = stats->guess2_len = 0;
486 stats->max_len = stats->min_len = 0;
487 stats->initialized = false;
488 stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
489 stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
491 func_operator = oper("=",stats->attr->atttypid,stats->attr->atttypid,true);
492 if (func_operator != NULL)
496 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
497 fmgr_info (pgopform->oprcode, &(stats->f_cmpeq), &nargs);
500 stats->f_cmpeq = NULL;
502 func_operator = oper("<",stats->attr->atttypid,stats->attr->atttypid,true);
503 if (func_operator != NULL)
507 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
508 fmgr_info (pgopform->oprcode, &(stats->f_cmplt), &nargs);
511 stats->f_cmplt = NULL;
513 func_operator = oper(">",stats->attr->atttypid,stats->attr->atttypid,true);
514 if (func_operator != NULL)
518 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
519 fmgr_info (pgopform->oprcode, &(stats->f_cmpgt), &nargs);
522 stats->f_cmpgt = NULL;
524 pgttup = SearchSysCacheTuple(TYPOID,
525 ObjectIdGetDatum(stats->attr->atttypid),
527 if (HeapTupleIsValid(pgttup))
528 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
530 stats->outfunc = InvalidOid;
532 vacrelstats->va_natts = attr_cnt;
533 vc_delhilowstats (relid, ((attnums) ? attr_cnt : 0), attnums);
539 vacrelstats->va_natts = 0;
540 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
543 /* we require the relation to be locked until the indices are cleaned */
544 RelationSetLockForWrite(onerel);
547 Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
548 vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
550 /* Now open indices */
551 Irel = (Relation *) NULL;
552 vc_getindices(vacrelstats->relid, &nindices, &Irel);
555 vacrelstats->hasindex = true;
557 vacrelstats->hasindex = false;
559 /* Clean/scan index relation(s) */
560 if ( Irel != (Relation*) NULL )
562 if ( Vvpl.vpl_npages > 0 )
564 for (i = 0; i < nindices; i++)
565 vc_vaconeind (&Vvpl, Irel[i], vacrelstats->ntups);
567 else /* just scan indices to update statistic */
569 for (i = 0; i < nindices; i++)
570 vc_scanoneind (Irel[i], vacrelstats->ntups);
574 if ( Fvpl.vpl_npages > 0 ) /* Try to shrink heap */
575 vc_rpfheap (vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
578 if ( Irel != (Relation*) NULL )
579 vc_clsindices (nindices, Irel);
580 if ( Vvpl.vpl_npages > 0 ) /* Clean pages from Vvpl list */
581 vc_vacheap (vacrelstats, onerel, &Vvpl);
584 /* ok - free Vvpl list of reapped pages */
585 if ( Vvpl.vpl_npages > 0 )
587 vpp = Vvpl.vpl_pgdesc;
588 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
590 pfree (Vvpl.vpl_pgdesc);
591 if ( Fvpl.vpl_npages > 0 )
592 pfree (Fvpl.vpl_pgdesc);
595 /* all done with this class */
597 heap_endscan(pgcscan);
600 /* update statistics in pg_class */
601 vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
602 vacrelstats->hasindex, vacrelstats);
604 /* next command frees attribute stats */
606 CommitTransactionCommand();
610 * vc_scanheap() -- scan an open heap relation
612 * This routine sets commit times, constructs Vvpl list of
613 * empty/uninitialized pages and pages with dead tuples and
614 * ~LP_USED line pointers, constructs Fvpl list of pages
615 * appropriate for purposes of shrinking and maintains statistics
616 * on the number of live tuples in a heap.
619 vc_scanheap (VRelStats *vacrelstats, Relation onerel,
620 VPageList Vvpl, VPageList Fvpl)
627 Page page, tempPage = NULL;
628 OffsetNumber offnum, maxoff;
629 bool pgchanged, tupgone, dobufrel, notup;
632 uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
634 Size min_tlen = MAXTUPLEN;
636 int32 i/*, attr_cnt*/;
637 struct rusage ru0, ru1;
638 bool do_shrinking = true;
640 getrusage(RUSAGE_SELF, &ru0);
642 nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
645 relname = (RelationGetRelationName(onerel))->data;
647 nblocks = RelationGetNumberOfBlocks(onerel);
649 vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
652 for (blkno = 0; blkno < nblocks; blkno++) {
653 buf = ReadBuffer(onerel, blkno);
654 page = BufferGetPage(buf);
655 vpc->vpd_blkno = blkno;
658 if (PageIsNew(page)) {
659 elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
660 NAMEDATALEN, relname, blkno);
661 PageInit (page, BufferGetPageSize (buf), 0);
662 vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
663 frsize += (vpc->vpd_free - sizeof (ItemIdData));
666 vc_reappage (Vvpl, vpc);
671 if (PageIsEmpty(page)) {
672 vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
673 frsize += (vpc->vpd_free - sizeof (ItemIdData));
676 vc_reappage (Vvpl, vpc);
683 maxoff = PageGetMaxOffsetNumber(page);
684 for (offnum = FirstOffsetNumber;
686 offnum = OffsetNumberNext(offnum)) {
687 itemid = PageGetItemId(page, offnum);
690 * Collect un-used items too - it's possible to have
691 * indices pointing here after crash.
693 if (!ItemIdIsUsed(itemid)) {
694 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
699 htup = (HeapTuple) PageGetItem(page, itemid);
702 if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) &&
703 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
705 if (TransactionIdDidAbort(htup->t_xmin)) {
707 } else if (TransactionIdDidCommit(htup->t_xmin)) {
708 htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
710 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
712 * Not Aborted, Not Committed, Not in Progress -
713 * so it from crashed process. - vadim 11/26/96
720 elog (NOTICE, "Rel %.*s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
721 NAMEDATALEN, relname, blkno, offnum, htup->t_xmin);
722 do_shrinking = false;
726 if (TransactionIdIsValid((TransactionId)htup->t_xmax))
728 if (TransactionIdDidAbort(htup->t_xmax))
730 StoreInvalidTransactionId(&(htup->t_xmax));
733 else if (TransactionIdDidCommit(htup->t_xmax))
735 else if ( !TransactionIdIsInProgress (htup->t_xmax) ) {
737 * Not Aborted, Not Committed, Not in Progress -
738 * so it from crashed process. - vadim 06/02/97
740 StoreInvalidTransactionId(&(htup->t_xmax));
745 elog (NOTICE, "Rel %.*s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
746 NAMEDATALEN, relname, blkno, offnum, htup->t_xmax);
747 do_shrinking = false;
752 * Is it possible at all ? - vadim 11/26/96
754 if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
756 elog (NOTICE, "Rel %.*s: TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
757 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.",
758 NAMEDATALEN, relname, blkno, offnum,
759 TransactionIdIsValid((TransactionId)htup->t_xmax),
764 * It's possibly! But from where it comes ?
765 * And should we fix it ? - vadim 11/28/96
767 itemptr = &(htup->t_ctid);
768 if ( !ItemPointerIsValid (itemptr) ||
769 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
771 elog (NOTICE, "Rel %.*s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
772 NAMEDATALEN, relname, blkno, offnum,
773 BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
774 itemptr->ip_posid, tupgone);
780 if ( htup->t_len != itemid->lp_len )
782 elog (NOTICE, "Rel %.*s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
783 NAMEDATALEN, relname, blkno, offnum,
784 itemid->lp_len, htup->t_len, tupgone);
786 if ( !OidIsValid(htup->t_oid) )
788 elog (NOTICE, "Rel %.*s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
789 NAMEDATALEN, relname, blkno, offnum, tupgone);
795 if ( tempPage == (Page) NULL )
799 pageSize = PageGetPageSize(page);
800 tempPage = (Page) palloc(pageSize);
801 memmove (tempPage, page, pageSize);
804 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
807 lpp->lp_flags &= ~LP_USED;
809 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
815 if ( htup->t_len < min_tlen )
816 min_tlen = htup->t_len;
817 if ( htup->t_len > max_tlen )
818 max_tlen = htup->t_len;
819 vc_attrstats(onerel, vacrelstats, htup);
830 if ( tempPage != (Page) NULL )
831 { /* Some tuples are gone */
832 PageRepairFragmentation(tempPage);
833 vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
834 frsize += vpc->vpd_free;
835 vc_reappage (Vvpl, vpc);
837 tempPage = (Page) NULL;
839 else if ( vpc->vpd_noff > 0 )
840 { /* there are only ~LP_USED line pointers */
841 vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
842 frsize += vpc->vpd_free;
843 vc_reappage (Vvpl, vpc);
855 /* save stats in the rel list for use later */
856 vacrelstats->ntups = ntups;
857 vacrelstats->npages = nblocks;
858 /* vacrelstats->natts = attr_cnt;*/
860 min_tlen = max_tlen = 0;
861 vacrelstats->min_tlen = min_tlen;
862 vacrelstats->max_tlen = max_tlen;
864 Vvpl->vpl_nemend = nemend;
865 Fvpl->vpl_nemend = nemend;
868 * Try to make Fvpl keeping in mind that we can't use free space
869 * of "empty" end-pages and last page if it reapped.
871 if ( do_shrinking && Vvpl->vpl_npages - nemend > 0 )
873 int nusf; /* blocks usefull for re-using */
875 nusf = Vvpl->vpl_npages - nemend;
876 if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
879 for (i = 0; i < nusf; i++)
881 vp = Vvpl->vpl_pgdesc[i];
882 if ( vc_enough_space (vp, min_tlen) )
884 vc_vpinsert (Fvpl, vp);
885 frsusf += vp->vpd_free;
890 getrusage(RUSAGE_SELF, &ru1);
892 elog (MESSAGE_LEVEL, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
893 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
894 NAMEDATALEN, relname,
895 nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
896 ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
897 frsize, frsusf, nemend, Fvpl->vpl_npages,
898 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
899 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
905 * vc_rpfheap() -- try to repaire relation' fragmentation
907 * This routine marks dead tuples as unused and tries re-use dead space
908 * by moving tuples (and inserting indices if needed). It constructs
909 * Nvpl list of free-ed pages (moved tuples) and clean indices
910 * for them after committing (in hack-manner - without losing locks
911 * and freeing memory!) current transaction. It truncates relation
912 * if some end-blocks are gone away.
915 vc_rpfheap (VRelStats *vacrelstats, Relation onerel,
916 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
920 AbsoluteTime myCTM = 0;
923 Page page, ToPage = NULL;
924 OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
925 ItemId itemid, newitemid;
926 HeapTuple htup, newtup;
927 TupleDesc tupdesc = NULL;
928 Datum *idatum = NULL;
930 InsertIndexResult iresult;
932 VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
934 IndDesc *Idesc, *idcur;
935 int Fblklast, Vblklast, i;
937 int nmoved, Fnpages, Vnpages;
939 bool isempty, dowrite;
941 struct rusage ru0, ru1;
943 getrusage(RUSAGE_SELF, &ru0);
945 myXID = GetCurrentTransactionId();
946 myCID = GetCurrentCommandId();
948 if ( Irel != (Relation*) NULL ) /* preparation for index' inserts */
950 vc_mkindesc (onerel, nindices, Irel, &Idesc);
951 tupdesc = RelationGetTupleDescriptor(onerel);
952 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
953 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
956 /* if the relation has an archive, open it */
957 if (onerel->rd_rel->relarch != 'n')
959 archrel = vc_getarchrel(onerel);
960 /* Archive tuples from "empty" end-pages */
961 for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1,
962 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
964 if ( (*vpp)->vpd_noff > 0 )
966 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
967 page = BufferGetPage(buf);
968 Assert ( !PageIsEmpty(page) );
969 vc_vacpage (page, *vpp, archrel);
975 archrel = (Relation) NULL;
978 Fnpages = Fvpl->vpl_npages;
979 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
980 Fblklast = Fvplast->vpd_blkno;
981 Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
982 Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
983 Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
984 Vblklast = Vvplast->vpd_blkno;
985 Assert ( Vblklast >= Fblklast );
986 ToBuf = InvalidBuffer;
989 vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
990 vpc->vpd_nusd = vpc->vpd_noff = 0;
992 nblocks = vacrelstats->npages;
993 for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
995 /* if it's reapped page and it was used by me - quit */
996 if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
999 buf = ReadBuffer(onerel, blkno);
1000 page = BufferGetPage(buf);
1004 isempty = PageIsEmpty(page);
1007 if ( blkno == Vblklast ) /* it's reapped page */
1009 if ( Vvplast->vpd_noff > 0 ) /* there are dead tuples */
1010 { /* on this page - clean */
1011 Assert ( ! isempty );
1012 vc_vacpage (page, Vvplast, archrel);
1020 Assert ( Vnpages > 0 );
1021 /* get prev reapped page from Vvpl */
1022 Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1023 Vblklast = Vvplast->vpd_blkno;
1024 if ( blkno == Fblklast ) /* this page in Fvpl too */
1027 Assert ( Fnpages > 0 );
1028 Assert ( Fvplast->vpd_nusd == 0 );
1029 /* get prev reapped page from Fvpl */
1030 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1031 Fblklast = Fvplast->vpd_blkno;
1033 Assert ( Fblklast <= Vblklast );
1042 Assert ( ! isempty );
1045 vpc->vpd_blkno = blkno;
1046 maxoff = PageGetMaxOffsetNumber(page);
1047 for (offnum = FirstOffsetNumber;
1049 offnum = OffsetNumberNext(offnum))
1051 itemid = PageGetItemId(page, offnum);
1053 if (!ItemIdIsUsed(itemid))
1056 htup = (HeapTuple) PageGetItem(page, itemid);
1059 /* try to find new page for this tuple */
1060 if ( ToBuf == InvalidBuffer ||
1061 ! vc_enough_space (ToVpd, tlen) )
1063 if ( ToBuf != InvalidBuffer )
1066 ToBuf = InvalidBuffer;
1068 * If no one tuple can't be added to this page -
1069 * remove page from Fvpl. - vadim 11/27/96
1071 if ( !vc_enough_space (ToVpd, vacrelstats->min_tlen) )
1073 if ( ToVpd != Fvplast )
1075 Assert ( Fnpages > ToVpI + 1 );
1076 memmove (Fvpl->vpl_pgdesc + ToVpI,
1077 Fvpl->vpl_pgdesc + ToVpI + 1,
1078 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
1080 Assert ( Fnpages >= 1 );
1084 /* get prev reapped page from Fvpl */
1085 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1086 Fblklast = Fvplast->vpd_blkno;
1089 for (i=0; i < Fnpages; i++)
1091 if ( vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
1095 break; /* can't move item anywhere */
1097 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1098 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1099 ToPage = BufferGetPage(ToBuf);
1100 /* if this page was not used before - clean it */
1101 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
1102 vc_vacpage (ToPage, ToVpd, archrel);
1106 newtup = (HeapTuple) palloc (tlen);
1107 memmove((char *) newtup, (char *) htup, tlen);
1109 /* store transaction information */
1110 TransactionIdStore(myXID, &(newtup->t_xmin));
1111 newtup->t_cmin = myCID;
1112 StoreInvalidTransactionId(&(newtup->t_xmax));
1113 newtup->t_tmin = INVALID_ABSTIME;
1114 newtup->t_tmax = CURRENT_ABSTIME;
1115 ItemPointerSetInvalid(&newtup->t_chain);
1117 /* add tuple to the page */
1118 newoff = PageAddItem (ToPage, (Item)newtup, tlen,
1119 InvalidOffsetNumber, LP_USED);
1120 if ( newoff == InvalidOffsetNumber )
1123 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1124 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1125 ToVpd->vpd_nusd, ToVpd->vpd_noff);
1127 newitemid = PageGetItemId(ToPage, newoff);
1129 newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1130 ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1132 /* now logically delete end-tuple */
1133 TransactionIdStore(myXID, &(htup->t_xmax));
1134 htup->t_cmax = myCID;
1135 memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
1139 ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
1140 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1142 /* insert index' tuples if needed */
1143 if ( Irel != (Relation*) NULL )
1145 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1149 (AttrNumber *)&(idcur->tform->indkey[0]),
1156 iresult = index_insert (
1162 if (iresult) pfree(iresult);
1166 } /* walk along page */
1168 if ( vpc->vpd_noff > 0 ) /* some tuples were moved */
1170 vc_reappage (&Nvpl, vpc);
1178 if ( offnum <= maxoff )
1179 break; /* some item(s) left */
1181 } /* walk along relation */
1183 blkno++; /* new number of blocks */
1185 if ( ToBuf != InvalidBuffer )
1187 Assert (nmoved > 0);
1194 * We have to commit our tuple' movings before we'll truncate
1195 * relation, but we shouldn't lose our locks. And so - quick hack:
1196 * flush buffers and record status of current transaction
1197 * as committed, and continue. - vadim 11/13/96
1199 FlushBufferPool(!TransactionFlushEnabled());
1200 TransactionIdCommit(myXID);
1201 FlushBufferPool(!TransactionFlushEnabled());
1202 myCTM = TransactionIdGetCommitTime(myXID);
1206 * Clean uncleaned reapped pages from Vvpl list
1207 * and set commit' times for inserted tuples
1210 for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1212 Assert ( (*vpp)->vpd_blkno < blkno );
1213 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1214 page = BufferGetPage(buf);
1215 if ( (*vpp)->vpd_nusd == 0 ) /* this page was not used */
1217 /* noff == 0 in empty pages only - such pages should be re-used */
1218 Assert ( (*vpp)->vpd_noff > 0 );
1219 vc_vacpage (page, *vpp, archrel);
1221 else /* this page was used */
1224 moff = PageGetMaxOffsetNumber(page);
1225 for (newoff = FirstOffsetNumber;
1227 newoff = OffsetNumberNext(newoff))
1229 itemid = PageGetItemId(page, newoff);
1230 if (!ItemIdIsUsed(itemid))
1232 htup = (HeapTuple) PageGetItem(page, itemid);
1233 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1235 htup->t_tmin = myCTM;
1239 Assert ( (*vpp)->vpd_nusd == ntups );
1244 Assert ( nmoved == nchkmvd );
1246 getrusage(RUSAGE_SELF, &ru1);
1248 elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
1249 Elapsed %u/%u sec.",
1250 NAMEDATALEN, (RelationGetRelationName(onerel))->data,
1251 nblocks, blkno, nmoved,
1252 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1253 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1255 if ( Nvpl.vpl_npages > 0 )
1257 /* vacuum indices again if needed */
1258 if ( Irel != (Relation*) NULL )
1260 VPageDescr *vpleft, *vpright, vpsave;
1262 /* re-sort Nvpl.vpl_pgdesc */
1263 for (vpleft = Nvpl.vpl_pgdesc,
1264 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1265 vpleft < vpright; vpleft++, vpright--)
1267 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1269 for (i = 0; i < nindices; i++)
1270 vc_vaconeind (&Nvpl, Irel[i], vacrelstats->ntups);
1274 * clean moved tuples from last page in Nvpl list
1275 * if some tuples left there
1277 if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1279 Assert (vpc->vpd_blkno == blkno - 1);
1280 buf = ReadBuffer(onerel, vpc->vpd_blkno);
1281 page = BufferGetPage (buf);
1284 for (offnum = FirstOffsetNumber;
1286 offnum = OffsetNumberNext(offnum))
1288 itemid = PageGetItemId(page, offnum);
1289 if (!ItemIdIsUsed(itemid))
1291 htup = (HeapTuple) PageGetItem(page, itemid);
1292 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1293 itemid->lp_flags &= ~LP_USED;
1296 Assert ( vpc->vpd_noff == ntups );
1297 PageRepairFragmentation(page);
1301 /* now - free new list of reapped pages */
1302 vpp = Nvpl.vpl_pgdesc;
1303 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1305 pfree (Nvpl.vpl_pgdesc);
1308 /* truncate relation */
1309 if ( blkno < nblocks )
1311 blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1312 Assert ( blkno >= 0 );
1313 vacrelstats->npages = blkno; /* set new number of blocks */
1316 if ( archrel != (Relation) NULL )
1317 heap_close(archrel);
1319 if ( Irel != (Relation*) NULL ) /* pfree index' allocations */
1324 vc_clsindices (nindices, Irel);
1332 * vc_vacheap() -- free dead tuples
1334 * This routine marks dead tuples as unused and truncates relation
1335 * if there are "empty" end-blocks.
1338 vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1347 nblocks = Vvpl->vpl_npages;
1348 /* if the relation has an archive, open it */
1349 if (onerel->rd_rel->relarch != 'n')
1350 archrel = vc_getarchrel(onerel);
1353 archrel = (Relation) NULL;
1354 nblocks -= Vvpl->vpl_nemend; /* nothing to do with them */
1357 for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1359 if ( (*vpp)->vpd_noff > 0 )
1361 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1362 page = BufferGetPage (buf);
1363 vc_vacpage (page, *vpp, archrel);
1368 /* truncate relation if there are some empty end-pages */
1369 if ( Vvpl->vpl_nemend > 0 )
1371 Assert ( vacrelstats->npages >= Vvpl->vpl_nemend );
1372 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1373 elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u.",
1374 NAMEDATALEN, (RelationGetRelationName(onerel))->data,
1375 vacrelstats->npages, nblocks);
1378 * we have to flush "empty" end-pages (if changed, but who knows it)
1381 FlushBufferPool(!TransactionFlushEnabled());
1383 nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1384 Assert ( nblocks >= 0 );
1385 vacrelstats->npages = nblocks; /* set new number of blocks */
1388 if ( archrel != (Relation) NULL )
1389 heap_close(archrel);
1394 * vc_vacpage() -- free (and archive if needed) dead tuples on a page
1395 * and repaire its fragmentation.
1398 vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1404 Assert ( vpd->vpd_nusd == 0 );
1405 for (i=0; i < vpd->vpd_noff; i++)
1407 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1408 if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1410 htup = (HeapTuple) PageGetItem (page, itemid);
1411 vc_archive (archrel, htup);
1413 itemid->lp_flags &= ~LP_USED;
1415 PageRepairFragmentation(page);
1420 * _vc_scanoneind() -- scan one index relation to update statistic.
1424 vc_scanoneind (Relation indrel, int nhtups)
1426 RetrieveIndexResult res;
1427 IndexScanDesc iscan;
1430 struct rusage ru0, ru1;
1432 getrusage(RUSAGE_SELF, &ru0);
1434 /* walk through the entire index */
1435 iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1438 while ((res = index_getnext(iscan, ForwardScanDirection))
1439 != (RetrieveIndexResult) NULL)
1445 index_endscan(iscan);
1447 /* now update statistics in pg_class */
1448 nipages = RelationGetNumberOfBlocks(indrel);
1449 vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1451 getrusage(RUSAGE_SELF, &ru1);
1453 elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1454 NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups,
1455 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1456 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1458 if ( nitups != nhtups )
1459 elog (NOTICE, "Ind %.*s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1460 NAMEDATALEN, indrel->rd_rel->relname.data, nitups, nhtups);
1462 } /* vc_scanoneind */
1465 * vc_vaconeind() -- vacuum one index relation.
1467 * Vpl is the VPageList of the heap we're currently vacuuming.
1468 * It's locked. Indrel is an index relation on the vacuumed heap.
1469 * We don't set locks on the index relation here, since the indexed
1470 * access methods support locking at different granularities.
1471 * We let them handle it.
1473 * Finally, we arrange to update the index relation's statistics in
1477 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1479 RetrieveIndexResult res;
1480 IndexScanDesc iscan;
1481 ItemPointer heapptr;
1486 struct rusage ru0, ru1;
1488 getrusage(RUSAGE_SELF, &ru0);
1490 /* walk through the entire index */
1491 iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1495 while ((res = index_getnext(iscan, ForwardScanDirection))
1496 != (RetrieveIndexResult) NULL) {
1497 heapptr = &res->heap_iptr;
1499 if ( (vp = vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1502 elog(DEBUG, "<%x,%x> -> <%x,%x>",
1503 ItemPointerGetBlockNumber(&(res->index_iptr)),
1504 ItemPointerGetOffsetNumber(&(res->index_iptr)),
1505 ItemPointerGetBlockNumber(&(res->heap_iptr)),
1506 ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1508 if ( vp->vpd_noff == 0 )
1509 { /* this is EmptyPage !!! */
1510 elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
1511 NAMEDATALEN, indrel->rd_rel->relname.data,
1512 vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1515 index_delete(indrel, &res->index_iptr);
1524 index_endscan(iscan);
1526 /* now update statistics in pg_class */
1527 nipages = RelationGetNumberOfBlocks(indrel);
1528 vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1530 getrusage(RUSAGE_SELF, &ru1);
1532 elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1533 NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
1534 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1535 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1537 if ( nitups != nhtups )
1538 elog (NOTICE, "Ind %.*s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1539 NAMEDATALEN, indrel->rd_rel->relname.data, nitups, nhtups);
1541 } /* vc_vaconeind */
1544 * vc_tidreapped() -- is a particular tid reapped?
1546 * vpl->VPageDescr_array is sorted in right order.
1549 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1551 OffsetNumber ioffno;
1553 VPageDescr vp, *vpp;
1556 vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1557 ioffno = ItemPointerGetOffsetNumber(itemptr);
1560 vpp = (VPageDescr*) vc_find_eq ((char*)(vpl->vpl_pgdesc),
1561 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp,
1564 if ( vpp == (VPageDescr*) NULL )
1565 return ((VPageDescr)NULL);
1568 /* ok - we are on true page */
1570 if ( vp->vpd_noff == 0 ) { /* this is EmptyPage !!! */
1574 voff = (OffsetNumber*) vc_find_eq ((char*)(vp->vpd_voff),
1575 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno,
1578 if ( voff == (OffsetNumber*) NULL )
1579 return ((VPageDescr)NULL);
1583 } /* vc_tidreapped */
1586 * vc_attrstats() -- compute column statistics used by the optimzer
1588 * We compute the column min, max, null and non-null counts.
1589 * Plus we attempt to find the count of the value that occurs most
1590 * frequently in each column
1591 * These figures are used to compute the selectivity of the column
1593 * We use a three-bucked cache to get the most frequent item
1594 * The 'guess' buckets count hits. A cache miss causes guess1
1595 * to get the most hit 'guess' item in the most recent cycle, and
1596 * the new item goes into guess2. Whenever the total count of hits
1597 * of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1599 * This method works perfectly for columns with unique values, and columns
1600 * with only two unique values, plus nulls.
1602 * It becomes less perfect as the number of unique values increases and
1603 * their distribution in the table becomes more random.
1607 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1609 int i, attr_cnt = vacrelstats->va_natts;
1610 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1611 TupleDesc tupDesc = onerel->rd_att;
1615 for (i = 0; i < attr_cnt; i++) {
1616 VacAttrStats *stats = &vacattrstats[i];
1617 bool value_hit = true;
1619 value = (Datum) heap_getattr (htup, InvalidBuffer,
1620 stats->attr->attnum, tupDesc, &isnull);
1622 if (!VacAttrStatsEqValid(stats))
1628 stats->nonnull_cnt++;
1629 if (stats->initialized == false) {
1630 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1631 /* best_cnt gets incremented later */
1632 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1633 stats->guess1_cnt = stats->guess1_hits = 1;
1634 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1635 stats->guess2_hits = 1;
1636 if (VacAttrStatsLtGtValid(stats)) {
1637 vc_bucketcpy(stats->attr, value, &stats->max , &stats->max_len);
1638 vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1640 stats->initialized = true;
1642 if (VacAttrStatsLtGtValid(stats)) {
1643 if ( (*(stats->f_cmplt)) (value,stats->min) ) {
1644 vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1647 if ( (*(stats->f_cmpgt)) (value,stats->max) ) {
1648 vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1651 if ( (*(stats->f_cmpeq)) (value,stats->min) )
1653 else if ( (*(stats->f_cmpeq)) (value,stats->max) )
1656 if ( (*(stats->f_cmpeq)) (value,stats->best) )
1658 else if ( (*(stats->f_cmpeq)) (value,stats->guess1) ) {
1659 stats->guess1_cnt++;
1660 stats->guess1_hits++;
1662 else if ( (*(stats->f_cmpeq)) (value,stats->guess2) )
1663 stats->guess2_hits++;
1664 else value_hit = false;
1666 if (stats->guess2_hits > stats->guess1_hits) {
1667 swapDatum(stats->guess1,stats->guess2);
1668 swapInt(stats->guess1_len,stats->guess2_len);
1669 stats->guess1_cnt = stats->guess2_hits;
1670 swapLong(stats->guess1_hits, stats->guess2_hits);
1672 if (stats->guess1_cnt > stats->best_cnt) {
1673 swapDatum(stats->best,stats->guess1);
1674 swapInt(stats->best_len,stats->guess1_len);
1675 swapLong(stats->best_cnt,stats->guess1_cnt);
1676 stats->guess1_hits = 1;
1677 stats->guess2_hits = 1;
1680 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1681 stats->guess1_hits = 1;
1682 stats->guess2_hits = 1;
1690 * vc_bucketcpy() -- update pg_class statistics for one relation
1694 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1696 if (attr->attbyval && attr->attlen != -1)
1699 int len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1701 if (len > *bucket_len)
1703 if (*bucket_len != 0)
1704 pfree(DatumGetPointer(*bucket));
1705 *bucket = PointerGetDatum(palloc(len));
1708 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1713 * vc_updstats() -- update pg_class statistics for one relation
1715 * This routine works for both index and heap relation entries in
1716 * pg_class. We violate no-overwrite semantics here by storing new
1717 * values for ntups, npages, and hasindex directly in the pg_class
1718 * tuple that's already on the page. The reason for this is that if
1719 * we updated these tuples in the usual way, then every tuple in pg_class
1720 * would be replaced every day. This would make planning and executing
1721 * historical queries very expensive.
1724 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1726 Relation rd, ad, sd;
1727 HeapScanDesc rsdesc, asdesc;
1729 HeapTuple rtup, atup, stup;
1731 Form_pg_class pgcform;
1732 ScanKeyData rskey, askey;
1733 AttributeTupleForm attp;
1736 * update number of tuples and number of pages in pg_class
1738 ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1739 ObjectIdEqualRegProcedure,
1740 ObjectIdGetDatum(relid));
1742 rd = heap_openr(RelationRelationName);
1743 rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1745 if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1746 elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1749 /* overwrite the existing statistics in the tuple */
1750 vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1751 pgcform = (Form_pg_class) GETSTRUCT(rtup);
1752 pgcform->reltuples = ntups;
1753 pgcform->relpages = npages;
1754 pgcform->relhasindex = hasindex;
1756 if ( vacrelstats != NULL && vacrelstats->va_natts > 0 )
1758 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1759 int natts = vacrelstats->va_natts;
1761 ad = heap_openr(AttributeRelationName);
1762 sd = heap_openr(StatisticRelationName);
1763 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1766 asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1768 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1771 double selratio; /* average ratio of rows selected for a random constant */
1772 VacAttrStats *stats;
1773 Datum values[ Natts_pg_statistic ];
1774 char nulls[ Natts_pg_statistic ];
1776 attp = (AttributeTupleForm) GETSTRUCT(atup);
1777 if ( attp->attnum <= 0) /* skip system attributes for now, */
1778 /* they are unique anyway */
1781 for (i = 0; i < natts; i++)
1783 if ( attp->attnum == vacattrstats[i].attr->attnum )
1788 stats = &(vacattrstats[i]);
1790 /* overwrite the existing statistics in the tuple */
1791 if (VacAttrStatsEqValid(stats)) {
1793 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1795 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1796 (stats->null_cnt <= 1 && stats->best_cnt == 1))
1798 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1800 double min_cnt_d = stats->min_cnt,
1801 max_cnt_d = stats->max_cnt,
1802 null_cnt_d = stats->null_cnt,
1803 nonnullcnt_d = stats->nonnull_cnt; /* prevent overflow */
1804 selratio = (min_cnt_d*min_cnt_d+max_cnt_d*max_cnt_d+null_cnt_d*null_cnt_d)/
1805 (nonnullcnt_d+null_cnt_d)/(nonnullcnt_d+null_cnt_d);
1808 double most = (double)(stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1809 double total = ((double)stats->nonnull_cnt)+((double)stats->null_cnt);
1810 /* we assume count of other values are 20%
1811 of best count in table */
1812 selratio = (most*most + 0.20*most*(total-most))/total/total;
1816 attp->attnvals = (selratio ? (selratio * ATTNVALS_SCALE) : 0);
1817 WriteNoReleaseBuffer(abuf);
1819 /* DO PG_STATISTIC INSERTS */
1821 /* doing system relations, especially pg_statistic is a problem */
1822 if (VacAttrStatsLtGtValid(stats) && stats->initialized /* &&
1823 !IsSystemRelationName(pgcform->relname.data)*/) {
1824 func_ptr out_function;
1828 for (i = 0; i < Natts_pg_statistic; ++i) nulls[i] = ' ';
1831 * initialize values[]
1835 values[i++] = (Datum) relid; /* 1 */
1836 values[i++] = (Datum) attp->attnum; /* 2 */
1837 values[i++] = (Datum) InvalidOid; /* 3 */
1838 fmgr_info(stats->outfunc, &out_function, &dummy);
1839 out_string = (*out_function)(stats->min, stats->attr->atttypid);
1840 values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1842 out_string = (char *)(*out_function)(stats->max, stats->attr->atttypid);
1843 values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1848 stup = heap_formtuple(sdesc, values, nulls);
1851 * insert the tuple in the relation and get the tuple's oid.
1854 heap_insert(sd, stup);
1855 pfree(DatumGetPointer(values[3]));
1856 pfree(DatumGetPointer(values[4]));
1861 heap_endscan(asdesc);
1866 /* XXX -- after write, should invalidate relcache in other backends */
1867 WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1869 /* invalidating system relations confuses the function cache
1870 of pg_operator and pg_opclass */
1871 if ( !IsSystemRelationName(pgcform->relname.data))
1872 RelationInvalidateHeapTuple(rd, rtup);
1874 /* that's all, folks */
1875 heap_endscan(rsdesc);
1880 * vc_delhilowstats() -- delete pg_statistics rows
1884 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1886 Relation pgstatistic;
1887 HeapScanDesc pgsscan;
1891 pgstatistic = heap_openr(StatisticRelationName);
1893 if (relid != InvalidOid ) {
1894 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1895 ObjectIdEqualRegProcedure,
1896 ObjectIdGetDatum(relid));
1897 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
1900 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
1902 while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1906 Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT (pgstup);
1909 for (i = 0; i < attcnt; i++)
1911 if ( pgs->staattnum == attnums[i] + 1 )
1915 continue; /* don't delete it */
1917 heap_delete(pgstatistic, &pgstup->t_ctid);
1920 heap_endscan(pgsscan);
1921 heap_close(pgstatistic);
1924 static void vc_setpagelock(Relation rel, BlockNumber blkno)
1926 ItemPointerData itm;
1928 ItemPointerSet(&itm, blkno, 1);
1930 RelationSetLockForWritePage(rel, &itm);
1934 * vc_reappage() -- save a page on the array of reapped pages.
1936 * As a side effect of the way that the vacuuming loop for a given
1937 * relation works, higher pages come after lower pages in the array
1938 * (and highest tid on a page is last).
1941 vc_reappage(VPageList vpl, VPageDescr vpc)
1945 /* allocate a VPageDescrData entry */
1946 newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1949 if ( vpc->vpd_noff > 0 )
1950 memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1951 newvpd->vpd_blkno = vpc->vpd_blkno;
1952 newvpd->vpd_free = vpc->vpd_free;
1953 newvpd->vpd_nusd = vpc->vpd_nusd;
1954 newvpd->vpd_noff = vpc->vpd_noff;
1956 /* insert this page into vpl list */
1957 vc_vpinsert (vpl, newvpd);
1962 vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1965 /* allocate a VPageDescr entry if needed */
1966 if ( vpl->vpl_npages == 0 )
1967 vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1968 else if ( vpl->vpl_npages % 100 == 0 )
1969 vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1970 vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1971 (vpl->vpl_npages)++;
1976 vc_free(VRelList vrl)
1980 PortalVariableMemory pmem;
1982 pmem = PortalGetVariableMemory(vc_portal);
1983 old = MemoryContextSwitchTo((MemoryContext)pmem);
1985 while (vrl != (VRelList) NULL) {
1987 /* free rel list entry */
1989 vrl = vrl->vrl_next;
1993 (void) MemoryContextSwitchTo(old);
1997 * vc_getarchrel() -- open the archive relation for a heap relation
1999 * The archive relation is named 'a,XXXXX' for the heap relation
2000 * whose relid is XXXXX.
2003 #define ARCHIVE_PREFIX "a,"
2006 vc_getarchrel(Relation heaprel)
2011 archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
2012 sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
2014 archrel = heap_openr(archrelname);
2021 * vc_archive() -- write a tuple to an archive relation
2023 * In the future, this will invoke the archived accessd method. For
2024 * now, archive relations are on mag disk.
2027 vc_archive(Relation archrel, HeapTuple htup)
2029 doinsert(archrel, htup);
2033 vc_isarchrel(char *rname)
2035 if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
2042 vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
2045 int last = nelem - 1;
2046 int celm = nelem / 2;
2047 bool last_move, first_move;
2049 last_move = first_move = true;
2052 if ( first_move == true )
2054 res = compar (bot, elm);
2061 if ( last_move == true )
2063 res = compar (elm, bot + last*size);
2067 return (bot + last*size);
2070 res = compar (elm, bot + celm*size);
2072 return (bot + celm*size);
2086 last = last - celm - 1;
2087 bot = bot + (celm+1)*size;
2088 celm = (last + 1) / 2;
2095 vc_cmp_blk (char *left, char *right)
2097 BlockNumber lblk, rblk;
2099 lblk = (*((VPageDescr*)left))->vpd_blkno;
2100 rblk = (*((VPageDescr*)right))->vpd_blkno;
2111 vc_cmp_offno (char *left, char *right)
2114 if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
2116 if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
2120 } /* vc_cmp_offno */
2124 vc_getindices (Oid relid, int *nindices, Relation **Irel)
2130 HeapScanDesc pgiscan;
2139 ioid = (Oid *) palloc(10*sizeof(Oid));
2141 /* prepare a heap scan on the pg_index relation */
2142 pgindex = heap_openr(IndexRelationName);
2143 pgidesc = RelationGetTupleDescriptor(pgindex);
2145 ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2146 ObjectIdEqualRegProcedure,
2147 ObjectIdGetDatum(relid));
2149 pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2151 while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
2152 d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2156 ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
2157 ioid[i-1] = DatumGetObjectId(d);
2160 heap_endscan(pgiscan);
2161 heap_close(pgindex);
2163 if ( i == 0 ) { /* No one index found */
2168 if ( Irel != (Relation **) NULL )
2169 *Irel = (Relation *) palloc(i * sizeof(Relation));
2171 for (k = 0; i > 0; )
2173 irel = index_open(ioid[--i]);
2174 if ( irel != (Relation) NULL )
2176 if ( Irel != (Relation **) NULL )
2183 elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2188 if ( Irel != (Relation **) NULL && *nindices == 0 )
2191 *Irel = (Relation *) NULL;
2194 } /* vc_getindices */
2198 vc_clsindices (int nindices, Relation *Irel)
2201 if ( Irel == (Relation*) NULL )
2204 while (nindices--) {
2205 index_close (Irel[nindices]);
2209 } /* vc_clsindices */
2213 vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2216 HeapTuple pgIndexTup;
2217 AttrNumber *attnumP;
2221 *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
2223 for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
2225 SearchSysCacheTuple(INDEXRELID,
2226 ObjectIdGetDatum(Irel[i]->rd_id),
2229 idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
2230 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2231 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2232 attnumP++, natts++);
2233 if (idcur->tform->indproc != InvalidOid) {
2234 idcur->finfoP = &(idcur->finfo);
2235 FIgetnArgs(idcur->finfoP) = natts;
2237 FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2238 *(FIgetname(idcur->finfoP)) = '\0';
2240 idcur->finfoP = (FuncIndexInfo *) NULL;
2242 idcur->natts = natts;
2249 vc_enough_space (VPageDescr vpd, Size len)
2252 len = DOUBLEALIGN(len);
2254 if ( len > vpd->vpd_free )
2257 if ( vpd->vpd_nusd < vpd->vpd_noff ) /* there are free itemid(s) */
2258 return (true); /* and len <= free_space */
2260 /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2261 if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
2266 } /* vc_enough_space */