*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.8 1996/11/06 08:21:40 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.9 1996/11/27 07:27:20 vadim Exp $
*
*-------------------------------------------------------------------------
*/
#include <storage/bufmgr.h>
#include <access/transam.h>
#include <catalog/pg_index.h>
+#include <catalog/index.h>
#include <catalog/catname.h>
#include <catalog/pg_class.h>
#include <catalog/pg_proc.h>
#include <storage/smgr.h>
#include <storage/lmgr.h>
#include <utils/mcxt.h>
+#include <utils/syscache.h>
#include <commands/vacuum.h>
#include <storage/bufpage.h>
+#include "storage/shmem.h"
#ifdef NEED_RUSAGE
# include <rusagestub.h>
#else /* NEED_RUSAGE */
bool VacuumRunning = false;
+#ifdef VACUUM_QUIET
+static int MESSLEV = DEBUG;
+#else
+static int MESSLEV = NOTICE;
+#endif
+
+typedef struct {
+ FuncIndexInfo finfo;
+ FuncIndexInfo *finfoP;
+ IndexTupleForm tform;
+ int natts;
+} IndDesc;
+
/* non-export function prototypes */
static void _vc_init(void);
static void _vc_shutdown(void);
static void _vc_vacuum(NameData *VacRelP);
static VRelList _vc_getrels(Portal p, NameData *VacRelP);
-static void _vc_vacone(Portal p, VRelList curvrl);
-static void _vc_vacheap(Portal p, VRelList curvrl, Relation onerel);
-static void _vc_vacindices(VRelList curvrl, Relation onerel);
-static void _vc_vaconeind(VRelList curvrl, Relation indrel);
+static void _vc_vacone (VRelList curvrl);
+static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl);
+static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
+static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl);
+static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
+static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex);
static void _vc_setpagelock(Relation rel, BlockNumber blkno);
-static bool _vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel);
-static void _vc_reappage(Portal p, VRelList curvrl, VPageDescr vpc);
+static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl);
+static void _vc_reappage (VPageList vpl, VPageDescr vpc);
+static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew);
static void _vc_free(Portal p, VRelList vrl);
+static void _vc_getindices (Oid relid, int *nindices, Relation **Irel);
+static void _vc_clsindices (int nindices, Relation *Irel);
static Relation _vc_getarchrel(Relation heaprel);
static void _vc_archive(Relation archrel, HeapTuple htup);
static bool _vc_isarchrel(char *rname);
+static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
static int _vc_cmp_blk (char *left, char *right);
static int _vc_cmp_offno (char *left, char *right);
+static bool _vc_enough_space (VPageDescr vpd, Size len);
+
void
vacuum(char *vacrel)
/* vacuum each heap relation */
for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
- _vc_vacone(p, cur);
+ _vc_vacone (cur);
_vc_free(p, vrl);
Buffer buf;
PortalVariableMemory portalmem;
MemoryContext old;
- VRelList vrl, cur = NULL;
+ VRelList vrl, cur;
Datum d;
char *rname;
char rkind;
continue;
}
+ /* don't vacuum large objects for now - something breaks when we do */
+ if ( (strlen(rname) > 4) && rname[0] == 'X' &&
+ rname[1] == 'i' && rname[2] == 'n' &&
+ (rname[3] == 'v' || rname[3] == 'x'))
+ {
+ elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now",
+ NAMEDATALEN, rname);
+ ReleaseBuffer(buf);
+ continue;
+ }
+
d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
pgcdesc, &n);
smgrno = DatumGetInt16(d);
cur->vrl_relid = pgctup->t_oid;
cur->vrl_attlist = (VAttList) NULL;
- cur->vrl_pgdsc = (VPageDescr*) NULL;
- cur->vrl_nrepg = 0;
cur->vrl_npages = cur->vrl_ntups = 0;
cur->vrl_hasindex = false;
cur->vrl_next = (VRelList) NULL;
* us to lock the entire database during one pass of the vacuum cleaner.
*/
static void
-_vc_vacone(Portal p, VRelList curvrl)
+_vc_vacone (VRelList curvrl)
{
Relation pgclass;
TupleDesc pgcdesc;
HeapScanDesc pgcscan;
Relation onerel;
ScanKeyData pgckey;
+ VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
+ VPageListData Fvpl; /* List of pages with space enough for re-using */
+ VPageDescr *vpp;
+ Relation *Irel;
+ int nindices;
+ int i;
StartTransactionCommand();
/* we require the relation to be locked until the indices are cleaned */
RelationSetLockForWrite(onerel);
- /* vacuum it */
- _vc_vacheap(p, curvrl, onerel);
+ /* scan it */
+ Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
+ _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl);
- /* if we vacuumed any heap tuples, vacuum the indices too */
- if (curvrl->vrl_nrepg > 0)
- _vc_vacindices(curvrl, onerel);
+ /* Now open/count indices */
+ Irel = (Relation *) NULL;
+ if ( Vvpl.vpl_npages > 0 )
+ /* Open all indices of this relation */
+ _vc_getindices(curvrl->vrl_relid, &nindices, &Irel);
else
- curvrl->vrl_hasindex = onerel->rd_rel->relhasindex;
+ /* Count indices only */
+ _vc_getindices(curvrl->vrl_relid, &nindices, NULL);
+
+ if ( nindices > 0 )
+ curvrl->vrl_hasindex = true;
+ else
+ curvrl->vrl_hasindex = false;
+
+ /* Clean index' relation(s) */
+ if ( Irel != (Relation*) NULL )
+ {
+ for (i = 0; i < nindices; i++)
+ _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups);
+ }
+
+ if ( Fvpl.vpl_npages > 0 ) /* Try to shrink heap */
+ _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel);
+ else if ( Vvpl.vpl_npages > 0 ) /* Clean pages from Vvpl list */
+ _vc_vacheap (curvrl, onerel, &Vvpl);
+
+ /* ok - free Vvpl list of reapped pages */
+ if ( Vvpl.vpl_npages > 0 )
+ {
+ vpp = Vvpl.vpl_pgdesc;
+ for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
+ pfree(*vpp);
+ pfree (Vvpl.vpl_pgdesc);
+ if ( Fvpl.vpl_npages > 0 )
+ pfree (Fvpl.vpl_pgdesc);
+ }
/* all done with this class */
heap_close(onerel);
CommitTransactionCommand();
}
-# define ADJUST_FREE_SPACE(S)\
- ((DOUBLEALIGN(S) == (S)) ? (S) : (DOUBLEALIGN(S) - sizeof(double)))
-
/*
- * _vc_vacheap() -- vacuum an open heap relation
+ * _vc_scanheap() -- scan an open heap relation
*
- * This routine sets commit times, vacuums dead tuples, cleans up
- * wasted space on the page, and maintains statistics on the number
- * of live tuples in a heap. In addition, it records the tids of
- * all tuples removed from the heap for any reason. These tids are
- * used in a scan of indices on the relation to get rid of dead
- * index tuples.
+ * This routine sets commit times, constructs Vvpl list of
+ * empty/uninitialized pages and pages with dead tuples and
+ * ~LP_USED line pointers, constructs Fvpl list of pages
+ * appropriate for purposes of shrinking and maintains statistics
+ * on the number of live tuples in a heap.
*/
static void
-_vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
+_vc_scanheap (VRelList curvrl, Relation onerel,
+ VPageList Vvpl, VPageList Fvpl)
{
int nblocks, blkno;
ItemId itemid;
HeapTuple htup;
Buffer buf;
- Page page;
+ Page page, tempPage = NULL;
OffsetNumber offnum, maxoff;
- Relation archrel = NULL;
- bool isarchived;
- int nvac;
- int ntups;
- bool pgchanged, tupgone;
+ bool pgchanged, tupgone, dobufrel, notup;
AbsoluteTime purgetime, expiretime;
RelativeTime preservetime;
char *relname;
- VPageDescr vpc;
- uint32 nunused, nempg, nnepg, nchpg;
- Size frsize;
+ VPageDescr vpc, vp;
+ uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
+ Size frsize, frsusf;
+ Size min_tlen = MAXTUPLEN;
+ Size max_tlen = 0;
+ int i;
struct rusage ru0, ru1;
getrusage(RUSAGE_SELF, &ru0);
+ nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
+ frsize = frsusf = 0;
+
relname = (RelationGetRelationName(onerel))->data;
- nvac = 0;
- ntups = 0;
- nunused = nempg = nnepg = nchpg = 0;
- frsize = 0;
- curvrl->vrl_nrepg = 0;
-
nblocks = RelationGetNumberOfBlocks(onerel);
- /* if the relation has an archive, open it */
- if (onerel->rd_rel->relarch != 'n') {
- isarchived = true;
- archrel = _vc_getarchrel(onerel);
- } else
- isarchived = false;
-
- /* don't vacuum large objects for now.
- something breaks when we do*/
- if ( (strlen(relname) > 4) &&
- relname[0] == 'X' &&
- relname[1] == 'i' &&
- relname[2] == 'n' &&
- (relname[3] == 'v' || relname[3] == 'x'))
- return;
-
/* calculate the purge time: tuples that expired before this time
will be archived or deleted */
purgetime = GetCurrentTransactionStartTime();
purgetime = expiretime;
vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+ vpc->vpd_nusd = 0;
for (blkno = 0; blkno < nblocks; blkno++) {
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
vpc->vpd_blkno = blkno;
vpc->vpd_noff = 0;
+ vpc->vpd_noff = 0;
if (PageIsNew(page)) {
elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
NAMEDATALEN, relname, blkno);
PageInit (page, BufferGetPageSize (buf), 0);
- vpc->vpd_free = PageGetFreeSpace (page);
- vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
- frsize += vpc->vpd_free;
+ vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+ frsize += (vpc->vpd_free - sizeof (ItemIdData));
nnepg++;
- _vc_reappage(p, curvrl, vpc); /* No one index should point here */
+ nemend++;
+ _vc_reappage (Vvpl, vpc);
WriteBuffer(buf);
continue;
}
if (PageIsEmpty(page)) {
- vpc->vpd_free = PageGetFreeSpace (page);
- vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
- frsize += vpc->vpd_free;
+ vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+ frsize += (vpc->vpd_free - sizeof (ItemIdData));
nempg++;
- _vc_reappage(p, curvrl, vpc); /* No one index should point here */
+ nemend++;
+ _vc_reappage (Vvpl, vpc);
ReleaseBuffer(buf);
continue;
}
pgchanged = false;
+ notup = true;
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum)) {
itemid = PageGetItemId(page, offnum);
+ /*
+ * Collect un-used items too - it's possible to have
+ * indices pointing here after crash.
+ */
if (!ItemIdIsUsed(itemid)) {
vpc->vpd_voff[vpc->vpd_noff++] = offnum;
- nunused++;
+ nunused++;
continue;
}
TransactionIdIsValid((TransactionId)htup->t_xmin)) {
if (TransactionIdDidAbort(htup->t_xmin)) {
- pgchanged = true;
tupgone = true;
} else if (TransactionIdDidCommit(htup->t_xmin)) {
htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
pgchanged = true;
+ } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
+ /*
+ * Not Aborted, Not Committed, Not in Progress -
+ * so it from crashed process. - vadim 11/26/96
+ */
+ ncrash++;
+ tupgone = true;
+ }
+ else {
+ elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
+ NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
}
}
if (htup->t_tmax < purgetime) {
tupgone = true;
- pgchanged = true;
}
}
}
+ /*
+ * Is it possible at all ? - vadim 11/26/96
+ */
+ if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
+ {
+ elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
+DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.",
+ TransactionIdIsValid((TransactionId)htup->t_xmax),
+ tupgone);
+ }
+
if (tupgone) {
- ItemId lpp = &(((PageHeader) page)->pd_linp[offnum - 1]);
-
- /* write the tuple to the archive, if necessary */
- if (isarchived)
- _vc_archive(archrel, htup);
+ ItemId lpp;
+
+ if ( tempPage == (Page) NULL )
+ {
+ Size pageSize;
+
+ pageSize = PageGetPageSize(page);
+ tempPage = (Page) palloc(pageSize);
+ memmove (tempPage, page, pageSize);
+ }
+
+ lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
/* mark it unused */
lpp->lp_flags &= ~LP_USED;
vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+ nvac++;
- ++nvac;
} else {
ntups++;
+ notup = false;
+ if ( htup->t_len < min_tlen )
+ min_tlen = htup->t_len;
+ if ( htup->t_len > max_tlen )
+ max_tlen = htup->t_len;
}
}
if (pgchanged) {
- PageRepairFragmentation(page);
- if ( vpc->vpd_noff > 0 ) { /* there are some new unused tids here */
- vpc->vpd_free = PageGetFreeSpace (page);
- vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
- frsize += vpc->vpd_free;
- _vc_reappage(p, curvrl, vpc);
- }
WriteBuffer(buf);
+ dobufrel = false;
nchpg++;
- } else {
- if ( vpc->vpd_noff > 0 ) { /* there are only old unused tids here */
- vpc->vpd_free = PageGetFreeSpace (page);
- vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
- frsize += vpc->vpd_free;
- _vc_reappage(p, curvrl, vpc);
- }
- ReleaseBuffer(buf);
}
+ else
+ dobufrel = true;
+ if ( tempPage != (Page) NULL )
+ { /* Some tuples are gone */
+ PageRepairFragmentation(tempPage);
+ vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
+ frsize += vpc->vpd_free;
+ _vc_reappage (Vvpl, vpc);
+ pfree (tempPage);
+ tempPage = (Page) NULL;
+ }
+ else if ( vpc->vpd_noff > 0 )
+ { /* there are only ~LP_USED line pointers */
+ vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+ frsize += vpc->vpd_free;
+ _vc_reappage (Vvpl, vpc);
+ }
+ if ( dobufrel )
+ ReleaseBuffer(buf);
+ if ( notup )
+ nemend++;
+ else
+ nemend = 0;
}
- if (isarchived)
- heap_close(archrel);
+ pfree (vpc);
/* save stats in the rel list for use later */
curvrl->vrl_ntups = ntups;
curvrl->vrl_npages = nblocks;
+
+ Vvpl->vpl_nemend = nemend;
+ Fvpl->vpl_nemend = nemend;
+
+ /*
+ * Try to make Fvpl keeping in mind that we can't use free space
+ * of "empty" end-pages and last page if it reapped.
+ */
+ if ( Vvpl->vpl_npages - nemend > 0 )
+ {
+ int nusf; /* blocks usefull for re-using */
+
+ nusf = Vvpl->vpl_npages - nemend;
+ if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
+ nusf--;
+
+ for (i = 0; i < nusf; i++)
+ {
+ vp = Vvpl->vpl_pgdesc[i];
+ if ( _vc_enough_space (vp, min_tlen) )
+ {
+ _vc_vpinsert (Fvpl, vp);
+ frsusf += vp->vpd_free;
+ }
+ }
+ }
getrusage(RUSAGE_SELF, &ru1);
- elog (NOTICE, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
-Tuples %u: Vac %u, UnUsed %u; FreeSpace %u. Elapsed %u/%u sec.",
+ elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
+Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
NAMEDATALEN, relname,
- nblocks, nchpg, curvrl->vrl_nrepg, nempg, nnepg,
- ntups, nvac, nunused, frsize,
+ nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
+ ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
+ frsize, frsusf, nemend, Fvpl->vpl_npages,
ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
- pfree (vpc);
-}
+} /* _vc_scanheap */
+
/*
- * _vc_vacindices() -- vacuum all the indices for a particular heap relation.
- *
- * On entry, curvrl points at the relation currently being vacuumed.
- * We already have a write lock on the relation, so we don't need to
- * worry about anyone building an index on it while we're doing the
- * vacuuming. The tid list for curvrl is sorted in reverse tid order:
- * that is, tids on higher page numbers are before those on lower page
- * numbers, and tids high on the page are before those low on the page.
- * We use this ordering to cut down the search cost when we look at an
- * index entry.
+ * _vc_rpfheap() -- try to repaire relation' fragmentation
*
- * We're executing inside the transaction that vacuumed the heap.
+ * This routine marks dead tuples as unused and tries re-use dead space
+ * by moving tuples (and inserting indices if needed). It constructs
+ * Nvpl list of free-ed pages (moved tuples) and clean indices
+ * for them after committing (in hack-manner - without losing locks
+ * and freeing memory!) current transaction. It truncates relation
+ * if some end-blocks are gone away.
*/
static void
-_vc_vacindices(VRelList curvrl, Relation onerel)
+_vc_rpfheap (VRelList curvrl, Relation onerel,
+ VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
{
- Relation pgindex;
- TupleDesc pgidesc;
- HeapTuple pgitup;
- HeapScanDesc pgiscan;
- Buffer buf;
- Relation indrel;
- Oid indoid;
- Datum d;
- bool n;
- int nindices;
- ScanKeyData pgikey;
+ TransactionId myXID;
+ CommandId myCID;
+ AbsoluteTime myCTM;
+ Buffer buf, ToBuf;
+ int nblocks, blkno;
+ Page page, ToPage;
+ OffsetNumber offnum, maxoff, newoff, moff;
+ ItemId itemid, newitemid;
+ HeapTuple htup, newtup;
+ TupleDesc tupdesc;
+ Datum *idatum;
+ char *inulls;
+ InsertIndexResult iresult;
+ VPageListData Nvpl;
+ VPageDescr ToVpd, Fvplast, Vvplast, vpc, *vpp;
+ IndDesc *Idesc, *idcur;
+ int Fblklast, Vblklast, i;
+ Size tlen;
+ int nmoved, Fnpages, Vnpages;
+ int nchkmvd, ntups;
+ bool isempty, dowrite;
+ Relation archrel;
+ struct rusage ru0, ru1;
- /* see if we can dodge doing any work at all */
- if (!(onerel->rd_rel->relhasindex))
- return;
+ getrusage(RUSAGE_SELF, &ru0);
- nindices = 0;
+ myXID = GetCurrentTransactionId();
+ myCID = GetCurrentCommandId();
+
+ if ( Irel != (Relation*) NULL ) /* preparation for index' inserts */
+ {
+ _vc_mkindesc (onerel, nindices, Irel, &Idesc);
+ tupdesc = RelationGetTupleDescriptor(onerel);
+ idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
+ inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
+ }
- /* prepare a heap scan on the pg_index relation */
- pgindex = heap_openr(IndexRelationName);
- pgidesc = RelationGetTupleDescriptor(pgindex);
+ /* if the relation has an archive, open it */
+ if (onerel->rd_rel->relarch != 'n')
+ {
+ archrel = _vc_getarchrel(onerel);
+ /* Archive tuples from "empty" end-pages */
+ for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1,
+ i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
+ {
+ if ( (*vpp)->vpd_noff > 0 )
+ {
+ buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+ page = BufferGetPage(buf);
+ Assert ( !PageIsEmpty(page) );
+ _vc_vacpage (page, *vpp, archrel);
+ WriteBuffer (buf);
+ }
+ }
+ }
+ else
+ archrel = (Relation) NULL;
+
+ Nvpl.vpl_npages = 0;
+ Fnpages = Fvpl->vpl_npages;
+ Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+ Fblklast = Fvplast->vpd_blkno;
+ Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
+ Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
+ Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+ Vblklast = Vvplast->vpd_blkno;
+ Assert ( Vblklast >= Fblklast );
+ ToBuf = InvalidBuffer;
+ nmoved = 0;
- ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
- ObjectIdEqualRegProcedure,
- ObjectIdGetDatum(curvrl->vrl_relid));
+ vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+ vpc->vpd_nusd = 0;
+
+ nblocks = curvrl->vrl_npages;
+ for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
+ {
+ /* if it's reapped page and it was used by me - quit */
+ if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
+ break;
- pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+ buf = ReadBuffer(onerel, blkno);
+ page = BufferGetPage(buf);
- /* vacuum all the indices */
- while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, &buf))) {
- d = (Datum) heap_getattr(pgitup, buf, Anum_pg_index_indexrelid,
- pgidesc, &n);
- indoid = DatumGetObjectId(d);
- indrel = index_open(indoid);
- _vc_vaconeind(curvrl, indrel);
- heap_close(indrel);
- nindices++;
+ vpc->vpd_noff = 0;
+
+ isempty = PageIsEmpty(page);
+
+ if ( blkno == Vblklast ) /* it's reapped page */
+ {
+ if ( Vvplast->vpd_noff > 0 ) /* there are dead tuples */
+ { /* on this page - clean */
+ Assert ( ! isempty );
+ _vc_vacpage (page, Vvplast, archrel);
+ dowrite = true;
+ }
+ else
+ Assert ( isempty );
+ Assert ( --Vnpages > 0 );
+ /* get prev reapped page from Vvpl */
+ Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+ Vblklast = Vvplast->vpd_blkno;
+ if ( blkno == Fblklast ) /* this page in Fvpl too */
+ {
+ Assert ( --Fnpages > 0 );
+ Assert ( Fvplast->vpd_nusd == 0 );
+ /* get prev reapped page from Fvpl */
+ Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+ Fblklast = Fvplast->vpd_blkno;
+ }
+ Assert ( Fblklast <= Vblklast );
+ if ( isempty )
+ {
+ ReleaseBuffer(buf);
+ continue;
+ }
+ }
+ else
+ {
+ Assert ( ! isempty );
+ dowrite = false;
+ }
+
+ vpc->vpd_blkno = blkno;
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ itemid = PageGetItemId(page, offnum);
+
+ if (!ItemIdIsUsed(itemid))
+ continue;
+
+ htup = (HeapTuple) PageGetItem(page, itemid);
+ tlen = htup->t_len;
+
+ /* try to find new page for this tuple */
+ if ( ToBuf == InvalidBuffer ||
+ ! _vc_enough_space (ToVpd, tlen) )
+ {
+ if ( ToBuf != InvalidBuffer )
+ WriteBuffer(ToBuf);
+ ToBuf = InvalidBuffer;
+ for (i=0; i < Fnpages; i++)
+ {
+ if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
+ break;
+ }
+ if ( i == Fnpages )
+ break; /* can't move item anywhere */
+ ToVpd = Fvpl->vpl_pgdesc[i];
+ ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
+ ToPage = BufferGetPage(ToBuf);
+ /* if this page was not used before - clean it */
+ if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
+ _vc_vacpage (ToPage, ToVpd, archrel);
+ }
+
+ /* copy tuple */
+ newtup = (HeapTuple) palloc (tlen);
+ memmove((char *) newtup, (char *) htup, tlen);
+
+ /* store transaction information */
+ TransactionIdStore(myXID, &(newtup->t_xmin));
+ newtup->t_cmin = myCID;
+ StoreInvalidTransactionId(&(newtup->t_xmax));
+ newtup->t_tmin = INVALID_ABSTIME;
+ newtup->t_tmax = CURRENT_ABSTIME;
+ ItemPointerSetInvalid(&newtup->t_chain);
+
+ /* add tuple to the page */
+ newoff = PageAddItem (ToPage, (Item)newtup, tlen,
+ InvalidOffsetNumber, LP_USED);
+ if ( newoff == InvalidOffsetNumber )
+ {
+ elog (WARN, "\
+failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
+ tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
+ ToVpd->vpd_nusd, ToVpd->vpd_noff);
+ }
+ newitemid = PageGetItemId(ToPage, newoff);
+ pfree (newtup);
+ newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
+ ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
+
+ /* now logically delete end-tuple */
+ TransactionIdStore(myXID, &(htup->t_xmax));
+ htup->t_cmax = myCID;
+ memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
+
+ ToVpd->vpd_nusd++;
+ nmoved++;
+ ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
+ vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+
+ /* insert index' tuples if needed */
+ if ( Irel != (Relation*) NULL )
+ {
+ for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
+ {
+ FormIndexDatum (
+ idcur->natts,
+ (AttrNumber *)&(idcur->tform->indkey[0]),
+ newtup,
+ tupdesc,
+ InvalidBuffer,
+ idatum,
+ inulls,
+ idcur->finfoP);
+ iresult = index_insert (
+ Irel[i],
+ idatum,
+ inulls,
+ &(newtup->t_ctid),
+ true);
+ if (iresult) pfree(iresult);
+ }
+ }
+
+ } /* walk along page */
+
+ if ( vpc->vpd_noff > 0 ) /* some tuples were moved */
+ {
+ _vc_reappage (&Nvpl, vpc);
+ WriteBuffer(buf);
+ }
+ else if ( dowrite )
+ WriteBuffer(buf);
+ else
+ ReleaseBuffer(buf);
+
+ if ( offnum <= maxoff )
+ break; /* some item(s) left */
+
+ } /* walk along relation */
+
+ blkno++; /* new number of blocks */
+
+ if ( ToBuf != InvalidBuffer )
+ {
+ Assert (nmoved > 0);
+ WriteBuffer(ToBuf);
}
- heap_endscan(pgiscan);
- heap_close(pgindex);
+ if ( nmoved > 0 )
+ {
+ /*
+ * We have to commit our tuple' movings before we'll truncate
+ * relation, but we shouldn't lose our locks. And so - quick hack:
+ * flush buffers and record status of current transaction
+ * as committed, and continue. - vadim 11/13/96
+ */
+ FlushBufferPool(!TransactionFlushEnabled());
+ TransactionIdCommit(myXID);
+ FlushBufferPool(!TransactionFlushEnabled());
+ myCTM = TransactionIdGetCommitTime(myXID);
+ }
+
+ /*
+ * Clean uncleaned reapped pages from Vvpl list
+ * and set commit' times for inserted tuples
+ */
+ nchkmvd = 0;
+ for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
+ {
+ Assert ( (*vpp)->vpd_blkno < blkno );
+ buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+ page = BufferGetPage(buf);
+ if ( (*vpp)->vpd_nusd == 0 ) /* this page was not used */
+ {
+ /* noff == 0 in empty pages only - such pages should be re-used */
+ Assert ( (*vpp)->vpd_noff > 0 );
+ _vc_vacpage (page, *vpp, archrel);
+ }
+ else /* this page was used */
+ {
+ ntups = 0;
+ moff = PageGetMaxOffsetNumber(page);
+ for (newoff = FirstOffsetNumber;
+ newoff <= moff;
+ newoff = OffsetNumberNext(newoff))
+ {
+ itemid = PageGetItemId(page, newoff);
+ if (!ItemIdIsUsed(itemid))
+ continue;
+ htup = (HeapTuple) PageGetItem(page, itemid);
+ if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
+ {
+ htup->t_tmin = myCTM;
+ ntups++;
+ }
+ }
+ Assert ( (*vpp)->vpd_nusd == ntups );
+ nchkmvd += ntups;
+ }
+ WriteBuffer (buf);
+ }
+ Assert ( nmoved == nchkmvd );
- if (nindices > 0)
- curvrl->vrl_hasindex = true;
+ getrusage(RUSAGE_SELF, &ru1);
+
+ elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
+Elapsed %u/%u sec.",
+ NAMEDATALEN, (RelationGetRelationName(onerel))->data,
+ nblocks, blkno, nmoved,
+ ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
+ ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
+
+ if ( Nvpl.vpl_npages > 0 )
+ {
+ /* vacuum indices again if needed */
+ if ( Irel != (Relation*) NULL )
+ {
+ VPageDescr *vpleft, *vpright, vpsave;
+
+ /* re-sort Nvpl.vpl_pgdesc */
+ for (vpleft = Nvpl.vpl_pgdesc,
+ vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
+ vpleft < vpright; vpleft++, vpright--)
+ {
+ vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
+ }
+ for (i = 0; i < nindices; i++)
+ _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups);
+ }
+
+ /*
+ * clean moved tuples from last page in Nvpl list
+ * if some tuples left there
+ */
+ if ( vpc->vpd_noff > 0 && offnum <= maxoff )
+ {
+ Assert (vpc->vpd_blkno == blkno - 1);
+ buf = ReadBuffer(onerel, vpc->vpd_blkno);
+ page = BufferGetPage (buf);
+ ntups = 0;
+ maxoff = offnum;
+ for (offnum = FirstOffsetNumber;
+ offnum < maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ itemid = PageGetItemId(page, offnum);
+ if (!ItemIdIsUsed(itemid))
+ continue;
+ htup = (HeapTuple) PageGetItem(page, itemid);
+ Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
+ itemid->lp_flags &= ~LP_USED;
+ ntups++;
+ }
+ Assert ( vpc->vpd_noff == ntups );
+ PageRepairFragmentation(page);
+ WriteBuffer (buf);
+ }
+
+ /* now - free new list of reapped pages */
+ vpp = Nvpl.vpl_pgdesc;
+ for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
+ pfree(*vpp);
+ pfree (Nvpl.vpl_pgdesc);
+ }
+
+ /* truncate relation */
+ if ( blkno < nblocks )
+ {
+ blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
+ Assert ( blkno >= 0 );
+ curvrl->vrl_npages = blkno; /* set new number of blocks */
+ }
+
+ if ( archrel != (Relation) NULL )
+ heap_close(archrel);
+
+ if ( Irel != (Relation*) NULL ) /* pfree index' allocations */
+ {
+ pfree (Idesc);
+ pfree (idatum);
+ pfree (inulls);
+ _vc_clsindices (nindices, Irel);
+ }
+
+ pfree (vpc);
+
+} /* _vc_rpfheap */
+
+/*
+ * _vc_vacheap() -- free dead tuples
+ *
+ * This routine marks dead tuples as unused and truncates relation
+ * if there are "empty" end-blocks.
+ */
+static void
+_vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl)
+{
+ Buffer buf;
+ Page page;
+ VPageDescr *vpp;
+ Relation archrel;
+ int nblocks;
+ int i;
+
+ nblocks = Vvpl->vpl_npages;
+ /* if the relation has an archive, open it */
+ if (onerel->rd_rel->relarch != 'n')
+ archrel = _vc_getarchrel(onerel);
else
- curvrl->vrl_hasindex = false;
-}
+ {
+ archrel = (Relation) NULL;
+ nblocks -= Vvpl->vpl_nemend; /* nothing to do with them */
+ }
+
+ for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
+ {
+ if ( (*vpp)->vpd_noff > 0 )
+ {
+ buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+ page = BufferGetPage (buf);
+ _vc_vacpage (page, *vpp, archrel);
+ WriteBuffer (buf);
+ }
+ }
+
+ /* truncate relation if there are some empty end-pages */
+ if ( Vvpl->vpl_nemend > 0 )
+ {
+ Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend );
+ nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend;
+ elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.",
+ NAMEDATALEN, (RelationGetRelationName(onerel))->data,
+ curvrl->vrl_npages, nblocks);
+
+ /*
+ * we have to flush "empty" end-pages (if changed, but who knows it)
+ * before truncation
+ */
+ FlushBufferPool(!TransactionFlushEnabled());
+
+ nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
+ Assert ( nblocks >= 0 );
+ curvrl->vrl_npages = nblocks; /* set new number of blocks */
+ }
+
+ if ( archrel != (Relation) NULL )
+ heap_close(archrel);
+
+} /* _vc_vacheap */
+
+/*
+ * _vc_vacpage() -- free (and archive if needed) dead tuples on a page
+ * and repaire its fragmentation.
+ */
+static void
+_vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
+{
+ ItemId itemid;
+ HeapTuple htup;
+ int i;
+
+ Assert ( vpd->vpd_nusd == 0 );
+ for (i=0; i < vpd->vpd_noff; i++)
+ {
+ itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
+ if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
+ {
+ htup = (HeapTuple) PageGetItem (page, itemid);
+ _vc_archive (archrel, htup);
+ }
+ itemid->lp_flags &= ~LP_USED;
+ }
+ PageRepairFragmentation(page);
+
+} /* _vc_vacpage */
/*
* _vc_vaconeind() -- vacuum one index relation.
*
- * Curvrl is the VRelList entry for the heap we're currently vacuuming.
- * It's locked. Onerel is an index relation on the vacuumed heap.
+ * Vpl is the VPageList of the heap we're currently vacuuming.
+ * It's locked. Indrel is an index relation on the vacuumed heap.
* We don't set locks on the index relation here, since the indexed
* access methods support locking at different granularities.
* We let them handle it.
* pg_class.
*/
static void
-_vc_vaconeind(VRelList curvrl, Relation indrel)
+_vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
{
RetrieveIndexResult res;
IndexScanDesc iscan;
int nvac;
int nitups;
int nipages;
+ VPageDescr vp;
struct rusage ru0, ru1;
getrusage(RUSAGE_SELF, &ru0);
!= (RetrieveIndexResult) NULL) {
heapptr = &res->heap_iptr;
- if (_vc_tidreapped(heapptr, curvrl, indrel)) {
+ if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
+ {
#if 0
elog(DEBUG, "<%x,%x> -> <%x,%x>",
ItemPointerGetBlockNumber(&(res->index_iptr)),
ItemPointerGetBlockNumber(&(res->heap_iptr)),
ItemPointerGetOffsetNumber(&(res->heap_iptr)));
#endif
+ if ( vp->vpd_noff == 0 )
+ { /* this is EmptyPage !!! */
+ elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
+ NAMEDATALEN, indrel->rd_rel->relname.data,
+ vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
+ }
++nvac;
index_delete(indrel, &res->index_iptr);
} else {
getrusage(RUSAGE_SELF, &ru1);
- elog (NOTICE, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
+ elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
-}
+ if ( nitups != nhtups )
+ elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
+ nitups, nhtups);
+
+} /* _vc_vaconeind */
+
+/*
+ * _vc_tidreapped() -- is a particular tid reapped?
+ *
+ * vpl->VPageDescr_array is sorted in right order.
+ */
+static VPageDescr
+_vc_tidreapped(ItemPointer itemptr, VPageList vpl)
+{
+ OffsetNumber ioffno;
+ OffsetNumber *voff;
+ VPageDescr vp, *vpp;
+ VPageDescrData vpd;
+
+ vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
+ ioffno = ItemPointerGetOffsetNumber(itemptr);
+
+ vp = &vpd;
+ vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc),
+ vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp,
+ _vc_cmp_blk);
+
+ if ( vpp == (VPageDescr*) NULL )
+ return ((VPageDescr)NULL);
+ vp = *vpp;
+
+ /* ok - we are on true page */
+
+ if ( vp->vpd_noff == 0 ) { /* this is EmptyPage !!! */
+ return (vp);
+ }
+
+ voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff),
+ vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno,
+ _vc_cmp_offno);
+
+ if ( voff == (OffsetNumber*) NULL )
+ return ((VPageDescr)NULL);
+
+ return (vp);
+
+} /* _vc_tidreapped */
/*
* _vc_updstats() -- update pg_class statistics for one relation
pgcform->relhasindex = hasindex;
/* XXX -- after write, should invalidate relcache in other backends */
- WriteNoReleaseBuffer(buf);
+ WriteNoReleaseBuffer(buf); /* heap_endscan release scan' buffers ? */
/* that's all, folks */
heap_endscan(sdesc);
RelationSetLockForWritePage(rel, &itm);
}
-/*
- * _vc_tidreapped() -- is a particular tid reapped?
- *
- * VPageDescr array is sorted in right order.
- */
-static bool
-_vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel)
-{
- OffsetNumber ioffno;
- OffsetNumber *voff;
- VPageDescr vp, *vpp;
- VPageDescrData vpd;
-
- vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
- ioffno = ItemPointerGetOffsetNumber(itemptr);
-
- vp = &vpd;
- vpp = (VPageDescr*) _vc_find_eq ((char*)(curvrl->vrl_pgdsc),
- curvrl->vrl_nrepg, sizeof (VPageDescr), (char*)&vp,
- _vc_cmp_blk);
-
- if ( vpp == (VPageDescr*) NULL )
- return (false);
- vp = *vpp;
-
- /* ok - we are on true page */
-
- if ( vp->vpd_noff == 0 ) { /* this is EmptyPage !!! */
- elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
- NAMEDATALEN, indrel->rd_rel->relname.data,
- vpd.vpd_blkno, ioffno);
- return (true);
- }
-
- voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff),
- vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno,
- _vc_cmp_offno);
-
- if ( voff == (OffsetNumber*) NULL )
- return (false);
-
- return (true);
-
-} /* _vc_tidreapped */
-
/*
- * _vc_reappage() -- save a page on the array of reaped pages for the current
- * entry on the vacuum relation list.
+ * _vc_reappage() -- save a page on the array of reapped pages.
*
* As a side effect of the way that the vacuuming loop for a given
* relation works, higher pages come after lower pages in the array
* (and highest tid on a page is last).
*/
-static void
-_vc_reappage(Portal p,
- VRelList curvrl,
- VPageDescr vpc)
+static void
+_vc_reappage(VPageList vpl, VPageDescr vpc)
{
- PortalVariableMemory pmem;
- MemoryContext old;
VPageDescr newvpd;
- /* allocate a VPageDescrData entry in the portal memory context */
- pmem = PortalGetVariableMemory(p);
- old = MemoryContextSwitchTo((MemoryContext) pmem);
- if ( curvrl->vrl_nrepg == 0 )
- curvrl->vrl_pgdsc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
- else if ( curvrl->vrl_nrepg % 100 == 0 )
- curvrl->vrl_pgdsc = (VPageDescr*) repalloc(curvrl->vrl_pgdsc, (curvrl->vrl_nrepg+100)*sizeof(VPageDescr));
+ /* allocate a VPageDescrData entry */
newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
- MemoryContextSwitchTo(old);
/* fill it in */
if ( vpc->vpd_noff > 0 )
memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
newvpd->vpd_blkno = vpc->vpd_blkno;
newvpd->vpd_free = vpc->vpd_free;
+ newvpd->vpd_nusd = vpc->vpd_nusd;
newvpd->vpd_noff = vpc->vpd_noff;
- curvrl->vrl_pgdsc[curvrl->vrl_nrepg] = newvpd;
- (curvrl->vrl_nrepg)++;
+ /* insert this page into vpl list */
+ _vc_vpinsert (vpl, newvpd);
+
+} /* _vc_reappage */
+
+static void
+_vc_vpinsert (VPageList vpl, VPageDescr vpnew)
+{
+
+ /* allocate a VPageDescr entry if needed */
+ if ( vpl->vpl_npages == 0 )
+ vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
+ else if ( vpl->vpl_npages % 100 == 0 )
+ vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
+ vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
+ (vpl->vpl_npages)++;
+
}
static void
{
VRelList p_vrl;
VAttList p_val, val;
- VPageDescr *vpd;
- int i;
MemoryContext old;
PortalVariableMemory pmem;
pfree(p_val);
}
- /* free page' descriptions */
- if ( vrl->vrl_nrepg > 0 ) {
- vpd = vrl->vrl_pgdsc;
- for (i = 0; i < vrl->vrl_nrepg; i++) {
- pfree(vpd[i]);
- }
- pfree (vpd);
- }
-
/* free rel list entry */
p_vrl = vrl;
vrl = vrl->vrl_next;
return (1);
} /* _vc_cmp_offno */
+
+
+static void
+_vc_getindices (Oid relid, int *nindices, Relation **Irel)
+{
+ Relation pgindex;
+ Relation irel;
+ TupleDesc pgidesc;
+ HeapTuple pgitup;
+ HeapScanDesc pgiscan;
+ Datum d;
+ int i, k;
+ bool n;
+ ScanKeyData pgikey;
+ Oid *ioid;
+
+ *nindices = i = 0;
+
+ ioid = (Oid *) palloc(10*sizeof(Oid));
+
+ /* prepare a heap scan on the pg_index relation */
+ pgindex = heap_openr(IndexRelationName);
+ pgidesc = RelationGetTupleDescriptor(pgindex);
+
+ ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
+ ObjectIdEqualRegProcedure,
+ ObjectIdGetDatum(relid));
+
+ pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+
+ while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
+ d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
+ pgidesc, &n);
+ i++;
+ if ( i % 10 == 0 )
+ ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
+ ioid[i-1] = DatumGetObjectId(d);
+ }
+
+ heap_endscan(pgiscan);
+ heap_close(pgindex);
+
+ if ( i == 0 ) { /* No one index found */
+ pfree(ioid);
+ return;
+ }
+
+ if ( Irel != (Relation **) NULL )
+ *Irel = (Relation *) palloc(i * sizeof(Relation));
+
+ for (k = 0; i > 0; )
+ {
+ irel = index_open(ioid[--i]);
+ if ( irel != (Relation) NULL )
+ {
+ if ( Irel != (Relation **) NULL )
+ (*Irel)[k] = irel;
+ else
+ index_close (irel);
+ k++;
+ }
+ else
+ elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
+ }
+ *nindices = k;
+ pfree(ioid);
+
+ if ( Irel != (Relation **) NULL && *nindices == 0 )
+ {
+ pfree (*Irel);
+ *Irel = (Relation *) NULL;
+ }
+
+} /* _vc_getindices */
+
+
+static void
+_vc_clsindices (int nindices, Relation *Irel)
+{
+
+ if ( Irel == (Relation*) NULL )
+ return;
+
+ while (nindices--) {
+ index_close (Irel[nindices]);
+ }
+ pfree (Irel);
+
+} /* _vc_clsindices */
+
+
+static void
+_vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
+{
+ IndDesc *idcur;
+ HeapTuple pgIndexTup;
+ AttrNumber *attnumP;
+ int natts;
+ int i;
+
+ *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
+
+ for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
+ pgIndexTup =
+ SearchSysCacheTuple(INDEXRELID,
+ ObjectIdGetDatum(Irel[i]->rd_id),
+ 0,0,0);
+ Assert(pgIndexTup);
+ idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
+ for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
+ *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
+ attnumP++, natts++);
+ if (idcur->tform->indproc != InvalidOid) {
+ idcur->finfoP = &(idcur->finfo);
+ FIgetnArgs(idcur->finfoP) = natts;
+ natts = 1;
+ FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
+ *(FIgetname(idcur->finfoP)) = '\0';
+ } else
+ idcur->finfoP = (FuncIndexInfo *) NULL;
+
+ idcur->natts = natts;
+ }
+
+} /* _vc_mkindesc */
+
+
+static bool
+_vc_enough_space (VPageDescr vpd, Size len)
+{
+
+ len = DOUBLEALIGN(len);
+
+ if ( len > vpd->vpd_free )
+ return (false);
+
+ if ( vpd->vpd_nusd < vpd->vpd_noff ) /* there are free itemid(s) */
+ return (true); /* and len <= free_space */
+
+ /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
+ if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
+ return (true);
+
+ return (false);
+
+} /* _vc_enough_space */