]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Rename pg_attribute.attnvals to attdisbursion.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *    the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.41 1997/08/21 03:01:36 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <utils/builtins.h>
46 #include <commands/vacuum.h>
47 #include <parser/catalog_utils.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 # include <rusagestub.h>
52 #else 
53 # include <sys/time.h>
54 # include <sys/resource.h>
55 #endif 
56
57 #include <port-protos.h>
58
59 bool VacuumRunning =    false;
60
61 static Portal vc_portal;
62
63 static int MESSAGE_LEVEL;       /* message level */
64
65 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
66 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
67 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
68 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
69 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
70                                    stats->f_cmpgt != NULL && \
71                                    RegProcedureIsValid(stats->outfunc) )
72   
73
74 /* non-export function prototypes */
75 static void vc_init(void);
76 static void vc_shutdown(void);
77 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
78 static VRelList vc_getrels(NameData *VacRelP);
79 static void vc_vacone (Oid relid, bool analyze, List *va_cols);
80 static void vc_scanheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
81 static void vc_rpfheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
82 static void vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList vpl);
83 static void vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
84 static void vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
85 static void vc_scanoneind (Relation indrel, int nhtups);
86 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
87 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
88 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
89 static void vc_delhilowstats (Oid relid, int attcnt, int *attnums);
90 static void vc_setpagelock(Relation rel, BlockNumber blkno);
91 static VPageDescr vc_tidreapped (ItemPointer itemptr, VPageList vpl);
92 static void vc_reappage (VPageList vpl, VPageDescr vpc);
93 static void vc_vpinsert (VPageList vpl, VPageDescr vpnew);
94 static void vc_free(VRelList vrl);
95 static void vc_getindices (Oid relid, int *nindices, Relation **Irel);
96 static void vc_clsindices (int nindices, Relation *Irel);
97 static Relation vc_getarchrel(Relation heaprel);
98 static void vc_archive(Relation archrel, HeapTuple htup);
99 static bool vc_isarchrel(char *rname);
100 static void vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
101 static char * vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
102 static int vc_cmp_blk (char *left, char *right);
103 static int vc_cmp_offno (char *left, char *right);
104 static bool vc_enough_space (VPageDescr vpd, Size len);
105
106 void
107 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
108 {
109     char *pname;
110     MemoryContext old;
111     PortalVariableMemory pmem;
112     NameData VacRel;
113     List *le;
114     List *va_cols = NIL;
115
116     /*
117      *  Create a portal for safe memory across transctions.  We need to
118      *  palloc the name space for it because our hash function expects
119      *  the name to be on a longword boundary.  CreatePortal copies the
120      *  name to safe storage for us.
121      */
122     pname = (char *) palloc(strlen(VACPNAME) + 1);
123     strcpy(pname, VACPNAME);
124     vc_portal = CreatePortal(pname);
125     pfree(pname);
126
127     if (verbose)
128         MESSAGE_LEVEL = NOTICE;
129     else
130         MESSAGE_LEVEL = DEBUG;
131
132     /* vacrel gets de-allocated on transaction commit */
133     if (vacrel)
134         strcpy(VacRel.data,vacrel);
135
136     pmem = PortalGetVariableMemory(vc_portal);
137     old = MemoryContextSwitchTo((MemoryContext)pmem);
138
139     Assert ( va_spec == NIL || analyze );
140     foreach (le, va_spec)
141     {
142         char *col = (char*)lfirst(le);
143         char *dest;
144         
145         dest = (char*) palloc (strlen (col) + 1);
146         strcpy (dest, col);
147         va_cols = lappend (va_cols, dest);
148     }
149     MemoryContextSwitchTo(old);
150         
151     /* initialize vacuum cleaner */
152     vc_init();
153
154     /* vacuum the database */
155     if (vacrel)
156         vc_vacuum (&VacRel, analyze, va_cols);
157     else
158         vc_vacuum (NULL, analyze, NIL);
159
160     PortalDestroy (&vc_portal);
161
162     /* clean up */
163     vc_shutdown();
164 }
165
166 /*
167  *  vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
168  *
169  *      We run exactly one vacuum cleaner at a time.  We use the file system
170  *      to guarantee an exclusive lock on vacuuming, since a single vacuum
171  *      cleaner instantiation crosses transaction boundaries, and we'd lose
172  *      postgres-style locks at the end of every transaction.
173  *
174  *      The strangeness with committing and starting transactions in the
175  *      init and shutdown routines is due to the fact that the vacuum cleaner
176  *      is invoked via a sql command, and so is already executing inside
177  *      a transaction.  We need to leave ourselves in a predictable state
178  *      on entry and exit to the vacuum cleaner.  We commit the transaction
179  *      started in PostgresMain() inside vc_init(), and start one in
180  *      vc_shutdown() to match the commit waiting for us back in
181  *      PostgresMain().
182  */
183 static void
184 vc_init()
185 {
186     int fd;
187
188     if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
189         elog(WARN, "can't create lock file -- another vacuum cleaner running?");
190
191     close(fd);
192
193     /*
194      *  By here, exclusive open on the lock file succeeded.  If we abort
195      *  for any reason during vacuuming, we need to remove the lock file.
196      *  This global variable is checked in the transaction manager on xact
197      *  abort, and the routine vc_abort() is called if necessary.
198      */
199
200     VacuumRunning = true;
201
202     /* matches the StartTransaction in PostgresMain() */
203     CommitTransactionCommand();
204 }
205
206 static void
207 vc_shutdown()
208 {
209     /* on entry, not in a transaction */
210     if (unlink("pg_vlock") < 0)
211         elog(WARN, "vacuum: can't destroy lock file!");
212
213     /* okay, we're done */
214     VacuumRunning = false;
215
216     /* matches the CommitTransaction in PostgresMain() */
217     StartTransactionCommand();
218
219 }
220
221 void
222 vc_abort()
223 {
224     /* on abort, remove the vacuum cleaner lock file */
225     unlink("pg_vlock");
226
227     VacuumRunning = false;
228 }
229
230 /*
231  *  vc_vacuum() -- vacuum the database.
232  *
233  *      This routine builds a list of relations to vacuum, and then calls
234  *      code that vacuums them one at a time.  We are careful to vacuum each
235  *      relation in a separate transaction in order to avoid holding too many
236  *      locks at one time.
237  */
238 static void
239 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
240 {
241     VRelList vrl, cur;
242
243     /* get list of relations */
244     vrl = vc_getrels(VacRelP);
245
246     if ( analyze && VacRelP == NULL && vrl != NULL )
247         vc_delhilowstats (InvalidOid, 0, NULL);
248         
249     /* vacuum each heap relation */
250     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
251         vc_vacone (cur->vrl_relid, analyze, va_cols);
252
253     vc_free(vrl);
254 }
255
256 static VRelList
257 vc_getrels(NameData *VacRelP)
258 {
259     Relation pgclass;
260     TupleDesc pgcdesc;
261     HeapScanDesc pgcscan;
262     HeapTuple pgctup;
263     Buffer buf;
264     PortalVariableMemory portalmem;
265     MemoryContext old;
266     VRelList vrl, cur;
267     Datum d;
268     char *rname;
269     char rkind;
270     int16 smgrno;
271     bool n;
272     ScanKeyData  pgckey;
273     bool found = false;
274
275     StartTransactionCommand();
276
277     if (VacRelP->data) {
278         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
279                                NameEqualRegProcedure, 
280                                PointerGetDatum(VacRelP->data));
281     } else {
282         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
283                                CharacterEqualRegProcedure, CharGetDatum('r'));
284     }
285  
286     portalmem = PortalGetVariableMemory(vc_portal);
287     vrl = cur = (VRelList) NULL;
288
289     pgclass = heap_openr(RelationRelationName);
290     pgcdesc = RelationGetTupleDescriptor(pgclass);
291
292     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
293
294     while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
295
296         found = true;
297         
298         /*
299          *  We have to be careful not to vacuum the archive (since it
300          *  already contains vacuumed tuples), and not to vacuum
301          *  relations on write-once storage managers like the Sony
302          *  jukebox at Berkeley.
303          */
304
305         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
306                                  pgcdesc, &n);
307         rname = (char*)d;
308
309         /* skip archive relations */
310         if (vc_isarchrel(rname)) {
311             ReleaseBuffer(buf);
312             continue;
313         }
314
315         /* don't vacuum large objects for now - something breaks when we do */
316         if ( (strlen(rname) >= 5) && rname[0] == 'x' &&
317                 rname[1] == 'i' && rname[2] == 'n' &&
318                 (rname[3] == 'v' || rname[3] == 'x') &&
319                 rname[4] >= '0' && rname[4] <= '9')
320         {
321             elog (NOTICE, "Rel %s: can't vacuum LargeObjects now", 
322                         rname);
323             ReleaseBuffer(buf);
324             continue;
325         }
326
327         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
328                                  pgcdesc, &n);
329         smgrno = DatumGetInt16(d);
330
331         /* skip write-once storage managers */
332         if (smgriswo(smgrno)) {
333             ReleaseBuffer(buf);
334             continue;
335         }
336
337         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
338                                  pgcdesc, &n);
339
340         rkind = DatumGetChar(d);
341
342         /* skip system relations */
343         if (rkind != 'r') {
344             ReleaseBuffer(buf);
345             elog(NOTICE, "Vacuum: can not process index and certain system tables" );
346             continue;
347         }
348                                  
349         /* get a relation list entry for this guy */
350         old = MemoryContextSwitchTo((MemoryContext)portalmem);
351         if (vrl == (VRelList) NULL) {
352             vrl = cur = (VRelList) palloc(sizeof(VRelListData));
353         } else {
354             cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
355             cur = cur->vrl_next;
356         }
357         MemoryContextSwitchTo(old);
358
359         cur->vrl_relid = pgctup->t_oid;
360         cur->vrl_next = (VRelList) NULL;
361
362         /* wei hates it if you forget to do this */
363         ReleaseBuffer(buf);
364     }
365     if (found == false)
366         elog(NOTICE, "Vacuum: table not found" );
367
368     
369     heap_endscan(pgcscan);
370     heap_close(pgclass);
371
372     CommitTransactionCommand();
373
374     return (vrl);
375 }
376
377 /*
378  *  vc_vacone() -- vacuum one heap relation
379  *
380  *      This routine vacuums a single heap, cleans out its indices, and
381  *      updates its statistics npages and ntups statistics.
382  *
383  *      Doing one heap at a time incurs extra overhead, since we need to
384  *      check that the heap exists again just before we vacuum it.  The
385  *      reason that we do this is so that vacuuming can be spread across
386  *      many small transactions.  Otherwise, two-phase locking would require
387  *      us to lock the entire database during one pass of the vacuum cleaner.
388  */
389 static void
390 vc_vacone (Oid relid, bool analyze, List *va_cols)
391 {
392     Relation pgclass;
393     TupleDesc pgcdesc;
394     HeapTuple pgctup, pgttup;
395     Buffer pgcbuf;
396     HeapScanDesc pgcscan;
397     Relation onerel;
398     ScanKeyData pgckey;
399     VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
400     VPageListData Fvpl; /* List of pages with space enough for re-using */
401     VPageDescr *vpp;
402     Relation *Irel;
403     int32 nindices, i;
404     VRelStats *vacrelstats;
405     
406     StartTransactionCommand();
407
408     ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
409                            ObjectIdEqualRegProcedure,
410                            ObjectIdGetDatum(relid));
411
412     pgclass = heap_openr(RelationRelationName);
413     pgcdesc = RelationGetTupleDescriptor(pgclass);
414     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
415
416     /*
417      *  Race condition -- if the pg_class tuple has gone away since the
418      *  last time we saw it, we don't need to vacuum it.
419      */
420
421     if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
422         heap_endscan(pgcscan);
423         heap_close(pgclass);
424         CommitTransactionCommand();
425         return;
426     }
427
428     /* now open the class and vacuum it */
429     onerel = heap_open(relid);
430
431     vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
432     vacrelstats->relid = relid;
433     vacrelstats->npages = vacrelstats->ntups = 0;
434     vacrelstats->hasindex = false;
435     if ( analyze && !IsSystemRelationName ((RelationGetRelationName (onerel))->data) )
436     {
437         int attr_cnt, *attnums = NULL;
438         AttributeTupleForm *attr;
439         
440         attr_cnt = onerel->rd_att->natts;
441         attr = onerel->rd_att->attrs;
442         
443         if ( va_cols != NIL )
444         {
445             int tcnt = 0;
446             List *le;
447             
448             if ( length (va_cols) > attr_cnt )
449                 elog (WARN, "vacuum: too many attributes specified for relation %s",
450                         (RelationGetRelationName(onerel))->data);
451             attnums = (int*) palloc (attr_cnt * sizeof (int));
452             foreach (le, va_cols)
453             {
454                 char *col = (char*) lfirst(le);
455                     
456                 for (i = 0; i < attr_cnt; i++)
457                 {
458                     if ( namestrcmp (&(attr[i]->attname), col) == 0 )
459                         break;
460                 }
461                 if ( i < attr_cnt )             /* found */
462                     attnums[tcnt++] = i;
463                 else
464                 {
465                     elog (WARN, "vacuum: there is no attribute %s in %s",
466                         col, (RelationGetRelationName(onerel))->data);
467                 }
468             }
469             attr_cnt = tcnt;
470         }
471                 
472         vacrelstats->vacattrstats = 
473                 (VacAttrStats *) palloc (attr_cnt * sizeof(VacAttrStats));
474     
475         for (i = 0; i < attr_cnt; i++)
476         {
477             Operator func_operator;
478             OperatorTupleForm pgopform;
479             VacAttrStats *stats;
480
481             stats = &vacrelstats->vacattrstats[i];
482             stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
483             memmove (stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
484             stats->best = stats->guess1 = stats->guess2 = 0;
485             stats->max = stats->min = 0;
486             stats->best_len = stats->guess1_len = stats->guess2_len = 0;
487             stats->max_len = stats->min_len = 0;
488             stats->initialized = false;
489             stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
490             stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
491         
492             func_operator = oper("=",stats->attr->atttypid,stats->attr->atttypid,true);
493             if (func_operator != NULL)
494             {
495                 int nargs;
496             
497                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
498                 fmgr_info (pgopform->oprcode, &(stats->f_cmpeq), &nargs);
499             }
500             else
501                 stats->f_cmpeq = NULL;
502         
503             func_operator = oper("<",stats->attr->atttypid,stats->attr->atttypid,true);
504             if (func_operator != NULL)
505             {
506                 int nargs;
507             
508                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
509                 fmgr_info (pgopform->oprcode, &(stats->f_cmplt), &nargs);
510             }
511             else
512                 stats->f_cmplt = NULL;
513         
514             func_operator = oper(">",stats->attr->atttypid,stats->attr->atttypid,true);
515             if (func_operator != NULL)
516             {
517                 int nargs;
518             
519                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
520                 fmgr_info (pgopform->oprcode, &(stats->f_cmpgt), &nargs);
521             }
522             else
523                 stats->f_cmpgt = NULL;
524         
525             pgttup = SearchSysCacheTuple(TYPOID,
526                                     ObjectIdGetDatum(stats->attr->atttypid),
527                                     0,0,0);
528             if (HeapTupleIsValid(pgttup))
529                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
530             else
531                 stats->outfunc = InvalidOid;
532         }
533         vacrelstats->va_natts = attr_cnt;
534         vc_delhilowstats (relid, ((attnums) ? attr_cnt : 0), attnums);
535         if ( attnums )
536             pfree (attnums);
537     }
538     else
539     {
540         vacrelstats->va_natts = 0;
541         vacrelstats->vacattrstats = (VacAttrStats *) NULL;
542     }
543
544     /* we require the relation to be locked until the indices are cleaned */
545     RelationSetLockForWrite(onerel);
546
547     /* scan it */
548     Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
549     vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
550
551     /* Now open indices */
552     Irel = (Relation *) NULL;
553     vc_getindices(vacrelstats->relid, &nindices, &Irel);
554     
555     if ( nindices > 0 )
556         vacrelstats->hasindex = true;
557     else
558         vacrelstats->hasindex = false;
559
560     /* Clean/scan index relation(s) */
561     if ( Irel != (Relation*) NULL )
562     {
563         if ( Vvpl.vpl_npages > 0 )
564         {
565             for (i = 0; i < nindices; i++)
566                 vc_vaconeind (&Vvpl, Irel[i], vacrelstats->ntups);
567         }
568         else    /* just scan indices to update statistic */
569         {
570             for (i = 0; i < nindices; i++)
571                 vc_scanoneind (Irel[i], vacrelstats->ntups);
572         }
573     }
574
575     if ( Fvpl.vpl_npages > 0 )          /* Try to shrink heap */
576         vc_rpfheap (vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
577     else 
578     {
579         if ( Irel != (Relation*) NULL )
580             vc_clsindices (nindices, Irel);
581         if ( Vvpl.vpl_npages > 0 )      /* Clean pages from Vvpl list */
582             vc_vacheap (vacrelstats, onerel, &Vvpl);
583     }
584
585     /* ok - free Vvpl list of reapped pages */
586     if ( Vvpl.vpl_npages > 0 )
587     {
588         vpp = Vvpl.vpl_pgdesc;
589         for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
590             pfree(*vpp);
591         pfree (Vvpl.vpl_pgdesc);
592         if ( Fvpl.vpl_npages > 0 )
593             pfree (Fvpl.vpl_pgdesc);
594     }
595
596     /* all done with this class */
597     heap_close(onerel);
598     heap_endscan(pgcscan);
599     heap_close(pgclass);
600
601     /* update statistics in pg_class */
602     vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
603                          vacrelstats->hasindex, vacrelstats);
604
605     /* next command frees attribute stats */
606
607     CommitTransactionCommand();
608 }
609
610 /*
611  *  vc_scanheap() -- scan an open heap relation
612  *
613  *      This routine sets commit times, constructs Vvpl list of 
614  *      empty/uninitialized pages and pages with dead tuples and
615  *      ~LP_USED line pointers, constructs Fvpl list of pages
616  *      appropriate for purposes of shrinking and maintains statistics 
617  *      on the number of live tuples in a heap.
618  */
619 static void
620 vc_scanheap (VRelStats *vacrelstats, Relation onerel, 
621                         VPageList Vvpl, VPageList Fvpl)
622 {
623     int nblocks, blkno;
624     ItemId itemid;
625     ItemPointer itemptr;
626     HeapTuple htup;
627     Buffer buf;
628     Page page, tempPage = NULL;
629     OffsetNumber offnum, maxoff;
630     bool pgchanged, tupgone, dobufrel, notup;
631     char *relname;
632     VPageDescr vpc, vp;
633     uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
634     Size frsize, frsusf;
635     Size min_tlen = MAXTUPLEN;
636     Size max_tlen = 0;
637     int32 i/*, attr_cnt*/;
638     struct rusage ru0, ru1;
639     bool do_shrinking = true;
640
641     getrusage(RUSAGE_SELF, &ru0);
642
643     nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
644     frsize = frsusf = 0;
645
646     relname = (RelationGetRelationName(onerel))->data;
647
648     nblocks = RelationGetNumberOfBlocks(onerel);
649
650     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
651     vpc->vpd_nusd = 0;
652             
653     for (blkno = 0; blkno < nblocks; blkno++) {
654         buf = ReadBuffer(onerel, blkno);
655         page = BufferGetPage(buf);
656         vpc->vpd_blkno = blkno;
657         vpc->vpd_noff = 0;
658
659         if (PageIsNew(page)) {
660             elog (NOTICE, "Rel %s: Uninitialized page %u - fixing",
661                 relname, blkno);
662             PageInit (page, BufferGetPageSize (buf), 0);
663             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
664             frsize += (vpc->vpd_free - sizeof (ItemIdData));
665             nnepg++;
666             nemend++;
667             vc_reappage (Vvpl, vpc);
668             WriteBuffer(buf);
669             continue;
670         }
671
672         if (PageIsEmpty(page)) {
673             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
674             frsize += (vpc->vpd_free - sizeof (ItemIdData));
675             nempg++;
676             nemend++;
677             vc_reappage (Vvpl, vpc);
678             ReleaseBuffer(buf);
679             continue;
680         }
681
682         pgchanged = false;
683         notup = true;
684         maxoff = PageGetMaxOffsetNumber(page);
685         for (offnum = FirstOffsetNumber;
686              offnum <= maxoff;
687              offnum = OffsetNumberNext(offnum)) {
688             itemid = PageGetItemId(page, offnum);
689
690             /*
691              * Collect un-used items too - it's possible to have
692              * indices pointing here after crash.
693              */
694             if (!ItemIdIsUsed(itemid)) {
695                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
696                 nunused++;
697                 continue;
698             }
699
700             htup = (HeapTuple) PageGetItem(page, itemid);
701             tupgone = false;
702
703             if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) && 
704                 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
705
706                 if (TransactionIdDidAbort(htup->t_xmin)) {
707                     tupgone = true;
708                 } else if (TransactionIdDidCommit(htup->t_xmin)) {
709                     htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
710                     pgchanged = true;
711                 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
712                     /* 
713                      * Not Aborted, Not Committed, Not in Progress -
714                      * so it from crashed process. - vadim 11/26/96
715                      */
716                     ncrash++;
717                     tupgone = true;
718                 }
719                 else
720                 {
721                     elog (NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
722                         relname, blkno, offnum, htup->t_xmin);
723                     do_shrinking = false;
724                 }
725             }
726
727             if (TransactionIdIsValid((TransactionId)htup->t_xmax))
728             {
729                 if (TransactionIdDidAbort(htup->t_xmax))
730                 {
731                     StoreInvalidTransactionId(&(htup->t_xmax));
732                     pgchanged = true;
733                 }
734                 else if (TransactionIdDidCommit(htup->t_xmax))
735                     tupgone = true;
736                 else if ( !TransactionIdIsInProgress (htup->t_xmax) ) {
737                     /* 
738                      * Not Aborted, Not Committed, Not in Progress -
739                      * so it from crashed process. - vadim 06/02/97
740                      */
741                     StoreInvalidTransactionId(&(htup->t_xmax));
742                     pgchanged = true;
743                 }
744                 else
745                 {
746                     elog (NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
747                         relname, blkno, offnum, htup->t_xmax);
748                     do_shrinking = false;
749                 }
750             }
751
752             /*
753              * Is it possible at all ? - vadim 11/26/96
754              */
755             if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
756             {
757                 elog (NOTICE, "Rel %s: TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
758 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
759                         relname, blkno, offnum, 
760                         TransactionIdIsValid((TransactionId)htup->t_xmax),
761                         tupgone);
762             }
763             
764             /*
765              * It's possibly! But from where it comes ?
766              * And should we fix it ?  - vadim 11/28/96
767              */
768             itemptr = &(htup->t_ctid);
769             if ( !ItemPointerIsValid (itemptr) || 
770                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
771             {
772                 elog (NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.", 
773                         relname, blkno, offnum,
774                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)), 
775                         itemptr->ip_posid, tupgone);
776             }
777
778             /*
779              * Other checks...
780              */
781             if ( htup->t_len != itemid->lp_len )
782             {
783                 elog (NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.", 
784                         relname, blkno, offnum, 
785                         itemid->lp_len, htup->t_len, tupgone);
786             }
787             if ( !OidIsValid(htup->t_oid) )
788             {
789                 elog (NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.", 
790                         relname, blkno, offnum, tupgone);
791             }
792             
793             if (tupgone) {
794                 ItemId lpp;
795                                                     
796                 if ( tempPage == (Page) NULL )
797                 {
798                     Size pageSize;
799                     
800                     pageSize = PageGetPageSize(page);
801                     tempPage = (Page) palloc(pageSize);
802                     memmove (tempPage, page, pageSize);
803                 }
804                 
805                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
806
807                 /* mark it unused */
808                 lpp->lp_flags &= ~LP_USED;
809
810                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
811                 nvac++;
812
813             } else {
814                 ntups++;
815                 notup = false;
816                 if ( htup->t_len < min_tlen )
817                     min_tlen = htup->t_len;
818                 if ( htup->t_len > max_tlen )
819                     max_tlen = htup->t_len;
820                 vc_attrstats(onerel, vacrelstats, htup);
821             }
822         }
823
824         if (pgchanged) {
825             WriteBuffer(buf);
826             dobufrel = false;
827             nchpg++;
828         }
829         else
830             dobufrel = true;
831         if ( tempPage != (Page) NULL )
832         { /* Some tuples are gone */
833             PageRepairFragmentation(tempPage);
834             vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
835             frsize += vpc->vpd_free;
836             vc_reappage (Vvpl, vpc);
837             pfree (tempPage);
838             tempPage = (Page) NULL;
839         }
840         else if ( vpc->vpd_noff > 0 )
841         { /* there are only ~LP_USED line pointers */
842             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
843             frsize += vpc->vpd_free;
844             vc_reappage (Vvpl, vpc);
845         }
846         if ( dobufrel )
847             ReleaseBuffer(buf);
848         if ( notup )
849             nemend++;
850         else
851             nemend = 0;
852     }
853
854     pfree (vpc);
855
856     /* save stats in the rel list for use later */
857     vacrelstats->ntups = ntups;
858     vacrelstats->npages = nblocks;
859 /*    vacrelstats->natts = attr_cnt;*/
860     if ( ntups == 0 )
861         min_tlen = max_tlen = 0;
862     vacrelstats->min_tlen = min_tlen;
863     vacrelstats->max_tlen = max_tlen;
864     
865     Vvpl->vpl_nemend = nemend;
866     Fvpl->vpl_nemend = nemend;
867
868     /* 
869      * Try to make Fvpl keeping in mind that we can't use free space 
870      * of "empty" end-pages and last page if it reapped.
871      */
872     if ( do_shrinking && Vvpl->vpl_npages - nemend > 0 )
873     {
874         int nusf;               /* blocks usefull for re-using */
875         
876         nusf = Vvpl->vpl_npages - nemend;
877         if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
878             nusf--;
879     
880         for (i = 0; i < nusf; i++)
881         {
882             vp = Vvpl->vpl_pgdesc[i];
883             if ( vc_enough_space (vp, min_tlen) )
884             {
885                 vc_vpinsert (Fvpl, vp);
886                 frsusf += vp->vpd_free;
887             }
888         }
889     }
890
891     getrusage(RUSAGE_SELF, &ru1);
892     
893     elog (MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
894 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
895         relname, 
896         nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
897         ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
898         frsize, frsusf, nemend, Fvpl->vpl_npages,
899         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
900         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
901
902 } /* vc_scanheap */
903
904
905 /*
906  *  vc_rpfheap() -- try to repaire relation' fragmentation
907  *
908  *      This routine marks dead tuples as unused and tries re-use dead space
909  *      by moving tuples (and inserting indices if needed). It constructs 
910  *      Nvpl list of free-ed pages (moved tuples) and clean indices
911  *      for them after committing (in hack-manner - without losing locks
912  *      and freeing memory!) current transaction. It truncates relation
913  *      if some end-blocks are gone away.
914  */
915 static void
916 vc_rpfheap (VRelStats *vacrelstats, Relation onerel, 
917                 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
918 {
919     TransactionId myXID;
920     CommandId myCID;
921     AbsoluteTime myCTM = 0;
922     Buffer buf, ToBuf;
923     int nblocks, blkno;
924     Page page, ToPage = NULL;
925     OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
926     ItemId itemid, newitemid;
927     HeapTuple htup, newtup;
928     TupleDesc tupdesc = NULL;
929     Datum *idatum = NULL;
930     char *inulls = NULL;
931     InsertIndexResult iresult;
932     VPageListData Nvpl;
933     VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
934     int ToVpI = 0;
935     IndDesc *Idesc, *idcur;
936     int Fblklast, Vblklast, i;
937     Size tlen;
938     int nmoved, Fnpages, Vnpages;
939     int nchkmvd, ntups;
940     bool isempty, dowrite;
941     Relation archrel;
942     struct rusage ru0, ru1;
943
944     getrusage(RUSAGE_SELF, &ru0);
945
946     myXID = GetCurrentTransactionId();
947     myCID = GetCurrentCommandId();
948         
949     if ( Irel != (Relation*) NULL )     /* preparation for index' inserts */
950     {
951         vc_mkindesc (onerel, nindices, Irel, &Idesc);
952         tupdesc = RelationGetTupleDescriptor(onerel);
953         idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
954         inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
955     }
956
957     /* if the relation has an archive, open it */
958     if (onerel->rd_rel->relarch != 'n')
959     {
960         archrel = vc_getarchrel(onerel);
961         /* Archive tuples from "empty" end-pages */
962         for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
963                                 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
964         {
965             if ( (*vpp)->vpd_noff > 0 )
966             {
967                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
968                 page = BufferGetPage(buf);
969                 Assert ( !PageIsEmpty(page) );
970                 vc_vacpage (page, *vpp, archrel);
971                 WriteBuffer (buf);
972             }
973         }
974     }
975     else
976         archrel = (Relation) NULL;
977
978     Nvpl.vpl_npages = 0;
979     Fnpages = Fvpl->vpl_npages;
980     Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
981     Fblklast = Fvplast->vpd_blkno;
982     Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
983     Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
984     Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
985     Vblklast = Vvplast->vpd_blkno;
986     Assert ( Vblklast >= Fblklast );
987     ToBuf = InvalidBuffer;
988     nmoved = 0;
989
990     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
991     vpc->vpd_nusd = vpc->vpd_noff = 0;
992         
993     nblocks = vacrelstats->npages;
994     for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
995     {
996         /* if it's reapped page and it was used by me - quit */
997         if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
998             break;
999
1000         buf = ReadBuffer(onerel, blkno);
1001         page = BufferGetPage(buf);
1002
1003         vpc->vpd_noff = 0;
1004
1005         isempty = PageIsEmpty(page);
1006
1007         dowrite = false;
1008         if ( blkno == Vblklast )                /* it's reapped page */
1009         {
1010             if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
1011             {                                   /* on this page - clean */
1012                 Assert ( ! isempty );
1013                 vc_vacpage (page, Vvplast, archrel);
1014                 dowrite = true;
1015             }
1016             else
1017             {
1018                 Assert ( isempty );
1019             }
1020             --Vnpages;
1021             Assert ( Vnpages > 0 );
1022             /* get prev reapped page from Vvpl */
1023             Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1024             Vblklast = Vvplast->vpd_blkno;
1025             if ( blkno == Fblklast )    /* this page in Fvpl too */
1026             {
1027                 --Fnpages;
1028                 Assert ( Fnpages > 0 );
1029                 Assert ( Fvplast->vpd_nusd == 0 );
1030                 /* get prev reapped page from Fvpl */
1031                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1032                 Fblklast = Fvplast->vpd_blkno;
1033             }
1034             Assert ( Fblklast <= Vblklast );
1035             if ( isempty )
1036             {
1037                 ReleaseBuffer(buf);
1038                 continue;
1039             }
1040         }
1041         else
1042         {
1043             Assert ( ! isempty );
1044         }
1045
1046         vpc->vpd_blkno = blkno;
1047         maxoff = PageGetMaxOffsetNumber(page);
1048         for (offnum = FirstOffsetNumber;
1049                 offnum <= maxoff;
1050                 offnum = OffsetNumberNext(offnum))
1051         {
1052             itemid = PageGetItemId(page, offnum);
1053
1054             if (!ItemIdIsUsed(itemid))
1055                 continue;
1056
1057             htup = (HeapTuple) PageGetItem(page, itemid);
1058             tlen = htup->t_len;
1059                 
1060             /* try to find new page for this tuple */
1061             if ( ToBuf == InvalidBuffer ||
1062                 ! vc_enough_space (ToVpd, tlen) )
1063             {
1064                 if ( ToBuf != InvalidBuffer )
1065                 {
1066                     WriteBuffer(ToBuf);
1067                     ToBuf = InvalidBuffer;
1068                     /*
1069                      * If no one tuple can't be added to this page -
1070                      * remove page from Fvpl. - vadim 11/27/96
1071                      */ 
1072                     if ( !vc_enough_space (ToVpd, vacrelstats->min_tlen) )
1073                     {
1074                         if ( ToVpd != Fvplast )
1075                         {
1076                             Assert ( Fnpages > ToVpI + 1 );
1077                             memmove (Fvpl->vpl_pgdesc + ToVpI, 
1078                                 Fvpl->vpl_pgdesc + ToVpI + 1, 
1079                                 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
1080                         }
1081                         Assert ( Fnpages >= 1 );
1082                         Fnpages--;
1083                         if ( Fnpages == 0 )
1084                             break;
1085                         /* get prev reapped page from Fvpl */
1086                         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1087                         Fblklast = Fvplast->vpd_blkno;
1088                     }
1089                 }
1090                 for (i=0; i < Fnpages; i++)
1091                 {
1092                     if ( vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
1093                         break;
1094                 }
1095                 if ( i == Fnpages )
1096                     break;                      /* can't move item anywhere */
1097                 ToVpI = i;
1098                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1099                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1100                 ToPage = BufferGetPage(ToBuf);
1101                 /* if this page was not used before - clean it */
1102                 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
1103                     vc_vacpage (ToPage, ToVpd, archrel);
1104             }
1105                 
1106             /* copy tuple */
1107             newtup = (HeapTuple) palloc (tlen);
1108             memmove((char *) newtup, (char *) htup, tlen);
1109
1110             /* store transaction information */
1111             TransactionIdStore(myXID, &(newtup->t_xmin));
1112             newtup->t_cmin = myCID;
1113             StoreInvalidTransactionId(&(newtup->t_xmax));
1114             newtup->t_tmin = INVALID_ABSTIME;
1115             newtup->t_tmax = CURRENT_ABSTIME;
1116             ItemPointerSetInvalid(&newtup->t_chain);
1117
1118             /* add tuple to the page */
1119             newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
1120                                 InvalidOffsetNumber, LP_USED);
1121             if ( newoff == InvalidOffsetNumber )
1122             {
1123                 elog (WARN, "\
1124 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1125                 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
1126                 ToVpd->vpd_nusd, ToVpd->vpd_noff);
1127             }
1128             newitemid = PageGetItemId(ToPage, newoff);
1129             pfree (newtup);
1130             newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1131             ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1132
1133             /* now logically delete end-tuple */
1134             TransactionIdStore(myXID, &(htup->t_xmax));
1135             htup->t_cmax = myCID;
1136             memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
1137
1138             ToVpd->vpd_nusd++;
1139             nmoved++;
1140             ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
1141             vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1142                 
1143             /* insert index' tuples if needed */
1144             if ( Irel != (Relation*) NULL )
1145             {
1146                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1147                 {
1148                     FormIndexDatum (
1149                                 idcur->natts,
1150                                 (AttrNumber *)&(idcur->tform->indkey[0]),
1151                                 newtup, 
1152                                 tupdesc,
1153                                 InvalidBuffer,
1154                                 idatum,
1155                                 inulls,
1156                                 idcur->finfoP);
1157                     iresult = index_insert (
1158                                 Irel[i],
1159                                 idatum,
1160                                 inulls,
1161                                 &(newtup->t_ctid),
1162                                 onerel);
1163                     if (iresult) pfree(iresult);
1164                 }
1165             }
1166                 
1167         } /* walk along page */
1168
1169         if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
1170         {
1171             vc_reappage (&Nvpl, vpc);
1172             WriteBuffer(buf);
1173         }
1174         else if ( dowrite )
1175             WriteBuffer(buf);
1176         else
1177             ReleaseBuffer(buf);
1178             
1179         if ( offnum <= maxoff )
1180             break;                              /* some item(s) left */
1181             
1182     } /* walk along relation */
1183         
1184     blkno++;                            /* new number of blocks */
1185
1186     if ( ToBuf != InvalidBuffer )
1187     {
1188         Assert (nmoved > 0);
1189         WriteBuffer(ToBuf);
1190     }
1191
1192     if ( nmoved > 0 )
1193     {
1194         /* 
1195          * We have to commit our tuple' movings before we'll truncate 
1196          * relation, but we shouldn't lose our locks. And so - quick hack: 
1197          * flush buffers and record status of current transaction
1198          * as committed, and continue. - vadim 11/13/96
1199          */
1200         FlushBufferPool(!TransactionFlushEnabled());
1201         TransactionIdCommit(myXID);
1202         FlushBufferPool(!TransactionFlushEnabled());
1203         myCTM = TransactionIdGetCommitTime(myXID);
1204     }
1205         
1206     /* 
1207      * Clean uncleaned reapped pages from Vvpl list 
1208      * and set commit' times  for inserted tuples
1209      */
1210     nchkmvd = 0;
1211     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1212     {
1213         Assert ( (*vpp)->vpd_blkno < blkno );
1214         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1215         page = BufferGetPage(buf);
1216         if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
1217         {
1218             /* noff == 0 in empty pages only - such pages should be re-used */
1219             Assert ( (*vpp)->vpd_noff > 0 );
1220             vc_vacpage (page, *vpp, archrel);
1221         }
1222         else                            /* this page was used */
1223         {
1224             ntups = 0;
1225             moff = PageGetMaxOffsetNumber(page);
1226             for (newoff = FirstOffsetNumber;
1227                         newoff <= moff;
1228                         newoff = OffsetNumberNext(newoff))
1229             {
1230                 itemid = PageGetItemId(page, newoff);
1231                 if (!ItemIdIsUsed(itemid))
1232                     continue;
1233                 htup = (HeapTuple) PageGetItem(page, itemid);
1234                 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1235                 {
1236                     htup->t_tmin = myCTM;
1237                     ntups++;
1238                 }
1239             }
1240             Assert ( (*vpp)->vpd_nusd == ntups );
1241             nchkmvd += ntups;
1242         }
1243         WriteBuffer (buf);
1244     }
1245     Assert ( nmoved == nchkmvd );
1246
1247     getrusage(RUSAGE_SELF, &ru1);
1248     
1249     elog (MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1250 Elapsed %u/%u sec.",
1251                 (RelationGetRelationName(onerel))->data, 
1252                 nblocks, blkno, nmoved,
1253                 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1254                 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1255
1256     if ( Nvpl.vpl_npages > 0 )
1257     {
1258         /* vacuum indices again if needed */
1259         if ( Irel != (Relation*) NULL )
1260         {
1261             VPageDescr *vpleft, *vpright, vpsave;
1262                 
1263             /* re-sort Nvpl.vpl_pgdesc */
1264             for (vpleft = Nvpl.vpl_pgdesc, 
1265                 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1266                 vpleft < vpright; vpleft++, vpright--)
1267             {
1268                 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1269             }
1270             for (i = 0; i < nindices; i++)
1271                 vc_vaconeind (&Nvpl, Irel[i], vacrelstats->ntups);
1272         }
1273
1274         /* 
1275          * clean moved tuples from last page in Nvpl list
1276          * if some tuples left there
1277          */
1278         if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1279         {
1280             Assert (vpc->vpd_blkno == blkno - 1);
1281             buf = ReadBuffer(onerel, vpc->vpd_blkno);
1282             page = BufferGetPage (buf);
1283             ntups = 0;
1284             maxoff = offnum;
1285             for (offnum = FirstOffsetNumber;
1286                         offnum < maxoff;
1287                         offnum = OffsetNumberNext(offnum))
1288             {
1289                 itemid = PageGetItemId(page, offnum);
1290                 if (!ItemIdIsUsed(itemid))
1291                     continue;
1292                 htup = (HeapTuple) PageGetItem(page, itemid);
1293                 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1294                 itemid->lp_flags &= ~LP_USED;
1295                 ntups++;
1296             }
1297             Assert ( vpc->vpd_noff == ntups );
1298             PageRepairFragmentation(page);
1299             WriteBuffer (buf);
1300         }
1301
1302         /* now - free new list of reapped pages */
1303         vpp = Nvpl.vpl_pgdesc;
1304         for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1305             pfree(*vpp);
1306         pfree (Nvpl.vpl_pgdesc);
1307     }
1308         
1309     /* truncate relation */
1310     if ( blkno < nblocks )
1311     {
1312         blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1313         Assert ( blkno >= 0 );
1314         vacrelstats->npages = blkno;    /* set new number of blocks */
1315     }
1316
1317     if ( archrel != (Relation) NULL )
1318         heap_close(archrel);
1319
1320     if ( Irel != (Relation*) NULL )     /* pfree index' allocations */
1321     {
1322         pfree (Idesc);
1323         pfree (idatum);
1324         pfree (inulls);
1325         vc_clsindices (nindices, Irel);
1326     }
1327
1328     pfree (vpc);
1329
1330 } /* vc_rpfheap */
1331
1332 /*
1333  *  vc_vacheap() -- free dead tuples
1334  *
1335  *      This routine marks dead tuples as unused and truncates relation
1336  *      if there are "empty" end-blocks.
1337  */
1338 static void
1339 vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1340 {
1341     Buffer buf;
1342     Page page;
1343     VPageDescr *vpp;
1344     Relation archrel;
1345     int nblocks;
1346     int i;
1347
1348     nblocks = Vvpl->vpl_npages;
1349     /* if the relation has an archive, open it */
1350     if (onerel->rd_rel->relarch != 'n')
1351         archrel = vc_getarchrel(onerel);
1352     else
1353     {
1354         archrel = (Relation) NULL;
1355         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1356     }
1357         
1358     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1359     {
1360         if ( (*vpp)->vpd_noff > 0 )
1361         {
1362             buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1363             page = BufferGetPage (buf);
1364             vc_vacpage (page, *vpp, archrel);
1365             WriteBuffer (buf);
1366         }
1367     }
1368
1369     /* truncate relation if there are some empty end-pages */
1370     if ( Vvpl->vpl_nemend > 0 )
1371     {
1372         Assert ( vacrelstats->npages >= Vvpl->vpl_nemend );
1373         nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1374         elog (MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1375                 (RelationGetRelationName(onerel))->data, 
1376                 vacrelstats->npages, nblocks);
1377
1378         /* 
1379          * we have to flush "empty" end-pages (if changed, but who knows it)
1380          * before truncation 
1381          */
1382         FlushBufferPool(!TransactionFlushEnabled());
1383
1384         nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1385         Assert ( nblocks >= 0 );
1386         vacrelstats->npages = nblocks;  /* set new number of blocks */
1387     }
1388
1389     if ( archrel != (Relation) NULL )
1390         heap_close(archrel);
1391
1392 } /* vc_vacheap */
1393
1394 /*
1395  *  vc_vacpage() -- free (and archive if needed) dead tuples on a page
1396  *                   and repaire its fragmentation.
1397  */
1398 static void
1399 vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1400 {
1401     ItemId itemid;
1402     HeapTuple htup;
1403     int i;
1404     
1405     Assert ( vpd->vpd_nusd == 0 );
1406     for (i=0; i < vpd->vpd_noff; i++)
1407     {
1408         itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1409         if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1410         {
1411             htup = (HeapTuple) PageGetItem (page, itemid);
1412             vc_archive (archrel, htup);
1413         }
1414         itemid->lp_flags &= ~LP_USED;
1415     }
1416     PageRepairFragmentation(page);
1417
1418 } /* vc_vacpage */
1419
1420 /*
1421  *  _vc_scanoneind() -- scan one index relation to update statistic.
1422  *
1423  */
1424 static void
1425 vc_scanoneind (Relation indrel, int nhtups)
1426 {
1427     RetrieveIndexResult res;
1428     IndexScanDesc iscan;
1429     int nitups;
1430     int nipages;
1431     struct rusage ru0, ru1;
1432
1433     getrusage(RUSAGE_SELF, &ru0);
1434
1435     /* walk through the entire index */
1436     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1437     nitups = 0;
1438
1439     while ((res = index_getnext(iscan, ForwardScanDirection))
1440                                         != (RetrieveIndexResult) NULL)
1441     {
1442         nitups++;
1443         pfree(res);
1444     }
1445
1446     index_endscan(iscan);
1447
1448     /* now update statistics in pg_class */
1449     nipages = RelationGetNumberOfBlocks(indrel);
1450     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1451
1452     getrusage(RUSAGE_SELF, &ru1);
1453
1454     elog (MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1455         indrel->rd_rel->relname.data, nipages, nitups, 
1456         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1457         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1458
1459     if ( nitups != nhtups )
1460         elog (NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1461                 indrel->rd_rel->relname.data, nitups, nhtups);
1462
1463 } /* vc_scanoneind */
1464
1465 /*
1466  *  vc_vaconeind() -- vacuum one index relation.
1467  *
1468  *      Vpl is the VPageList of the heap we're currently vacuuming.
1469  *      It's locked. Indrel is an index relation on the vacuumed heap. 
1470  *      We don't set locks on the index relation here, since the indexed 
1471  *      access methods support locking at different granularities. 
1472  *      We let them handle it.
1473  *
1474  *      Finally, we arrange to update the index relation's statistics in
1475  *      pg_class.
1476  */
1477 static void
1478 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1479 {
1480     RetrieveIndexResult res;
1481     IndexScanDesc iscan;
1482     ItemPointer heapptr;
1483     int nvac;
1484     int nitups;
1485     int nipages;
1486     VPageDescr vp;
1487     struct rusage ru0, ru1;
1488
1489     getrusage(RUSAGE_SELF, &ru0);
1490
1491     /* walk through the entire index */
1492     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1493     nvac = 0;
1494     nitups = 0;
1495
1496     while ((res = index_getnext(iscan, ForwardScanDirection))
1497            != (RetrieveIndexResult) NULL) {
1498         heapptr = &res->heap_iptr;
1499
1500         if ( (vp = vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1501         {
1502 #if 0
1503             elog(DEBUG, "<%x,%x> -> <%x,%x>",
1504                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1505                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1506                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1507                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1508 #endif
1509             if ( vp->vpd_noff == 0 ) 
1510             {                           /* this is EmptyPage !!! */
1511                 elog (NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1512                         indrel->rd_rel->relname.data,
1513                         vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1514             }
1515             ++nvac;
1516             index_delete(indrel, &res->index_iptr);
1517         } else {
1518             nitups++;
1519         }
1520
1521         /* be tidy */
1522         pfree(res);
1523     }
1524
1525     index_endscan(iscan);
1526
1527     /* now update statistics in pg_class */
1528     nipages = RelationGetNumberOfBlocks(indrel);
1529     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1530
1531     getrusage(RUSAGE_SELF, &ru1);
1532
1533     elog (MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1534         indrel->rd_rel->relname.data, nipages, nitups, nvac,
1535         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1536         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1537
1538     if ( nitups != nhtups )
1539         elog (NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1540                 indrel->rd_rel->relname.data, nitups, nhtups);
1541
1542 } /* vc_vaconeind */
1543
1544 /*
1545  *  vc_tidreapped() -- is a particular tid reapped?
1546  *
1547  *      vpl->VPageDescr_array is sorted in right order.
1548  */
1549 static VPageDescr
1550 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1551 {
1552     OffsetNumber ioffno;
1553     OffsetNumber *voff;
1554     VPageDescr vp, *vpp;
1555     VPageDescrData vpd;
1556
1557     vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1558     ioffno = ItemPointerGetOffsetNumber(itemptr);
1559         
1560     vp = &vpd;
1561     vpp = (VPageDescr*) vc_find_eq ((char*)(vpl->vpl_pgdesc), 
1562                 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
1563                 vc_cmp_blk);
1564
1565     if ( vpp == (VPageDescr*) NULL )
1566         return ((VPageDescr)NULL);
1567     vp = *vpp;
1568
1569     /* ok - we are on true page */
1570
1571     if ( vp->vpd_noff == 0 ) {          /* this is EmptyPage !!! */
1572         return (vp);
1573     }
1574     
1575     voff = (OffsetNumber*) vc_find_eq ((char*)(vp->vpd_voff), 
1576                 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
1577                 vc_cmp_offno);
1578
1579     if ( voff == (OffsetNumber*) NULL )
1580         return ((VPageDescr)NULL);
1581
1582     return (vp);
1583
1584 } /* vc_tidreapped */
1585
1586 /*
1587  *  vc_attrstats() -- compute column statistics used by the optimzer
1588  *
1589  *  We compute the column min, max, null and non-null counts.
1590  *  Plus we attempt to find the count of the value that occurs most
1591  *  frequently in each column
1592  *  These figures are used to compute the selectivity of the column
1593  *
1594  *  We use a three-bucked cache to get the most frequent item
1595  *  The 'guess' buckets count hits.  A cache miss causes guess1
1596  *  to get the most hit 'guess' item in the most recent cycle, and
1597  *  the new item goes into guess2.  Whenever the total count of hits
1598  *  of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1599  *
1600  *  This method works perfectly for columns with unique values, and columns
1601  *  with only two unique values, plus nulls.
1602  *
1603  *  It becomes less perfect as the number of unique values increases and
1604  *  their distribution in the table becomes more random.
1605  *
1606  */
1607 static void
1608 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1609 {
1610     int i, attr_cnt = vacrelstats->va_natts;
1611     VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1612     TupleDesc tupDesc = onerel->rd_att;
1613     Datum value;
1614     bool isnull;
1615
1616     for (i = 0; i < attr_cnt; i++) {
1617         VacAttrStats *stats = &vacattrstats[i];
1618         bool value_hit = true;
1619
1620         value = (Datum) heap_getattr (htup, InvalidBuffer, 
1621                                         stats->attr->attnum, tupDesc, &isnull);
1622
1623         if (!VacAttrStatsEqValid(stats))
1624                 continue;
1625
1626         if (isnull)
1627             stats->null_cnt++;
1628         else {
1629             stats->nonnull_cnt++;
1630             if (stats->initialized == false) {
1631                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1632                 /* best_cnt gets incremented later */
1633                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1634                 stats->guess1_cnt = stats->guess1_hits = 1;
1635                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1636                 stats->guess2_hits = 1;
1637                 if (VacAttrStatsLtGtValid(stats)) {
1638                     vc_bucketcpy(stats->attr, value, &stats->max , &stats->max_len);
1639                     vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1640                 }
1641                 stats->initialized = true;
1642             }
1643             if (VacAttrStatsLtGtValid(stats)) {
1644                 if ( (*(stats->f_cmplt)) (value,stats->min) ) {
1645                     vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1646                     stats->min_cnt = 0;
1647                 }
1648                 if ( (*(stats->f_cmpgt)) (value,stats->max) ) {
1649                     vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1650                     stats->max_cnt = 0;
1651                 }
1652                 if ( (*(stats->f_cmpeq)) (value,stats->min) )
1653                     stats->min_cnt++;
1654                 else if ( (*(stats->f_cmpeq)) (value,stats->max) )
1655                     stats->max_cnt++;
1656             }
1657             if ( (*(stats->f_cmpeq)) (value,stats->best) )
1658                 stats->best_cnt++;
1659             else if ( (*(stats->f_cmpeq)) (value,stats->guess1) ) {
1660                 stats->guess1_cnt++;
1661                 stats->guess1_hits++;
1662             }
1663             else if ( (*(stats->f_cmpeq)) (value,stats->guess2) )
1664                 stats->guess2_hits++;
1665             else value_hit = false;
1666
1667             if (stats->guess2_hits > stats->guess1_hits) {
1668                 swapDatum(stats->guess1,stats->guess2);
1669                 swapInt(stats->guess1_len,stats->guess2_len);
1670                 stats->guess1_cnt = stats->guess2_hits;
1671                 swapLong(stats->guess1_hits, stats->guess2_hits);
1672             }
1673             if (stats->guess1_cnt > stats->best_cnt) {
1674                 swapDatum(stats->best,stats->guess1);
1675                 swapInt(stats->best_len,stats->guess1_len);
1676                 swapLong(stats->best_cnt,stats->guess1_cnt);
1677                 stats->guess1_hits = 1;
1678                 stats->guess2_hits = 1;
1679             }
1680             if (!value_hit) {
1681                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1682                 stats->guess1_hits = 1;
1683                 stats->guess2_hits = 1;
1684             }
1685         }
1686     }
1687     return;
1688 }
1689
1690 /*
1691  *  vc_bucketcpy() -- update pg_class statistics for one relation
1692  *
1693  */
1694 static void
1695 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1696 {
1697     if (attr->attbyval && attr->attlen != -1)
1698         *bucket = value;
1699     else {
1700         int len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1701
1702         if (len > *bucket_len)
1703         {
1704             if (*bucket_len != 0)
1705                 pfree(DatumGetPointer(*bucket));
1706             *bucket = PointerGetDatum(palloc(len));
1707             *bucket_len = len;
1708         }
1709         memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1710     }
1711 }
1712
1713 /*
1714  *  vc_updstats() -- update pg_class statistics for one relation
1715  *
1716  *      This routine works for both index and heap relation entries in
1717  *      pg_class.  We violate no-overwrite semantics here by storing new
1718  *      values for ntups, npages, and hasindex directly in the pg_class
1719  *      tuple that's already on the page.  The reason for this is that if
1720  *      we updated these tuples in the usual way, then every tuple in pg_class
1721  *      would be replaced every day.  This would make planning and executing
1722  *      historical queries very expensive.
1723  */
1724 static void
1725 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1726 {
1727     Relation rd, ad, sd;
1728     HeapScanDesc rsdesc, asdesc;
1729     TupleDesc sdesc;
1730     HeapTuple rtup, atup, stup;
1731     Buffer rbuf, abuf;
1732     Form_pg_class pgcform;
1733     ScanKeyData rskey, askey;
1734     AttributeTupleForm attp;
1735
1736     /*
1737      * update number of tuples and number of pages in pg_class
1738      */
1739     ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1740                            ObjectIdEqualRegProcedure,
1741                            ObjectIdGetDatum(relid));
1742
1743     rd = heap_openr(RelationRelationName);
1744     rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1745
1746     if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1747         elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1748                    relid);
1749
1750     /* overwrite the existing statistics in the tuple */
1751     vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1752     pgcform = (Form_pg_class) GETSTRUCT(rtup);
1753     pgcform->reltuples = ntups;
1754     pgcform->relpages = npages;
1755     pgcform->relhasindex = hasindex;
1756
1757     if ( vacrelstats != NULL && vacrelstats->va_natts > 0 )
1758     {
1759         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1760         int natts = vacrelstats->va_natts;
1761         
1762         ad = heap_openr(AttributeRelationName);
1763         sd = heap_openr(StatisticRelationName);
1764         ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1765                    F_INT4EQ, relid);
1766
1767         asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1768
1769         while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1770         {
1771             int i;
1772             double selratio;  /* average ratio of rows selected for a random constant */
1773             VacAttrStats *stats;
1774             Datum values[ Natts_pg_statistic ];
1775             char nulls[ Natts_pg_statistic ];
1776
1777             attp = (AttributeTupleForm) GETSTRUCT(atup);
1778             if ( attp->attnum <= 0)     /* skip system attributes for now, */
1779                                         /* they are unique anyway */
1780                 continue;
1781             
1782             for (i = 0; i < natts; i++)
1783             {
1784                 if ( attp->attnum == vacattrstats[i].attr->attnum )
1785                     break;
1786             }
1787             if ( i >= natts )
1788                 continue;
1789             stats = &(vacattrstats[i]);
1790
1791             /* overwrite the existing statistics in the tuple */
1792             if (VacAttrStatsEqValid(stats)) {
1793
1794                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1795
1796                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1797                     (stats->null_cnt <= 1 && stats->best_cnt == 1))
1798                     selratio = 0;
1799                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1800                 {
1801                     double min_cnt_d = stats->min_cnt, 
1802                            max_cnt_d = stats->max_cnt, 
1803                            null_cnt_d = stats->null_cnt,
1804                            nonnullcnt_d = stats->nonnull_cnt; /* prevent overflow */
1805                     selratio = (min_cnt_d*min_cnt_d+max_cnt_d*max_cnt_d+null_cnt_d*null_cnt_d)/
1806                                 (nonnullcnt_d+null_cnt_d)/(nonnullcnt_d+null_cnt_d);
1807                 }
1808                 else {
1809                     double most = (double)(stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1810                     double total = ((double)stats->nonnull_cnt)+((double)stats->null_cnt);
1811                     /* we assume count of other values are 20% 
1812                        of best count in table */
1813                     selratio = (most*most + 0.20*most*(total-most))/total/total;
1814                 }
1815                 if (selratio > 1.0)
1816                     selratio = 1.0;
1817                 attp->attdisbursion = selratio;
1818                 WriteNoReleaseBuffer(abuf);
1819
1820                 /* DO PG_STATISTIC INSERTS */
1821
1822                 /* doing system relations, especially pg_statistic is a problem */
1823                 if (VacAttrStatsLtGtValid(stats) && stats->initialized /* &&
1824                         !IsSystemRelationName(pgcform->relname.data)*/) {
1825                     func_ptr out_function;
1826                     char *out_string;
1827                     int dummy;
1828                 
1829                     for (i = 0; i < Natts_pg_statistic; ++i) nulls[i] = ' ';
1830
1831                     /* ----------------
1832                      *  initialize values[]
1833                      * ----------------
1834                      */
1835                     i = 0;
1836                     values[i++] = (Datum) relid;        /* 1 */
1837                     values[i++] = (Datum) attp->attnum; /* 2 */
1838                     values[i++] = (Datum) InvalidOid;   /* 3 */
1839                     fmgr_info(stats->outfunc, &out_function, &dummy);
1840                     out_string = (*out_function)(stats->min, stats->attr->atttypid);
1841                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1842                     pfree(out_string);
1843                     out_string = (char *)(*out_function)(stats->max, stats->attr->atttypid);
1844                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1845                     pfree(out_string);
1846
1847                     sdesc = sd->rd_att;
1848         
1849                     stup = heap_formtuple(sdesc, values, nulls);
1850
1851                     /* ----------------
1852                      *  insert the tuple in the relation and get the tuple's oid.
1853                      * ----------------
1854                      */
1855                     heap_insert(sd, stup);
1856                     pfree(DatumGetPointer(values[3]));
1857                     pfree(DatumGetPointer(values[4]));
1858                     pfree(stup);
1859                 }
1860             }
1861         }
1862         heap_endscan(asdesc);
1863         heap_close(ad);
1864         heap_close(sd);
1865     }
1866  
1867     /* XXX -- after write, should invalidate relcache in other backends */
1868     WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1869
1870     /* invalidating system relations confuses the function cache
1871        of pg_operator and pg_opclass */
1872     if ( !IsSystemRelationName(pgcform->relname.data))
1873         RelationInvalidateHeapTuple(rd, rtup);
1874
1875     /* that's all, folks */
1876     heap_endscan(rsdesc);
1877     heap_close(rd);
1878 }
1879
1880 /*
1881  *  vc_delhilowstats() -- delete pg_statistics rows
1882  *
1883  */
1884 static void
1885 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1886 {
1887     Relation pgstatistic;
1888     HeapScanDesc pgsscan;
1889     HeapTuple pgstup;
1890     ScanKeyData  pgskey;
1891
1892     pgstatistic = heap_openr(StatisticRelationName);
1893
1894     if (relid != InvalidOid ) {
1895         ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1896                                    ObjectIdEqualRegProcedure,
1897                                    ObjectIdGetDatum(relid));
1898         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
1899     }
1900     else
1901         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
1902
1903     while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1904     {
1905         if ( attcnt > 0 )
1906         {
1907             Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT (pgstup);
1908             int i;
1909             
1910             for (i = 0; i < attcnt; i++)
1911             {
1912                 if ( pgs->staattnum == attnums[i] + 1 )
1913                     break;
1914             }
1915             if ( i >= attcnt )
1916                 continue;               /* don't delete it */
1917         }
1918         heap_delete(pgstatistic, &pgstup->t_ctid);
1919     }
1920
1921     heap_endscan(pgsscan);
1922     heap_close(pgstatistic);
1923 }
1924
1925 static void vc_setpagelock(Relation rel, BlockNumber blkno)
1926 {
1927     ItemPointerData itm;
1928
1929     ItemPointerSet(&itm, blkno, 1);
1930
1931     RelationSetLockForWritePage(rel, &itm);
1932 }
1933
1934 /*
1935  *  vc_reappage() -- save a page on the array of reapped pages.
1936  *
1937  *      As a side effect of the way that the vacuuming loop for a given
1938  *      relation works, higher pages come after lower pages in the array
1939  *      (and highest tid on a page is last).
1940  */
1941 static void 
1942 vc_reappage(VPageList vpl, VPageDescr vpc)
1943 {
1944     VPageDescr newvpd;
1945
1946     /* allocate a VPageDescrData entry */
1947     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1948
1949     /* fill it in */
1950     if ( vpc->vpd_noff > 0 )
1951         memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1952     newvpd->vpd_blkno = vpc->vpd_blkno;
1953     newvpd->vpd_free = vpc->vpd_free;
1954     newvpd->vpd_nusd = vpc->vpd_nusd;
1955     newvpd->vpd_noff = vpc->vpd_noff;
1956
1957     /* insert this page into vpl list */
1958     vc_vpinsert (vpl, newvpd);
1959     
1960 } /* vc_reappage */
1961
1962 static void
1963 vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1964 {
1965
1966     /* allocate a VPageDescr entry if needed */
1967     if ( vpl->vpl_npages == 0 )
1968         vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1969     else if ( vpl->vpl_npages % 100 == 0 )
1970         vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1971     vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1972     (vpl->vpl_npages)++;
1973     
1974 }
1975
1976 static void
1977 vc_free(VRelList vrl)
1978 {
1979     VRelList p_vrl;
1980     MemoryContext old;
1981     PortalVariableMemory pmem;
1982
1983     pmem = PortalGetVariableMemory(vc_portal);
1984     old = MemoryContextSwitchTo((MemoryContext)pmem);
1985
1986     while (vrl != (VRelList) NULL) {
1987
1988         /* free rel list entry */
1989         p_vrl = vrl;
1990         vrl = vrl->vrl_next;
1991         pfree(p_vrl);
1992     }
1993
1994     MemoryContextSwitchTo(old);
1995 }
1996
1997 /*
1998  *  vc_getarchrel() -- open the archive relation for a heap relation
1999  *
2000  *      The archive relation is named 'a,XXXXX' for the heap relation
2001  *      whose relid is XXXXX.
2002  */
2003
2004 #define ARCHIVE_PREFIX  "a,"
2005
2006 static Relation
2007 vc_getarchrel(Relation heaprel)
2008 {
2009     Relation archrel;
2010     char *archrelname;
2011
2012     archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
2013     sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
2014
2015     archrel = heap_openr(archrelname);
2016
2017     pfree(archrelname);
2018     return (archrel);
2019 }
2020
2021 /*
2022  *  vc_archive() -- write a tuple to an archive relation
2023  *
2024  *      In the future, this will invoke the archived accessd method.  For
2025  *      now, archive relations are on mag disk.
2026  */
2027 static void
2028 vc_archive(Relation archrel, HeapTuple htup)
2029 {
2030     doinsert(archrel, htup);
2031 }
2032
2033 static bool
2034 vc_isarchrel(char *rname)
2035 {
2036     if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
2037         return (true);
2038
2039     return (false);
2040 }
2041
2042 static char *
2043 vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
2044 {
2045     int res;
2046     int last = nelem - 1;
2047     int celm = nelem / 2;
2048     bool last_move, first_move;
2049     
2050     last_move = first_move = true;
2051     for ( ; ; )
2052     {
2053         if ( first_move == true )
2054         {
2055             res = compar (bot, elm);
2056             if ( res > 0 )
2057                 return (NULL);
2058             if ( res == 0 )
2059                 return (bot);
2060             first_move = false;
2061         }
2062         if ( last_move == true )
2063         {
2064             res = compar (elm, bot + last*size);
2065             if ( res > 0 )
2066                 return (NULL);
2067             if ( res == 0 )
2068                 return (bot + last*size);
2069             last_move = false;
2070         }
2071         res = compar (elm, bot + celm*size);
2072         if ( res == 0 )
2073             return (bot + celm*size);
2074         if ( res < 0 )
2075         {
2076             if ( celm == 0 )
2077                 return (NULL);
2078             last = celm - 1;
2079             celm = celm / 2;
2080             last_move = true;
2081             continue;
2082         }
2083         
2084         if ( celm == last )
2085             return (NULL);
2086             
2087         last = last - celm - 1;
2088         bot = bot + (celm+1)*size;
2089         celm = (last + 1) / 2;
2090         first_move = true;
2091     }
2092
2093 } /* vc_find_eq */
2094
2095 static int 
2096 vc_cmp_blk (char *left, char *right)
2097 {
2098     BlockNumber lblk, rblk;
2099
2100     lblk = (*((VPageDescr*)left))->vpd_blkno;
2101     rblk = (*((VPageDescr*)right))->vpd_blkno;
2102
2103     if ( lblk < rblk )
2104         return (-1);
2105     if ( lblk == rblk )
2106         return (0);
2107     return (1);
2108
2109 } /* vc_cmp_blk */
2110
2111 static int 
2112 vc_cmp_offno (char *left, char *right)
2113 {
2114
2115     if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
2116         return (-1);
2117     if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
2118         return (0);
2119     return (1);
2120
2121 } /* vc_cmp_offno */
2122
2123
2124 static void
2125 vc_getindices (Oid relid, int *nindices, Relation **Irel)
2126 {
2127     Relation pgindex;
2128     Relation irel;
2129     TupleDesc pgidesc;
2130     HeapTuple pgitup;
2131     HeapScanDesc pgiscan;
2132     Datum d;
2133     int i, k;
2134     bool n;
2135     ScanKeyData pgikey;
2136     Oid *ioid;
2137
2138     *nindices = i = 0;
2139     
2140     ioid = (Oid *) palloc(10*sizeof(Oid));
2141
2142     /* prepare a heap scan on the pg_index relation */
2143     pgindex = heap_openr(IndexRelationName);
2144     pgidesc = RelationGetTupleDescriptor(pgindex);
2145
2146     ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2147                            ObjectIdEqualRegProcedure,
2148                            ObjectIdGetDatum(relid));
2149
2150     pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2151
2152     while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
2153         d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2154                                  pgidesc, &n);
2155         i++;
2156         if ( i % 10 == 0 )
2157             ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
2158         ioid[i-1] = DatumGetObjectId(d);
2159     }
2160
2161     heap_endscan(pgiscan);
2162     heap_close(pgindex);
2163
2164     if ( i == 0 ) {     /* No one index found */
2165         pfree(ioid);
2166         return;
2167     }
2168
2169     if ( Irel != (Relation **) NULL )
2170         *Irel = (Relation *) palloc(i * sizeof(Relation));
2171     
2172     for (k = 0; i > 0; )
2173     {
2174         irel = index_open(ioid[--i]);
2175         if ( irel != (Relation) NULL )
2176         {
2177             if ( Irel != (Relation **) NULL )
2178                 (*Irel)[k] = irel;
2179             else
2180                 index_close (irel);
2181             k++;
2182         }
2183         else
2184             elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2185     }
2186     *nindices = k;
2187     pfree(ioid);
2188
2189     if ( Irel != (Relation **) NULL && *nindices == 0 )
2190     {
2191         pfree (*Irel);
2192         *Irel = (Relation *) NULL;
2193     }
2194
2195 } /* vc_getindices */
2196
2197
2198 static void
2199 vc_clsindices (int nindices, Relation *Irel)
2200 {
2201
2202     if ( Irel == (Relation*) NULL )
2203         return;
2204
2205     while (nindices--) {
2206         index_close (Irel[nindices]);
2207     }
2208     pfree (Irel);
2209
2210 } /* vc_clsindices */
2211
2212
2213 static void
2214 vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2215 {
2216     IndDesc *idcur;
2217     HeapTuple pgIndexTup;
2218     AttrNumber *attnumP;
2219     int natts;
2220     int i;
2221
2222     *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
2223     
2224     for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
2225         pgIndexTup =
2226                 SearchSysCacheTuple(INDEXRELID,
2227                                 ObjectIdGetDatum(Irel[i]->rd_id),
2228                                 0,0,0);
2229         Assert(pgIndexTup);
2230         idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
2231         for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2232                 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2233                 attnumP++, natts++);
2234         if (idcur->tform->indproc != InvalidOid) {
2235             idcur->finfoP = &(idcur->finfo);
2236             FIgetnArgs(idcur->finfoP) = natts;
2237             natts = 1;
2238             FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2239             *(FIgetname(idcur->finfoP)) = '\0';
2240         } else
2241             idcur->finfoP = (FuncIndexInfo *) NULL;
2242         
2243         idcur->natts = natts;
2244     }
2245     
2246 } /* vc_mkindesc */
2247
2248
2249 static bool
2250 vc_enough_space (VPageDescr vpd, Size len)
2251 {
2252
2253     len = DOUBLEALIGN(len);
2254
2255     if ( len > vpd->vpd_free )
2256         return (false);
2257     
2258     if ( vpd->vpd_nusd < vpd->vpd_noff )        /* there are free itemid(s) */
2259         return (true);                          /* and len <= free_space */
2260     
2261     /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2262     if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
2263         return (true);
2264     
2265     return (false);
2266     
2267 } /* vc_enough_space */