]> granicus.if.org Git - postgresql/commitdiff
Shrinking and other things.
authorVadim B. Mikheev <vadim4o@yahoo.com>
Wed, 27 Nov 1996 07:27:20 +0000 (07:27 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Wed, 27 Nov 1996 07:27:20 +0000 (07:27 +0000)
src/backend/commands/vacuum.c

index 7fe8648e1ec974fb7f37171e54f973dd9ee39473..06140f7c277e9f3dfe902c5445d39853812696e3 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.8 1996/11/06 08:21:40 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.9 1996/11/27 07:27:20 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include <storage/bufmgr.h>
 #include <access/transam.h>
 #include <catalog/pg_index.h>
+#include <catalog/index.h>
 #include <catalog/catname.h>
 #include <catalog/pg_class.h>
 #include <catalog/pg_proc.h>
 #include <storage/smgr.h>
 #include <storage/lmgr.h>
 #include <utils/mcxt.h>
+#include <utils/syscache.h>
 #include <commands/vacuum.h>
 #include <storage/bufpage.h>
+#include "storage/shmem.h"
 #ifdef NEED_RUSAGE
 # include <rusagestub.h>
 #else /* NEED_RUSAGE */
 
 bool VacuumRunning =   false;
 
+#ifdef VACUUM_QUIET
+static int MESSLEV = DEBUG;
+#else
+static int MESSLEV = NOTICE;
+#endif
+
+typedef struct {
+    FuncIndexInfo      finfo;
+    FuncIndexInfo      *finfoP;
+    IndexTupleForm     tform;
+    int                        natts;
+} IndDesc;
+
 /* non-export function prototypes */
 static void _vc_init(void);
 static void _vc_shutdown(void);
 static void _vc_vacuum(NameData *VacRelP);
 static VRelList _vc_getrels(Portal p, NameData *VacRelP);
-static void _vc_vacone(Portal p, VRelList curvrl);
-static void _vc_vacheap(Portal p, VRelList curvrl, Relation onerel);
-static void _vc_vacindices(VRelList curvrl, Relation onerel);
-static void _vc_vaconeind(VRelList curvrl, Relation indrel);
+static void _vc_vacone (VRelList curvrl);
+static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl);
+static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
+static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl);
+static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
+static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
 static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex);
 static void _vc_setpagelock(Relation rel, BlockNumber blkno);
-static bool _vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel);
-static void _vc_reappage(Portal p, VRelList curvrl, VPageDescr vpc);
+static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl);
+static void _vc_reappage (VPageList vpl, VPageDescr vpc);
+static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew);
 static void _vc_free(Portal p, VRelList vrl);
+static void _vc_getindices (Oid relid, int *nindices, Relation **Irel);
+static void _vc_clsindices (int nindices, Relation *Irel);
 static Relation _vc_getarchrel(Relation heaprel);
 static void _vc_archive(Relation archrel, HeapTuple htup);
 static bool _vc_isarchrel(char *rname);
+static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
 static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
 static int _vc_cmp_blk (char *left, char *right);
 static int _vc_cmp_offno (char *left, char *right);
+static bool _vc_enough_space (VPageDescr vpd, Size len);
+
 
 void
 vacuum(char *vacrel)
@@ -183,7 +207,7 @@ _vc_vacuum(NameData *VacRelP)
 
     /* vacuum each heap relation */
     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
-       _vc_vacone(p, cur);
+       _vc_vacone (cur);
 
     _vc_free(p, vrl);
 
@@ -200,7 +224,7 @@ _vc_getrels(Portal p, NameData *VacRelP)
     Buffer buf;
     PortalVariableMemory portalmem;
     MemoryContext old;
-    VRelList vrl, cur = NULL;
+    VRelList vrl, cur;
     Datum d;
     char *rname;
     char rkind;
@@ -249,6 +273,17 @@ _vc_getrels(Portal p, NameData *VacRelP)
            continue;
        }
 
+       /* don't vacuum large objects for now - something breaks when we do */
+       if ( (strlen(rname) > 4) && rname[0] == 'X' &&
+               rname[1] == 'i' && rname[2] == 'n' &&
+               (rname[3] == 'v' || rname[3] == 'x'))
+       {
+           elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
+                       NAMEDATALEN, rname);
+           ReleaseBuffer(buf);
+           continue;
+       }
+
        d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
                                 pgcdesc, &n);
        smgrno = DatumGetInt16(d);
@@ -283,8 +318,6 @@ _vc_getrels(Portal p, NameData *VacRelP)
 
        cur->vrl_relid = pgctup->t_oid;
        cur->vrl_attlist = (VAttList) NULL;
-       cur->vrl_pgdsc = (VPageDescr*) NULL;
-       cur->vrl_nrepg = 0;
        cur->vrl_npages = cur->vrl_ntups = 0;
        cur->vrl_hasindex = false;
        cur->vrl_next = (VRelList) NULL;
@@ -317,7 +350,7 @@ _vc_getrels(Portal p, NameData *VacRelP)
  *     us to lock the entire database during one pass of the vacuum cleaner.
  */
 static void
-_vc_vacone(Portal p, VRelList curvrl)
+_vc_vacone (VRelList curvrl)
 {
     Relation pgclass;
     TupleDesc pgcdesc;
@@ -326,6 +359,12 @@ _vc_vacone(Portal p, VRelList curvrl)
     HeapScanDesc pgcscan;
     Relation onerel;
     ScanKeyData pgckey;
+    VPageListData Vvpl;        /* List of pages to vacuum and/or clean indices */
+    VPageListData Fvpl;        /* List of pages with space enough for re-using */
+    VPageDescr *vpp;
+    Relation *Irel;
+    int nindices;
+    int i;
 
     StartTransactionCommand();
 
@@ -355,14 +394,46 @@ _vc_vacone(Portal p, VRelList curvrl)
     /* we require the relation to be locked until the indices are cleaned */
     RelationSetLockForWrite(onerel);
 
-    /* vacuum it */
-    _vc_vacheap(p, curvrl, onerel);
+    /* scan it */
+    Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
+    _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl);
 
-    /* if we vacuumed any heap tuples, vacuum the indices too */
-    if (curvrl->vrl_nrepg > 0)
-       _vc_vacindices(curvrl, onerel);
+    /* Now open/count indices */
+    Irel = (Relation *) NULL;
+    if ( Vvpl.vpl_npages > 0 )
+                   /* Open all indices of this relation */
+       _vc_getindices(curvrl->vrl_relid, &nindices, &Irel);
     else
-       curvrl->vrl_hasindex = onerel->rd_rel->relhasindex;
+                   /* Count indices only */
+       _vc_getindices(curvrl->vrl_relid, &nindices, NULL);
+    
+    if ( nindices > 0 )
+       curvrl->vrl_hasindex = true;
+    else
+       curvrl->vrl_hasindex = false;
+
+    /* Clean index' relation(s) */
+    if ( Irel != (Relation*) NULL )
+    {
+       for (i = 0; i < nindices; i++)
+           _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups);
+    }
+
+    if ( Fvpl.vpl_npages > 0 )         /* Try to shrink heap */
+       _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel);
+    else if ( Vvpl.vpl_npages > 0 )    /* Clean pages from Vvpl list */
+       _vc_vacheap (curvrl, onerel, &Vvpl);
+
+    /* ok - free Vvpl list of reapped pages */
+    if ( Vvpl.vpl_npages > 0 )
+    {
+       vpp = Vvpl.vpl_pgdesc;
+       for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
+           pfree(*vpp);
+       pfree (Vvpl.vpl_pgdesc);
+       if ( Fvpl.vpl_npages > 0 )
+           pfree (Fvpl.vpl_pgdesc);
+    }
 
     /* all done with this class */
     heap_close(onerel);
@@ -376,69 +447,46 @@ _vc_vacone(Portal p, VRelList curvrl)
     CommitTransactionCommand();
 }
 
-# define       ADJUST_FREE_SPACE(S)\
-       ((DOUBLEALIGN(S) == (S)) ? (S) : (DOUBLEALIGN(S) - sizeof(double)))
-
 /*
- *  _vc_vacheap() -- vacuum an open heap relation
+ *  _vc_scanheap() -- scan an open heap relation
  *
- *     This routine sets commit times, vacuums dead tuples, cleans up
- *     wasted space on the page, and maintains statistics on the number
- *     of live tuples in a heap.  In addition, it records the tids of
- *     all tuples removed from the heap for any reason.  These tids are
- *     used in a scan of indices on the relation to get rid of dead
- *     index tuples.
+ *     This routine sets commit times, constructs Vvpl list of 
+ *     empty/uninitialized pages and pages with dead tuples and
+ *     ~LP_USED line pointers, constructs Fvpl list of pages
+ *     appropriate for purposes of shrinking and maintains statistics 
+ *     on the number of live tuples in a heap.
  */
 static void
-_vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
+_vc_scanheap (VRelList curvrl, Relation onerel, 
+                       VPageList Vvpl, VPageList Fvpl)
 {
     int nblocks, blkno;
     ItemId itemid;
     HeapTuple htup;
     Buffer buf;
-    Page page;
+    Page page, tempPage = NULL;
     OffsetNumber offnum, maxoff;
-    Relation archrel = NULL;
-    bool isarchived;
-    int nvac;
-    int ntups;
-    bool pgchanged, tupgone;
+    bool pgchanged, tupgone, dobufrel, notup;
     AbsoluteTime purgetime, expiretime;
     RelativeTime preservetime;
     char *relname;
-    VPageDescr vpc;
-    uint32 nunused, nempg, nnepg, nchpg;
-    Size frsize;
+    VPageDescr vpc, vp;
+    uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
+    Size frsize, frsusf;
+    Size min_tlen = MAXTUPLEN;
+    Size max_tlen = 0;
+    int i;
     struct rusage ru0, ru1;
 
     getrusage(RUSAGE_SELF, &ru0);
 
+    nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
+    frsize = frsusf = 0;
+    
     relname = (RelationGetRelationName(onerel))->data;
 
-    nvac = 0;
-    ntups = 0;
-    nunused = nempg = nnepg = nchpg = 0;
-    frsize = 0;
-    curvrl->vrl_nrepg = 0;
-    
     nblocks = RelationGetNumberOfBlocks(onerel);
 
-    /* if the relation has an archive, open it */
-    if (onerel->rd_rel->relarch != 'n') {
-       isarchived = true;
-       archrel = _vc_getarchrel(onerel);
-    } else
-       isarchived = false;
-
-    /* don't vacuum large objects for now.
-       something breaks when we do*/
-    if ( (strlen(relname) > 4) && 
-       relname[0] == 'X' &&
-       relname[1] == 'i' &&
-       relname[2] == 'n' &&
-       (relname[3] == 'v' || relname[3] == 'x'))
-           return;
-
     /* calculate the purge time: tuples that expired before this time
        will be archived or deleted */
     purgetime = GetCurrentTransactionStartTime();
@@ -456,46 +504,53 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
        purgetime = expiretime;
 
     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+    vpc->vpd_nusd = 0;
            
     for (blkno = 0; blkno < nblocks; blkno++) {
        buf = ReadBuffer(onerel, blkno);
        page = BufferGetPage(buf);
        vpc->vpd_blkno = blkno;
        vpc->vpd_noff = 0;
+       vpc->vpd_noff = 0;
 
        if (PageIsNew(page)) {
            elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
                NAMEDATALEN, relname, blkno);
            PageInit (page, BufferGetPageSize (buf), 0);
-           vpc->vpd_free = PageGetFreeSpace (page);
-           vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-           frsize += vpc->vpd_free;
+           vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+           frsize += (vpc->vpd_free - sizeof (ItemIdData));
            nnepg++;
-           _vc_reappage(p, curvrl, vpc);       /* No one index should point here */
+           nemend++;
+           _vc_reappage (Vvpl, vpc);
            WriteBuffer(buf);
            continue;
        }
 
        if (PageIsEmpty(page)) {
-           vpc->vpd_free = PageGetFreeSpace (page);
-           vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-           frsize += vpc->vpd_free;
+           vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+           frsize += (vpc->vpd_free - sizeof (ItemIdData));
            nempg++;
-           _vc_reappage(p, curvrl, vpc);       /* No one index should point here */
+           nemend++;
+           _vc_reappage (Vvpl, vpc);
            ReleaseBuffer(buf);
            continue;
        }
 
        pgchanged = false;
+       notup = true;
        maxoff = PageGetMaxOffsetNumber(page);
        for (offnum = FirstOffsetNumber;
             offnum <= maxoff;
             offnum = OffsetNumberNext(offnum)) {
            itemid = PageGetItemId(page, offnum);
 
+           /*
+            * Collect un-used items too - it's possible to have
+            * indices pointing here after crash.
+            */
            if (!ItemIdIsUsed(itemid)) {
                vpc->vpd_voff[vpc->vpd_noff++] = offnum;
-               nunused++;
+               nunused++;
                continue;
            }
 
@@ -506,11 +561,21 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
                TransactionIdIsValid((TransactionId)htup->t_xmin)) {
 
                if (TransactionIdDidAbort(htup->t_xmin)) {
-                   pgchanged = true;
                    tupgone = true;
                } else if (TransactionIdDidCommit(htup->t_xmin)) {
                    htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
                    pgchanged = true;
+               } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
+                   /* 
+                    * Not Aborted, Not Committed, Not in Progress -
+                    * so it from crashed process. - vadim 11/26/96
+                    */
+                   ncrash++;
+                   tupgone = true;
+               }
+               else {
+                   elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
+                       NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
                }
            }
 
@@ -532,140 +597,618 @@ _vc_vacheap(Portal p, VRelList curvrl, Relation onerel)
 
                    if (htup->t_tmax < purgetime) {
                        tupgone = true;
-                       pgchanged = true;
                    }
                }
            }
 
+           /*
+            * Is it possible at all ? - vadim 11/26/96
+            */
+           if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
+           {
+               elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
+DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
+                       TransactionIdIsValid((TransactionId)htup->t_xmax),
+                       tupgone);
+           }
+           
            if (tupgone) {
-               ItemId lpp = &(((PageHeader) page)->pd_linp[offnum - 1]);
-
-               /* write the tuple to the archive, if necessary */
-               if (isarchived)
-                   _vc_archive(archrel, htup);
+               ItemId lpp;
+                                                    
+               if ( tempPage == (Page) NULL )
+               {
+                   Size pageSize;
+                   
+                   pageSize = PageGetPageSize(page);
+                   tempPage = (Page) palloc(pageSize);
+                   memmove (tempPage, page, pageSize);
+               }
+               
+               lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
 
                /* mark it unused */
                lpp->lp_flags &= ~LP_USED;
 
                vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+               nvac++;
 
-               ++nvac;
            } else {
                ntups++;
+               notup = false;
+               if ( htup->t_len < min_tlen )
+                   min_tlen = htup->t_len;
+               if ( htup->t_len > max_tlen )
+                   max_tlen = htup->t_len;
            }
        }
 
        if (pgchanged) {
-           PageRepairFragmentation(page);
-           if ( vpc->vpd_noff > 0 ) {  /* there are some new unused tids here */
-               vpc->vpd_free = PageGetFreeSpace (page);
-               vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-               frsize += vpc->vpd_free;
-               _vc_reappage(p, curvrl, vpc);
-           }
            WriteBuffer(buf);
+           dobufrel = false;
            nchpg++;
-       } else {
-           if ( vpc->vpd_noff > 0 ) {  /* there are only old unused tids here */
-               vpc->vpd_free = PageGetFreeSpace (page);
-               vpc->vpd_free = ADJUST_FREE_SPACE (vpc->vpd_free);
-               frsize += vpc->vpd_free;
-               _vc_reappage(p, curvrl, vpc);
-           }
-           ReleaseBuffer(buf);
        }
+       else
+           dobufrel = true;
+       if ( tempPage != (Page) NULL )
+       { /* Some tuples are gone */
+           PageRepairFragmentation(tempPage);
+           vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
+           frsize += vpc->vpd_free;
+           _vc_reappage (Vvpl, vpc);
+           pfree (tempPage);
+           tempPage = (Page) NULL;
+       }
+       else if ( vpc->vpd_noff > 0 )
+       { /* there are only ~LP_USED line pointers */
+           vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
+           frsize += vpc->vpd_free;
+           _vc_reappage (Vvpl, vpc);
+       }
+       if ( dobufrel )
+           ReleaseBuffer(buf);
+       if ( notup )
+           nemend++;
+       else
+           nemend = 0;
     }
 
-    if (isarchived)
-       heap_close(archrel);
+    pfree (vpc);
 
     /* save stats in the rel list for use later */
     curvrl->vrl_ntups = ntups;
     curvrl->vrl_npages = nblocks;
+    
+    Vvpl->vpl_nemend = nemend;
+    Fvpl->vpl_nemend = nemend;
+
+    /* 
+     * Try to make Fvpl keeping in mind that we can't use free space 
+     * of "empty" end-pages and last page if it reapped.
+     */
+    if ( Vvpl->vpl_npages - nemend > 0 )
+    {
+       int nusf;               /* blocks usefull for re-using */
+       
+       nusf = Vvpl->vpl_npages - nemend;
+       if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
+           nusf--;
+    
+       for (i = 0; i < nusf; i++)
+       {
+           vp = Vvpl->vpl_pgdesc[i];
+           if ( _vc_enough_space (vp, min_tlen) )
+           {
+               _vc_vpinsert (Fvpl, vp);
+               frsusf += vp->vpd_free;
+           }
+       }
+    }
 
     getrusage(RUSAGE_SELF, &ru1);
     
-    elog (NOTICE, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
-Tuples %u: Vac %u, UnUsed %u; FreeSpace %u. Elapsed %u/%u sec.",
+    elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
+Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
        NAMEDATALEN, relname, 
-       nblocks, nchpg, curvrl->vrl_nrepg, nempg, nnepg, 
-       ntups, nvac, nunused, frsize, 
+       nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
+       ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
+       frsize, frsusf, nemend, Fvpl->vpl_npages,
        ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
        ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
 
-    pfree (vpc);
-}
+} /* _vc_scanheap */
+
 
 /*
- *  _vc_vacindices() -- vacuum all the indices for a particular heap relation.
- *
- *     On entry, curvrl points at the relation currently being vacuumed.
- *     We already have a write lock on the relation, so we don't need to
- *     worry about anyone building an index on it while we're doing the
- *     vacuuming.  The tid list for curvrl is sorted in reverse tid order:
- *     that is, tids on higher page numbers are before those on lower page
- *     numbers, and tids high on the page are before those low on the page.
- *     We use this ordering to cut down the search cost when we look at an
- *     index entry.
+ *  _vc_rpfheap() -- try to repaire relation' fragmentation
  *
- *     We're executing inside the transaction that vacuumed the heap.
+ *     This routine marks dead tuples as unused and tries re-use dead space
+ *     by moving tuples (and inserting indices if needed). It constructs 
+ *     Nvpl list of free-ed pages (moved tuples) and clean indices
+ *     for them after committing (in hack-manner - without losing locks
+ *     and freeing memory!) current transaction. It truncates relation
+ *     if some end-blocks are gone away.
  */
 static void
-_vc_vacindices(VRelList curvrl, Relation onerel)
+_vc_rpfheap (VRelList curvrl, Relation onerel, 
+               VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
 {
-    Relation pgindex;
-    TupleDesc pgidesc;
-    HeapTuple pgitup;
-    HeapScanDesc pgiscan;
-    Buffer buf;
-    Relation indrel;
-    Oid indoid;
-    Datum d;
-    bool n;
-    int nindices;
-    ScanKeyData pgikey;
+    TransactionId myXID;
+    CommandId myCID;
+    AbsoluteTime myCTM;
+    Buffer buf, ToBuf;
+    int nblocks, blkno;
+    Page page, ToPage;
+    OffsetNumber offnum, maxoff, newoff, moff;
+    ItemId itemid, newitemid;
+    HeapTuple htup, newtup;
+    TupleDesc tupdesc;
+    Datum *idatum;
+    char *inulls;
+    InsertIndexResult iresult;
+    VPageListData Nvpl;
+    VPageDescr ToVpd, Fvplast, Vvplast, vpc, *vpp;
+    IndDesc *Idesc, *idcur;
+    int Fblklast, Vblklast, i;
+    Size tlen;
+    int nmoved, Fnpages, Vnpages;
+    int nchkmvd, ntups;
+    bool isempty, dowrite;
+    Relation archrel;
+    struct rusage ru0, ru1;
 
-    /* see if we can dodge doing any work at all */
-    if (!(onerel->rd_rel->relhasindex))
-       return;
+    getrusage(RUSAGE_SELF, &ru0);
 
-    nindices = 0;
+    myXID = GetCurrentTransactionId();
+    myCID = GetCurrentCommandId();
+       
+    if ( Irel != (Relation*) NULL )    /* preparation for index' inserts */
+    {
+       _vc_mkindesc (onerel, nindices, Irel, &Idesc);
+       tupdesc = RelationGetTupleDescriptor(onerel);
+       idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
+       inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
+    }
 
-    /* prepare a heap scan on the pg_index relation */
-    pgindex = heap_openr(IndexRelationName);
-    pgidesc = RelationGetTupleDescriptor(pgindex);
+    /* if the relation has an archive, open it */
+    if (onerel->rd_rel->relarch != 'n')
+    {
+       archrel = _vc_getarchrel(onerel);
+       /* Archive tuples from "empty" end-pages */
+       for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
+                               i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
+       {
+           if ( (*vpp)->vpd_noff > 0 )
+           {
+               buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+               page = BufferGetPage(buf);
+               Assert ( !PageIsEmpty(page) );
+               _vc_vacpage (page, *vpp, archrel);
+               WriteBuffer (buf);
+           }
+       }
+    }
+    else
+       archrel = (Relation) NULL;
+
+    Nvpl.vpl_npages = 0;
+    Fnpages = Fvpl->vpl_npages;
+    Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+    Fblklast = Fvplast->vpd_blkno;
+    Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
+    Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
+    Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+    Vblklast = Vvplast->vpd_blkno;
+    Assert ( Vblklast >= Fblklast );
+    ToBuf = InvalidBuffer;
+    nmoved = 0;
 
-    ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
-                          ObjectIdEqualRegProcedure,
-                          ObjectIdGetDatum(curvrl->vrl_relid));
+    vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
+    vpc->vpd_nusd = 0;
+       
+    nblocks = curvrl->vrl_npages;
+    for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
+    {
+       /* if it's reapped page and it was used by me - quit */
+       if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
+           break;
 
-    pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+       buf = ReadBuffer(onerel, blkno);
+       page = BufferGetPage(buf);
 
-    /* vacuum all the indices */
-    while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, &buf))) {
-       d = (Datum) heap_getattr(pgitup, buf, Anum_pg_index_indexrelid,
-                                pgidesc, &n);
-       indoid = DatumGetObjectId(d);
-       indrel = index_open(indoid);
-       _vc_vaconeind(curvrl, indrel);
-       heap_close(indrel);
-       nindices++;
+       vpc->vpd_noff = 0;
+
+       isempty = PageIsEmpty(page);
+
+       if ( blkno == Vblklast )                /* it's reapped page */
+       {
+           if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
+           {                                   /* on this page - clean */
+               Assert ( ! isempty );
+               _vc_vacpage (page, Vvplast, archrel);
+               dowrite = true;
+           }
+           else
+               Assert ( isempty );
+           Assert ( --Vnpages > 0 );
+           /* get prev reapped page from Vvpl */
+           Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
+           Vblklast = Vvplast->vpd_blkno;
+           if ( blkno == Fblklast )    /* this page in Fvpl too */
+           {
+               Assert ( --Fnpages > 0 );
+               Assert ( Fvplast->vpd_nusd == 0 );
+               /* get prev reapped page from Fvpl */
+               Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
+               Fblklast = Fvplast->vpd_blkno;
+           }
+           Assert ( Fblklast <= Vblklast );
+           if ( isempty )
+           {
+               ReleaseBuffer(buf);
+               continue;
+           }
+       }
+       else
+       {
+           Assert ( ! isempty );
+           dowrite = false;
+       }
+
+       vpc->vpd_blkno = blkno;
+       maxoff = PageGetMaxOffsetNumber(page);
+       for (offnum = FirstOffsetNumber;
+               offnum <= maxoff;
+               offnum = OffsetNumberNext(offnum))
+       {
+           itemid = PageGetItemId(page, offnum);
+
+           if (!ItemIdIsUsed(itemid))
+               continue;
+
+           htup = (HeapTuple) PageGetItem(page, itemid);
+           tlen = htup->t_len;
+               
+           /* try to find new page for this tuple */
+           if ( ToBuf == InvalidBuffer ||
+               ! _vc_enough_space (ToVpd, tlen) )
+           {
+               if ( ToBuf != InvalidBuffer )
+                   WriteBuffer(ToBuf);
+               ToBuf = InvalidBuffer;
+               for (i=0; i < Fnpages; i++)
+               {
+                   if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
+                       break;
+               }
+               if ( i == Fnpages )
+                   break;                      /* can't move item anywhere */
+               ToVpd = Fvpl->vpl_pgdesc[i];
+               ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
+               ToPage = BufferGetPage(ToBuf);
+               /* if this page was not used before - clean it */
+               if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
+                   _vc_vacpage (ToPage, ToVpd, archrel);
+           }
+               
+           /* copy tuple */
+           newtup = (HeapTuple) palloc (tlen);
+           memmove((char *) newtup, (char *) htup, tlen);
+
+           /* store transaction information */
+           TransactionIdStore(myXID, &(newtup->t_xmin));
+           newtup->t_cmin = myCID;
+           StoreInvalidTransactionId(&(newtup->t_xmax));
+           newtup->t_tmin = INVALID_ABSTIME;
+           newtup->t_tmax = CURRENT_ABSTIME;
+           ItemPointerSetInvalid(&newtup->t_chain);
+
+           /* add tuple to the page */
+           newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
+                               InvalidOffsetNumber, LP_USED);
+           if ( newoff == InvalidOffsetNumber )
+           {
+               elog (WARN, "\
+failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
+               tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
+               ToVpd->vpd_nusd, ToVpd->vpd_noff);
+           }
+           newitemid = PageGetItemId(ToPage, newoff);
+           pfree (newtup);
+           newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
+           ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
+
+           /* now logically delete end-tuple */
+           TransactionIdStore(myXID, &(htup->t_xmax));
+           htup->t_cmax = myCID;
+           memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
+
+           ToVpd->vpd_nusd++;
+           nmoved++;
+           ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
+           vpc->vpd_voff[vpc->vpd_noff++] = offnum;
+               
+           /* insert index' tuples if needed */
+           if ( Irel != (Relation*) NULL )
+           {
+               for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
+               {
+                   FormIndexDatum (
+                               idcur->natts,
+                               (AttrNumber *)&(idcur->tform->indkey[0]),
+                               newtup, 
+                               tupdesc,
+                               InvalidBuffer,
+                               idatum,
+                               inulls,
+                               idcur->finfoP);
+                   iresult = index_insert (
+                               Irel[i],
+                               idatum,
+                               inulls,
+                               &(newtup->t_ctid),
+                               true);
+                   if (iresult) pfree(iresult);
+               }
+           }
+               
+       } /* walk along page */
+
+       if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
+       {
+           _vc_reappage (&Nvpl, vpc);
+           WriteBuffer(buf);
+       }
+       else if ( dowrite )
+           WriteBuffer(buf);
+       else
+           ReleaseBuffer(buf);
+           
+       if ( offnum <= maxoff )
+           break;                              /* some item(s) left */
+           
+    } /* walk along relation */
+       
+    blkno++;                           /* new number of blocks */
+
+    if ( ToBuf != InvalidBuffer )
+    {
+       Assert (nmoved > 0);
+       WriteBuffer(ToBuf);
     }
 
-    heap_endscan(pgiscan);
-    heap_close(pgindex);
+    if ( nmoved > 0 )
+    {
+       /* 
+        * We have to commit our tuple' movings before we'll truncate 
+        * relation, but we shouldn't lose our locks. And so - quick hack: 
+        * flush buffers and record status of current transaction
+        * as committed, and continue. - vadim 11/13/96
+        */
+       FlushBufferPool(!TransactionFlushEnabled());
+       TransactionIdCommit(myXID);
+       FlushBufferPool(!TransactionFlushEnabled());
+       myCTM = TransactionIdGetCommitTime(myXID);
+    }
+       
+    /* 
+     * Clean uncleaned reapped pages from Vvpl list 
+     * and set commit' times  for inserted tuples
+     */
+    nchkmvd = 0;
+    for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
+    {
+       Assert ( (*vpp)->vpd_blkno < blkno );
+       buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+       page = BufferGetPage(buf);
+       if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
+       {
+           /* noff == 0 in empty pages only - such pages should be re-used */
+           Assert ( (*vpp)->vpd_noff > 0 );
+           _vc_vacpage (page, *vpp, archrel);
+       }
+       else                            /* this page was used */
+       {
+           ntups = 0;
+           moff = PageGetMaxOffsetNumber(page);
+           for (newoff = FirstOffsetNumber;
+                       newoff <= moff;
+                       newoff = OffsetNumberNext(newoff))
+           {
+               itemid = PageGetItemId(page, newoff);
+               if (!ItemIdIsUsed(itemid))
+                   continue;
+               htup = (HeapTuple) PageGetItem(page, itemid);
+               if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
+               {
+                   htup->t_tmin = myCTM;
+                   ntups++;
+               }
+           }
+           Assert ( (*vpp)->vpd_nusd == ntups );
+           nchkmvd += ntups;
+       }
+       WriteBuffer (buf);
+    }
+    Assert ( nmoved == nchkmvd );
 
-    if (nindices > 0)
-       curvrl->vrl_hasindex = true;
+    getrusage(RUSAGE_SELF, &ru1);
+    
+    elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
+Elapsed %u/%u sec.",
+               NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
+               nblocks, blkno, nmoved,
+               ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
+               ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
+
+    if ( Nvpl.vpl_npages > 0 )
+    {
+       /* vacuum indices again if needed */
+       if ( Irel != (Relation*) NULL )
+       {
+           VPageDescr *vpleft, *vpright, vpsave;
+               
+           /* re-sort Nvpl.vpl_pgdesc */
+           for (vpleft = Nvpl.vpl_pgdesc, 
+               vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
+               vpleft < vpright; vpleft++, vpright--)
+           {
+               vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
+           }
+           for (i = 0; i < nindices; i++)
+               _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups);
+       }
+
+       /* 
+        * clean moved tuples from last page in Nvpl list
+        * if some tuples left there
+        */
+       if ( vpc->vpd_noff > 0 && offnum <= maxoff )
+       {
+           Assert (vpc->vpd_blkno == blkno - 1);
+           buf = ReadBuffer(onerel, vpc->vpd_blkno);
+           page = BufferGetPage (buf);
+           ntups = 0;
+           maxoff = offnum;
+           for (offnum = FirstOffsetNumber;
+                       offnum < maxoff;
+                       offnum = OffsetNumberNext(offnum))
+           {
+               itemid = PageGetItemId(page, offnum);
+               if (!ItemIdIsUsed(itemid))
+                   continue;
+               htup = (HeapTuple) PageGetItem(page, itemid);
+               Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
+               itemid->lp_flags &= ~LP_USED;
+               ntups++;
+           }
+           Assert ( vpc->vpd_noff == ntups );
+           PageRepairFragmentation(page);
+           WriteBuffer (buf);
+       }
+
+       /* now - free new list of reapped pages */
+       vpp = Nvpl.vpl_pgdesc;
+       for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
+           pfree(*vpp);
+       pfree (Nvpl.vpl_pgdesc);
+    }
+       
+    /* truncate relation */
+    if ( blkno < nblocks )
+    {
+       blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
+       Assert ( blkno >= 0 );
+       curvrl->vrl_npages = blkno;     /* set new number of blocks */
+    }
+
+    if ( archrel != (Relation) NULL )
+       heap_close(archrel);
+
+    if ( Irel != (Relation*) NULL )    /* pfree index' allocations */
+    {
+       pfree (Idesc);
+       pfree (idatum);
+       pfree (inulls);
+       _vc_clsindices (nindices, Irel);
+    }
+
+    pfree (vpc);
+
+} /* _vc_rpfheap */
+
+/*
+ *  _vc_vacheap() -- free dead tuples
+ *
+ *     This routine marks dead tuples as unused and truncates relation
+ *     if there are "empty" end-blocks.
+ */
+static void
+_vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl)
+{
+    Buffer buf;
+    Page page;
+    VPageDescr *vpp;
+    Relation archrel;
+    int nblocks;
+    int i;
+
+    nblocks = Vvpl->vpl_npages;
+    /* if the relation has an archive, open it */
+    if (onerel->rd_rel->relarch != 'n')
+       archrel = _vc_getarchrel(onerel);
     else
-       curvrl->vrl_hasindex = false;
-}
+    {
+       archrel = (Relation) NULL;
+       nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
+    }
+       
+    for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
+    {
+       if ( (*vpp)->vpd_noff > 0 )
+       {
+           buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
+           page = BufferGetPage (buf);
+           _vc_vacpage (page, *vpp, archrel);
+           WriteBuffer (buf);
+       }
+    }
+
+    /* truncate relation if there are some empty end-pages */
+    if ( Vvpl->vpl_nemend > 0 )
+    {
+       Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend );
+       nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend;
+       elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.",
+               NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
+               curvrl->vrl_npages, nblocks);
+
+       /* 
+        * we have to flush "empty" end-pages (if changed, but who knows it)
+        * before truncation 
+        */
+       FlushBufferPool(!TransactionFlushEnabled());
+
+       nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
+       Assert ( nblocks >= 0 );
+       curvrl->vrl_npages = nblocks;   /* set new number of blocks */
+    }
+
+    if ( archrel != (Relation) NULL )
+       heap_close(archrel);
+
+} /* _vc_vacheap */
+
+/*
+ *  _vc_vacpage() -- free (and archive if needed) dead tuples on a page
+ *                  and repaire its fragmentation.
+ */
+static void
+_vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
+{
+    ItemId itemid;
+    HeapTuple htup;
+    int i;
+    
+    Assert ( vpd->vpd_nusd == 0 );
+    for (i=0; i < vpd->vpd_noff; i++)
+    {
+       itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
+       if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
+       {
+           htup = (HeapTuple) PageGetItem (page, itemid);
+           _vc_archive (archrel, htup);
+       }
+       itemid->lp_flags &= ~LP_USED;
+    }
+    PageRepairFragmentation(page);
+
+} /* _vc_vacpage */
 
 /*
  *  _vc_vaconeind() -- vacuum one index relation.
  *
- *     Curvrl is the VRelList entry for the heap we're currently vacuuming.
- *     It's locked. Onerel is an index relation on the vacuumed heap. 
+ *     Vpl is the VPageList of the heap we're currently vacuuming.
+ *     It's locked. Indrel is an index relation on the vacuumed heap. 
  *     We don't set locks on the index relation here, since the indexed 
  *     access methods support locking at different granularities. 
  *     We let them handle it.
@@ -674,7 +1217,7 @@ _vc_vacindices(VRelList curvrl, Relation onerel)
  *     pg_class.
  */
 static void
-_vc_vaconeind(VRelList curvrl, Relation indrel)
+_vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
 {
     RetrieveIndexResult res;
     IndexScanDesc iscan;
@@ -682,6 +1225,7 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
     int nvac;
     int nitups;
     int nipages;
+    VPageDescr vp;
     struct rusage ru0, ru1;
 
     getrusage(RUSAGE_SELF, &ru0);
@@ -695,7 +1239,8 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
           != (RetrieveIndexResult) NULL) {
        heapptr = &res->heap_iptr;
 
-       if (_vc_tidreapped(heapptr, curvrl, indrel)) {
+       if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
+       {
 #if 0
            elog(DEBUG, "<%x,%x> -> <%x,%x>",
                 ItemPointerGetBlockNumber(&(res->index_iptr)),
@@ -703,6 +1248,12 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
                 ItemPointerGetBlockNumber(&(res->heap_iptr)),
                 ItemPointerGetOffsetNumber(&(res->heap_iptr)));
 #endif
+           if ( vp->vpd_noff == 0 ) 
+           {                           /* this is EmptyPage !!! */
+               elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
+                       NAMEDATALEN, indrel->rd_rel->relname.data,
+                       vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
+           }
            ++nvac;
            index_delete(indrel, &res->index_iptr);
        } else {
@@ -721,12 +1272,58 @@ _vc_vaconeind(VRelList curvrl, Relation indrel)
 
     getrusage(RUSAGE_SELF, &ru1);
 
-    elog (NOTICE, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
+    elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
        NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
        ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
        ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
 
-}
+    if ( nitups != nhtups )
+       elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
+               nitups, nhtups);
+
+} /* _vc_vaconeind */
+
+/*
+ *  _vc_tidreapped() -- is a particular tid reapped?
+ *
+ *     vpl->VPageDescr_array is sorted in right order.
+ */
+static VPageDescr
+_vc_tidreapped(ItemPointer itemptr, VPageList vpl)
+{
+    OffsetNumber ioffno;
+    OffsetNumber *voff;
+    VPageDescr vp, *vpp;
+    VPageDescrData vpd;
+
+    vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
+    ioffno = ItemPointerGetOffsetNumber(itemptr);
+       
+    vp = &vpd;
+    vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc), 
+               vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
+               _vc_cmp_blk);
+
+    if ( vpp == (VPageDescr*) NULL )
+       return ((VPageDescr)NULL);
+    vp = *vpp;
+
+    /* ok - we are on true page */
+
+    if ( vp->vpd_noff == 0 ) {         /* this is EmptyPage !!! */
+       return (vp);
+    }
+    
+    voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
+               vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
+               _vc_cmp_offno);
+
+    if ( voff == (OffsetNumber*) NULL )
+       return ((VPageDescr)NULL);
+
+    return (vp);
+
+} /* _vc_tidreapped */
 
 /*
  *  _vc_updstats() -- update pg_class statistics for one relation
@@ -771,7 +1368,7 @@ _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex)
     pgcform->relhasindex = hasindex;
  
     /* XXX -- after write, should invalidate relcache in other backends */
-    WriteNoReleaseBuffer(buf);
+    WriteNoReleaseBuffer(buf); /* heap_endscan release scan' buffers ? */
 
     /* that's all, folks */
     heap_endscan(sdesc);
@@ -788,88 +1385,47 @@ static void _vc_setpagelock(Relation rel, BlockNumber blkno)
     RelationSetLockForWritePage(rel, &itm);
 }
 
-/*
- *  _vc_tidreapped() -- is a particular tid reapped?
- *
- *     VPageDescr array is sorted in right order.
- */
-static bool
-_vc_tidreapped(ItemPointer itemptr, VRelList curvrl, Relation indrel)
-{
-    OffsetNumber ioffno;
-    OffsetNumber *voff;
-    VPageDescr vp, *vpp;
-    VPageDescrData vpd;
-
-    vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
-    ioffno = ItemPointerGetOffsetNumber(itemptr);
-       
-    vp = &vpd;
-    vpp = (VPageDescr*) _vc_find_eq ((char*)(curvrl->vrl_pgdsc), 
-               curvrl->vrl_nrepg, sizeof (VPageDescr), (char*)&vp, 
-               _vc_cmp_blk);
-
-    if ( vpp == (VPageDescr*) NULL )
-       return (false);
-    vp = *vpp;
-
-    /* ok - we are on true page */
-
-    if ( vp->vpd_noff == 0 ) {         /* this is EmptyPage !!! */
-       elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
-               NAMEDATALEN, indrel->rd_rel->relname.data,
-               vpd.vpd_blkno, ioffno);
-       return (true);
-    }
-    
-    voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
-               vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
-               _vc_cmp_offno);
-
-    if ( voff == (OffsetNumber*) NULL )
-       return (false);
-
-    return (true);
-
-} /* _vc_tidreapped */
-
 
 /*
- *  _vc_reappage() -- save a page on the array of reaped pages for the current
- *                  entry on the vacuum relation list.
+ *  _vc_reappage() -- save a page on the array of reapped pages.
  *
  *     As a side effect of the way that the vacuuming loop for a given
  *     relation works, higher pages come after lower pages in the array
  *     (and highest tid on a page is last).
  */
-static void
-_vc_reappage(Portal p,
-           VRelList curvrl,
-           VPageDescr vpc)
+static void 
+_vc_reappage(VPageList vpl, VPageDescr vpc)
 {
-    PortalVariableMemory pmem;
-    MemoryContext old;
     VPageDescr newvpd;
 
-    /* allocate a VPageDescrData entry in the portal memory context */
-    pmem = PortalGetVariableMemory(p);
-    old = MemoryContextSwitchTo((MemoryContext) pmem);
-    if ( curvrl->vrl_nrepg == 0 )
-       curvrl->vrl_pgdsc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
-    else if ( curvrl->vrl_nrepg % 100 == 0 )
-       curvrl->vrl_pgdsc = (VPageDescr*) repalloc(curvrl->vrl_pgdsc, (curvrl->vrl_nrepg+100)*sizeof(VPageDescr));
+    /* allocate a VPageDescrData entry */
     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
-    MemoryContextSwitchTo(old);
 
     /* fill it in */
     if ( vpc->vpd_noff > 0 )
        memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
     newvpd->vpd_blkno = vpc->vpd_blkno;
     newvpd->vpd_free = vpc->vpd_free;
+    newvpd->vpd_nusd = vpc->vpd_nusd;
     newvpd->vpd_noff = vpc->vpd_noff;
 
-    curvrl->vrl_pgdsc[curvrl->vrl_nrepg] = newvpd;
-    (curvrl->vrl_nrepg)++;
+    /* insert this page into vpl list */
+    _vc_vpinsert (vpl, newvpd);
+    
+} /* _vc_reappage */
+
+static void
+_vc_vpinsert (VPageList vpl, VPageDescr vpnew)
+{
+
+    /* allocate a VPageDescr entry if needed */
+    if ( vpl->vpl_npages == 0 )
+       vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
+    else if ( vpl->vpl_npages % 100 == 0 )
+       vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
+    vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
+    (vpl->vpl_npages)++;
+    
 }
 
 static void
@@ -877,8 +1433,6 @@ _vc_free(Portal p, VRelList vrl)
 {
     VRelList p_vrl;
     VAttList p_val, val;
-    VPageDescr *vpd;
-    int i;
     MemoryContext old;
     PortalVariableMemory pmem;
 
@@ -895,15 +1449,6 @@ _vc_free(Portal p, VRelList vrl)
            pfree(p_val);
        }
 
-       /* free page' descriptions */
-       if ( vrl->vrl_nrepg > 0 ) {
-           vpd = vrl->vrl_pgdsc;
-           for (i = 0; i < vrl->vrl_nrepg; i++) {
-               pfree(vpd[i]);
-           }
-           pfree (vpd);
-       }
-
        /* free rel list entry */
        p_vrl = vrl;
        vrl = vrl->vrl_next;
@@ -1038,3 +1583,149 @@ _vc_cmp_offno (char *left, char *right)
     return (1);
 
 } /* _vc_cmp_offno */
+
+
+static void
+_vc_getindices (Oid relid, int *nindices, Relation **Irel)
+{
+    Relation pgindex;
+    Relation irel;
+    TupleDesc pgidesc;
+    HeapTuple pgitup;
+    HeapScanDesc pgiscan;
+    Datum d;
+    int i, k;
+    bool n;
+    ScanKeyData pgikey;
+    Oid *ioid;
+
+    *nindices = i = 0;
+    
+    ioid = (Oid *) palloc(10*sizeof(Oid));
+
+    /* prepare a heap scan on the pg_index relation */
+    pgindex = heap_openr(IndexRelationName);
+    pgidesc = RelationGetTupleDescriptor(pgindex);
+
+    ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
+                          ObjectIdEqualRegProcedure,
+                          ObjectIdGetDatum(relid));
+
+    pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
+
+    while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
+       d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
+                                pgidesc, &n);
+       i++;
+       if ( i % 10 == 0 )
+           ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
+       ioid[i-1] = DatumGetObjectId(d);
+    }
+
+    heap_endscan(pgiscan);
+    heap_close(pgindex);
+
+    if ( i == 0 ) {    /* No one index found */
+       pfree(ioid);
+       return;
+    }
+
+    if ( Irel != (Relation **) NULL )
+       *Irel = (Relation *) palloc(i * sizeof(Relation));
+    
+    for (k = 0; i > 0; )
+    {
+       irel = index_open(ioid[--i]);
+       if ( irel != (Relation) NULL )
+       {
+           if ( Irel != (Relation **) NULL )
+               (*Irel)[k] = irel;
+           else
+               index_close (irel);
+           k++;
+       }
+       else
+           elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
+    }
+    *nindices = k;
+    pfree(ioid);
+
+    if ( Irel != (Relation **) NULL && *nindices == 0 )
+    {
+       pfree (*Irel);
+       *Irel = (Relation *) NULL;
+    }
+
+} /* _vc_getindices */
+
+
+static void
+_vc_clsindices (int nindices, Relation *Irel)
+{
+
+    if ( Irel == (Relation*) NULL )
+       return;
+
+    while (nindices--) {
+       index_close (Irel[nindices]);
+    }
+    pfree (Irel);
+
+} /* _vc_clsindices */
+
+
+static void
+_vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
+{
+    IndDesc *idcur;
+    HeapTuple pgIndexTup;
+    AttrNumber *attnumP;
+    int natts;
+    int i;
+
+    *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
+    
+    for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
+       pgIndexTup =
+               SearchSysCacheTuple(INDEXRELID,
+                               ObjectIdGetDatum(Irel[i]->rd_id),
+                               0,0,0);
+       Assert(pgIndexTup);
+       idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
+       for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
+               *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
+               attnumP++, natts++);
+       if (idcur->tform->indproc != InvalidOid) {
+           idcur->finfoP = &(idcur->finfo);
+           FIgetnArgs(idcur->finfoP) = natts;
+           natts = 1;
+           FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
+           *(FIgetname(idcur->finfoP)) = '\0';
+       } else
+           idcur->finfoP = (FuncIndexInfo *) NULL;
+       
+       idcur->natts = natts;
+    }
+    
+} /* _vc_mkindesc */
+
+
+static bool
+_vc_enough_space (VPageDescr vpd, Size len)
+{
+
+    len = DOUBLEALIGN(len);
+
+    if ( len > vpd->vpd_free )
+       return (false);
+    
+    if ( vpd->vpd_nusd < vpd->vpd_noff )       /* there are free itemid(s) */
+       return (true);                          /* and len <= free_space */
+    
+    /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
+    if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
+       return (true);
+    
+    return (false);
+    
+} /* _vc_enough_space */