]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
lowercase large object table name fix.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *    the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.34 1997/06/06 03:41:16 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/file.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <utils/builtins.h>
46 #include <commands/vacuum.h>
47 #include <parser/catalog_utils.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 # include <rusagestub.h>
52 #else 
53 # include <sys/time.h>
54 # include <sys/resource.h>
55 #endif 
56
57 #include <port-protos.h>
58
59 bool VacuumRunning =    false;
60
61 static Portal vc_portal;
62
63 static int MESSAGE_LEVEL;       /* message level */
64
65 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
66 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
67 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
68 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
69 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
70                                    stats->f_cmpgt != NULL && \
71                                    RegProcedureIsValid(stats->outfunc) )
72   
73
74 /* non-export function prototypes */
75 static void vc_init(void);
76 static void vc_shutdown(void);
77 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
78 static VRelList vc_getrels(NameData *VacRelP);
79 static void vc_vacone (Oid relid, bool analyze, List *va_cols);
80 static void vc_scanheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
81 static void vc_rpfheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
82 static void vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList vpl);
83 static void vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
84 static void vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
85 static void vc_scanoneind (Relation indrel, int nhtups);
86 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
87 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
88 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
89 static void vc_delhilowstats (Oid relid, int attcnt, int *attnums);
90 static void vc_setpagelock(Relation rel, BlockNumber blkno);
91 static VPageDescr vc_tidreapped (ItemPointer itemptr, VPageList vpl);
92 static void vc_reappage (VPageList vpl, VPageDescr vpc);
93 static void vc_vpinsert (VPageList vpl, VPageDescr vpnew);
94 static void vc_free(VRelList vrl);
95 static void vc_getindices (Oid relid, int *nindices, Relation **Irel);
96 static void vc_clsindices (int nindices, Relation *Irel);
97 static Relation vc_getarchrel(Relation heaprel);
98 static void vc_archive(Relation archrel, HeapTuple htup);
99 static bool vc_isarchrel(char *rname);
100 static void vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
101 static char * vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
102 static int vc_cmp_blk (char *left, char *right);
103 static int vc_cmp_offno (char *left, char *right);
104 static bool vc_enough_space (VPageDescr vpd, Size len);
105
106 void
107 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
108 {
109     char *pname;
110     MemoryContext old;
111     PortalVariableMemory pmem;
112     NameData VacRel;
113     List *le;
114     List *va_cols = NIL;
115
116     /*
117      *  Create a portal for safe memory across transctions.  We need to
118      *  palloc the name space for it because our hash function expects
119      *  the name to be on a longword boundary.  CreatePortal copies the
120      *  name to safe storage for us.
121      */
122     pname = (char *) palloc(strlen(VACPNAME) + 1);
123     strcpy(pname, VACPNAME);
124     vc_portal = CreatePortal(pname);
125     pfree(pname);
126
127     if (verbose)
128         MESSAGE_LEVEL = NOTICE;
129     else
130         MESSAGE_LEVEL = DEBUG;
131
132     /* vacrel gets de-allocated on transaction commit */
133     if (vacrel)
134         strcpy(VacRel.data,vacrel);
135
136     pmem = PortalGetVariableMemory(vc_portal);
137     old = MemoryContextSwitchTo((MemoryContext)pmem);
138
139     Assert ( va_spec == NIL || analyze );
140     foreach (le, va_spec)
141     {
142         char *col = (char*)lfirst(le);
143         char *dest;
144         
145         dest = (char*) palloc (strlen (col) + 1);
146         strcpy (dest, col);
147         va_cols = lappend (va_cols, dest);
148     }
149     (void) MemoryContextSwitchTo(old);
150         
151     /* initialize vacuum cleaner */
152     vc_init();
153
154     /* vacuum the database */
155     if (vacrel)
156         vc_vacuum (&VacRel, analyze, va_cols);
157     else
158         vc_vacuum (NULL, analyze, NIL);
159
160     PortalDestroy (&vc_portal);
161
162     /* clean up */
163     vc_shutdown();
164 }
165
166 /*
167  *  vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
168  *
169  *      We run exactly one vacuum cleaner at a time.  We use the file system
170  *      to guarantee an exclusive lock on vacuuming, since a single vacuum
171  *      cleaner instantiation crosses transaction boundaries, and we'd lose
172  *      postgres-style locks at the end of every transaction.
173  *
174  *      The strangeness with committing and starting transactions in the
175  *      init and shutdown routines is due to the fact that the vacuum cleaner
176  *      is invoked via a sql command, and so is already executing inside
177  *      a transaction.  We need to leave ourselves in a predictable state
178  *      on entry and exit to the vacuum cleaner.  We commit the transaction
179  *      started in PostgresMain() inside vc_init(), and start one in
180  *      vc_shutdown() to match the commit waiting for us back in
181  *      PostgresMain().
182  */
183 static void
184 vc_init()
185 {
186     int fd;
187
188     if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
189         elog(WARN, "can't create lock file -- another vacuum cleaner running?");
190
191     close(fd);
192
193     /*
194      *  By here, exclusive open on the lock file succeeded.  If we abort
195      *  for any reason during vacuuming, we need to remove the lock file.
196      *  This global variable is checked in the transaction manager on xact
197      *  abort, and the routine vc_abort() is called if necessary.
198      */
199
200     VacuumRunning = true;
201
202     /* matches the StartTransaction in PostgresMain() */
203     CommitTransactionCommand();
204 }
205
206 static void
207 vc_shutdown()
208 {
209     /* on entry, not in a transaction */
210     if (unlink("pg_vlock") < 0)
211         elog(WARN, "vacuum: can't destroy lock file!");
212
213     /* okay, we're done */
214     VacuumRunning = false;
215
216     /* matches the CommitTransaction in PostgresMain() */
217     StartTransactionCommand();
218
219 }
220
221 void
222 vc_abort()
223 {
224     /* on abort, remove the vacuum cleaner lock file */
225     (void) unlink("pg_vlock");
226
227     VacuumRunning = false;
228 }
229
230 /*
231  *  vc_vacuum() -- vacuum the database.
232  *
233  *      This routine builds a list of relations to vacuum, and then calls
234  *      code that vacuums them one at a time.  We are careful to vacuum each
235  *      relation in a separate transaction in order to avoid holding too many
236  *      locks at one time.
237  */
238 static void
239 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
240 {
241     VRelList vrl, cur;
242
243     /* get list of relations */
244     vrl = vc_getrels(VacRelP);
245
246     if ( analyze && VacRelP == NULL && vrl != NULL )
247         vc_delhilowstats (InvalidOid, 0, NULL);
248         
249     /* vacuum each heap relation */
250     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
251         vc_vacone (cur->vrl_relid, analyze, va_cols);
252
253     vc_free(vrl);
254 }
255
256 static VRelList
257 vc_getrels(NameData *VacRelP)
258 {
259     Relation pgclass;
260     TupleDesc pgcdesc;
261     HeapScanDesc pgcscan;
262     HeapTuple pgctup;
263     Buffer buf;
264     PortalVariableMemory portalmem;
265     MemoryContext old;
266     VRelList vrl, cur;
267     Datum d;
268     char *rname;
269     char rkind;
270     int16 smgrno;
271     bool n;
272     ScanKeyData  pgckey;
273     bool found = false;
274
275     StartTransactionCommand();
276
277     if (VacRelP->data) {
278         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
279                                NameEqualRegProcedure, 
280                                PointerGetDatum(VacRelP->data));
281     } else {
282         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
283                                CharacterEqualRegProcedure, CharGetDatum('r'));
284     }
285  
286     portalmem = PortalGetVariableMemory(vc_portal);
287     vrl = cur = (VRelList) NULL;
288
289     pgclass = heap_openr(RelationRelationName);
290     pgcdesc = RelationGetTupleDescriptor(pgclass);
291
292     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
293
294     while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
295
296         found = true;
297         
298         /*
299          *  We have to be careful not to vacuum the archive (since it
300          *  already contains vacuumed tuples), and not to vacuum
301          *  relations on write-once storage managers like the Sony
302          *  jukebox at Berkeley.
303          */
304
305         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
306                                  pgcdesc, &n);
307         rname = (char*)d;
308
309         /* skip archive relations */
310         if (vc_isarchrel(rname)) {
311             ReleaseBuffer(buf);
312             continue;
313         }
314
315         /* don't vacuum large objects for now - something breaks when we do */
316         if ( (strlen(rname) > 4) && rname[0] == 'x' &&
317                 rname[1] == 'i' && rname[2] == 'n' &&
318                 (rname[3] == 'v' || rname[3] == 'x'))
319         {
320             elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
321                         NAMEDATALEN, rname);
322             ReleaseBuffer(buf);
323             continue;
324         }
325
326         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
327                                  pgcdesc, &n);
328         smgrno = DatumGetInt16(d);
329
330         /* skip write-once storage managers */
331         if (smgriswo(smgrno)) {
332             ReleaseBuffer(buf);
333             continue;
334         }
335
336         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
337                                  pgcdesc, &n);
338
339         rkind = DatumGetChar(d);
340
341         /* skip system relations */
342         if (rkind != 'r') {
343             ReleaseBuffer(buf);
344             elog(NOTICE, "Vacuum: can not process index and certain system tables" );
345             continue;
346         }
347                                  
348         /* get a relation list entry for this guy */
349         old = MemoryContextSwitchTo((MemoryContext)portalmem);
350         if (vrl == (VRelList) NULL) {
351             vrl = cur = (VRelList) palloc(sizeof(VRelListData));
352         } else {
353             cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
354             cur = cur->vrl_next;
355         }
356         (void) MemoryContextSwitchTo(old);
357
358         cur->vrl_relid = pgctup->t_oid;
359         cur->vrl_next = (VRelList) NULL;
360
361         /* wei hates it if you forget to do this */
362         ReleaseBuffer(buf);
363     }
364     if (found == false)
365         elog(NOTICE, "Vacuum: table not found" );
366
367     
368     heap_endscan(pgcscan);
369     heap_close(pgclass);
370
371     CommitTransactionCommand();
372
373     return (vrl);
374 }
375
376 /*
377  *  vc_vacone() -- vacuum one heap relation
378  *
379  *      This routine vacuums a single heap, cleans out its indices, and
380  *      updates its statistics npages and ntups statistics.
381  *
382  *      Doing one heap at a time incurs extra overhead, since we need to
383  *      check that the heap exists again just before we vacuum it.  The
384  *      reason that we do this is so that vacuuming can be spread across
385  *      many small transactions.  Otherwise, two-phase locking would require
386  *      us to lock the entire database during one pass of the vacuum cleaner.
387  */
388 static void
389 vc_vacone (Oid relid, bool analyze, List *va_cols)
390 {
391     Relation pgclass;
392     TupleDesc pgcdesc;
393     HeapTuple pgctup, pgttup;
394     Buffer pgcbuf;
395     HeapScanDesc pgcscan;
396     Relation onerel;
397     ScanKeyData pgckey;
398     VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
399     VPageListData Fvpl; /* List of pages with space enough for re-using */
400     VPageDescr *vpp;
401     Relation *Irel;
402     int32 nindices, i;
403     VRelStats *vacrelstats;
404     
405     StartTransactionCommand();
406
407     ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
408                            ObjectIdEqualRegProcedure,
409                            ObjectIdGetDatum(relid));
410
411     pgclass = heap_openr(RelationRelationName);
412     pgcdesc = RelationGetTupleDescriptor(pgclass);
413     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
414
415     /*
416      *  Race condition -- if the pg_class tuple has gone away since the
417      *  last time we saw it, we don't need to vacuum it.
418      */
419
420     if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
421         heap_endscan(pgcscan);
422         heap_close(pgclass);
423         CommitTransactionCommand();
424         return;
425     }
426
427     /* now open the class and vacuum it */
428     onerel = heap_open(relid);
429
430     vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
431     vacrelstats->relid = relid;
432     vacrelstats->npages = vacrelstats->ntups = 0;
433     vacrelstats->hasindex = false;
434     if ( analyze && !IsSystemRelationName ((RelationGetRelationName (onerel))->data) )
435     {
436         int attr_cnt, *attnums = NULL;
437         AttributeTupleForm *attr;
438         
439         attr_cnt = onerel->rd_att->natts;
440         attr = onerel->rd_att->attrs;
441         
442         if ( va_cols != NIL )
443         {
444             int tcnt = 0;
445             List *le;
446             
447             if ( length (va_cols) > attr_cnt )
448                 elog (WARN, "vacuum: too many attributes specified for relation %.*s",
449                         NAMEDATALEN, (RelationGetRelationName(onerel))->data);
450             attnums = (int*) palloc (attr_cnt * sizeof (int));
451             foreach (le, va_cols)
452             {
453                 char *col = (char*) lfirst(le);
454                     
455                 for (i = 0; i < attr_cnt; i++)
456                 {
457                     if ( namestrcmp (&(attr[i]->attname), col) == 0 )
458                         break;
459                 }
460                 if ( i < attr_cnt )             /* found */
461                     attnums[tcnt++] = i;
462                 else
463                 {
464                     elog (WARN, "vacuum: there is no attribute %s in %.*s",
465                         col, NAMEDATALEN, (RelationGetRelationName(onerel))->data);
466                 }
467             }
468             attr_cnt = tcnt;
469         }
470                 
471         vacrelstats->vacattrstats = 
472                 (VacAttrStats *) palloc (attr_cnt * sizeof(VacAttrStats));
473     
474         for (i = 0; i < attr_cnt; i++)
475         {
476             Operator func_operator;
477             OperatorTupleForm pgopform;
478             VacAttrStats *stats;
479
480             stats = &vacrelstats->vacattrstats[i];
481             stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
482             memmove (stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
483             stats->best = stats->guess1 = stats->guess2 = 0;
484             stats->max = stats->min = 0;
485             stats->best_len = stats->guess1_len = stats->guess2_len = 0;
486             stats->max_len = stats->min_len = 0;
487             stats->initialized = false;
488             stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
489             stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
490         
491             func_operator = oper("=",stats->attr->atttypid,stats->attr->atttypid,true);
492             if (func_operator != NULL)
493             {
494                 int nargs;
495             
496                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
497                 fmgr_info (pgopform->oprcode, &(stats->f_cmpeq), &nargs);
498             }
499             else
500                 stats->f_cmpeq = NULL;
501         
502             func_operator = oper("<",stats->attr->atttypid,stats->attr->atttypid,true);
503             if (func_operator != NULL)
504             {
505                 int nargs;
506             
507                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
508                 fmgr_info (pgopform->oprcode, &(stats->f_cmplt), &nargs);
509             }
510             else
511                 stats->f_cmplt = NULL;
512         
513             func_operator = oper(">",stats->attr->atttypid,stats->attr->atttypid,true);
514             if (func_operator != NULL)
515             {
516                 int nargs;
517             
518                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
519                 fmgr_info (pgopform->oprcode, &(stats->f_cmpgt), &nargs);
520             }
521             else
522                 stats->f_cmpgt = NULL;
523         
524             pgttup = SearchSysCacheTuple(TYPOID,
525                                     ObjectIdGetDatum(stats->attr->atttypid),
526                                     0,0,0);
527             if (HeapTupleIsValid(pgttup))
528                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
529             else
530                 stats->outfunc = InvalidOid;
531         }
532         vacrelstats->va_natts = attr_cnt;
533         vc_delhilowstats (relid, ((attnums) ? attr_cnt : 0), attnums);
534         if ( attnums )
535             pfree (attnums);
536     }
537     else
538     {
539         vacrelstats->va_natts = 0;
540         vacrelstats->vacattrstats = (VacAttrStats *) NULL;
541     }
542
543     /* we require the relation to be locked until the indices are cleaned */
544     RelationSetLockForWrite(onerel);
545
546     /* scan it */
547     Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
548     vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
549
550     /* Now open indices */
551     Irel = (Relation *) NULL;
552     vc_getindices(vacrelstats->relid, &nindices, &Irel);
553     
554     if ( nindices > 0 )
555         vacrelstats->hasindex = true;
556     else
557         vacrelstats->hasindex = false;
558
559     /* Clean/scan index relation(s) */
560     if ( Irel != (Relation*) NULL )
561     {
562         if ( Vvpl.vpl_npages > 0 )
563         {
564             for (i = 0; i < nindices; i++)
565                 vc_vaconeind (&Vvpl, Irel[i], vacrelstats->ntups);
566         }
567         else    /* just scan indices to update statistic */
568         {
569             for (i = 0; i < nindices; i++)
570                 vc_scanoneind (Irel[i], vacrelstats->ntups);
571         }
572     }
573
574     if ( Fvpl.vpl_npages > 0 )          /* Try to shrink heap */
575         vc_rpfheap (vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
576     else 
577     {
578         if ( Irel != (Relation*) NULL )
579             vc_clsindices (nindices, Irel);
580         if ( Vvpl.vpl_npages > 0 )      /* Clean pages from Vvpl list */
581             vc_vacheap (vacrelstats, onerel, &Vvpl);
582     }
583
584     /* ok - free Vvpl list of reapped pages */
585     if ( Vvpl.vpl_npages > 0 )
586     {
587         vpp = Vvpl.vpl_pgdesc;
588         for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
589             pfree(*vpp);
590         pfree (Vvpl.vpl_pgdesc);
591         if ( Fvpl.vpl_npages > 0 )
592             pfree (Fvpl.vpl_pgdesc);
593     }
594
595     /* all done with this class */
596     heap_close(onerel);
597     heap_endscan(pgcscan);
598     heap_close(pgclass);
599
600     /* update statistics in pg_class */
601     vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
602                          vacrelstats->hasindex, vacrelstats);
603
604     /* next command frees attribute stats */
605
606     CommitTransactionCommand();
607 }
608
609 /*
610  *  vc_scanheap() -- scan an open heap relation
611  *
612  *      This routine sets commit times, constructs Vvpl list of 
613  *      empty/uninitialized pages and pages with dead tuples and
614  *      ~LP_USED line pointers, constructs Fvpl list of pages
615  *      appropriate for purposes of shrinking and maintains statistics 
616  *      on the number of live tuples in a heap.
617  */
618 static void
619 vc_scanheap (VRelStats *vacrelstats, Relation onerel, 
620                         VPageList Vvpl, VPageList Fvpl)
621 {
622     int nblocks, blkno;
623     ItemId itemid;
624     ItemPointer itemptr;
625     HeapTuple htup;
626     Buffer buf;
627     Page page, tempPage = NULL;
628     OffsetNumber offnum, maxoff;
629     bool pgchanged, tupgone, dobufrel, notup;
630     char *relname;
631     VPageDescr vpc, vp;
632     uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
633     Size frsize, frsusf;
634     Size min_tlen = MAXTUPLEN;
635     Size max_tlen = 0;
636     int32 i/*, attr_cnt*/;
637     struct rusage ru0, ru1;
638     bool do_shrinking = true;
639
640     getrusage(RUSAGE_SELF, &ru0);
641
642     nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
643     frsize = frsusf = 0;
644
645     relname = (RelationGetRelationName(onerel))->data;
646
647     nblocks = RelationGetNumberOfBlocks(onerel);
648
649     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
650     vpc->vpd_nusd = 0;
651             
652     for (blkno = 0; blkno < nblocks; blkno++) {
653         buf = ReadBuffer(onerel, blkno);
654         page = BufferGetPage(buf);
655         vpc->vpd_blkno = blkno;
656         vpc->vpd_noff = 0;
657
658         if (PageIsNew(page)) {
659             elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
660                 NAMEDATALEN, relname, blkno);
661             PageInit (page, BufferGetPageSize (buf), 0);
662             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
663             frsize += (vpc->vpd_free - sizeof (ItemIdData));
664             nnepg++;
665             nemend++;
666             vc_reappage (Vvpl, vpc);
667             WriteBuffer(buf);
668             continue;
669         }
670
671         if (PageIsEmpty(page)) {
672             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
673             frsize += (vpc->vpd_free - sizeof (ItemIdData));
674             nempg++;
675             nemend++;
676             vc_reappage (Vvpl, vpc);
677             ReleaseBuffer(buf);
678             continue;
679         }
680
681         pgchanged = false;
682         notup = true;
683         maxoff = PageGetMaxOffsetNumber(page);
684         for (offnum = FirstOffsetNumber;
685              offnum <= maxoff;
686              offnum = OffsetNumberNext(offnum)) {
687             itemid = PageGetItemId(page, offnum);
688
689             /*
690              * Collect un-used items too - it's possible to have
691              * indices pointing here after crash.
692              */
693             if (!ItemIdIsUsed(itemid)) {
694                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
695                 nunused++;
696                 continue;
697             }
698
699             htup = (HeapTuple) PageGetItem(page, itemid);
700             tupgone = false;
701
702             if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) && 
703                 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
704
705                 if (TransactionIdDidAbort(htup->t_xmin)) {
706                     tupgone = true;
707                 } else if (TransactionIdDidCommit(htup->t_xmin)) {
708                     htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
709                     pgchanged = true;
710                 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
711                     /* 
712                      * Not Aborted, Not Committed, Not in Progress -
713                      * so it from crashed process. - vadim 11/26/96
714                      */
715                     ncrash++;
716                     tupgone = true;
717                 }
718                 else
719                 {
720                     elog (NOTICE, "Rel %.*s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
721                         NAMEDATALEN, relname, blkno, offnum, htup->t_xmin);
722                     do_shrinking = false;
723                 }
724             }
725
726             if (TransactionIdIsValid((TransactionId)htup->t_xmax))
727             {
728                 if (TransactionIdDidAbort(htup->t_xmax))
729                 {
730                     StoreInvalidTransactionId(&(htup->t_xmax));
731                     pgchanged = true;
732                 }
733                 else if (TransactionIdDidCommit(htup->t_xmax))
734                     tupgone = true;
735                 else if ( !TransactionIdIsInProgress (htup->t_xmax) ) {
736                     /* 
737                      * Not Aborted, Not Committed, Not in Progress -
738                      * so it from crashed process. - vadim 06/02/97
739                      */
740                     StoreInvalidTransactionId(&(htup->t_xmax));
741                     pgchanged = true;
742                 }
743                 else
744                 {
745                     elog (NOTICE, "Rel %.*s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
746                         NAMEDATALEN, relname, blkno, offnum, htup->t_xmax);
747                     do_shrinking = false;
748                 }
749             }
750
751             /*
752              * Is it possible at all ? - vadim 11/26/96
753              */
754             if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
755             {
756                 elog (NOTICE, "Rel %.*s: TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
757 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
758                         NAMEDATALEN, relname, blkno, offnum, 
759                         TransactionIdIsValid((TransactionId)htup->t_xmax),
760                         tupgone);
761             }
762             
763             /*
764              * It's possibly! But from where it comes ?
765              * And should we fix it ?  - vadim 11/28/96
766              */
767             itemptr = &(htup->t_ctid);
768             if ( !ItemPointerIsValid (itemptr) || 
769                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
770             {
771                 elog (NOTICE, "Rel %.*s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.", 
772                         NAMEDATALEN, relname, blkno, offnum,
773                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)), 
774                         itemptr->ip_posid, tupgone);
775             }
776
777             /*
778              * Other checks...
779              */
780             if ( htup->t_len != itemid->lp_len )
781             {
782                 elog (NOTICE, "Rel %.*s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.", 
783                         NAMEDATALEN, relname, blkno, offnum, 
784                         itemid->lp_len, htup->t_len, tupgone);
785             }
786             if ( !OidIsValid(htup->t_oid) )
787             {
788                 elog (NOTICE, "Rel %.*s: TID %u/%u: OID IS INVALID. TUPGONE %d.", 
789                         NAMEDATALEN, relname, blkno, offnum, tupgone);
790             }
791             
792             if (tupgone) {
793                 ItemId lpp;
794                                                     
795                 if ( tempPage == (Page) NULL )
796                 {
797                     Size pageSize;
798                     
799                     pageSize = PageGetPageSize(page);
800                     tempPage = (Page) palloc(pageSize);
801                     memmove (tempPage, page, pageSize);
802                 }
803                 
804                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
805
806                 /* mark it unused */
807                 lpp->lp_flags &= ~LP_USED;
808
809                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
810                 nvac++;
811
812             } else {
813                 ntups++;
814                 notup = false;
815                 if ( htup->t_len < min_tlen )
816                     min_tlen = htup->t_len;
817                 if ( htup->t_len > max_tlen )
818                     max_tlen = htup->t_len;
819                 vc_attrstats(onerel, vacrelstats, htup);
820             }
821         }
822
823         if (pgchanged) {
824             WriteBuffer(buf);
825             dobufrel = false;
826             nchpg++;
827         }
828         else
829             dobufrel = true;
830         if ( tempPage != (Page) NULL )
831         { /* Some tuples are gone */
832             PageRepairFragmentation(tempPage);
833             vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
834             frsize += vpc->vpd_free;
835             vc_reappage (Vvpl, vpc);
836             pfree (tempPage);
837             tempPage = (Page) NULL;
838         }
839         else if ( vpc->vpd_noff > 0 )
840         { /* there are only ~LP_USED line pointers */
841             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
842             frsize += vpc->vpd_free;
843             vc_reappage (Vvpl, vpc);
844         }
845         if ( dobufrel )
846             ReleaseBuffer(buf);
847         if ( notup )
848             nemend++;
849         else
850             nemend = 0;
851     }
852
853     pfree (vpc);
854
855     /* save stats in the rel list for use later */
856     vacrelstats->ntups = ntups;
857     vacrelstats->npages = nblocks;
858 /*    vacrelstats->natts = attr_cnt;*/
859     if ( ntups == 0 )
860         min_tlen = max_tlen = 0;
861     vacrelstats->min_tlen = min_tlen;
862     vacrelstats->max_tlen = max_tlen;
863     
864     Vvpl->vpl_nemend = nemend;
865     Fvpl->vpl_nemend = nemend;
866
867     /* 
868      * Try to make Fvpl keeping in mind that we can't use free space 
869      * of "empty" end-pages and last page if it reapped.
870      */
871     if ( do_shrinking && Vvpl->vpl_npages - nemend > 0 )
872     {
873         int nusf;               /* blocks usefull for re-using */
874         
875         nusf = Vvpl->vpl_npages - nemend;
876         if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
877             nusf--;
878     
879         for (i = 0; i < nusf; i++)
880         {
881             vp = Vvpl->vpl_pgdesc[i];
882             if ( vc_enough_space (vp, min_tlen) )
883             {
884                 vc_vpinsert (Fvpl, vp);
885                 frsusf += vp->vpd_free;
886             }
887         }
888     }
889
890     getrusage(RUSAGE_SELF, &ru1);
891     
892     elog (MESSAGE_LEVEL, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
893 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
894         NAMEDATALEN, relname, 
895         nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
896         ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
897         frsize, frsusf, nemend, Fvpl->vpl_npages,
898         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
899         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
900
901 } /* vc_scanheap */
902
903
904 /*
905  *  vc_rpfheap() -- try to repaire relation' fragmentation
906  *
907  *      This routine marks dead tuples as unused and tries re-use dead space
908  *      by moving tuples (and inserting indices if needed). It constructs 
909  *      Nvpl list of free-ed pages (moved tuples) and clean indices
910  *      for them after committing (in hack-manner - without losing locks
911  *      and freeing memory!) current transaction. It truncates relation
912  *      if some end-blocks are gone away.
913  */
914 static void
915 vc_rpfheap (VRelStats *vacrelstats, Relation onerel, 
916                 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
917 {
918     TransactionId myXID;
919     CommandId myCID;
920     AbsoluteTime myCTM = 0;
921     Buffer buf, ToBuf;
922     int nblocks, blkno;
923     Page page, ToPage = NULL;
924     OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
925     ItemId itemid, newitemid;
926     HeapTuple htup, newtup;
927     TupleDesc tupdesc = NULL;
928     Datum *idatum = NULL;
929     char *inulls = NULL;
930     InsertIndexResult iresult;
931     VPageListData Nvpl;
932     VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
933     int ToVpI = 0;
934     IndDesc *Idesc, *idcur;
935     int Fblklast, Vblklast, i;
936     Size tlen;
937     int nmoved, Fnpages, Vnpages;
938     int nchkmvd, ntups;
939     bool isempty, dowrite;
940     Relation archrel;
941     struct rusage ru0, ru1;
942
943     getrusage(RUSAGE_SELF, &ru0);
944
945     myXID = GetCurrentTransactionId();
946     myCID = GetCurrentCommandId();
947         
948     if ( Irel != (Relation*) NULL )     /* preparation for index' inserts */
949     {
950         vc_mkindesc (onerel, nindices, Irel, &Idesc);
951         tupdesc = RelationGetTupleDescriptor(onerel);
952         idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
953         inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
954     }
955
956     /* if the relation has an archive, open it */
957     if (onerel->rd_rel->relarch != 'n')
958     {
959         archrel = vc_getarchrel(onerel);
960         /* Archive tuples from "empty" end-pages */
961         for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
962                                 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
963         {
964             if ( (*vpp)->vpd_noff > 0 )
965             {
966                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
967                 page = BufferGetPage(buf);
968                 Assert ( !PageIsEmpty(page) );
969                 vc_vacpage (page, *vpp, archrel);
970                 WriteBuffer (buf);
971             }
972         }
973     }
974     else
975         archrel = (Relation) NULL;
976
977     Nvpl.vpl_npages = 0;
978     Fnpages = Fvpl->vpl_npages;
979     Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
980     Fblklast = Fvplast->vpd_blkno;
981     Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
982     Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
983     Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
984     Vblklast = Vvplast->vpd_blkno;
985     Assert ( Vblklast >= Fblklast );
986     ToBuf = InvalidBuffer;
987     nmoved = 0;
988
989     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
990     vpc->vpd_nusd = vpc->vpd_noff = 0;
991         
992     nblocks = vacrelstats->npages;
993     for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
994     {
995         /* if it's reapped page and it was used by me - quit */
996         if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
997             break;
998
999         buf = ReadBuffer(onerel, blkno);
1000         page = BufferGetPage(buf);
1001
1002         vpc->vpd_noff = 0;
1003
1004         isempty = PageIsEmpty(page);
1005
1006         dowrite = false;
1007         if ( blkno == Vblklast )                /* it's reapped page */
1008         {
1009             if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
1010             {                                   /* on this page - clean */
1011                 Assert ( ! isempty );
1012                 vc_vacpage (page, Vvplast, archrel);
1013                 dowrite = true;
1014             }
1015             else
1016             {
1017                 Assert ( isempty );
1018             }
1019             --Vnpages;
1020             Assert ( Vnpages > 0 );
1021             /* get prev reapped page from Vvpl */
1022             Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1023             Vblklast = Vvplast->vpd_blkno;
1024             if ( blkno == Fblklast )    /* this page in Fvpl too */
1025             {
1026                 --Fnpages;
1027                 Assert ( Fnpages > 0 );
1028                 Assert ( Fvplast->vpd_nusd == 0 );
1029                 /* get prev reapped page from Fvpl */
1030                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1031                 Fblklast = Fvplast->vpd_blkno;
1032             }
1033             Assert ( Fblklast <= Vblklast );
1034             if ( isempty )
1035             {
1036                 ReleaseBuffer(buf);
1037                 continue;
1038             }
1039         }
1040         else
1041         {
1042             Assert ( ! isempty );
1043         }
1044
1045         vpc->vpd_blkno = blkno;
1046         maxoff = PageGetMaxOffsetNumber(page);
1047         for (offnum = FirstOffsetNumber;
1048                 offnum <= maxoff;
1049                 offnum = OffsetNumberNext(offnum))
1050         {
1051             itemid = PageGetItemId(page, offnum);
1052
1053             if (!ItemIdIsUsed(itemid))
1054                 continue;
1055
1056             htup = (HeapTuple) PageGetItem(page, itemid);
1057             tlen = htup->t_len;
1058                 
1059             /* try to find new page for this tuple */
1060             if ( ToBuf == InvalidBuffer ||
1061                 ! vc_enough_space (ToVpd, tlen) )
1062             {
1063                 if ( ToBuf != InvalidBuffer )
1064                 {
1065                     WriteBuffer(ToBuf);
1066                     ToBuf = InvalidBuffer;
1067                     /*
1068                      * If no one tuple can't be added to this page -
1069                      * remove page from Fvpl. - vadim 11/27/96
1070                      */ 
1071                     if ( !vc_enough_space (ToVpd, vacrelstats->min_tlen) )
1072                     {
1073                         if ( ToVpd != Fvplast )
1074                         {
1075                             Assert ( Fnpages > ToVpI + 1 );
1076                             memmove (Fvpl->vpl_pgdesc + ToVpI, 
1077                                 Fvpl->vpl_pgdesc + ToVpI + 1, 
1078                                 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
1079                         }
1080                         Assert ( Fnpages >= 1 );
1081                         Fnpages--;
1082                         if ( Fnpages == 0 )
1083                             break;
1084                         /* get prev reapped page from Fvpl */
1085                         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1086                         Fblklast = Fvplast->vpd_blkno;
1087                     }
1088                 }
1089                 for (i=0; i < Fnpages; i++)
1090                 {
1091                     if ( vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
1092                         break;
1093                 }
1094                 if ( i == Fnpages )
1095                     break;                      /* can't move item anywhere */
1096                 ToVpI = i;
1097                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1098                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1099                 ToPage = BufferGetPage(ToBuf);
1100                 /* if this page was not used before - clean it */
1101                 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
1102                     vc_vacpage (ToPage, ToVpd, archrel);
1103             }
1104                 
1105             /* copy tuple */
1106             newtup = (HeapTuple) palloc (tlen);
1107             memmove((char *) newtup, (char *) htup, tlen);
1108
1109             /* store transaction information */
1110             TransactionIdStore(myXID, &(newtup->t_xmin));
1111             newtup->t_cmin = myCID;
1112             StoreInvalidTransactionId(&(newtup->t_xmax));
1113             newtup->t_tmin = INVALID_ABSTIME;
1114             newtup->t_tmax = CURRENT_ABSTIME;
1115             ItemPointerSetInvalid(&newtup->t_chain);
1116
1117             /* add tuple to the page */
1118             newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
1119                                 InvalidOffsetNumber, LP_USED);
1120             if ( newoff == InvalidOffsetNumber )
1121             {
1122                 elog (WARN, "\
1123 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1124                 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
1125                 ToVpd->vpd_nusd, ToVpd->vpd_noff);
1126             }
1127             newitemid = PageGetItemId(ToPage, newoff);
1128             pfree (newtup);
1129             newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1130             ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1131
1132             /* now logically delete end-tuple */
1133             TransactionIdStore(myXID, &(htup->t_xmax));
1134             htup->t_cmax = myCID;
1135             memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
1136
1137             ToVpd->vpd_nusd++;
1138             nmoved++;
1139             ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
1140             vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1141                 
1142             /* insert index' tuples if needed */
1143             if ( Irel != (Relation*) NULL )
1144             {
1145                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1146                 {
1147                     FormIndexDatum (
1148                                 idcur->natts,
1149                                 (AttrNumber *)&(idcur->tform->indkey[0]),
1150                                 newtup, 
1151                                 tupdesc,
1152                                 InvalidBuffer,
1153                                 idatum,
1154                                 inulls,
1155                                 idcur->finfoP);
1156                     iresult = index_insert (
1157                                 Irel[i],
1158                                 idatum,
1159                                 inulls,
1160                                 &(newtup->t_ctid),
1161                                 onerel);
1162                     if (iresult) pfree(iresult);
1163                 }
1164             }
1165                 
1166         } /* walk along page */
1167
1168         if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
1169         {
1170             vc_reappage (&Nvpl, vpc);
1171             WriteBuffer(buf);
1172         }
1173         else if ( dowrite )
1174             WriteBuffer(buf);
1175         else
1176             ReleaseBuffer(buf);
1177             
1178         if ( offnum <= maxoff )
1179             break;                              /* some item(s) left */
1180             
1181     } /* walk along relation */
1182         
1183     blkno++;                            /* new number of blocks */
1184
1185     if ( ToBuf != InvalidBuffer )
1186     {
1187         Assert (nmoved > 0);
1188         WriteBuffer(ToBuf);
1189     }
1190
1191     if ( nmoved > 0 )
1192     {
1193         /* 
1194          * We have to commit our tuple' movings before we'll truncate 
1195          * relation, but we shouldn't lose our locks. And so - quick hack: 
1196          * flush buffers and record status of current transaction
1197          * as committed, and continue. - vadim 11/13/96
1198          */
1199         FlushBufferPool(!TransactionFlushEnabled());
1200         TransactionIdCommit(myXID);
1201         FlushBufferPool(!TransactionFlushEnabled());
1202         myCTM = TransactionIdGetCommitTime(myXID);
1203     }
1204         
1205     /* 
1206      * Clean uncleaned reapped pages from Vvpl list 
1207      * and set commit' times  for inserted tuples
1208      */
1209     nchkmvd = 0;
1210     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1211     {
1212         Assert ( (*vpp)->vpd_blkno < blkno );
1213         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1214         page = BufferGetPage(buf);
1215         if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
1216         {
1217             /* noff == 0 in empty pages only - such pages should be re-used */
1218             Assert ( (*vpp)->vpd_noff > 0 );
1219             vc_vacpage (page, *vpp, archrel);
1220         }
1221         else                            /* this page was used */
1222         {
1223             ntups = 0;
1224             moff = PageGetMaxOffsetNumber(page);
1225             for (newoff = FirstOffsetNumber;
1226                         newoff <= moff;
1227                         newoff = OffsetNumberNext(newoff))
1228             {
1229                 itemid = PageGetItemId(page, newoff);
1230                 if (!ItemIdIsUsed(itemid))
1231                     continue;
1232                 htup = (HeapTuple) PageGetItem(page, itemid);
1233                 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1234                 {
1235                     htup->t_tmin = myCTM;
1236                     ntups++;
1237                 }
1238             }
1239             Assert ( (*vpp)->vpd_nusd == ntups );
1240             nchkmvd += ntups;
1241         }
1242         WriteBuffer (buf);
1243     }
1244     Assert ( nmoved == nchkmvd );
1245
1246     getrusage(RUSAGE_SELF, &ru1);
1247     
1248     elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
1249 Elapsed %u/%u sec.",
1250                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1251                 nblocks, blkno, nmoved,
1252                 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1253                 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1254
1255     if ( Nvpl.vpl_npages > 0 )
1256     {
1257         /* vacuum indices again if needed */
1258         if ( Irel != (Relation*) NULL )
1259         {
1260             VPageDescr *vpleft, *vpright, vpsave;
1261                 
1262             /* re-sort Nvpl.vpl_pgdesc */
1263             for (vpleft = Nvpl.vpl_pgdesc, 
1264                 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1265                 vpleft < vpright; vpleft++, vpright--)
1266             {
1267                 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1268             }
1269             for (i = 0; i < nindices; i++)
1270                 vc_vaconeind (&Nvpl, Irel[i], vacrelstats->ntups);
1271         }
1272
1273         /* 
1274          * clean moved tuples from last page in Nvpl list
1275          * if some tuples left there
1276          */
1277         if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1278         {
1279             Assert (vpc->vpd_blkno == blkno - 1);
1280             buf = ReadBuffer(onerel, vpc->vpd_blkno);
1281             page = BufferGetPage (buf);
1282             ntups = 0;
1283             maxoff = offnum;
1284             for (offnum = FirstOffsetNumber;
1285                         offnum < maxoff;
1286                         offnum = OffsetNumberNext(offnum))
1287             {
1288                 itemid = PageGetItemId(page, offnum);
1289                 if (!ItemIdIsUsed(itemid))
1290                     continue;
1291                 htup = (HeapTuple) PageGetItem(page, itemid);
1292                 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1293                 itemid->lp_flags &= ~LP_USED;
1294                 ntups++;
1295             }
1296             Assert ( vpc->vpd_noff == ntups );
1297             PageRepairFragmentation(page);
1298             WriteBuffer (buf);
1299         }
1300
1301         /* now - free new list of reapped pages */
1302         vpp = Nvpl.vpl_pgdesc;
1303         for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1304             pfree(*vpp);
1305         pfree (Nvpl.vpl_pgdesc);
1306     }
1307         
1308     /* truncate relation */
1309     if ( blkno < nblocks )
1310     {
1311         blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1312         Assert ( blkno >= 0 );
1313         vacrelstats->npages = blkno;    /* set new number of blocks */
1314     }
1315
1316     if ( archrel != (Relation) NULL )
1317         heap_close(archrel);
1318
1319     if ( Irel != (Relation*) NULL )     /* pfree index' allocations */
1320     {
1321         pfree (Idesc);
1322         pfree (idatum);
1323         pfree (inulls);
1324         vc_clsindices (nindices, Irel);
1325     }
1326
1327     pfree (vpc);
1328
1329 } /* vc_rpfheap */
1330
1331 /*
1332  *  vc_vacheap() -- free dead tuples
1333  *
1334  *      This routine marks dead tuples as unused and truncates relation
1335  *      if there are "empty" end-blocks.
1336  */
1337 static void
1338 vc_vacheap (VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1339 {
1340     Buffer buf;
1341     Page page;
1342     VPageDescr *vpp;
1343     Relation archrel;
1344     int nblocks;
1345     int i;
1346
1347     nblocks = Vvpl->vpl_npages;
1348     /* if the relation has an archive, open it */
1349     if (onerel->rd_rel->relarch != 'n')
1350         archrel = vc_getarchrel(onerel);
1351     else
1352     {
1353         archrel = (Relation) NULL;
1354         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1355     }
1356         
1357     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1358     {
1359         if ( (*vpp)->vpd_noff > 0 )
1360         {
1361             buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1362             page = BufferGetPage (buf);
1363             vc_vacpage (page, *vpp, archrel);
1364             WriteBuffer (buf);
1365         }
1366     }
1367
1368     /* truncate relation if there are some empty end-pages */
1369     if ( Vvpl->vpl_nemend > 0 )
1370     {
1371         Assert ( vacrelstats->npages >= Vvpl->vpl_nemend );
1372         nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1373         elog (MESSAGE_LEVEL, "Rel %.*s: Pages: %u --> %u.",
1374                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1375                 vacrelstats->npages, nblocks);
1376
1377         /* 
1378          * we have to flush "empty" end-pages (if changed, but who knows it)
1379          * before truncation 
1380          */
1381         FlushBufferPool(!TransactionFlushEnabled());
1382
1383         nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1384         Assert ( nblocks >= 0 );
1385         vacrelstats->npages = nblocks;  /* set new number of blocks */
1386     }
1387
1388     if ( archrel != (Relation) NULL )
1389         heap_close(archrel);
1390
1391 } /* vc_vacheap */
1392
1393 /*
1394  *  vc_vacpage() -- free (and archive if needed) dead tuples on a page
1395  *                   and repaire its fragmentation.
1396  */
1397 static void
1398 vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1399 {
1400     ItemId itemid;
1401     HeapTuple htup;
1402     int i;
1403     
1404     Assert ( vpd->vpd_nusd == 0 );
1405     for (i=0; i < vpd->vpd_noff; i++)
1406     {
1407         itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1408         if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1409         {
1410             htup = (HeapTuple) PageGetItem (page, itemid);
1411             vc_archive (archrel, htup);
1412         }
1413         itemid->lp_flags &= ~LP_USED;
1414     }
1415     PageRepairFragmentation(page);
1416
1417 } /* vc_vacpage */
1418
1419 /*
1420  *  _vc_scanoneind() -- scan one index relation to update statistic.
1421  *
1422  */
1423 static void
1424 vc_scanoneind (Relation indrel, int nhtups)
1425 {
1426     RetrieveIndexResult res;
1427     IndexScanDesc iscan;
1428     int nitups;
1429     int nipages;
1430     struct rusage ru0, ru1;
1431
1432     getrusage(RUSAGE_SELF, &ru0);
1433
1434     /* walk through the entire index */
1435     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1436     nitups = 0;
1437
1438     while ((res = index_getnext(iscan, ForwardScanDirection))
1439                                         != (RetrieveIndexResult) NULL)
1440     {
1441         nitups++;
1442         pfree(res);
1443     }
1444
1445     index_endscan(iscan);
1446
1447     /* now update statistics in pg_class */
1448     nipages = RelationGetNumberOfBlocks(indrel);
1449     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1450
1451     getrusage(RUSAGE_SELF, &ru1);
1452
1453     elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1454         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, 
1455         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1456         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1457
1458     if ( nitups != nhtups )
1459         elog (NOTICE, "Ind %.*s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1460                 NAMEDATALEN, indrel->rd_rel->relname.data, nitups, nhtups);
1461
1462 } /* vc_scanoneind */
1463
1464 /*
1465  *  vc_vaconeind() -- vacuum one index relation.
1466  *
1467  *      Vpl is the VPageList of the heap we're currently vacuuming.
1468  *      It's locked. Indrel is an index relation on the vacuumed heap. 
1469  *      We don't set locks on the index relation here, since the indexed 
1470  *      access methods support locking at different granularities. 
1471  *      We let them handle it.
1472  *
1473  *      Finally, we arrange to update the index relation's statistics in
1474  *      pg_class.
1475  */
1476 static void
1477 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1478 {
1479     RetrieveIndexResult res;
1480     IndexScanDesc iscan;
1481     ItemPointer heapptr;
1482     int nvac;
1483     int nitups;
1484     int nipages;
1485     VPageDescr vp;
1486     struct rusage ru0, ru1;
1487
1488     getrusage(RUSAGE_SELF, &ru0);
1489
1490     /* walk through the entire index */
1491     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1492     nvac = 0;
1493     nitups = 0;
1494
1495     while ((res = index_getnext(iscan, ForwardScanDirection))
1496            != (RetrieveIndexResult) NULL) {
1497         heapptr = &res->heap_iptr;
1498
1499         if ( (vp = vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1500         {
1501 #if 0
1502             elog(DEBUG, "<%x,%x> -> <%x,%x>",
1503                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1504                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1505                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1506                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1507 #endif
1508             if ( vp->vpd_noff == 0 ) 
1509             {                           /* this is EmptyPage !!! */
1510                 elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
1511                         NAMEDATALEN, indrel->rd_rel->relname.data,
1512                         vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1513             }
1514             ++nvac;
1515             index_delete(indrel, &res->index_iptr);
1516         } else {
1517             nitups++;
1518         }
1519
1520         /* be tidy */
1521         pfree(res);
1522     }
1523
1524     index_endscan(iscan);
1525
1526     /* now update statistics in pg_class */
1527     nipages = RelationGetNumberOfBlocks(indrel);
1528     vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1529
1530     getrusage(RUSAGE_SELF, &ru1);
1531
1532     elog (MESSAGE_LEVEL, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1533         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
1534         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1535         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1536
1537     if ( nitups != nhtups )
1538         elog (NOTICE, "Ind %.*s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1539                 NAMEDATALEN, indrel->rd_rel->relname.data, nitups, nhtups);
1540
1541 } /* vc_vaconeind */
1542
1543 /*
1544  *  vc_tidreapped() -- is a particular tid reapped?
1545  *
1546  *      vpl->VPageDescr_array is sorted in right order.
1547  */
1548 static VPageDescr
1549 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1550 {
1551     OffsetNumber ioffno;
1552     OffsetNumber *voff;
1553     VPageDescr vp, *vpp;
1554     VPageDescrData vpd;
1555
1556     vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1557     ioffno = ItemPointerGetOffsetNumber(itemptr);
1558         
1559     vp = &vpd;
1560     vpp = (VPageDescr*) vc_find_eq ((char*)(vpl->vpl_pgdesc), 
1561                 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
1562                 vc_cmp_blk);
1563
1564     if ( vpp == (VPageDescr*) NULL )
1565         return ((VPageDescr)NULL);
1566     vp = *vpp;
1567
1568     /* ok - we are on true page */
1569
1570     if ( vp->vpd_noff == 0 ) {          /* this is EmptyPage !!! */
1571         return (vp);
1572     }
1573     
1574     voff = (OffsetNumber*) vc_find_eq ((char*)(vp->vpd_voff), 
1575                 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
1576                 vc_cmp_offno);
1577
1578     if ( voff == (OffsetNumber*) NULL )
1579         return ((VPageDescr)NULL);
1580
1581     return (vp);
1582
1583 } /* vc_tidreapped */
1584
1585 /*
1586  *  vc_attrstats() -- compute column statistics used by the optimzer
1587  *
1588  *  We compute the column min, max, null and non-null counts.
1589  *  Plus we attempt to find the count of the value that occurs most
1590  *  frequently in each column
1591  *  These figures are used to compute the selectivity of the column
1592  *
1593  *  We use a three-bucked cache to get the most frequent item
1594  *  The 'guess' buckets count hits.  A cache miss causes guess1
1595  *  to get the most hit 'guess' item in the most recent cycle, and
1596  *  the new item goes into guess2.  Whenever the total count of hits
1597  *  of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1598  *
1599  *  This method works perfectly for columns with unique values, and columns
1600  *  with only two unique values, plus nulls.
1601  *
1602  *  It becomes less perfect as the number of unique values increases and
1603  *  their distribution in the table becomes more random.
1604  *
1605  */
1606 static void
1607 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1608 {
1609     int i, attr_cnt = vacrelstats->va_natts;
1610     VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1611     TupleDesc tupDesc = onerel->rd_att;
1612     Datum value;
1613     bool isnull;
1614
1615     for (i = 0; i < attr_cnt; i++) {
1616         VacAttrStats *stats = &vacattrstats[i];
1617         bool value_hit = true;
1618
1619         value = (Datum) heap_getattr (htup, InvalidBuffer, 
1620                                         stats->attr->attnum, tupDesc, &isnull);
1621
1622         if (!VacAttrStatsEqValid(stats))
1623                 continue;
1624
1625         if (isnull)
1626             stats->null_cnt++;
1627         else {
1628             stats->nonnull_cnt++;
1629             if (stats->initialized == false) {
1630                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1631                 /* best_cnt gets incremented later */
1632                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1633                 stats->guess1_cnt = stats->guess1_hits = 1;
1634                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1635                 stats->guess2_hits = 1;
1636                 if (VacAttrStatsLtGtValid(stats)) {
1637                     vc_bucketcpy(stats->attr, value, &stats->max , &stats->max_len);
1638                     vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1639                 }
1640                 stats->initialized = true;
1641             }
1642             if (VacAttrStatsLtGtValid(stats)) {
1643                 if ( (*(stats->f_cmplt)) (value,stats->min) ) {
1644                     vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1645                     stats->min_cnt = 0;
1646                 }
1647                 if ( (*(stats->f_cmpgt)) (value,stats->max) ) {
1648                     vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1649                     stats->max_cnt = 0;
1650                 }
1651                 if ( (*(stats->f_cmpeq)) (value,stats->min) )
1652                     stats->min_cnt++;
1653                 else if ( (*(stats->f_cmpeq)) (value,stats->max) )
1654                     stats->max_cnt++;
1655             }
1656             if ( (*(stats->f_cmpeq)) (value,stats->best) )
1657                 stats->best_cnt++;
1658             else if ( (*(stats->f_cmpeq)) (value,stats->guess1) ) {
1659                 stats->guess1_cnt++;
1660                 stats->guess1_hits++;
1661             }
1662             else if ( (*(stats->f_cmpeq)) (value,stats->guess2) )
1663                 stats->guess2_hits++;
1664             else value_hit = false;
1665
1666             if (stats->guess2_hits > stats->guess1_hits) {
1667                 swapDatum(stats->guess1,stats->guess2);
1668                 swapInt(stats->guess1_len,stats->guess2_len);
1669                 stats->guess1_cnt = stats->guess2_hits;
1670                 swapLong(stats->guess1_hits, stats->guess2_hits);
1671             }
1672             if (stats->guess1_cnt > stats->best_cnt) {
1673                 swapDatum(stats->best,stats->guess1);
1674                 swapInt(stats->best_len,stats->guess1_len);
1675                 swapLong(stats->best_cnt,stats->guess1_cnt);
1676                 stats->guess1_hits = 1;
1677                 stats->guess2_hits = 1;
1678             }
1679             if (!value_hit) {
1680                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1681                 stats->guess1_hits = 1;
1682                 stats->guess2_hits = 1;
1683             }
1684         }
1685     }
1686     return;
1687 }
1688
1689 /*
1690  *  vc_bucketcpy() -- update pg_class statistics for one relation
1691  *
1692  */
1693 static void
1694 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1695 {
1696     if (attr->attbyval && attr->attlen != -1)
1697         *bucket = value;
1698     else {
1699         int len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1700
1701         if (len > *bucket_len)
1702         {
1703             if (*bucket_len != 0)
1704                 pfree(DatumGetPointer(*bucket));
1705             *bucket = PointerGetDatum(palloc(len));
1706             *bucket_len = len;
1707         }
1708         memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1709     }
1710 }
1711
1712 /*
1713  *  vc_updstats() -- update pg_class statistics for one relation
1714  *
1715  *      This routine works for both index and heap relation entries in
1716  *      pg_class.  We violate no-overwrite semantics here by storing new
1717  *      values for ntups, npages, and hasindex directly in the pg_class
1718  *      tuple that's already on the page.  The reason for this is that if
1719  *      we updated these tuples in the usual way, then every tuple in pg_class
1720  *      would be replaced every day.  This would make planning and executing
1721  *      historical queries very expensive.
1722  */
1723 static void
1724 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1725 {
1726     Relation rd, ad, sd;
1727     HeapScanDesc rsdesc, asdesc;
1728     TupleDesc sdesc;
1729     HeapTuple rtup, atup, stup;
1730     Buffer rbuf, abuf;
1731     Form_pg_class pgcform;
1732     ScanKeyData rskey, askey;
1733     AttributeTupleForm attp;
1734
1735     /*
1736      * update number of tuples and number of pages in pg_class
1737      */
1738     ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1739                            ObjectIdEqualRegProcedure,
1740                            ObjectIdGetDatum(relid));
1741
1742     rd = heap_openr(RelationRelationName);
1743     rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1744
1745     if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1746         elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1747                    relid);
1748
1749     /* overwrite the existing statistics in the tuple */
1750     vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1751     pgcform = (Form_pg_class) GETSTRUCT(rtup);
1752     pgcform->reltuples = ntups;
1753     pgcform->relpages = npages;
1754     pgcform->relhasindex = hasindex;
1755
1756     if ( vacrelstats != NULL && vacrelstats->va_natts > 0 )
1757     {
1758         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1759         int natts = vacrelstats->va_natts;
1760         
1761         ad = heap_openr(AttributeRelationName);
1762         sd = heap_openr(StatisticRelationName);
1763         ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1764                    F_INT4EQ, relid);
1765
1766         asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1767
1768         while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1769         {
1770             int i;
1771             double selratio;  /* average ratio of rows selected for a random constant */
1772             VacAttrStats *stats;
1773             Datum values[ Natts_pg_statistic ];
1774             char nulls[ Natts_pg_statistic ];
1775
1776             attp = (AttributeTupleForm) GETSTRUCT(atup);
1777             if ( attp->attnum <= 0)     /* skip system attributes for now, */
1778                                         /* they are unique anyway */
1779                 continue;
1780             
1781             for (i = 0; i < natts; i++)
1782             {
1783                 if ( attp->attnum == vacattrstats[i].attr->attnum )
1784                     break;
1785             }
1786             if ( i >= natts )
1787                 continue;
1788             stats = &(vacattrstats[i]);
1789
1790             /* overwrite the existing statistics in the tuple */
1791             if (VacAttrStatsEqValid(stats)) {
1792
1793                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1794
1795                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1796                     (stats->null_cnt <= 1 && stats->best_cnt == 1))
1797                     selratio = 0;
1798                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1799                 {
1800                     double min_cnt_d = stats->min_cnt, 
1801                            max_cnt_d = stats->max_cnt, 
1802                            null_cnt_d = stats->null_cnt,
1803                            nonnullcnt_d = stats->nonnull_cnt; /* prevent overflow */
1804                     selratio = (min_cnt_d*min_cnt_d+max_cnt_d*max_cnt_d+null_cnt_d*null_cnt_d)/
1805                                 (nonnullcnt_d+null_cnt_d)/(nonnullcnt_d+null_cnt_d);
1806                 }
1807                 else {
1808                     double most = (double)(stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1809                     double total = ((double)stats->nonnull_cnt)+((double)stats->null_cnt);
1810                     /* we assume count of other values are 20% 
1811                        of best count in table */
1812                     selratio = (most*most + 0.20*most*(total-most))/total/total;
1813                 }
1814                 if (selratio > 1.0)
1815                     selratio = 1.0;
1816                 attp->attnvals = (selratio ? (selratio * ATTNVALS_SCALE) : 0);
1817                 WriteNoReleaseBuffer(abuf);
1818
1819                 /* DO PG_STATISTIC INSERTS */
1820
1821                 /* doing system relations, especially pg_statistic is a problem */
1822                 if (VacAttrStatsLtGtValid(stats) && stats->initialized /* &&
1823                         !IsSystemRelationName(pgcform->relname.data)*/) {
1824                     func_ptr out_function;
1825                     char *out_string;
1826                     int dummy;
1827                 
1828                     for (i = 0; i < Natts_pg_statistic; ++i) nulls[i] = ' ';
1829
1830                     /* ----------------
1831                      *  initialize values[]
1832                      * ----------------
1833                      */
1834                     i = 0;
1835                     values[i++] = (Datum) relid;        /* 1 */
1836                     values[i++] = (Datum) attp->attnum; /* 2 */
1837                     values[i++] = (Datum) InvalidOid;   /* 3 */
1838                     fmgr_info(stats->outfunc, &out_function, &dummy);
1839                     out_string = (*out_function)(stats->min, stats->attr->atttypid);
1840                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1841                     pfree(out_string);
1842                     out_string = (char *)(*out_function)(stats->max, stats->attr->atttypid);
1843                     values[i++] = (Datum) fmgr(TextInRegProcedure,out_string);
1844                     pfree(out_string);
1845
1846                     sdesc = sd->rd_att;
1847         
1848                     stup = heap_formtuple(sdesc, values, nulls);
1849
1850                     /* ----------------
1851                      *  insert the tuple in the relation and get the tuple's oid.
1852                      * ----------------
1853                      */
1854                     heap_insert(sd, stup);
1855                     pfree(DatumGetPointer(values[3]));
1856                     pfree(DatumGetPointer(values[4]));
1857                     pfree(stup);
1858                 }
1859             }
1860         }
1861         heap_endscan(asdesc);
1862         heap_close(ad);
1863         heap_close(sd);
1864     }
1865  
1866     /* XXX -- after write, should invalidate relcache in other backends */
1867     WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1868
1869     /* invalidating system relations confuses the function cache
1870        of pg_operator and pg_opclass */
1871     if ( !IsSystemRelationName(pgcform->relname.data))
1872         RelationInvalidateHeapTuple(rd, rtup);
1873
1874     /* that's all, folks */
1875     heap_endscan(rsdesc);
1876     heap_close(rd);
1877 }
1878
1879 /*
1880  *  vc_delhilowstats() -- delete pg_statistics rows
1881  *
1882  */
1883 static void
1884 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1885 {
1886     Relation pgstatistic;
1887     HeapScanDesc pgsscan;
1888     HeapTuple pgstup;
1889     ScanKeyData  pgskey;
1890
1891     pgstatistic = heap_openr(StatisticRelationName);
1892
1893     if (relid != InvalidOid ) {
1894         ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1895                                    ObjectIdEqualRegProcedure,
1896                                    ObjectIdGetDatum(relid));
1897         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
1898     }
1899     else
1900         pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
1901
1902     while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1903     {
1904         if ( attcnt > 0 )
1905         {
1906             Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT (pgstup);
1907             int i;
1908             
1909             for (i = 0; i < attcnt; i++)
1910             {
1911                 if ( pgs->staattnum == attnums[i] + 1 )
1912                     break;
1913             }
1914             if ( i >= attcnt )
1915                 continue;               /* don't delete it */
1916         }
1917         heap_delete(pgstatistic, &pgstup->t_ctid);
1918     }
1919
1920     heap_endscan(pgsscan);
1921     heap_close(pgstatistic);
1922 }
1923
1924 static void vc_setpagelock(Relation rel, BlockNumber blkno)
1925 {
1926     ItemPointerData itm;
1927
1928     ItemPointerSet(&itm, blkno, 1);
1929
1930     RelationSetLockForWritePage(rel, &itm);
1931 }
1932
1933 /*
1934  *  vc_reappage() -- save a page on the array of reapped pages.
1935  *
1936  *      As a side effect of the way that the vacuuming loop for a given
1937  *      relation works, higher pages come after lower pages in the array
1938  *      (and highest tid on a page is last).
1939  */
1940 static void 
1941 vc_reappage(VPageList vpl, VPageDescr vpc)
1942 {
1943     VPageDescr newvpd;
1944
1945     /* allocate a VPageDescrData entry */
1946     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1947
1948     /* fill it in */
1949     if ( vpc->vpd_noff > 0 )
1950         memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1951     newvpd->vpd_blkno = vpc->vpd_blkno;
1952     newvpd->vpd_free = vpc->vpd_free;
1953     newvpd->vpd_nusd = vpc->vpd_nusd;
1954     newvpd->vpd_noff = vpc->vpd_noff;
1955
1956     /* insert this page into vpl list */
1957     vc_vpinsert (vpl, newvpd);
1958     
1959 } /* vc_reappage */
1960
1961 static void
1962 vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1963 {
1964
1965     /* allocate a VPageDescr entry if needed */
1966     if ( vpl->vpl_npages == 0 )
1967         vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1968     else if ( vpl->vpl_npages % 100 == 0 )
1969         vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1970     vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1971     (vpl->vpl_npages)++;
1972     
1973 }
1974
1975 static void
1976 vc_free(VRelList vrl)
1977 {
1978     VRelList p_vrl;
1979     MemoryContext old;
1980     PortalVariableMemory pmem;
1981
1982     pmem = PortalGetVariableMemory(vc_portal);
1983     old = MemoryContextSwitchTo((MemoryContext)pmem);
1984
1985     while (vrl != (VRelList) NULL) {
1986
1987         /* free rel list entry */
1988         p_vrl = vrl;
1989         vrl = vrl->vrl_next;
1990         pfree(p_vrl);
1991     }
1992
1993     (void) MemoryContextSwitchTo(old);
1994 }
1995
1996 /*
1997  *  vc_getarchrel() -- open the archive relation for a heap relation
1998  *
1999  *      The archive relation is named 'a,XXXXX' for the heap relation
2000  *      whose relid is XXXXX.
2001  */
2002
2003 #define ARCHIVE_PREFIX  "a,"
2004
2005 static Relation
2006 vc_getarchrel(Relation heaprel)
2007 {
2008     Relation archrel;
2009     char *archrelname;
2010
2011     archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
2012     sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
2013
2014     archrel = heap_openr(archrelname);
2015
2016     pfree(archrelname);
2017     return (archrel);
2018 }
2019
2020 /*
2021  *  vc_archive() -- write a tuple to an archive relation
2022  *
2023  *      In the future, this will invoke the archived accessd method.  For
2024  *      now, archive relations are on mag disk.
2025  */
2026 static void
2027 vc_archive(Relation archrel, HeapTuple htup)
2028 {
2029     doinsert(archrel, htup);
2030 }
2031
2032 static bool
2033 vc_isarchrel(char *rname)
2034 {
2035     if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
2036         return (true);
2037
2038     return (false);
2039 }
2040
2041 static char *
2042 vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
2043 {
2044     int res;
2045     int last = nelem - 1;
2046     int celm = nelem / 2;
2047     bool last_move, first_move;
2048     
2049     last_move = first_move = true;
2050     for ( ; ; )
2051     {
2052         if ( first_move == true )
2053         {
2054             res = compar (bot, elm);
2055             if ( res > 0 )
2056                 return (NULL);
2057             if ( res == 0 )
2058                 return (bot);
2059             first_move = false;
2060         }
2061         if ( last_move == true )
2062         {
2063             res = compar (elm, bot + last*size);
2064             if ( res > 0 )
2065                 return (NULL);
2066             if ( res == 0 )
2067                 return (bot + last*size);
2068             last_move = false;
2069         }
2070         res = compar (elm, bot + celm*size);
2071         if ( res == 0 )
2072             return (bot + celm*size);
2073         if ( res < 0 )
2074         {
2075             if ( celm == 0 )
2076                 return (NULL);
2077             last = celm - 1;
2078             celm = celm / 2;
2079             last_move = true;
2080             continue;
2081         }
2082         
2083         if ( celm == last )
2084             return (NULL);
2085             
2086         last = last - celm - 1;
2087         bot = bot + (celm+1)*size;
2088         celm = (last + 1) / 2;
2089         first_move = true;
2090     }
2091
2092 } /* vc_find_eq */
2093
2094 static int 
2095 vc_cmp_blk (char *left, char *right)
2096 {
2097     BlockNumber lblk, rblk;
2098
2099     lblk = (*((VPageDescr*)left))->vpd_blkno;
2100     rblk = (*((VPageDescr*)right))->vpd_blkno;
2101
2102     if ( lblk < rblk )
2103         return (-1);
2104     if ( lblk == rblk )
2105         return (0);
2106     return (1);
2107
2108 } /* vc_cmp_blk */
2109
2110 static int 
2111 vc_cmp_offno (char *left, char *right)
2112 {
2113
2114     if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
2115         return (-1);
2116     if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
2117         return (0);
2118     return (1);
2119
2120 } /* vc_cmp_offno */
2121
2122
2123 static void
2124 vc_getindices (Oid relid, int *nindices, Relation **Irel)
2125 {
2126     Relation pgindex;
2127     Relation irel;
2128     TupleDesc pgidesc;
2129     HeapTuple pgitup;
2130     HeapScanDesc pgiscan;
2131     Datum d;
2132     int i, k;
2133     bool n;
2134     ScanKeyData pgikey;
2135     Oid *ioid;
2136
2137     *nindices = i = 0;
2138     
2139     ioid = (Oid *) palloc(10*sizeof(Oid));
2140
2141     /* prepare a heap scan on the pg_index relation */
2142     pgindex = heap_openr(IndexRelationName);
2143     pgidesc = RelationGetTupleDescriptor(pgindex);
2144
2145     ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2146                            ObjectIdEqualRegProcedure,
2147                            ObjectIdGetDatum(relid));
2148
2149     pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2150
2151     while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
2152         d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2153                                  pgidesc, &n);
2154         i++;
2155         if ( i % 10 == 0 )
2156             ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
2157         ioid[i-1] = DatumGetObjectId(d);
2158     }
2159
2160     heap_endscan(pgiscan);
2161     heap_close(pgindex);
2162
2163     if ( i == 0 ) {     /* No one index found */
2164         pfree(ioid);
2165         return;
2166     }
2167
2168     if ( Irel != (Relation **) NULL )
2169         *Irel = (Relation *) palloc(i * sizeof(Relation));
2170     
2171     for (k = 0; i > 0; )
2172     {
2173         irel = index_open(ioid[--i]);
2174         if ( irel != (Relation) NULL )
2175         {
2176             if ( Irel != (Relation **) NULL )
2177                 (*Irel)[k] = irel;
2178             else
2179                 index_close (irel);
2180             k++;
2181         }
2182         else
2183             elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2184     }
2185     *nindices = k;
2186     pfree(ioid);
2187
2188     if ( Irel != (Relation **) NULL && *nindices == 0 )
2189     {
2190         pfree (*Irel);
2191         *Irel = (Relation *) NULL;
2192     }
2193
2194 } /* vc_getindices */
2195
2196
2197 static void
2198 vc_clsindices (int nindices, Relation *Irel)
2199 {
2200
2201     if ( Irel == (Relation*) NULL )
2202         return;
2203
2204     while (nindices--) {
2205         index_close (Irel[nindices]);
2206     }
2207     pfree (Irel);
2208
2209 } /* vc_clsindices */
2210
2211
2212 static void
2213 vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2214 {
2215     IndDesc *idcur;
2216     HeapTuple pgIndexTup;
2217     AttrNumber *attnumP;
2218     int natts;
2219     int i;
2220
2221     *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
2222     
2223     for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
2224         pgIndexTup =
2225                 SearchSysCacheTuple(INDEXRELID,
2226                                 ObjectIdGetDatum(Irel[i]->rd_id),
2227                                 0,0,0);
2228         Assert(pgIndexTup);
2229         idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
2230         for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2231                 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2232                 attnumP++, natts++);
2233         if (idcur->tform->indproc != InvalidOid) {
2234             idcur->finfoP = &(idcur->finfo);
2235             FIgetnArgs(idcur->finfoP) = natts;
2236             natts = 1;
2237             FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2238             *(FIgetname(idcur->finfoP)) = '\0';
2239         } else
2240             idcur->finfoP = (FuncIndexInfo *) NULL;
2241         
2242         idcur->natts = natts;
2243     }
2244     
2245 } /* vc_mkindesc */
2246
2247
2248 static bool
2249 vc_enough_space (VPageDescr vpd, Size len)
2250 {
2251
2252     len = DOUBLEALIGN(len);
2253
2254     if ( len > vpd->vpd_free )
2255         return (false);
2256     
2257     if ( vpd->vpd_nusd < vpd->vpd_noff )        /* there are free itemid(s) */
2258         return (true);                          /* and len <= free_space */
2259     
2260     /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2261     if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
2262         return (true);
2263     
2264     return (false);
2265     
2266 } /* vc_enough_space */