]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Clarify clearing of attribute stats memory.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *    the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.20 1997/02/18 04:13:57 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/file.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <commands/vacuum.h>
46 #include <parser/catalog_utils.h>
47 #include <storage/bufpage.h>
48 #include "storage/shmem.h"
49 #ifndef HAVE_GETRUSAGE
50 # include <rusagestub.h>
51 #else 
52 # include <sys/time.h>
53 # include <sys/resource.h>
54 #endif 
55
56 bool VacuumRunning =    false;
57 static int MESSAGE_LEVEL;       /* message level */
58
59 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
60 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
61 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
62 #define VacAttrStatsEqValid(stats) ( RegProcedureIsValid(stats->cmpeq))
63 #define VacAttrStatsLtGtValid(stats) ( RegProcedureIsValid(stats->cmplt) && \
64                                    RegProcedureIsValid(stats->cmpgt) && \
65                                    RegProcedureIsValid(stats->outfunc) )
66   
67
68 /* non-export function prototypes */
69 static void vc_init(void);
70 static void vc_shutdown(void);
71 static void vc_vacuum(NameData *VacRelP);
72 static VRelList vc_getrels(Portal p, NameData *VacRelP);
73 static void vc_vacone (Oid relid);
74 static void vc_scanheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
75 static void vc_rpfheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
76 static void vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList vpl);
77 static void vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
78 static void vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
79 static void vc_scanoneind (Relation indrel, int nhtups);
80 static void vc_attrstats(Relation onerel, VacAttrStats *vacattrstats, HeapTuple htup);
81 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
82 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VacAttrStats *vacattrstats);
83 static void vc_delhilowstats(Oid relid);
84 static void vc_setpagelock(Relation rel, BlockNumber blkno);
85 static VPageDescr vc_tidreapped (ItemPointer itemptr, VPageList vpl);
86 static void vc_reappage (VPageList vpl, VPageDescr vpc);
87 static void vc_vpinsert (VPageList vpl, VPageDescr vpnew);
88 static void vc_free(Portal p, VRelList vrl);
89 static void vc_getindices (Oid relid, int *nindices, Relation **Irel);
90 static void vc_clsindices (int nindices, Relation *Irel);
91 static Relation vc_getarchrel(Relation heaprel);
92 static void vc_archive(Relation archrel, HeapTuple htup);
93 static bool vc_isarchrel(char *rname);
94 static void vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
95 static char * vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
96 static int vc_cmp_blk (char *left, char *right);
97 static int vc_cmp_offno (char *left, char *right);
98 static bool vc_enough_space (VPageDescr vpd, Size len);
99
100 void
101 vacuum(char *vacrel, bool verbose)
102 {
103     NameData VacRel;
104
105     if (verbose)
106         MESSAGE_LEVEL = NOTICE;
107     else
108         MESSAGE_LEVEL = DEBUG;
109
110     /* vacrel gets de-allocated on transaction commit */
111         
112     /* initialize vacuum cleaner */
113     vc_init();
114
115     /* vacuum the database */
116     if (vacrel)
117     {
118         strcpy(VacRel.data,vacrel);
119         vc_vacuum(&VacRel);
120     }
121     else
122         vc_vacuum(NULL);
123
124     /* clean up */
125     vc_shutdown();
126 }
127
128 /*
129  *  vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
130  *
131  *      We run exactly one vacuum cleaner at a time.  We use the file system
132  *      to guarantee an exclusive lock on vacuuming, since a single vacuum
133  *      cleaner instantiation crosses transaction boundaries, and we'd lose
134  *      postgres-style locks at the end of every transaction.
135  *
136  *      The strangeness with committing and starting transactions in the
137  *      init and shutdown routines is due to the fact that the vacuum cleaner
138  *      is invoked via a sql command, and so is already executing inside
139  *      a transaction.  We need to leave ourselves in a predictable state
140  *      on entry and exit to the vacuum cleaner.  We commit the transaction
141  *      started in PostgresMain() inside vc_init(), and start one in
142  *      vc_shutdown() to match the commit waiting for us back in
143  *      PostgresMain().
144  */
145 static void
146 vc_init()
147 {
148     int fd;
149
150     if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
151         elog(WARN, "can't create lock file -- another vacuum cleaner running?");
152
153     close(fd);
154
155     /*
156      *  By here, exclusive open on the lock file succeeded.  If we abort
157      *  for any reason during vacuuming, we need to remove the lock file.
158      *  This global variable is checked in the transaction manager on xact
159      *  abort, and the routine vc_abort() is called if necessary.
160      */
161
162     VacuumRunning = true;
163
164     /* matches the StartTransaction in PostgresMain() */
165     CommitTransactionCommand();
166 }
167
168 static void
169 vc_shutdown()
170 {
171     /* on entry, not in a transaction */
172     if (unlink("pg_vlock") < 0)
173         elog(WARN, "vacuum: can't destroy lock file!");
174
175     /* okay, we're done */
176     VacuumRunning = false;
177
178     /* matches the CommitTransaction in PostgresMain() */
179     StartTransactionCommand();
180
181 }
182
183 void
184 vc_abort()
185 {
186     /* on abort, remove the vacuum cleaner lock file */
187     (void) unlink("pg_vlock");
188
189     VacuumRunning = false;
190 }
191
192 /*
193  *  vc_vacuum() -- vacuum the database.
194  *
195  *      This routine builds a list of relations to vacuum, and then calls
196  *      code that vacuums them one at a time.  We are careful to vacuum each
197  *      relation in a separate transaction in order to avoid holding too many
198  *      locks at one time.
199  */
200 static void
201 vc_vacuum(NameData *VacRelP)
202 {
203     VRelList vrl, cur;
204     char *pname;
205     Portal p;
206
207     /*
208      *  Create a portal for safe memory across transctions.  We need to
209      *  palloc the name space for it because our hash function expects
210      *  the name to be on a longword boundary.  CreatePortal copies the
211      *  name to safe storage for us.
212      */
213
214     pname = (char *) palloc(strlen(VACPNAME) + 1);
215     strcpy(pname, VACPNAME);
216     p = CreatePortal(pname);
217     pfree(pname);
218
219     /* get list of relations */
220     vrl = vc_getrels(p, VacRelP);
221
222     if (VacRelP != NULL)
223         vc_delhilowstats(vrl->vrl_relid);
224     else
225         vc_delhilowstats(InvalidOid);
226         
227     /* vacuum each heap relation */
228     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
229         vc_vacone (cur->vrl_relid);
230
231     vc_free(p, vrl);
232
233     PortalDestroy(&p);
234 }
235
236 static VRelList
237 vc_getrels(Portal p, NameData *VacRelP)
238 {
239     Relation pgclass;
240     TupleDesc pgcdesc;
241     HeapScanDesc pgcscan;
242     HeapTuple pgctup;
243     Buffer buf;
244     PortalVariableMemory portalmem;
245     MemoryContext old;
246     VRelList vrl, cur;
247     Datum d;
248     char *rname;
249     char rkind;
250     int16 smgrno;
251     bool n;
252     ScanKeyData  pgckey;
253     bool found = false;
254
255     StartTransactionCommand();
256
257     if (VacRelP->data) {
258         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
259                                NameEqualRegProcedure, 
260                                PointerGetDatum(VacRelP->data));
261     } else {
262         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
263                                CharacterEqualRegProcedure, CharGetDatum('r'));
264     }
265  
266     portalmem = PortalGetVariableMemory(p);
267     vrl = cur = (VRelList) NULL;
268
269     pgclass = heap_openr(RelationRelationName);
270     pgcdesc = RelationGetTupleDescriptor(pgclass);
271
272     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
273
274     while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
275
276         found = true;
277         
278         /*
279          *  We have to be careful not to vacuum the archive (since it
280          *  already contains vacuumed tuples), and not to vacuum
281          *  relations on write-once storage managers like the Sony
282          *  jukebox at Berkeley.
283          */
284
285         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
286                                  pgcdesc, &n);
287         rname = (char*)d;
288
289         /* skip archive relations */
290         if (vc_isarchrel(rname)) {
291             ReleaseBuffer(buf);
292             continue;
293         }
294
295         /* don't vacuum large objects for now - something breaks when we do */
296         if ( (strlen(rname) > 4) && rname[0] == 'X' &&
297                 rname[1] == 'i' && rname[2] == 'n' &&
298                 (rname[3] == 'v' || rname[3] == 'x'))
299         {
300             elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
301                         NAMEDATALEN, rname);
302             ReleaseBuffer(buf);
303             continue;
304         }
305
306         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
307                                  pgcdesc, &n);
308         smgrno = DatumGetInt16(d);
309
310         /* skip write-once storage managers */
311         if (smgriswo(smgrno)) {
312             ReleaseBuffer(buf);
313             continue;
314         }
315
316         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
317                                  pgcdesc, &n);
318
319         rkind = DatumGetChar(d);
320
321         /* skip system relations */
322         if (rkind != 'r') {
323             ReleaseBuffer(buf);
324             elog(NOTICE, "Vacuum: can not process index and certain system tables" );
325             continue;
326         }
327                                  
328         /* get a relation list entry for this guy */
329         old = MemoryContextSwitchTo((MemoryContext)portalmem);
330         if (vrl == (VRelList) NULL) {
331             vrl = cur = (VRelList) palloc(sizeof(VRelListData));
332         } else {
333             cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
334             cur = cur->vrl_next;
335         }
336         (void) MemoryContextSwitchTo(old);
337
338         cur->vrl_relid = pgctup->t_oid;
339         cur->vrl_next = (VRelList) NULL;
340
341         /* wei hates it if you forget to do this */
342         ReleaseBuffer(buf);
343     }
344     if (found == false)
345         elog(NOTICE, "Vacuum: table not found" );
346
347     
348     heap_endscan(pgcscan);
349     heap_close(pgclass);
350
351     CommitTransactionCommand();
352
353     return (vrl);
354 }
355
356 /*
357  *  vc_vacone() -- vacuum one heap relation
358  *
359  *      This routine vacuums a single heap, cleans out its indices, and
360  *      updates its statistics npages and ntups statistics.
361  *
362  *      Doing one heap at a time incurs extra overhead, since we need to
363  *      check that the heap exists again just before we vacuum it.  The
364  *      reason that we do this is so that vacuuming can be spread across
365  *      many small transactions.  Otherwise, two-phase locking would require
366  *      us to lock the entire database during one pass of the vacuum cleaner.
367  */
368 static void
369 vc_vacone (Oid relid)
370 {
371     Relation pgclass;
372     TupleDesc pgcdesc;
373     HeapTuple pgctup, pgttup;
374     Buffer pgcbuf;
375     HeapScanDesc pgcscan;
376     Relation onerel;
377     ScanKeyData pgckey;
378     VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
379     VPageListData Fvpl; /* List of pages with space enough for re-using */
380     VPageDescr *vpp;
381     Relation *Irel;
382     int32 nindices, i, attr_cnt;
383     AttributeTupleForm *attr;
384     VRelStats *vacrelstats;
385     
386     StartTransactionCommand();
387
388     ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
389                            ObjectIdEqualRegProcedure,
390                            ObjectIdGetDatum(relid));
391
392     pgclass = heap_openr(RelationRelationName);
393     pgcdesc = RelationGetTupleDescriptor(pgclass);
394     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
395
396     /*
397      *  Race condition -- if the pg_class tuple has gone away since the
398      *  last time we saw it, we don't need to vacuum it.
399      */
400
401     if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
402         heap_endscan(pgcscan);
403         heap_close(pgclass);
404         CommitTransactionCommand();
405         return;
406     }
407
408     /* now open the class and vacuum it */
409     onerel = heap_open(relid);
410
411     attr_cnt = onerel->rd_att->natts;
412     attr = onerel->rd_att->attrs;
413
414     vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
415     vacrelstats->relid = relid;
416     vacrelstats->npages = vacrelstats->ntups = 0;
417     vacrelstats->hasindex = false;
418     vacrelstats->vacattrstats =
419                 (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
420     
421     for (i = 0; i < attr_cnt; i++) {
422         Operator func_operator;
423         OperatorTupleForm pgopform;
424         VacAttrStats *stats = &vacrelstats->vacattrstats[i];
425
426         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
427         memmove(stats->attr,attr[i],ATTRIBUTE_TUPLE_SIZE);
428         stats->best = stats->guess1 = stats->guess2 = 0;
429         stats->max = stats->min = 0;
430         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
431         stats->max_len = stats->min_len = 0;
432         stats->initialized = false;
433         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
434         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
435         
436         func_operator = oper("=",stats->attr->atttypid,stats->attr->atttypid,true);
437         if (func_operator != NULL) {
438             pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
439             stats->cmpeq = pgopform->oprcode;
440         }
441         else    stats->cmpeq = InvalidOid;
442         func_operator = oper("<",stats->attr->atttypid,stats->attr->atttypid,true);
443         if (func_operator != NULL) {
444             pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
445             stats->cmplt = pgopform->oprcode;
446         }
447         else    stats->cmplt = InvalidOid;
448         func_operator = oper(">",stats->attr->atttypid,stats->attr->atttypid,true);
449         if (func_operator != NULL) {
450             pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
451             stats->cmpgt = pgopform->oprcode;
452         }
453         else    stats->cmpgt = InvalidOid;
454         func_operator = oper(">",stats->attr->atttypid,stats->attr->atttypid,true);
455         if (func_operator != NULL) {
456             pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
457             stats->cmpgt = pgopform->oprcode;
458         }
459         else    stats->cmpgt = InvalidOid;
460         pgttup = SearchSysCacheTuple(TYPOID,
461                                     ObjectIdGetDatum(stats->attr->atttypid),
462                                     0,0,0);
463         if (HeapTupleIsValid(pgttup))
464             stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
465         else    stats->outfunc = InvalidOid;
466     }
467
468     /* we require the relation to be locked until the indices are cleaned */
469     RelationSetLockForWrite(onerel);
470
471     /* scan it */
472     Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
473     vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
474
475     /* Now open indices */
476     Irel = (Relation *) NULL;
477     vc_getindices(vacrelstats->relid, &nindices, &Irel);
478     
479     if ( nindices > 0 )
480         vacrelstats->hasindex = true;
481     else
482         vacrelstats->hasindex = false;
483
484     /* Clean/scan index relation(s) */
485     if ( Irel != (Relation*) NULL )
486     {
487         if ( Vvpl.vpl_npages > 0 )
488         {
489             for (i = 0; i < nindices; i++)
490                 vc_vaconeind (&Vvpl, Irel[i], vacrelstats->ntups);
491         }
492         else    /* just scan indices to update statistic */
493         {
494             for (i = 0; i < nindices; i++)
495                 vc_scanoneind (Irel[i], vacrelstats->ntups);
496         }
497     }
498
499     if ( Fvpl.vpl_npages > 0 )          /* Try to shrink heap */
500         vc_rpfheap (vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
501     else 
502     {
503         if ( Irel != (Relation*) NULL )
504             vc_clsindices (nindices, Irel);
505         if ( Vvpl.vpl_npages > 0 )      /* Clean pages from Vvpl list */
506             vc_vacheap (vacrelstats, onerel, &Vvpl);
507     }
508
509     /* ok - free Vvpl list of reapped pages */
510     if ( Vvpl.vpl_npages > 0 )
511     {
512         vpp = Vvpl.vpl_pgdesc;
513         for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
514             pfree(*vpp);
515         pfree (Vvpl.vpl_pgdesc);
516         if ( Fvpl.vpl_npages > 0 )
517             pfree (Fvpl.vpl_pgdesc);
518     }
519
520     /* all done with this class */
521     heap_close(onerel);
522     heap_endscan(pgcscan);
523     heap_close(pgclass);
524
525     /* update statistics in pg_class */
526     vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
527                          vacrelstats->hasindex, vacrelstats->vacattrstats);
528
529     /* next command frees attribute stats */
530
531     CommitTransactionCommand();
532 }
533
534 /*
535  *  vc_scanheap() -- scan an open heap relation
536  *
537  *      This routine sets commit times, constructs Vvpl list of 
538  *      empty/uninitialized pages and pages with dead tuples and
539  *      ~LP_USED line pointers, constructs Fvpl list of pages
540  *      appropriate for purposes of shrinking and maintains statistics 
541  *      on the number of live tuples in a heap.
542  */
543 static void
544 vc_scanheap (VRelStats *vacrelstats, Relation onerel, 
545                         VPageList Vvpl, VPageList Fvpl)
546 {
547     int nblocks, blkno;
548     ItemId itemid;
549     ItemPointer itemptr;
550     HeapTuple htup;
551     Buffer buf;
552     Page page, tempPage = NULL;
553     OffsetNumber offnum, maxoff;
554     bool pgchanged, tupgone, dobufrel, notup;
555     AbsoluteTime purgetime, expiretime;
556     RelativeTime preservetime;
557     char *relname;
558     VPageDescr vpc, vp;
559     uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
560     Size frsize, frsusf;
561     Size min_tlen = MAXTUPLEN;
562     Size max_tlen = 0;
563     int32 i/*, attr_cnt*/;
564     struct rusage ru0, ru1;
565
566     getrusage(RUSAGE_SELF, &ru0);
567
568     nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
569     frsize = frsusf = 0;
570
571     relname = (RelationGetRelationName(onerel))->data;
572
573     nblocks = RelationGetNumberOfBlocks(onerel);
574
575     /* calculate the purge time: tuples that expired before this time
576        will be archived or deleted */
577     purgetime = GetCurrentTransactionStartTime();
578     expiretime = (AbsoluteTime)onerel->rd_rel->relexpires;
579     preservetime = (RelativeTime)onerel->rd_rel->relpreserved;
580
581     if (RelativeTimeIsValid(preservetime) && (preservetime)) {
582         purgetime -= preservetime;
583         if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime) &&
584             expiretime > purgetime)
585             purgetime = expiretime;
586     }
587
588     else if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime))
589         purgetime = expiretime;
590
591     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
592     vpc->vpd_nusd = 0;
593             
594     for (blkno = 0; blkno < nblocks; blkno++) {
595         buf = ReadBuffer(onerel, blkno);
596         page = BufferGetPage(buf);
597         vpc->vpd_blkno = blkno;
598         vpc->vpd_noff = 0;
599
600         if (PageIsNew(page)) {
601             elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
602                 NAMEDATALEN, relname, blkno);
603             PageInit (page, BufferGetPageSize (buf), 0);
604             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
605             frsize += (vpc->vpd_free - sizeof (ItemIdData));
606             nnepg++;
607             nemend++;
608             vc_reappage (Vvpl, vpc);
609             WriteBuffer(buf);
610             continue;
611         }
612
613         if (PageIsEmpty(page)) {
614             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
615             frsize += (vpc->vpd_free - sizeof (ItemIdData));
616             nempg++;
617             nemend++;
618             vc_reappage (Vvpl, vpc);
619             ReleaseBuffer(buf);
620             continue;
621         }
622
623         pgchanged = false;
624         notup = true;
625         maxoff = PageGetMaxOffsetNumber(page);
626         for (offnum = FirstOffsetNumber;
627              offnum <= maxoff;
628              offnum = OffsetNumberNext(offnum)) {
629             itemid = PageGetItemId(page, offnum);
630
631             /*
632              * Collect un-used items too - it's possible to have
633              * indices pointing here after crash.
634              */
635             if (!ItemIdIsUsed(itemid)) {
636                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
637                 nunused++;
638                 continue;
639             }
640
641             htup = (HeapTuple) PageGetItem(page, itemid);
642             tupgone = false;
643
644             if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) && 
645                 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
646
647                 if (TransactionIdDidAbort(htup->t_xmin)) {
648                     tupgone = true;
649                 } else if (TransactionIdDidCommit(htup->t_xmin)) {
650                     htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
651                     pgchanged = true;
652                 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
653                     /* 
654                      * Not Aborted, Not Committed, Not in Progress -
655                      * so it from crashed process. - vadim 11/26/96
656                      */
657                     ncrash++;
658                     tupgone = true;
659                 }
660                 else {
661                     elog (MESSAGE_LEVEL, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
662                         NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
663                 }
664             }
665
666             if (TransactionIdIsValid((TransactionId)htup->t_xmax)) {
667                 if (TransactionIdDidAbort(htup->t_xmax)) {
668                     StoreInvalidTransactionId(&(htup->t_xmax));
669                     pgchanged = true;
670                 } else if (TransactionIdDidCommit(htup->t_xmax)) {
671                     if (!AbsoluteTimeIsBackwardCompatiblyReal(htup->t_tmax)) {
672
673                         htup->t_tmax = TransactionIdGetCommitTime(htup->t_xmax);  
674                         pgchanged = true;
675                     }
676
677                     /*
678                      *  Reap the dead tuple if its expiration time is
679                      *  before purgetime.
680                      */
681
682                     if (htup->t_tmax < purgetime) {
683                         tupgone = true;
684                     }
685                 }
686             }
687
688             /*
689              * Is it possible at all ? - vadim 11/26/96
690              */
691             if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
692             {
693                 elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
694 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
695                         TransactionIdIsValid((TransactionId)htup->t_xmax),
696                         tupgone);
697             }
698             
699             /*
700              * It's possibly! But from where it comes ?
701              * And should we fix it ?  - vadim 11/28/96
702              */
703             itemptr = &(htup->t_ctid);
704             if ( !ItemPointerIsValid (itemptr) || 
705                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
706             {
707                 elog (NOTICE, "ITEM POINTER IS INVALID: %u/%u FOR %u/%u. TUPGONE %d.", 
708                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)), 
709                         itemptr->ip_posid, blkno, offnum, tupgone);
710             }
711
712             /*
713              * Other checks...
714              */
715             if ( htup->t_len != itemid->lp_len )
716             {
717                 elog (NOTICE, "PAGEHEADER' LEN %u IS NOT THE SAME AS HTUP' %u FOR %u/%u.TUPGONE %d.", 
718                         itemid->lp_len, htup->t_len, blkno, offnum, tupgone);
719             }
720             if ( !OidIsValid(htup->t_oid) )
721             {
722                 elog (NOTICE, "OID IS INVALID FOR %u/%u.TUPGONE %d.", 
723                         blkno, offnum, tupgone);
724             }
725             
726             if (tupgone) {
727                 ItemId lpp;
728                                                     
729                 if ( tempPage == (Page) NULL )
730                 {
731                     Size pageSize;
732                     
733                     pageSize = PageGetPageSize(page);
734                     tempPage = (Page) palloc(pageSize);
735                     memmove (tempPage, page, pageSize);
736                 }
737                 
738                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
739
740                 /* mark it unused */
741                 lpp->lp_flags &= ~LP_USED;
742
743                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
744                 nvac++;
745
746             } else {
747                 ntups++;
748                 notup = false;
749                 if ( htup->t_len < min_tlen )
750                     min_tlen = htup->t_len;
751                 if ( htup->t_len > max_tlen )
752                     max_tlen = htup->t_len;
753                 vc_attrstats(onerel, vacrelstats->vacattrstats, htup);
754             }
755         }
756
757         if (pgchanged) {
758             WriteBuffer(buf);
759             dobufrel = false;
760             nchpg++;
761         }
762         else
763             dobufrel = true;
764         if ( tempPage != (Page) NULL )
765         { /* Some tuples are gone */
766             PageRepairFragmentation(tempPage);
767             vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
768             frsize += vpc->vpd_free;
769             vc_reappage (Vvpl, vpc);
770             pfree (tempPage);
771             tempPage = (Page) NULL;
772         }
773         else if ( vpc->vpd_noff > 0 )
774         { /* there are only ~LP_USED line pointers */
775             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
776             frsize += vpc->vpd_free;
777             vc_reappage (Vvpl, vpc);
778         }
779         if ( dobufrel )
780             ReleaseBuffer(buf);
781         if ( notup )
782             nemend++;
783         else
784             nemend = 0;
785     }
786
787     pfree (vpc);
788
789     /* save stats in the rel list for use later */
790     vacrelstats->ntups = ntups;
791     vacrelstats->npages = nblocks;
792 /*    vacrelstats->natts = attr_cnt;*/
793     if ( ntups == 0 )
794         min_tlen = max_tlen = 0;
795     vacrelstats->min_tlen = min_tlen;
796     vacrelstats->max_tlen = max_tlen;
797     
798     Vvpl->vpl_nemend = nemend;
799     Fvpl->vpl_nemend = nemend;
800
801     /* 
802      * Try to make Fvpl keeping in mind that we can't use free space 
803      * of "empty" end-pages and last page if it reapped.
804      */
805     if ( Vvpl->vpl_npages - nemend > 0 )
806     {
807         int nusf;               /* blocks usefull for re-using */
808         
809         nusf = Vvpl->vpl_npages - nemend;
810         if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
811             nusf--;
812     
813         for (i = 0; i < nusf; i++)
814         {
815             vp = Vvpl->vpl_pgdesc[i];
816             if ( vc_enough_space (vp, min_tlen) )
817             {
818                 vc_vpinsert (Fvpl, vp);
819                 frsusf += vp->vpd_free;
820             }
821         }
822     }
823
824     getrusage(RUSAGE_SELF, &ru1);
825     
826     elog (MESSAGE_LEVEL, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
827 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
828         NAMEDATALEN, relname, 
829         nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
830         ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
831         frsize, frsusf, nemend, Fvpl->vpl_npages,
832         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
833         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
834
835 } /* vc_scanheap */
836
837
838 /*
839  *  vc_rpfheap() -- try to repaire relation' fragmentation
840  *
841  *      This routine marks dead tuples as unused and tries re-use dead space
842  *      by moving tuples (and inserting indices if needed). It constructs 
843  *      Nvpl list of free-ed pages (moved tuples) and clean indices
844  *      for them after committing (in hack-manner - without losing locks
845  *      and freeing memory!) current transaction. It truncates relation
846  *      if some end-blocks are gone away.
847  */
848 static void
849 vc_rpfheap (VRelStats *vacrelstats, Relation onerel, 
850                 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
851 {
852     TransactionId myXID;
853     CommandId myCID;
854     AbsoluteTime myCTM = 0;
855     Buffer buf, ToBuf;
856     int nblocks, blkno;
857     Page page, ToPage = NULL;
858     OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
859     ItemId itemid, newitemid;
860     HeapTuple htup, newtup;
861     TupleDesc tupdesc = NULL;
862     Datum *idatum = NULL;
863     char *inulls = NULL;
864     InsertIndexResult iresult;
865     VPageListData Nvpl;
866     VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
867     int ToVpI = 0;
868     IndDesc *Idesc, *idcur;
869     int Fblklast, Vblklast, i;
870     Size tlen;
871     int nmoved, Fnpages, Vnpages;
872     int nchkmvd, ntups;
873     bool isempty, dowrite;
874     Relation archrel;
875     struct rusage ru0, ru1;
876
877     getrusage(RUSAGE_SELF, &ru0);
878
879     myXID = GetCurrentTransactionId();
880     myCID = GetCurrentCommandId();
881         
882     if ( Irel != (Relation*) NULL )     /* preparation for index' inserts */
883     {
884         vc_mkindesc (onerel, nindices, Irel, &Idesc);
885         tupdesc = RelationGetTupleDescriptor(onerel);
886         idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
887         inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
888     }
889
890     /* if the relation has an archive, open it */
891     if (onerel->rd_rel->relarch != 'n')
892     {
893         archrel = vc_getarchrel(onerel);
894         /* Archive tuples from "empty" end-pages */
895         for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
896                                 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
897         {
898             if ( (*vpp)->vpd_noff > 0 )
899             {
900                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
901                 page = BufferGetPage(buf);
902                 Assert ( !PageIsEmpty(page) );
903                 vc_vacpage (page, *vpp, archrel);
904                 WriteBuffer (buf);
905             }
906         }
907     }
908     else
909         archrel = (Relation) NULL;
910
911     Nvpl.vpl_npages = 0;
912     Fnpages = Fvpl->vpl_npages;
913     Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
914     Fblklast = Fvplast->vpd_blkno;
915     Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
916     Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
917     Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
918     Vblklast = Vvplast->vpd_blkno;
919     Assert ( Vblklast >= Fblklast );
920     ToBuf = InvalidBuffer;
921     nmoved = 0;
922
923     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
924     vpc->vpd_nusd = vpc->vpd_noff = 0;
925         
926     nblocks = vacrelstats->npages;
927     for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
928     {
929         /* if it's reapped page and it was used by me - quit */
930         if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
931             break;
932
933         buf = ReadBuffer(onerel, blkno);
934         page = BufferGetPage(buf);
935
936         vpc->vpd_noff = 0;
937
938         isempty = PageIsEmpty(page);
939
940         dowrite = false;
941         if ( blkno == Vblklast )                /* it's reapped page */
942         {
943             if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
944             {                                   /* on this page - clean */
945                 Assert ( ! isempty );
946                 vc_vacpage (page, Vvplast, archrel);
947                 dowrite = true;
948             }
949             else
950                 Assert ( isempty );
951             Assert ( --Vnpages > 0 );
952             /* get prev reapped page from Vvpl */
953             Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
954             Vblklast = Vvplast->vpd_blkno;
955             if ( blkno == Fblklast )    /* this page in Fvpl too */
956             {
957                 Assert ( --Fnpages > 0 );
958                 Assert ( Fvplast->vpd_nusd == 0 );
959                 /* get prev reapped page from Fvpl */
960                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
961                 Fblklast = Fvplast->vpd_blkno;
962             }
963             Assert ( Fblklast <= Vblklast );
964             if ( isempty )
965             {
966                 ReleaseBuffer(buf);
967                 continue;
968             }
969         }
970         else
971         {
972             Assert ( ! isempty );
973         }
974
975         vpc->vpd_blkno = blkno;
976         maxoff = PageGetMaxOffsetNumber(page);
977         for (offnum = FirstOffsetNumber;
978                 offnum <= maxoff;
979                 offnum = OffsetNumberNext(offnum))
980         {
981             itemid = PageGetItemId(page, offnum);
982
983             if (!ItemIdIsUsed(itemid))
984                 continue;
985
986             htup = (HeapTuple) PageGetItem(page, itemid);
987             tlen = htup->t_len;
988                 
989             /* try to find new page for this tuple */
990             if ( ToBuf == InvalidBuffer ||
991                 ! vc_enough_space (ToVpd, tlen) )
992             {
993                 if ( ToBuf != InvalidBuffer )
994                 {
995                     WriteBuffer(ToBuf);
996                     ToBuf = InvalidBuffer;
997                     /*
998                      * If no one tuple can't be added to this page -
999                      * remove page from Fvpl. - vadim 11/27/96
1000                      */ 
1001                     if ( !vc_enough_space (ToVpd, vacrelstats->min_tlen) )
1002                     {
1003                         if ( ToVpd != Fvplast )
1004                         {
1005                             Assert ( Fnpages > ToVpI + 1 );
1006                             memmove (Fvpl->vpl_pgdesc + ToVpI, 
1007                                 Fvpl->vpl_pgdesc + ToVpI + 1, 
1008                                 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
1009                         }
1010                         Assert ( Fnpages >= 1 );
1011                         Fnpages--;
1012                         if ( Fnpages == 0 )
1013                             break;
1014                         /* get prev reapped page from Fvpl */
1015                         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1016                         Fblklast = Fvplast->vpd_blkno;
1017                     }
1018                 }
1019                 for (i=0; i < Fnpages; i++)
1020                 {
1021                     if ( vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
1022                         break;
1023                 }
1024                 if ( i == Fnpages )
1025                     break;                      /* can't move item anywhere */
1026                 ToVpI = i;
1027                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1028                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1029                 ToPage = BufferGetPage(ToBuf);
1030                 /* if this page was not used before - clean it */
1031                 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
1032                     vc_vacpage (ToPage, ToVpd, archrel);
1033             }
1034                 
1035             /* copy tuple */
1036             newtup = (HeapTuple) palloc (tlen);
1037             memmove((char *) newtup, (char *) htup, tlen);
1038
1039             /* store transaction information */
1040             TransactionIdStore(myXID, &(newtup->t_xmin));
1041             newtup->t_cmin = myCID;
1042             StoreInvalidTransactionId(&(newtup->t_xmax));
1043             newtup->t_tmin = INVALID_ABSTIME;
1044             newtup->t_tmax = CURRENT_ABSTIME;
1045             ItemPointerSetInvalid(&newtup->t_chain);
1046
1047             /* add tuple to the page */
1048             newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
1049                                 InvalidOffsetNumber, LP_USED);
1050             if ( newoff == InvalidOffsetNumber )
1051             {
1052                 elog (WARN, "\
1053 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1054                 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
1055                 ToVpd->vpd_nusd, ToVpd->vpd_noff);
1056             }
1057             newitemid = PageGetItemId(ToPage, newoff);
1058             pfree (newtup);
1059             newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1060             ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1061
1062             /* now logically delete end-tuple */
1063             TransactionIdStore(myXID, &(htup->t_xmax));
1064             htup->t_cmax = myCID;
1065             memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
1066
1067             ToVpd->vpd_nusd++;
1068             nmoved++;
1069             ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
1070             vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1071                 
1072             /* insert index' tuples if needed */
1073             if ( Irel != (Relation*) NULL )
1074             {
1075                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1076                 {
1077                     FormIndexDatum (
1078                                 idcur->natts,
1079                                 (AttrNumber *)&(idcur->tform->indkey[0]),
1080                                 newtup, 
1081                                 tupdesc,
1082                                 InvalidBuffer,
1083                                 idatum,
1084                                 inulls,
1085                                 idcur->finfoP);
1086                     iresult = index_insert (
1087                                 Irel[i],
1088                                 idatum,
1089                                 inulls,
1090                                 &(newtup->t_ctid),
1091                                 onerel);
1092                     if (iresult) pfree(iresult);
1093                 }
1094             }
1095                 
1096         } /* walk along page */
1097
1098         if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
1099         {
1100             vc_reappage (&Nvpl, vpc);
1101             WriteBuffer(buf);
1102         }
1103         else if ( dowrite )
1104             WriteBuffer(buf);
1105         else
1106             ReleaseBuffer(buf);
1107             
1108         if ( offnum <= maxoff )
1109             break;                              /* some item(s) left */
1110             
1111     } /* walk along relation */
1112         
1113     blkno++;                            /* new number of blocks */
1114
1115     if ( ToBuf != InvalidBuffer )
1116     {
1117         Assert (nmoved > 0);
1118         WriteBuffer(ToBuf);
1119     }
1120
1121     if ( nmoved > 0 )
1122     {
1123         /* 
1124          * We have to commit our tuple' movings before we'll truncate 
1125          * relation, but we shouldn't lose our locks. And so - quick hack: 
1126          * flush buffers and record status of current transaction
1127          * as committed, and continue. - vadim 11/13/96
1128          */
1129         FlushBufferPool(!TransactionFlushEnabled());
1130         TransactionIdCommit(myXID);
1131         FlushBufferPool(!TransactionFlushEnabled());
1132         myCTM = TransactionIdGetCommitTime(myXID);
1133     }
1134         
1135     /* 
1136      * Clean uncleaned reapped pages from Vvpl list 
1137      * and set commit' times  for inserted tuples
1138      */
1139     nchkmvd = 0;
1140     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1141     {
1142         Assert ( (*vpp)->vpd_blkno < blkno );
1143         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1144         page = BufferGetPage(buf);
1145         if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
1146         {
1147             /* noff == 0 in empty pages only - such pages should be re-used */
1148             Assert ( (*vpp)->vpd_noff > 0 );
1149             vc_vacpage (page, *vpp, archrel);
1150         }
1151         else                            /* this page was used */
1152         {
1153             ntups = 0;
1154             moff = PageGetMaxOffsetNumber(page);
1155             for (newoff = FirstOffsetNumber;
1156                         newoff <= moff;
1157                         newoff = OffsetNumberNext(newoff))
1158             {
1159                 itemid = PageGetItemId(page, newoff);
1160                 if (!ItemIdIsUsed(itemid))
1161                     continue;
1162                 htup = (HeapTuple) PageGetItem(page, itemid);
1163                 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1164                 {
1165                     htup->t_tmin = myCTM;
1166                     ntups++;
1167                 }
1168             }
1169             Assert ( (*vpp)->vpd_nusd == ntups );
1170             nchkmvd += ntups;
1171         }
1172         WriteBuffer (buf);
1173     }
1174     Assert ( nmoved == nchkmvd );
1175
1176     getrusage(RUSAGE_SELF, &ru1);
1177     
1178     elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
1179 Elapsed %u/%u sec.",
1180                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1181                 nblocks, blkno, nmoved,
1182                 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1183                 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1184
1185     if ( Nvpl.vpl_npages > 0 )
1186     {
1187         /* vacuum indices again if needed */
1188         if ( Irel != (Relation*) NULL )
1189         {
1190             VPageDescr *vpleft, *vpright, vpsave;
1191                 
1192             /* re-sort Nvpl.vpl_pgdesc */
1193             for (vpleft = Nvpl.vpl_pgdesc, 
1194                 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1195                 vpleft < vpright; vpleft++, vpright--)
1196             {
1197                 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1198             }
1199             for (i = 0; i < nindices; i++)
1200                 vc_vaconeind (&Nvpl, Irel[i], vacrelstats->ntups);
1201         }
1202
1203         /* 
1204          * clean moved tuples from last page in Nvpl list
1205          * if some tuples left there
1206          */
1207         if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1208         {
1209             Assert (vpc->vpd_blkno == blkno - 1);
1210             buf = ReadBuffer(onerel, vpc->vpd_blkno);
1211             page = BufferGetPage (buf);
1212             ntups = 0;
1213             maxoff = offnum;
1214             for (offnum = FirstOffsetNumber;
1215                         offnum < maxoff;
1216                         offnum = OffsetNumberNext(offnum))
1217             {
1218                 itemid = PageGetItemId(page, offnum);
1219                 if (!ItemIdIsUsed(itemid))
1220                     continue;
1221                 htup = (HeapTuple) PageGetItem(page, itemid);
1222                 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1223                 itemid->lp_flags &= ~LP_USED;
1224                 ntups++;
1225             }
1226             Assert ( vpc->vpd_noff == ntups );
1227             PageRepairFragmentation(page);
1228             WriteBuffer (buf);
1229         }
1230
1231         /* now - free new list of reapped pages */
1232         vpp = Nvpl.vpl_pgdesc;
1233         for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1234             pfree(*vpp);
1235         pfree (Nvpl.vpl_pgdesc);
1236     }
1237         
1238     /* truncate relation */
1239     if ( blkno < nblocks )
1240     {
1241         blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1242         Assert ( blkno >= 0 );
1243         vacrelstats->npages = blkno;    /* set new number of blocks */
1244     }
1245
1246     if ( archrel != (Relation) NULL )
1247         heap_close(archrel);
1248
1249     if ( Irel != (Relation*) NULL )     /* pfree index' allocations */
1250     {
1251         pfree (Idesc);
1252         pfree (idatum);
1253         pfree (inulls);
1254         vc_clsindices (nindices, Irel);
1255     }
1256
1257     pfree (vpc);
1258
1259 } /* vc_rpfheap */
1260
1261 /*
1262  *  vc_vacheap() -- free dead tuples
1263  *
1264  *      This routine marks dead tuples as unused and truncates relation
1265  *      if there are "empty" end-blocks.
1266  */
1267 static void
1268 vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1269 {
1270     Buffer buf;
1271     Page page;
1272     VPageDescr *vpp;
1273     Relation archrel;
1274     int nblocks;
1275     int i;
1276
1277     nblocks = Vvpl->vpl_npages;
1278     /* if the relation has an archive, open it */
1279     if (onerel->rd_rel->relarch != 'n')
1280         archrel = vc_getarchrel(onerel);
1281     else
1282     {
1283         archrel = (Relation) NULL;
1284         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1285     }
1286         
1287     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1288     {
1289         if ( (*vpp)->vpd_noff > 0 )
1290         {
1291             buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1292             page = BufferGetPage (buf);
1293             vc_vacpage (page, *vpp, archrel);
1294             WriteBuffer (buf);
1295         }
1296     }
1297
1298     /* truncate relation if there are some empty end-pages */
1299     if ( Vvpl->vpl_nemend > 0 )
1300     {
1301         Assert ( vacrelstats->npages >= Vvpl->vpl_nemend );
1302         nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1303         elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u.",
1304                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1305                 vacrelstats->npages, nblocks);
1306
1307         /* 
1308          * we have to flush "empty" end-pages (if changed, but who knows it)
1309          * before truncation 
1310          */
1311         FlushBufferPool(!TransactionFlushEnabled());
1312
1313         nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1314         Assert ( nblocks >= 0 );
1315         vacrelstats->npages = nblocks;  /* set new number of blocks */
1316     }
1317
1318     if ( archrel != (Relation) NULL )
1319         heap_close(archrel);
1320
1321 } /* vc_vacheap */
1322
1323 /*
1324  *  vc_vacpage() -- free (and archive if needed) dead tuples on a page
1325  *                   and repaire its fragmentation.
1326  */
1327 static void
1328 vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1329 {
1330     ItemId itemid;
1331     HeapTuple htup;
1332     int i;
1333     
1334     Assert ( vpd->vpd_nusd == 0 );
1335     for (i=0; i < vpd->vpd_noff; i++)
1336     {
1337         itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1338         if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1339         {
1340             htup = (HeapTuple) PageGetItem (page, itemid);
1341             vc_archive (archrel, htup);
1342         }
1343         itemid->lp_flags &= ~LP_USED;
1344     }
1345     PageRepairFragmentation(page);
1346
1347 } /* vc_vacpage */
1348
1349 /*
1350  *  _vc_scanoneind() -- scan one index relation to update statistic.
1351  *
1352  */
1353 static void
1354 vc_scanoneind (Relation indrel, int nhtups)
1355 {
1356     RetrieveIndexResult res;
1357     IndexScanDesc iscan;
1358     int nitups;
1359     int nipages;
1360     struct rusage ru0, ru1;
1361
1362     getrusage(RUSAGE_SELF, &ru0);
1363
1364     /* walk through the entire index */
1365     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1366     nitups = 0;
1367
1368     while ((res = index_getnext(iscan, ForwardScanDirection))
1369                                         != (RetrieveIndexResult) NULL)
1370     {
1371         nitups++;
1372         pfree(res);
1373     }
1374
1375     index_endscan(iscan);
1376
1377     /* now update statistics in pg_class */
1378     nipages = RelationGetNumberOfBlocks(indrel);
1379     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1380
1381     getrusage(RUSAGE_SELF, &ru1);
1382
1383     elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1384         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, 
1385         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1386         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1387
1388     if ( nitups != nhtups )
1389         elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1390                 nitups, nhtups);
1391
1392 } /* vc_scanoneind */
1393
1394 /*
1395  *  vc_vaconeind() -- vacuum one index relation.
1396  *
1397  *      Vpl is the VPageList of the heap we're currently vacuuming.
1398  *      It's locked. Indrel is an index relation on the vacuumed heap. 
1399  *      We don't set locks on the index relation here, since the indexed 
1400  *      access methods support locking at different granularities. 
1401  *      We let them handle it.
1402  *
1403  *      Finally, we arrange to update the index relation's statistics in
1404  *      pg_class.
1405  */
1406 static void
1407 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1408 {
1409     RetrieveIndexResult res;
1410     IndexScanDesc iscan;
1411     ItemPointer heapptr;
1412     int nvac;
1413     int nitups;
1414     int nipages;
1415     VPageDescr vp;
1416     struct rusage ru0, ru1;
1417
1418     getrusage(RUSAGE_SELF, &ru0);
1419
1420     /* walk through the entire index */
1421     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1422     nvac = 0;
1423     nitups = 0;
1424
1425     while ((res = index_getnext(iscan, ForwardScanDirection))
1426            != (RetrieveIndexResult) NULL) {
1427         heapptr = &res->heap_iptr;
1428
1429         if ( (vp = vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1430         {
1431 #if 0
1432             elog(DEBUG, "<%x,%x> -> <%x,%x>",
1433                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1434                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1435                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1436                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1437 #endif
1438             if ( vp->vpd_noff == 0 ) 
1439             {                           /* this is EmptyPage !!! */
1440                 elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
1441                         NAMEDATALEN, indrel->rd_rel->relname.data,
1442                         vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1443             }
1444             ++nvac;
1445             index_delete(indrel, &res->index_iptr);
1446         } else {
1447             nitups++;
1448         }
1449
1450         /* be tidy */
1451         pfree(res);
1452     }
1453
1454     index_endscan(iscan);
1455
1456     /* now update statistics in pg_class */
1457     nipages = RelationGetNumberOfBlocks(indrel);
1458     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1459
1460     getrusage(RUSAGE_SELF, &ru1);
1461
1462     elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1463         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
1464         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1465         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1466
1467     if ( nitups != nhtups )
1468         elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1469                 nitups, nhtups);
1470
1471 } /* vc_vaconeind */
1472
1473 /*
1474  *  vc_tidreapped() -- is a particular tid reapped?
1475  *
1476  *      vpl->VPageDescr_array is sorted in right order.
1477  */
1478 static VPageDescr
1479 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1480 {
1481     OffsetNumber ioffno;
1482     OffsetNumber *voff;
1483     VPageDescr vp, *vpp;
1484     VPageDescrData vpd;
1485
1486     vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1487     ioffno = ItemPointerGetOffsetNumber(itemptr);
1488         
1489     vp = &vpd;
1490     vpp = (VPageDescr*) vc_find_eq ((char*)(vpl->vpl_pgdesc), 
1491                 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
1492                 vc_cmp_blk);
1493
1494     if ( vpp == (VPageDescr*) NULL )
1495         return ((VPageDescr)NULL);
1496     vp = *vpp;
1497
1498     /* ok - we are on true page */
1499
1500     if ( vp->vpd_noff == 0 ) {          /* this is EmptyPage !!! */
1501         return (vp);
1502     }
1503     
1504     voff = (OffsetNumber*) vc_find_eq ((char*)(vp->vpd_voff), 
1505                 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
1506                 vc_cmp_offno);
1507
1508     if ( voff == (OffsetNumber*) NULL )
1509         return ((VPageDescr)NULL);
1510
1511     return (vp);
1512
1513 } /* vc_tidreapped */
1514
1515 /*
1516  *  vc_attrstats() -- compute column statistics used by the optimzer
1517  *
1518  *  We compute the column min, max, null and non-null counts.
1519  *  Plus we attempt to find the count of the value that occurs most
1520  *  frequently in each column
1521  *  These figures are used to compute the selectivity of the column
1522  *
1523  *  We use a three-bucked cache to get the most frequent item
1524  *  The 'guess' buckets count hits.  A cache miss causes guess1
1525  *  to get the most hit 'guess' item in the most recent cycle, and
1526  *  the new item goes into guess2.  Whenever the total count of hits
1527  *  of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1528  *
1529  *  This method works perfectly for columns with unique values, and columns
1530  *  with only two unique values, plus nulls.
1531  *
1532  *  It becomes less perfect as the number of unique values increases and
1533  *  their distribution in the table becomes more random.
1534  *
1535  */
1536 static void
1537 vc_attrstats(Relation onerel, VacAttrStats *vacattrstats, HeapTuple htup)
1538 {
1539     int i, attr_cnt;
1540     AttributeTupleForm *attr;
1541     TupleDesc tupDesc;
1542     Datum value;
1543     bool isnull;
1544
1545     attr_cnt = onerel->rd_att->natts;
1546     attr = onerel->rd_att->attrs;
1547     tupDesc = onerel->rd_att;
1548
1549     for (i = 0; i < attr_cnt; i++) {
1550         VacAttrStats *stats = &vacattrstats[i];
1551         bool value_hit = true;
1552
1553         value = (Datum) heap_getattr(htup, InvalidBuffer, i+1, tupDesc, &isnull);
1554
1555         if (!VacAttrStatsEqValid(stats))
1556                 continue;
1557
1558         if (isnull)
1559             stats->null_cnt++;
1560         else {
1561             stats->nonnull_cnt++;
1562             if (stats->initialized == false) {
1563                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1564                 /* best_cnt gets incremented later */
1565                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1566                 stats->guess1_cnt = stats->guess1_hits = 1;
1567                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1568                 stats->guess2_hits = 1;
1569                 if (VacAttrStatsLtGtValid(stats)) {
1570                     vc_bucketcpy(stats->attr, value, &stats->max , &stats->max_len);
1571                     vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1572                 }
1573                 stats->initialized = true;
1574             }
1575             if (VacAttrStatsLtGtValid(stats) && fmgr(stats->cmplt,value,stats->min)) {
1576                 vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1577                 stats->min_cnt = 0;
1578             }
1579             if (VacAttrStatsLtGtValid(stats) && fmgr(stats->cmpgt,value,stats->max)) {
1580                 vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1581                 stats->max_cnt = 0;
1582             }
1583             if (VacAttrStatsLtGtValid(stats) && fmgr(stats->cmpeq,value,stats->min))
1584                 stats->min_cnt++;
1585             else if (VacAttrStatsLtGtValid(stats) && fmgr(stats->cmpeq,value,stats->max))
1586                 stats-> max_cnt++;
1587             if (fmgr(stats->cmpeq,value,stats->best))
1588                 stats-> best_cnt++;
1589             else if (fmgr(stats->cmpeq,value,stats->guess1)) {
1590                 stats->guess1_cnt++;
1591                 stats-> guess1_hits++;
1592             }
1593             else if (fmgr(stats->cmpeq,value,stats->guess2))
1594                 stats->guess2_hits++;
1595             else value_hit = false;
1596
1597             if (stats->guess2_hits > stats->guess1_hits) {
1598                 swapDatum(stats->guess1,stats->guess2);
1599                 swapInt(stats->guess1_len,stats->guess2_len);
1600                 stats->guess1_cnt = stats->guess2_hits;
1601                 swapLong(stats->guess1_hits, stats->guess2_hits);
1602             }
1603             if (stats->guess1_cnt > stats->best_cnt) {
1604                 swapDatum(stats->best,stats->guess1);
1605                 swapLong(stats->best_cnt,stats->guess1_cnt);
1606                 stats->guess1_hits = 1;
1607                 stats-> guess2_hits = 1;
1608             }
1609             if (!value_hit) {
1610                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1611                 stats->guess1_hits = 1;
1612                 stats-> guess2_hits = 1;
1613             }
1614         }
1615     }
1616     return;
1617 }
1618
1619 /*
1620  *  vc_bucketcpy() -- update pg_class statistics for one relation
1621  *
1622  */
1623 static void
1624 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1625 {
1626     if (attr->attbyval && attr->attlen != -1)
1627         *bucket = value;
1628     else {
1629         int len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1630
1631         if (len > *bucket_len)
1632         {
1633             if (*bucket_len != 0)
1634                 pfree(DatumGetPointer(*bucket));
1635             *bucket = PointerGetDatum(palloc(len));
1636             *bucket_len = len;
1637         }
1638         memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1639     }
1640 }
1641
1642 /*
1643  *  vc_updstats() -- update pg_class statistics for one relation
1644  *
1645  *      This routine works for both index and heap relation entries in
1646  *      pg_class.  We violate no-overwrite semantics here by storing new
1647  *      values for ntups, npages, and hasindex directly in the pg_class
1648  *      tuple that's already on the page.  The reason for this is that if
1649  *      we updated these tuples in the usual way, then every tuple in pg_class
1650  *      would be replaced every day.  This would make planning and executing
1651  *      historical queries very expensive.
1652  */
1653 static void
1654 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VacAttrStats *vacattrstats)
1655 {
1656     Relation rd, ad, sd;
1657     HeapScanDesc rsdesc, asdesc;
1658     TupleDesc sdesc;
1659     HeapTuple rtup, atup, stup;
1660     Buffer rbuf, abuf;
1661     Form_pg_class pgcform;
1662     ScanKeyData rskey, askey;
1663     AttributeTupleForm attp;
1664
1665     /*
1666      * update number of tuples and number of pages in pg_class
1667      */
1668     ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1669                            ObjectIdEqualRegProcedure,
1670                            ObjectIdGetDatum(relid));
1671
1672     rd = heap_openr(RelationRelationName);
1673     rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1674
1675     if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1676         elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1677                    relid);
1678
1679     /* overwrite the existing statistics in the tuple */
1680     vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1681     pgcform = (Form_pg_class) GETSTRUCT(rtup);
1682     pgcform->reltuples = ntups;
1683     pgcform->relpages = npages;
1684     pgcform->relhasindex = hasindex;
1685
1686     if (vacattrstats != NULL)
1687     {
1688         ad = heap_openr(AttributeRelationName);
1689         sd = heap_openr(StatisticRelationName);
1690         ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1691                    F_INT4EQ, relid);
1692
1693         asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1694
1695         while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf))) {
1696             int slot, i;
1697             double selratio;  /* average ratio of rows selected for a random constant */
1698             VacAttrStats *stats;
1699             Datum values[ Natts_pg_statistic ];
1700             char nulls[ Natts_pg_statistic ];
1701
1702             attp = (AttributeTupleForm) GETSTRUCT(atup);
1703             slot = attp->attnum - 1;
1704             if (slot < 0) /* skip system attributes for now,
1705                            they are unique anyway */
1706                 continue;            
1707             stats = &vacattrstats[slot];
1708
1709             /* overwrite the existing statistics in the tuple */
1710             if (VacAttrStatsEqValid(stats)) {
1711
1712                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1713
1714                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1715                     (stats->null_cnt <= 1 && stats->best_cnt == 1))
1716                     selratio = 0;
1717                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1718                 {
1719                     double min_cnt_d = stats->min_cnt, 
1720                            max_cnt_d = stats->max_cnt, 
1721                            null_cnt_d = stats->null_cnt,
1722                            nonnullcnt_d = stats->nonnull_cnt; /* prevent overflow */
1723                     selratio = (min_cnt_d*min_cnt_d+max_cnt_d*max_cnt_d+null_cnt_d*null_cnt_d)/
1724                                 (nonnullcnt_d+null_cnt_d)/(nonnullcnt_d+null_cnt_d);
1725                 }
1726                 else {
1727                     double most = (double)(stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1728                     double total = ((double)stats->nonnull_cnt)+((double)stats->null_cnt);
1729                     /* we assume count of other values are 20% 
1730                        of best count in table */
1731                     selratio = (most*most + 0.20*most*(total-most))/total/total;
1732                 }
1733                 if (selratio > 1.0)
1734                     selratio = 1.0;
1735                 attp->attnvals = (selratio ? (selratio * ATTNVALS_SCALE) : 0);
1736                 WriteNoReleaseBuffer(abuf);
1737
1738                 /* DO PG_STATISTIC INSERTS */
1739
1740                 /* doing system relations, especially pg_statistic is a problem */
1741                 if (VacAttrStatsLtGtValid(stats) && stats->initialized /* &&
1742                         !IsSystemRelationName(pgcform->relname.data)*/) {
1743                     func_ptr out_function;
1744                     char *out_string;
1745                     int dummy;
1746                 
1747                     for (i = 0; i < Natts_pg_statistic; ++i) nulls[i] = ' ';
1748
1749                     /* ----------------
1750                      *  initialize values[]
1751                      * ----------------
1752                      */
1753                     i = 0;
1754                     values[i++] = (Datum) relid;        /* 1 */
1755                     values[i++] = (Datum) attp->attnum; /* 2 */
1756                     values[i++] = (Datum) InvalidOid;   /* 3 */
1757                     fmgr_info(stats->outfunc, &out_function, &dummy);
1758                     out_string = (*out_function)(stats->min, stats->attr->atttypid);
1759                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1760                     pfree(out_string);
1761                     out_string = (char *)(*out_function)(stats->max, stats->attr->atttypid);
1762                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1763                     pfree(out_string);
1764
1765                     sdesc = sd->rd_att;
1766         
1767                     stup = heap_formtuple(sdesc, values, nulls);
1768
1769                     /* ----------------
1770                      *  insert the tuple in the relation and get the tuple's oid.
1771                      * ----------------
1772                      */
1773                     heap_insert(sd, stup);
1774                     pfree(DatumGetPointer(values[3]));
1775                     pfree(DatumGetPointer(values[4]));
1776                     pfree(stup);
1777                 }
1778             }
1779         }
1780         heap_endscan(asdesc);
1781         heap_close(ad);
1782         heap_close(sd);
1783     }
1784  
1785     /* XXX -- after write, should invalidate relcache in other backends */
1786     WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1787
1788     /* invalidating system relations confuses the function cache
1789        of pg_operator and pg_opclass */
1790     if ( !IsSystemRelationName(pgcform->relname.data))
1791         RelationInvalidateHeapTuple(rd, rtup);
1792
1793     /* that's all, folks */
1794     heap_endscan(rsdesc);
1795     heap_close(rd);
1796
1797 }
1798
1799 /*
1800  *  vc_delhilowstats() -- delete pg_statistics rows
1801  *
1802  */
1803 static void
1804 vc_delhilowstats(Oid relid)
1805 {
1806     Relation pgstatistic;
1807     HeapScanDesc pgsscan;
1808     HeapTuple pgstup;
1809     ScanKeyData  pgskey;
1810
1811     pgstatistic = heap_openr(StatisticRelationName);
1812
1813     if (relid != InvalidOid ) {
1814         ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1815                                    ObjectIdEqualRegProcedure,
1816                                    ObjectIdGetDatum(relid));
1817         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
1818     }
1819     else
1820         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
1821
1822     while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL))) {
1823         heap_delete(pgstatistic, &pgstup->t_ctid);
1824     }
1825
1826     heap_endscan(pgsscan);
1827     heap_close(pgstatistic);
1828 }
1829
1830 static void vc_setpagelock(Relation rel, BlockNumber blkno)
1831 {
1832     ItemPointerData itm;
1833
1834     ItemPointerSet(&itm, blkno, 1);
1835
1836     RelationSetLockForWritePage(rel, &itm);
1837 }
1838
1839
1840 /*
1841  *  vc_reappage() -- save a page on the array of reapped pages.
1842  *
1843  *      As a side effect of the way that the vacuuming loop for a given
1844  *      relation works, higher pages come after lower pages in the array
1845  *      (and highest tid on a page is last).
1846  */
1847 static void 
1848 vc_reappage(VPageList vpl, VPageDescr vpc)
1849 {
1850     VPageDescr newvpd;
1851
1852     /* allocate a VPageDescrData entry */
1853     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1854
1855     /* fill it in */
1856     if ( vpc->vpd_noff > 0 )
1857         memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1858     newvpd->vpd_blkno = vpc->vpd_blkno;
1859     newvpd->vpd_free = vpc->vpd_free;
1860     newvpd->vpd_nusd = vpc->vpd_nusd;
1861     newvpd->vpd_noff = vpc->vpd_noff;
1862
1863     /* insert this page into vpl list */
1864     vc_vpinsert (vpl, newvpd);
1865     
1866 } /* vc_reappage */
1867
1868 static void
1869 vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1870 {
1871
1872     /* allocate a VPageDescr entry if needed */
1873     if ( vpl->vpl_npages == 0 )
1874         vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1875     else if ( vpl->vpl_npages % 100 == 0 )
1876         vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1877     vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1878     (vpl->vpl_npages)++;
1879     
1880 }
1881
1882 static void
1883 vc_free(Portal p, VRelList vrl)
1884 {
1885     VRelList p_vrl;
1886     MemoryContext old;
1887     PortalVariableMemory pmem;
1888
1889     pmem = PortalGetVariableMemory(p);
1890     old = MemoryContextSwitchTo((MemoryContext)pmem);
1891
1892     while (vrl != (VRelList) NULL) {
1893
1894         /* free rel list entry */
1895         p_vrl = vrl;
1896         vrl = vrl->vrl_next;
1897         pfree(p_vrl);
1898     }
1899
1900     (void) MemoryContextSwitchTo(old);
1901 }
1902
1903 /*
1904  *  vc_getarchrel() -- open the archive relation for a heap relation
1905  *
1906  *      The archive relation is named 'a,XXXXX' for the heap relation
1907  *      whose relid is XXXXX.
1908  */
1909
1910 #define ARCHIVE_PREFIX  "a,"
1911
1912 static Relation
1913 vc_getarchrel(Relation heaprel)
1914 {
1915     Relation archrel;
1916     char *archrelname;
1917
1918     archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
1919     sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
1920
1921     archrel = heap_openr(archrelname);
1922
1923     pfree(archrelname);
1924     return (archrel);
1925 }
1926
1927 /*
1928  *  vc_archive() -- write a tuple to an archive relation
1929  *
1930  *      In the future, this will invoke the archived accessd method.  For
1931  *      now, archive relations are on mag disk.
1932  */
1933 static void
1934 vc_archive(Relation archrel, HeapTuple htup)
1935 {
1936     doinsert(archrel, htup);
1937 }
1938
1939 static bool
1940 vc_isarchrel(char *rname)
1941 {
1942     if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
1943         return (true);
1944
1945     return (false);
1946 }
1947
1948 static char *
1949 vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
1950 {
1951     int res;
1952     int last = nelem - 1;
1953     int celm = nelem / 2;
1954     bool last_move, first_move;
1955     
1956     last_move = first_move = true;
1957     for ( ; ; )
1958     {
1959         if ( first_move == true )
1960         {
1961             res = compar (bot, elm);
1962             if ( res > 0 )
1963                 return (NULL);
1964             if ( res == 0 )
1965                 return (bot);
1966             first_move = false;
1967         }
1968         if ( last_move == true )
1969         {
1970             res = compar (elm, bot + last*size);
1971             if ( res > 0 )
1972                 return (NULL);
1973             if ( res == 0 )
1974                 return (bot + last*size);
1975             last_move = false;
1976         }
1977         res = compar (elm, bot + celm*size);
1978         if ( res == 0 )
1979             return (bot + celm*size);
1980         if ( res < 0 )
1981         {
1982             if ( celm == 0 )
1983                 return (NULL);
1984             last = celm - 1;
1985             celm = celm / 2;
1986             last_move = true;
1987             continue;
1988         }
1989         
1990         if ( celm == last )
1991             return (NULL);
1992             
1993         last = last - celm - 1;
1994         bot = bot + (celm+1)*size;
1995         celm = (last + 1) / 2;
1996         first_move = true;
1997     }
1998
1999 } /* vc_find_eq */
2000
2001 static int 
2002 vc_cmp_blk (char *left, char *right)
2003 {
2004     BlockNumber lblk, rblk;
2005
2006     lblk = (*((VPageDescr*)left))->vpd_blkno;
2007     rblk = (*((VPageDescr*)right))->vpd_blkno;
2008
2009     if ( lblk < rblk )
2010         return (-1);
2011     if ( lblk == rblk )
2012         return (0);
2013     return (1);
2014
2015 } /* vc_cmp_blk */
2016
2017 static int 
2018 vc_cmp_offno (char *left, char *right)
2019 {
2020
2021     if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
2022         return (-1);
2023     if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
2024         return (0);
2025     return (1);
2026
2027 } /* vc_cmp_offno */
2028
2029
2030 static void
2031 vc_getindices (Oid relid, int *nindices, Relation **Irel)
2032 {
2033     Relation pgindex;
2034     Relation irel;
2035     TupleDesc pgidesc;
2036     HeapTuple pgitup;
2037     HeapScanDesc pgiscan;
2038     Datum d;
2039     int i, k;
2040     bool n;
2041     ScanKeyData pgikey;
2042     Oid *ioid;
2043
2044     *nindices = i = 0;
2045     
2046     ioid = (Oid *) palloc(10*sizeof(Oid));
2047
2048     /* prepare a heap scan on the pg_index relation */
2049     pgindex = heap_openr(IndexRelationName);
2050     pgidesc = RelationGetTupleDescriptor(pgindex);
2051
2052     ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2053                            ObjectIdEqualRegProcedure,
2054                            ObjectIdGetDatum(relid));
2055
2056     pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2057
2058     while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
2059         d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2060                                  pgidesc, &n);
2061         i++;
2062         if ( i % 10 == 0 )
2063             ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
2064         ioid[i-1] = DatumGetObjectId(d);
2065     }
2066
2067     heap_endscan(pgiscan);
2068     heap_close(pgindex);
2069
2070     if ( i == 0 ) {     /* No one index found */
2071         pfree(ioid);
2072         return;
2073     }
2074
2075     if ( Irel != (Relation **) NULL )
2076         *Irel = (Relation *) palloc(i * sizeof(Relation));
2077     
2078     for (k = 0; i > 0; )
2079     {
2080         irel = index_open(ioid[--i]);
2081         if ( irel != (Relation) NULL )
2082         {
2083             if ( Irel != (Relation **) NULL )
2084                 (*Irel)[k] = irel;
2085             else
2086                 index_close (irel);
2087             k++;
2088         }
2089         else
2090             elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2091     }
2092     *nindices = k;
2093     pfree(ioid);
2094
2095     if ( Irel != (Relation **) NULL && *nindices == 0 )
2096     {
2097         pfree (*Irel);
2098         *Irel = (Relation *) NULL;
2099     }
2100
2101 } /* vc_getindices */
2102
2103
2104 static void
2105 vc_clsindices (int nindices, Relation *Irel)
2106 {
2107
2108     if ( Irel == (Relation*) NULL )
2109         return;
2110
2111     while (nindices--) {
2112         index_close (Irel[nindices]);
2113     }
2114     pfree (Irel);
2115
2116 } /* vc_clsindices */
2117
2118
2119 static void
2120 vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2121 {
2122     IndDesc *idcur;
2123     HeapTuple pgIndexTup;
2124     AttrNumber *attnumP;
2125     int natts;
2126     int i;
2127
2128     *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
2129     
2130     for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
2131         pgIndexTup =
2132                 SearchSysCacheTuple(INDEXRELID,
2133                                 ObjectIdGetDatum(Irel[i]->rd_id),
2134                                 0,0,0);
2135         Assert(pgIndexTup);
2136         idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
2137         for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2138                 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2139                 attnumP++, natts++);
2140         if (idcur->tform->indproc != InvalidOid) {
2141             idcur->finfoP = &(idcur->finfo);
2142             FIgetnArgs(idcur->finfoP) = natts;
2143             natts = 1;
2144             FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2145             *(FIgetname(idcur->finfoP)) = '\0';
2146         } else
2147             idcur->finfoP = (FuncIndexInfo *) NULL;
2148         
2149         idcur->natts = natts;
2150     }
2151     
2152 } /* vc_mkindesc */
2153
2154
2155 static bool
2156 vc_enough_space (VPageDescr vpd, Size len)
2157 {
2158
2159     len = DOUBLEALIGN(len);
2160
2161     if ( len > vpd->vpd_free )
2162         return (false);
2163     
2164     if ( vpd->vpd_nusd < vpd->vpd_noff )        /* there are free itemid(s) */
2165         return (true);                          /* and len <= free_space */
2166     
2167     /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2168     if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
2169         return (true);
2170     
2171     return (false);
2172     
2173 } /* vc_enough_space */