]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Cast constants to the type of the other binary operand.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *    the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.15 1997/01/22 01:42:16 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/file.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <utils/portal.h>
24 #include <access/genam.h>
25 #include <access/heapam.h>
26 #include <access/xact.h>
27 #include <storage/bufmgr.h>
28 #include <access/transam.h>
29 #include <catalog/pg_index.h>
30 #include <catalog/index.h>
31 #include <catalog/catname.h>
32 #include <catalog/catalog.h>
33 #include <catalog/pg_class.h>
34 #include <catalog/pg_proc.h>
35 #include <storage/smgr.h>
36 #include <storage/lmgr.h>
37 #include <utils/inval.h>
38 #include <utils/mcxt.h>
39 #include <utils/syscache.h>
40 #include <commands/vacuum.h>
41 #include <storage/bufpage.h>
42 #include "storage/shmem.h"
43 #ifdef NEED_RUSAGE
44 # include <rusagestub.h>
45 #else /* NEED_RUSAGE */
46 # include <sys/time.h>
47 # include <sys/resource.h>
48 #endif /* NEED_RUSAGE */
49
50 bool VacuumRunning =    false;
51 static int MESSLEV;     /* message level */
52
53 typedef struct {
54     FuncIndexInfo       finfo;
55     FuncIndexInfo       *finfoP;
56     IndexTupleForm      tform;
57     int                 natts;
58 } IndDesc;
59
60 /* non-export function prototypes */
61 static void _vc_init(void);
62 static void _vc_shutdown(void);
63 static void _vc_vacuum(NameData *VacRelP);
64 static VRelList _vc_getrels(Portal p, NameData *VacRelP);
65 static void _vc_vacone (VRelList curvrl);
66 static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl);
67 static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
68 static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl);
69 static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
70 static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
71 static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex);
72 static void _vc_setpagelock(Relation rel, BlockNumber blkno);
73 static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl);
74 static void _vc_reappage (VPageList vpl, VPageDescr vpc);
75 static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew);
76 static void _vc_free(Portal p, VRelList vrl);
77 static void _vc_getindices (Oid relid, int *nindices, Relation **Irel);
78 static void _vc_clsindices (int nindices, Relation *Irel);
79 static Relation _vc_getarchrel(Relation heaprel);
80 static void _vc_archive(Relation archrel, HeapTuple htup);
81 static bool _vc_isarchrel(char *rname);
82 static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
83 static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
84 static int _vc_cmp_blk (char *left, char *right);
85 static int _vc_cmp_offno (char *left, char *right);
86 static bool _vc_enough_space (VPageDescr vpd, Size len);
87
88
89 void
90 vacuum(char *vacrel, bool verbose)
91 {
92     NameData VacRel;
93
94     if (verbose)
95         MESSLEV = NOTICE;
96     else
97         MESSLEV = DEBUG;
98
99     /* vacrel gets de-allocated on transaction commit */
100         
101     /* initialize vacuum cleaner */
102     _vc_init();
103
104     /* vacuum the database */
105     if (vacrel)
106     {
107         strcpy(VacRel.data,vacrel);
108         _vc_vacuum(&VacRel);
109     }
110     else
111         _vc_vacuum(NULL);
112
113     /* clean up */
114     _vc_shutdown();
115 }
116
117 /*
118  *  _vc_init(), _vc_shutdown() -- start up and shut down the vacuum cleaner.
119  *
120  *      We run exactly one vacuum cleaner at a time.  We use the file system
121  *      to guarantee an exclusive lock on vacuuming, since a single vacuum
122  *      cleaner instantiation crosses transaction boundaries, and we'd lose
123  *      postgres-style locks at the end of every transaction.
124  *
125  *      The strangeness with committing and starting transactions in the
126  *      init and shutdown routines is due to the fact that the vacuum cleaner
127  *      is invoked via a sql command, and so is already executing inside
128  *      a transaction.  We need to leave ourselves in a predictable state
129  *      on entry and exit to the vacuum cleaner.  We commit the transaction
130  *      started in PostgresMain() inside _vc_init(), and start one in
131  *      _vc_shutdown() to match the commit waiting for us back in
132  *      PostgresMain().
133  */
134 static void
135 _vc_init()
136 {
137     int fd;
138
139     if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
140         elog(WARN, "can't create lock file -- another vacuum cleaner running?");
141
142     close(fd);
143
144     /*
145      *  By here, exclusive open on the lock file succeeded.  If we abort
146      *  for any reason during vacuuming, we need to remove the lock file.
147      *  This global variable is checked in the transaction manager on xact
148      *  abort, and the routine vc_abort() is called if necessary.
149      */
150
151     VacuumRunning = true;
152
153     /* matches the StartTransaction in PostgresMain() */
154     CommitTransactionCommand();
155 }
156
157 static void
158 _vc_shutdown()
159 {
160     /* on entry, not in a transaction */
161     if (unlink("pg_vlock") < 0)
162         elog(WARN, "vacuum: can't destroy lock file!");
163
164     /* okay, we're done */
165     VacuumRunning = false;
166
167     /* matches the CommitTransaction in PostgresMain() */
168     StartTransactionCommand();
169 }
170
171 void
172 vc_abort()
173 {
174     /* on abort, remove the vacuum cleaner lock file */
175     (void) unlink("pg_vlock");
176
177     VacuumRunning = false;
178 }
179
180 /*
181  *  _vc_vacuum() -- vacuum the database.
182  *
183  *      This routine builds a list of relations to vacuum, and then calls
184  *      code that vacuums them one at a time.  We are careful to vacuum each
185  *      relation in a separate transaction in order to avoid holding too many
186  *      locks at one time.
187  */
188 static void
189 _vc_vacuum(NameData *VacRelP)
190 {
191     VRelList vrl, cur;
192     char *pname;
193     Portal p;
194
195     /*
196      *  Create a portal for safe memory across transctions.  We need to
197      *  palloc the name space for it because our hash function expects
198      *  the name to be on a longword boundary.  CreatePortal copies the
199      *  name to safe storage for us.
200      */
201
202     pname = (char *) palloc(strlen(VACPNAME) + 1);
203     strcpy(pname, VACPNAME);
204     p = CreatePortal(pname);
205     pfree(pname);
206
207     /* get list of relations */
208     vrl = _vc_getrels(p, VacRelP);
209
210     /* vacuum each heap relation */
211     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
212         _vc_vacone (cur);
213
214     _vc_free(p, vrl);
215
216     PortalDestroy(&p);
217 }
218
219 static VRelList
220 _vc_getrels(Portal p, NameData *VacRelP)
221 {
222     Relation pgclass;
223     TupleDesc pgcdesc;
224     HeapScanDesc pgcscan;
225     HeapTuple pgctup;
226     Buffer buf;
227     PortalVariableMemory portalmem;
228     MemoryContext old;
229     VRelList vrl, cur;
230     Datum d;
231     char *rname;
232     char rkind;
233     int16 smgrno;
234     bool n;
235     ScanKeyData  pgckey;
236     bool found = false;
237
238     StartTransactionCommand();
239
240     if (VacRelP->data) {
241         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
242                                NameEqualRegProcedure, 
243                                PointerGetDatum(VacRelP->data));
244     } else {
245         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
246                                CharacterEqualRegProcedure, CharGetDatum('r'));
247     }
248
249     portalmem = PortalGetVariableMemory(p);
250     vrl = cur = (VRelList) NULL;
251
252     pgclass = heap_openr(RelationRelationName);
253     pgcdesc = RelationGetTupleDescriptor(pgclass);
254
255     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
256
257     while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
258
259         found = true;
260         
261         /*
262          *  We have to be careful not to vacuum the archive (since it
263          *  already contains vacuumed tuples), and not to vacuum
264          *  relations on write-once storage managers like the Sony
265          *  jukebox at Berkeley.
266          */
267
268         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
269                                  pgcdesc, &n);
270         rname = (char*)d;
271
272         /* skip archive relations */
273         if (_vc_isarchrel(rname)) {
274             ReleaseBuffer(buf);
275             continue;
276         }
277
278         /* don't vacuum large objects for now - something breaks when we do */
279         if ( (strlen(rname) > 4) && rname[0] == 'X' &&
280                 rname[1] == 'i' && rname[2] == 'n' &&
281                 (rname[3] == 'v' || rname[3] == 'x'))
282         {
283             elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
284                         NAMEDATALEN, rname);
285             ReleaseBuffer(buf);
286             continue;
287         }
288
289         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
290                                  pgcdesc, &n);
291         smgrno = DatumGetInt16(d);
292
293         /* skip write-once storage managers */
294         if (smgriswo(smgrno)) {
295             ReleaseBuffer(buf);
296             continue;
297         }
298
299         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
300                                  pgcdesc, &n);
301
302         rkind = DatumGetChar(d);
303
304         /* skip system relations */
305         if (rkind != 'r') {
306             ReleaseBuffer(buf);
307             elog(NOTICE, "Vacuum: can not process index and certain system tables" );
308             continue;
309         }
310                                  
311         /* get a relation list entry for this guy */
312         old = MemoryContextSwitchTo((MemoryContext)portalmem);
313         if (vrl == (VRelList) NULL) {
314             vrl = cur = (VRelList) palloc(sizeof(VRelListData));
315         } else {
316             cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
317             cur = cur->vrl_next;
318         }
319         (void) MemoryContextSwitchTo(old);
320
321         cur->vrl_relid = pgctup->t_oid;
322         cur->vrl_attlist = (VAttList) NULL;
323         cur->vrl_npages = cur->vrl_ntups = 0;
324         cur->vrl_hasindex = false;
325         cur->vrl_next = (VRelList) NULL;
326
327         /* wei hates it if you forget to do this */
328         ReleaseBuffer(buf);
329     }
330     if (found == false)
331         elog(NOTICE, "Vacuum: table not found" );
332
333     
334     heap_close(pgclass);
335     heap_endscan(pgcscan);
336
337     CommitTransactionCommand();
338
339     return (vrl);
340 }
341
342 /*
343  *  _vc_vacone() -- vacuum one heap relation
344  *
345  *      This routine vacuums a single heap, cleans out its indices, and
346  *      updates its statistics npages and ntuples statistics.
347  *
348  *      Doing one heap at a time incurs extra overhead, since we need to
349  *      check that the heap exists again just before we vacuum it.  The
350  *      reason that we do this is so that vacuuming can be spread across
351  *      many small transactions.  Otherwise, two-phase locking would require
352  *      us to lock the entire database during one pass of the vacuum cleaner.
353  */
354 static void
355 _vc_vacone (VRelList curvrl)
356 {
357     Relation pgclass;
358     TupleDesc pgcdesc;
359     HeapTuple pgctup;
360     Buffer pgcbuf;
361     HeapScanDesc pgcscan;
362     Relation onerel;
363     ScanKeyData pgckey;
364     VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
365     VPageListData Fvpl; /* List of pages with space enough for re-using */
366     VPageDescr *vpp;
367     Relation *Irel;
368     int nindices;
369     int i;
370
371     StartTransactionCommand();
372
373     ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
374                            ObjectIdEqualRegProcedure,
375                            ObjectIdGetDatum(curvrl->vrl_relid));
376
377     pgclass = heap_openr(RelationRelationName);
378     pgcdesc = RelationGetTupleDescriptor(pgclass);
379     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
380
381     /*
382      *  Race condition -- if the pg_class tuple has gone away since the
383      *  last time we saw it, we don't need to vacuum it.
384      */
385
386     if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
387         heap_endscan(pgcscan);
388         heap_close(pgclass);
389         CommitTransactionCommand();
390         return;
391     }
392
393     /* now open the class and vacuum it */
394     onerel = heap_open(curvrl->vrl_relid);
395
396     /* we require the relation to be locked until the indices are cleaned */
397     RelationSetLockForWrite(onerel);
398
399     /* scan it */
400     Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
401     _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl);
402
403     /* Now open/count indices */
404     Irel = (Relation *) NULL;
405     if ( Vvpl.vpl_npages > 0 )
406                     /* Open all indices of this relation */
407         _vc_getindices(curvrl->vrl_relid, &nindices, &Irel);
408     else
409                     /* Count indices only */
410         _vc_getindices(curvrl->vrl_relid, &nindices, NULL);
411     
412     if ( nindices > 0 )
413         curvrl->vrl_hasindex = true;
414     else
415         curvrl->vrl_hasindex = false;
416
417     /* Clean index' relation(s) */
418     if ( Irel != (Relation*) NULL )
419     {
420         for (i = 0; i < nindices; i++)
421             _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups);
422     }
423
424     if ( Fvpl.vpl_npages > 0 )          /* Try to shrink heap */
425         _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel);
426     else if ( Vvpl.vpl_npages > 0 )     /* Clean pages from Vvpl list */
427     {
428         if ( Irel != (Relation*) NULL )
429             _vc_clsindices (nindices, Irel);
430         _vc_vacheap (curvrl, onerel, &Vvpl);
431     }
432
433     /* ok - free Vvpl list of reapped pages */
434     if ( Vvpl.vpl_npages > 0 )
435     {
436         vpp = Vvpl.vpl_pgdesc;
437         for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
438             pfree(*vpp);
439         pfree (Vvpl.vpl_pgdesc);
440         if ( Fvpl.vpl_npages > 0 )
441             pfree (Fvpl.vpl_pgdesc);
442     }
443
444     /* all done with this class */
445     heap_close(onerel);
446     heap_endscan(pgcscan);
447     heap_close(pgclass);
448
449     /* update statistics in pg_class */
450     _vc_updstats(curvrl->vrl_relid, curvrl->vrl_npages, curvrl->vrl_ntups,
451                  curvrl->vrl_hasindex);
452
453     CommitTransactionCommand();
454 }
455
456 /*
457  *  _vc_scanheap() -- scan an open heap relation
458  *
459  *      This routine sets commit times, constructs Vvpl list of 
460  *      empty/uninitialized pages and pages with dead tuples and
461  *      ~LP_USED line pointers, constructs Fvpl list of pages
462  *      appropriate for purposes of shrinking and maintains statistics 
463  *      on the number of live tuples in a heap.
464  */
465 static void
466 _vc_scanheap (VRelList curvrl, Relation onerel, 
467                         VPageList Vvpl, VPageList Fvpl)
468 {
469     int nblocks, blkno;
470     ItemId itemid;
471     ItemPointer itemptr;
472     HeapTuple htup;
473     Buffer buf;
474     Page page, tempPage = NULL;
475     OffsetNumber offnum, maxoff;
476     bool pgchanged, tupgone, dobufrel, notup;
477     AbsoluteTime purgetime, expiretime;
478     RelativeTime preservetime;
479     char *relname;
480     VPageDescr vpc, vp;
481     uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
482     Size frsize, frsusf;
483     Size min_tlen = MAXTUPLEN;
484     Size max_tlen = 0;
485     int i;
486     struct rusage ru0, ru1;
487
488     getrusage(RUSAGE_SELF, &ru0);
489
490     nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
491     frsize = frsusf = 0;
492     
493     relname = (RelationGetRelationName(onerel))->data;
494
495     nblocks = RelationGetNumberOfBlocks(onerel);
496
497     /* calculate the purge time: tuples that expired before this time
498        will be archived or deleted */
499     purgetime = GetCurrentTransactionStartTime();
500     expiretime = (AbsoluteTime)onerel->rd_rel->relexpires;
501     preservetime = (RelativeTime)onerel->rd_rel->relpreserved;
502
503     if (RelativeTimeIsValid(preservetime) && (preservetime)) {
504         purgetime -= preservetime;
505         if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime) &&
506             expiretime > purgetime)
507             purgetime = expiretime;
508     }
509
510     else if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime))
511         purgetime = expiretime;
512
513     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
514     vpc->vpd_nusd = 0;
515             
516     for (blkno = 0; blkno < nblocks; blkno++) {
517         buf = ReadBuffer(onerel, blkno);
518         page = BufferGetPage(buf);
519         vpc->vpd_blkno = blkno;
520         vpc->vpd_noff = 0;
521
522         if (PageIsNew(page)) {
523             elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
524                 NAMEDATALEN, relname, blkno);
525             PageInit (page, BufferGetPageSize (buf), 0);
526             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
527             frsize += (vpc->vpd_free - sizeof (ItemIdData));
528             nnepg++;
529             nemend++;
530             _vc_reappage (Vvpl, vpc);
531             WriteBuffer(buf);
532             continue;
533         }
534
535         if (PageIsEmpty(page)) {
536             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
537             frsize += (vpc->vpd_free - sizeof (ItemIdData));
538             nempg++;
539             nemend++;
540             _vc_reappage (Vvpl, vpc);
541             ReleaseBuffer(buf);
542             continue;
543         }
544
545         pgchanged = false;
546         notup = true;
547         maxoff = PageGetMaxOffsetNumber(page);
548         for (offnum = FirstOffsetNumber;
549              offnum <= maxoff;
550              offnum = OffsetNumberNext(offnum)) {
551             itemid = PageGetItemId(page, offnum);
552
553             /*
554              * Collect un-used items too - it's possible to have
555              * indices pointing here after crash.
556              */
557             if (!ItemIdIsUsed(itemid)) {
558                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
559                 nunused++;
560                 continue;
561             }
562
563             htup = (HeapTuple) PageGetItem(page, itemid);
564             tupgone = false;
565
566             if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) && 
567                 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
568
569                 if (TransactionIdDidAbort(htup->t_xmin)) {
570                     tupgone = true;
571                 } else if (TransactionIdDidCommit(htup->t_xmin)) {
572                     htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
573                     pgchanged = true;
574                 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
575                     /* 
576                      * Not Aborted, Not Committed, Not in Progress -
577                      * so it from crashed process. - vadim 11/26/96
578                      */
579                     ncrash++;
580                     tupgone = true;
581                 }
582                 else {
583                     elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
584                         NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
585                 }
586             }
587
588             if (TransactionIdIsValid((TransactionId)htup->t_xmax)) {
589                 if (TransactionIdDidAbort(htup->t_xmax)) {
590                     StoreInvalidTransactionId(&(htup->t_xmax));
591                     pgchanged = true;
592                 } else if (TransactionIdDidCommit(htup->t_xmax)) {
593                     if (!AbsoluteTimeIsBackwardCompatiblyReal(htup->t_tmax)) {
594
595                         htup->t_tmax = TransactionIdGetCommitTime(htup->t_xmax);  
596                         pgchanged = true;
597                     }
598
599                     /*
600                      *  Reap the dead tuple if its expiration time is
601                      *  before purgetime.
602                      */
603
604                     if (htup->t_tmax < purgetime) {
605                         tupgone = true;
606                     }
607                 }
608             }
609
610             /*
611              * Is it possible at all ? - vadim 11/26/96
612              */
613             if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
614             {
615                 elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
616 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
617                         TransactionIdIsValid((TransactionId)htup->t_xmax),
618                         tupgone);
619             }
620             
621             /*
622              * It's possibly! But from where it comes ?
623              * And should we fix it ?  - vadim 11/28/96
624              */
625             itemptr = &(htup->t_ctid);
626             if ( !ItemPointerIsValid (itemptr) || 
627                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
628             {
629                 elog (NOTICE, "ITEM POINTER IS INVALID: %u/%u FOR %u/%u. TUPGONE %d.", 
630                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)), 
631                         itemptr->ip_posid, blkno, offnum, tupgone);
632             }
633
634             /*
635              * Other checks...
636              */
637             if ( htup->t_len != itemid->lp_len )
638             {
639                 elog (NOTICE, "PAGEHEADER' LEN %u IS NOT THE SAME AS HTUP' %u FOR %u/%u.TUPGONE %d.", 
640                         itemid->lp_len, htup->t_len, blkno, offnum, tupgone);
641             }
642             if ( !OidIsValid(htup->t_oid) )
643             {
644                 elog (NOTICE, "OID IS INVALID FOR %u/%u.TUPGONE %d.", 
645                         blkno, offnum, tupgone);
646             }
647             
648             if (tupgone) {
649                 ItemId lpp;
650                                                     
651                 if ( tempPage == (Page) NULL )
652                 {
653                     Size pageSize;
654                     
655                     pageSize = PageGetPageSize(page);
656                     tempPage = (Page) palloc(pageSize);
657                     memmove (tempPage, page, pageSize);
658                 }
659                 
660                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
661
662                 /* mark it unused */
663                 lpp->lp_flags &= ~LP_USED;
664
665                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
666                 nvac++;
667
668             } else {
669                 ntups++;
670                 notup = false;
671                 if ( htup->t_len < min_tlen )
672                     min_tlen = htup->t_len;
673                 if ( htup->t_len > max_tlen )
674                     max_tlen = htup->t_len;
675             }
676         }
677
678         if (pgchanged) {
679             WriteBuffer(buf);
680             dobufrel = false;
681             nchpg++;
682         }
683         else
684             dobufrel = true;
685         if ( tempPage != (Page) NULL )
686         { /* Some tuples are gone */
687             PageRepairFragmentation(tempPage);
688             vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
689             frsize += vpc->vpd_free;
690             _vc_reappage (Vvpl, vpc);
691             pfree (tempPage);
692             tempPage = (Page) NULL;
693         }
694         else if ( vpc->vpd_noff > 0 )
695         { /* there are only ~LP_USED line pointers */
696             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
697             frsize += vpc->vpd_free;
698             _vc_reappage (Vvpl, vpc);
699         }
700         if ( dobufrel )
701             ReleaseBuffer(buf);
702         if ( notup )
703             nemend++;
704         else
705             nemend = 0;
706     }
707
708     pfree (vpc);
709
710     /* save stats in the rel list for use later */
711     curvrl->vrl_ntups = ntups;
712     curvrl->vrl_npages = nblocks;
713     if ( ntups == 0 )
714         min_tlen = max_tlen = 0;
715     curvrl->vrl_min_tlen = min_tlen;
716     curvrl->vrl_max_tlen = max_tlen;
717     
718     Vvpl->vpl_nemend = nemend;
719     Fvpl->vpl_nemend = nemend;
720
721     /* 
722      * Try to make Fvpl keeping in mind that we can't use free space 
723      * of "empty" end-pages and last page if it reapped.
724      */
725     if ( Vvpl->vpl_npages - nemend > 0 )
726     {
727         int nusf;               /* blocks usefull for re-using */
728         
729         nusf = Vvpl->vpl_npages - nemend;
730         if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
731             nusf--;
732     
733         for (i = 0; i < nusf; i++)
734         {
735             vp = Vvpl->vpl_pgdesc[i];
736             if ( _vc_enough_space (vp, min_tlen) )
737             {
738                 _vc_vpinsert (Fvpl, vp);
739                 frsusf += vp->vpd_free;
740             }
741         }
742     }
743
744     getrusage(RUSAGE_SELF, &ru1);
745     
746     elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
747 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
748         NAMEDATALEN, relname, 
749         nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
750         ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
751         frsize, frsusf, nemend, Fvpl->vpl_npages,
752         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
753         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
754
755 } /* _vc_scanheap */
756
757
758 /*
759  *  _vc_rpfheap() -- try to repaire relation' fragmentation
760  *
761  *      This routine marks dead tuples as unused and tries re-use dead space
762  *      by moving tuples (and inserting indices if needed). It constructs 
763  *      Nvpl list of free-ed pages (moved tuples) and clean indices
764  *      for them after committing (in hack-manner - without losing locks
765  *      and freeing memory!) current transaction. It truncates relation
766  *      if some end-blocks are gone away.
767  */
768 static void
769 _vc_rpfheap (VRelList curvrl, Relation onerel, 
770                 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
771 {
772     TransactionId myXID;
773     CommandId myCID;
774     AbsoluteTime myCTM = 0;
775     Buffer buf, ToBuf;
776     int nblocks, blkno;
777     Page page, ToPage = NULL;
778     OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
779     ItemId itemid, newitemid;
780     HeapTuple htup, newtup;
781     TupleDesc tupdesc = NULL;
782     Datum *idatum = NULL;
783     char *inulls = NULL;
784     InsertIndexResult iresult;
785     VPageListData Nvpl;
786     VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
787     int ToVpI = 0;
788     IndDesc *Idesc, *idcur;
789     int Fblklast, Vblklast, i;
790     Size tlen;
791     int nmoved, Fnpages, Vnpages;
792     int nchkmvd, ntups;
793     bool isempty, dowrite;
794     Relation archrel;
795     struct rusage ru0, ru1;
796
797     getrusage(RUSAGE_SELF, &ru0);
798
799     myXID = GetCurrentTransactionId();
800     myCID = GetCurrentCommandId();
801         
802     if ( Irel != (Relation*) NULL )     /* preparation for index' inserts */
803     {
804         _vc_mkindesc (onerel, nindices, Irel, &Idesc);
805         tupdesc = RelationGetTupleDescriptor(onerel);
806         idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
807         inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
808     }
809
810     /* if the relation has an archive, open it */
811     if (onerel->rd_rel->relarch != 'n')
812     {
813         archrel = _vc_getarchrel(onerel);
814         /* Archive tuples from "empty" end-pages */
815         for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
816                                 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
817         {
818             if ( (*vpp)->vpd_noff > 0 )
819             {
820                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
821                 page = BufferGetPage(buf);
822                 Assert ( !PageIsEmpty(page) );
823                 _vc_vacpage (page, *vpp, archrel);
824                 WriteBuffer (buf);
825             }
826         }
827     }
828     else
829         archrel = (Relation) NULL;
830
831     Nvpl.vpl_npages = 0;
832     Fnpages = Fvpl->vpl_npages;
833     Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
834     Fblklast = Fvplast->vpd_blkno;
835     Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
836     Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
837     Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
838     Vblklast = Vvplast->vpd_blkno;
839     Assert ( Vblklast >= Fblklast );
840     ToBuf = InvalidBuffer;
841     nmoved = 0;
842
843     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
844     vpc->vpd_nusd = vpc->vpd_noff = 0;
845         
846     nblocks = curvrl->vrl_npages;
847     for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
848     {
849         /* if it's reapped page and it was used by me - quit */
850         if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
851             break;
852
853         buf = ReadBuffer(onerel, blkno);
854         page = BufferGetPage(buf);
855
856         vpc->vpd_noff = 0;
857
858         isempty = PageIsEmpty(page);
859
860         dowrite = false;
861         if ( blkno == Vblklast )                /* it's reapped page */
862         {
863             if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
864             {                                   /* on this page - clean */
865                 Assert ( ! isempty );
866                 _vc_vacpage (page, Vvplast, archrel);
867                 dowrite = true;
868             }
869             else
870                 Assert ( isempty );
871             Assert ( --Vnpages > 0 );
872             /* get prev reapped page from Vvpl */
873             Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
874             Vblklast = Vvplast->vpd_blkno;
875             if ( blkno == Fblklast )    /* this page in Fvpl too */
876             {
877                 Assert ( --Fnpages > 0 );
878                 Assert ( Fvplast->vpd_nusd == 0 );
879                 /* get prev reapped page from Fvpl */
880                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
881                 Fblklast = Fvplast->vpd_blkno;
882             }
883             Assert ( Fblklast <= Vblklast );
884             if ( isempty )
885             {
886                 ReleaseBuffer(buf);
887                 continue;
888             }
889         }
890         else
891         {
892             Assert ( ! isempty );
893         }
894
895         vpc->vpd_blkno = blkno;
896         maxoff = PageGetMaxOffsetNumber(page);
897         for (offnum = FirstOffsetNumber;
898                 offnum <= maxoff;
899                 offnum = OffsetNumberNext(offnum))
900         {
901             itemid = PageGetItemId(page, offnum);
902
903             if (!ItemIdIsUsed(itemid))
904                 continue;
905
906             htup = (HeapTuple) PageGetItem(page, itemid);
907             tlen = htup->t_len;
908                 
909             /* try to find new page for this tuple */
910             if ( ToBuf == InvalidBuffer ||
911                 ! _vc_enough_space (ToVpd, tlen) )
912             {
913                 if ( ToBuf != InvalidBuffer )
914                 {
915                     WriteBuffer(ToBuf);
916                     ToBuf = InvalidBuffer;
917                     /*
918                      * If no one tuple can't be added to this page -
919                      * remove page from Fvpl. - vadim 11/27/96
920                      */ 
921                     if ( !_vc_enough_space (ToVpd, curvrl->vrl_min_tlen) )
922                     {
923                         if ( ToVpd != Fvplast )
924                         {
925                             Assert ( Fnpages > ToVpI + 1 );
926                             memmove (Fvpl->vpl_pgdesc + ToVpI, 
927                                 Fvpl->vpl_pgdesc + ToVpI + 1, 
928                                 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
929                         }
930                         Assert ( Fnpages >= 1 );
931                         Fnpages--;
932                         if ( Fnpages == 0 )
933                             break;
934                         /* get prev reapped page from Fvpl */
935                         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
936                         Fblklast = Fvplast->vpd_blkno;
937                     }
938                 }
939                 for (i=0; i < Fnpages; i++)
940                 {
941                     if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
942                         break;
943                 }
944                 if ( i == Fnpages )
945                     break;                      /* can't move item anywhere */
946                 ToVpI = i;
947                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
948                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
949                 ToPage = BufferGetPage(ToBuf);
950                 /* if this page was not used before - clean it */
951                 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
952                     _vc_vacpage (ToPage, ToVpd, archrel);
953             }
954                 
955             /* copy tuple */
956             newtup = (HeapTuple) palloc (tlen);
957             memmove((char *) newtup, (char *) htup, tlen);
958
959             /* store transaction information */
960             TransactionIdStore(myXID, &(newtup->t_xmin));
961             newtup->t_cmin = myCID;
962             StoreInvalidTransactionId(&(newtup->t_xmax));
963             newtup->t_tmin = INVALID_ABSTIME;
964             newtup->t_tmax = CURRENT_ABSTIME;
965             ItemPointerSetInvalid(&newtup->t_chain);
966
967             /* add tuple to the page */
968             newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
969                                 InvalidOffsetNumber, LP_USED);
970             if ( newoff == InvalidOffsetNumber )
971             {
972                 elog (WARN, "\
973 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
974                 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
975                 ToVpd->vpd_nusd, ToVpd->vpd_noff);
976             }
977             newitemid = PageGetItemId(ToPage, newoff);
978             pfree (newtup);
979             newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
980             ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
981
982             /* now logically delete end-tuple */
983             TransactionIdStore(myXID, &(htup->t_xmax));
984             htup->t_cmax = myCID;
985             memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
986
987             ToVpd->vpd_nusd++;
988             nmoved++;
989             ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
990             vpc->vpd_voff[vpc->vpd_noff++] = offnum;
991                 
992             /* insert index' tuples if needed */
993             if ( Irel != (Relation*) NULL )
994             {
995                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
996                 {
997                     FormIndexDatum (
998                                 idcur->natts,
999                                 (AttrNumber *)&(idcur->tform->indkey[0]),
1000                                 newtup, 
1001                                 tupdesc,
1002                                 InvalidBuffer,
1003                                 idatum,
1004                                 inulls,
1005                                 idcur->finfoP);
1006                     iresult = index_insert (
1007                                 Irel[i],
1008                                 idatum,
1009                                 inulls,
1010                                 &(newtup->t_ctid),
1011                                 onerel);
1012                     if (iresult) pfree(iresult);
1013                 }
1014             }
1015                 
1016         } /* walk along page */
1017
1018         if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
1019         {
1020             _vc_reappage (&Nvpl, vpc);
1021             WriteBuffer(buf);
1022         }
1023         else if ( dowrite )
1024             WriteBuffer(buf);
1025         else
1026             ReleaseBuffer(buf);
1027             
1028         if ( offnum <= maxoff )
1029             break;                              /* some item(s) left */
1030             
1031     } /* walk along relation */
1032         
1033     blkno++;                            /* new number of blocks */
1034
1035     if ( ToBuf != InvalidBuffer )
1036     {
1037         Assert (nmoved > 0);
1038         WriteBuffer(ToBuf);
1039     }
1040
1041     if ( nmoved > 0 )
1042     {
1043         /* 
1044          * We have to commit our tuple' movings before we'll truncate 
1045          * relation, but we shouldn't lose our locks. And so - quick hack: 
1046          * flush buffers and record status of current transaction
1047          * as committed, and continue. - vadim 11/13/96
1048          */
1049         FlushBufferPool(!TransactionFlushEnabled());
1050         TransactionIdCommit(myXID);
1051         FlushBufferPool(!TransactionFlushEnabled());
1052         myCTM = TransactionIdGetCommitTime(myXID);
1053     }
1054         
1055     /* 
1056      * Clean uncleaned reapped pages from Vvpl list 
1057      * and set commit' times  for inserted tuples
1058      */
1059     nchkmvd = 0;
1060     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1061     {
1062         Assert ( (*vpp)->vpd_blkno < blkno );
1063         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1064         page = BufferGetPage(buf);
1065         if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
1066         {
1067             /* noff == 0 in empty pages only - such pages should be re-used */
1068             Assert ( (*vpp)->vpd_noff > 0 );
1069             _vc_vacpage (page, *vpp, archrel);
1070         }
1071         else                            /* this page was used */
1072         {
1073             ntups = 0;
1074             moff = PageGetMaxOffsetNumber(page);
1075             for (newoff = FirstOffsetNumber;
1076                         newoff <= moff;
1077                         newoff = OffsetNumberNext(newoff))
1078             {
1079                 itemid = PageGetItemId(page, newoff);
1080                 if (!ItemIdIsUsed(itemid))
1081                     continue;
1082                 htup = (HeapTuple) PageGetItem(page, itemid);
1083                 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1084                 {
1085                     htup->t_tmin = myCTM;
1086                     ntups++;
1087                 }
1088             }
1089             Assert ( (*vpp)->vpd_nusd == ntups );
1090             nchkmvd += ntups;
1091         }
1092         WriteBuffer (buf);
1093     }
1094     Assert ( nmoved == nchkmvd );
1095
1096     getrusage(RUSAGE_SELF, &ru1);
1097     
1098     elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
1099 Elapsed %u/%u sec.",
1100                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1101                 nblocks, blkno, nmoved,
1102                 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1103                 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1104
1105     if ( Nvpl.vpl_npages > 0 )
1106     {
1107         /* vacuum indices again if needed */
1108         if ( Irel != (Relation*) NULL )
1109         {
1110             VPageDescr *vpleft, *vpright, vpsave;
1111                 
1112             /* re-sort Nvpl.vpl_pgdesc */
1113             for (vpleft = Nvpl.vpl_pgdesc, 
1114                 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1115                 vpleft < vpright; vpleft++, vpright--)
1116             {
1117                 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1118             }
1119             for (i = 0; i < nindices; i++)
1120                 _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups);
1121         }
1122
1123         /* 
1124          * clean moved tuples from last page in Nvpl list
1125          * if some tuples left there
1126          */
1127         if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1128         {
1129             Assert (vpc->vpd_blkno == blkno - 1);
1130             buf = ReadBuffer(onerel, vpc->vpd_blkno);
1131             page = BufferGetPage (buf);
1132             ntups = 0;
1133             maxoff = offnum;
1134             for (offnum = FirstOffsetNumber;
1135                         offnum < maxoff;
1136                         offnum = OffsetNumberNext(offnum))
1137             {
1138                 itemid = PageGetItemId(page, offnum);
1139                 if (!ItemIdIsUsed(itemid))
1140                     continue;
1141                 htup = (HeapTuple) PageGetItem(page, itemid);
1142                 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1143                 itemid->lp_flags &= ~LP_USED;
1144                 ntups++;
1145             }
1146             Assert ( vpc->vpd_noff == ntups );
1147             PageRepairFragmentation(page);
1148             WriteBuffer (buf);
1149         }
1150
1151         /* now - free new list of reapped pages */
1152         vpp = Nvpl.vpl_pgdesc;
1153         for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1154             pfree(*vpp);
1155         pfree (Nvpl.vpl_pgdesc);
1156     }
1157         
1158     /* truncate relation */
1159     if ( blkno < nblocks )
1160     {
1161         blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1162         Assert ( blkno >= 0 );
1163         curvrl->vrl_npages = blkno;     /* set new number of blocks */
1164     }
1165
1166     if ( archrel != (Relation) NULL )
1167         heap_close(archrel);
1168
1169     if ( Irel != (Relation*) NULL )     /* pfree index' allocations */
1170     {
1171         pfree (Idesc);
1172         pfree (idatum);
1173         pfree (inulls);
1174         _vc_clsindices (nindices, Irel);
1175     }
1176
1177     pfree (vpc);
1178
1179 } /* _vc_rpfheap */
1180
1181 /*
1182  *  _vc_vacheap() -- free dead tuples
1183  *
1184  *      This routine marks dead tuples as unused and truncates relation
1185  *      if there are "empty" end-blocks.
1186  */
1187 static void
1188 _vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl)
1189 {
1190     Buffer buf;
1191     Page page;
1192     VPageDescr *vpp;
1193     Relation archrel;
1194     int nblocks;
1195     int i;
1196
1197     nblocks = Vvpl->vpl_npages;
1198     /* if the relation has an archive, open it */
1199     if (onerel->rd_rel->relarch != 'n')
1200         archrel = _vc_getarchrel(onerel);
1201     else
1202     {
1203         archrel = (Relation) NULL;
1204         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1205     }
1206         
1207     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1208     {
1209         if ( (*vpp)->vpd_noff > 0 )
1210         {
1211             buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1212             page = BufferGetPage (buf);
1213             _vc_vacpage (page, *vpp, archrel);
1214             WriteBuffer (buf);
1215         }
1216     }
1217
1218     /* truncate relation if there are some empty end-pages */
1219     if ( Vvpl->vpl_nemend > 0 )
1220     {
1221         Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend );
1222         nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend;
1223         elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.",
1224                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1225                 curvrl->vrl_npages, nblocks);
1226
1227         /* 
1228          * we have to flush "empty" end-pages (if changed, but who knows it)
1229          * before truncation 
1230          */
1231         FlushBufferPool(!TransactionFlushEnabled());
1232
1233         nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1234         Assert ( nblocks >= 0 );
1235         curvrl->vrl_npages = nblocks;   /* set new number of blocks */
1236     }
1237
1238     if ( archrel != (Relation) NULL )
1239         heap_close(archrel);
1240
1241 } /* _vc_vacheap */
1242
1243 /*
1244  *  _vc_vacpage() -- free (and archive if needed) dead tuples on a page
1245  *                   and repaire its fragmentation.
1246  */
1247 static void
1248 _vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1249 {
1250     ItemId itemid;
1251     HeapTuple htup;
1252     int i;
1253     
1254     Assert ( vpd->vpd_nusd == 0 );
1255     for (i=0; i < vpd->vpd_noff; i++)
1256     {
1257         itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1258         if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1259         {
1260             htup = (HeapTuple) PageGetItem (page, itemid);
1261             _vc_archive (archrel, htup);
1262         }
1263         itemid->lp_flags &= ~LP_USED;
1264     }
1265     PageRepairFragmentation(page);
1266
1267 } /* _vc_vacpage */
1268
1269 /*
1270  *  _vc_vaconeind() -- vacuum one index relation.
1271  *
1272  *      Vpl is the VPageList of the heap we're currently vacuuming.
1273  *      It's locked. Indrel is an index relation on the vacuumed heap. 
1274  *      We don't set locks on the index relation here, since the indexed 
1275  *      access methods support locking at different granularities. 
1276  *      We let them handle it.
1277  *
1278  *      Finally, we arrange to update the index relation's statistics in
1279  *      pg_class.
1280  */
1281 static void
1282 _vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1283 {
1284     RetrieveIndexResult res;
1285     IndexScanDesc iscan;
1286     ItemPointer heapptr;
1287     int nvac;
1288     int nitups;
1289     int nipages;
1290     VPageDescr vp;
1291     struct rusage ru0, ru1;
1292
1293     getrusage(RUSAGE_SELF, &ru0);
1294
1295     /* walk through the entire index */
1296     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1297     nvac = 0;
1298     nitups = 0;
1299
1300     while ((res = index_getnext(iscan, ForwardScanDirection))
1301            != (RetrieveIndexResult) NULL) {
1302         heapptr = &res->heap_iptr;
1303
1304         if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1305         {
1306 #if 0
1307             elog(DEBUG, "<%x,%x> -> <%x,%x>",
1308                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1309                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1310                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1311                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1312 #endif
1313             if ( vp->vpd_noff == 0 ) 
1314             {                           /* this is EmptyPage !!! */
1315                 elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
1316                         NAMEDATALEN, indrel->rd_rel->relname.data,
1317                         vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1318             }
1319             ++nvac;
1320             index_delete(indrel, &res->index_iptr);
1321         } else {
1322             nitups++;
1323         }
1324
1325         /* be tidy */
1326         pfree(res);
1327     }
1328
1329     index_endscan(iscan);
1330
1331     /* now update statistics in pg_class */
1332     nipages = RelationGetNumberOfBlocks(indrel);
1333     _vc_updstats(indrel->rd_id, nipages, nitups, false);
1334
1335     getrusage(RUSAGE_SELF, &ru1);
1336
1337     elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1338         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
1339         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1340         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1341
1342     if ( nitups != nhtups )
1343         elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1344                 nitups, nhtups);
1345
1346 } /* _vc_vaconeind */
1347
1348 /*
1349  *  _vc_tidreapped() -- is a particular tid reapped?
1350  *
1351  *      vpl->VPageDescr_array is sorted in right order.
1352  */
1353 static VPageDescr
1354 _vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1355 {
1356     OffsetNumber ioffno;
1357     OffsetNumber *voff;
1358     VPageDescr vp, *vpp;
1359     VPageDescrData vpd;
1360
1361     vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1362     ioffno = ItemPointerGetOffsetNumber(itemptr);
1363         
1364     vp = &vpd;
1365     vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc), 
1366                 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
1367                 _vc_cmp_blk);
1368
1369     if ( vpp == (VPageDescr*) NULL )
1370         return ((VPageDescr)NULL);
1371     vp = *vpp;
1372
1373     /* ok - we are on true page */
1374
1375     if ( vp->vpd_noff == 0 ) {          /* this is EmptyPage !!! */
1376         return (vp);
1377     }
1378     
1379     voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
1380                 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
1381                 _vc_cmp_offno);
1382
1383     if ( voff == (OffsetNumber*) NULL )
1384         return ((VPageDescr)NULL);
1385
1386     return (vp);
1387
1388 } /* _vc_tidreapped */
1389
1390 /*
1391  *  _vc_updstats() -- update pg_class statistics for one relation
1392  *
1393  *      This routine works for both index and heap relation entries in
1394  *      pg_class.  We violate no-overwrite semantics here by storing new
1395  *      values for ntuples, npages, and hasindex directly in the pg_class
1396  *      tuple that's already on the page.  The reason for this is that if
1397  *      we updated these tuples in the usual way, then every tuple in pg_class
1398  *      would be replaced every day.  This would make planning and executing
1399  *      historical queries very expensive.
1400  */
1401 static void
1402 _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex)
1403 {
1404     Relation rd;
1405     HeapScanDesc sdesc;
1406     HeapTuple tup;
1407     Buffer buf;
1408     Form_pg_class pgcform;
1409     ScanKeyData skey;
1410
1411     /*
1412      * update number of tuples and number of pages in pg_class
1413      */
1414     ScanKeyEntryInitialize(&skey, 0x0, ObjectIdAttributeNumber,
1415                            ObjectIdEqualRegProcedure,
1416                            ObjectIdGetDatum(relid));
1417
1418     rd = heap_openr(RelationRelationName);
1419     sdesc = heap_beginscan(rd, false, NowTimeQual, 1, &skey);
1420
1421     if (!HeapTupleIsValid(tup = heap_getnext(sdesc, 0, &buf)))
1422         elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1423                    relid);
1424
1425     /* overwrite the existing statistics in the tuple */
1426     _vc_setpagelock(rd, BufferGetBlockNumber(buf));
1427     pgcform = (Form_pg_class) GETSTRUCT(tup);
1428     pgcform->reltuples = ntuples;
1429     pgcform->relpages = npages;
1430     pgcform->relhasindex = hasindex;
1431  
1432     /* XXX -- after write, should invalidate relcache in other backends */
1433     WriteNoReleaseBuffer(buf);  /* heap_endscan release scan' buffers ? */
1434
1435     /* invalidating system relations confuses the function cache
1436        of pg_operator and pg_opclass */
1437     if ( !IsSystemRelationName(pgcform->relname.data))
1438         RelationInvalidateHeapTuple(rd, tup);
1439
1440     /* that's all, folks */
1441     heap_endscan(sdesc);
1442     heap_close(rd);
1443
1444 }
1445
1446 static void _vc_setpagelock(Relation rel, BlockNumber blkno)
1447 {
1448     ItemPointerData itm;
1449
1450     ItemPointerSet(&itm, blkno, 1);
1451
1452     RelationSetLockForWritePage(rel, &itm);
1453 }
1454
1455
1456 /*
1457  *  _vc_reappage() -- save a page on the array of reapped pages.
1458  *
1459  *      As a side effect of the way that the vacuuming loop for a given
1460  *      relation works, higher pages come after lower pages in the array
1461  *      (and highest tid on a page is last).
1462  */
1463 static void 
1464 _vc_reappage(VPageList vpl, VPageDescr vpc)
1465 {
1466     VPageDescr newvpd;
1467
1468     /* allocate a VPageDescrData entry */
1469     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1470
1471     /* fill it in */
1472     if ( vpc->vpd_noff > 0 )
1473         memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1474     newvpd->vpd_blkno = vpc->vpd_blkno;
1475     newvpd->vpd_free = vpc->vpd_free;
1476     newvpd->vpd_nusd = vpc->vpd_nusd;
1477     newvpd->vpd_noff = vpc->vpd_noff;
1478
1479     /* insert this page into vpl list */
1480     _vc_vpinsert (vpl, newvpd);
1481     
1482 } /* _vc_reappage */
1483
1484 static void
1485 _vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1486 {
1487
1488     /* allocate a VPageDescr entry if needed */
1489     if ( vpl->vpl_npages == 0 )
1490         vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1491     else if ( vpl->vpl_npages % 100 == 0 )
1492         vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1493     vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1494     (vpl->vpl_npages)++;
1495     
1496 }
1497
1498 static void
1499 _vc_free(Portal p, VRelList vrl)
1500 {
1501     VRelList p_vrl;
1502     VAttList p_val, val;
1503     MemoryContext old;
1504     PortalVariableMemory pmem;
1505
1506     pmem = PortalGetVariableMemory(p);
1507     old = MemoryContextSwitchTo((MemoryContext)pmem);
1508
1509     while (vrl != (VRelList) NULL) {
1510
1511         /* free attribute list */
1512         val = vrl->vrl_attlist;
1513         while (val != (VAttList) NULL) {
1514             p_val = val;
1515             val = val->val_next;
1516             pfree(p_val);
1517         }
1518
1519         /* free rel list entry */
1520         p_vrl = vrl;
1521         vrl = vrl->vrl_next;
1522         pfree(p_vrl);
1523     }
1524
1525     (void) MemoryContextSwitchTo(old);
1526 }
1527
1528 /*
1529  *  _vc_getarchrel() -- open the archive relation for a heap relation
1530  *
1531  *      The archive relation is named 'a,XXXXX' for the heap relation
1532  *      whose relid is XXXXX.
1533  */
1534
1535 #define ARCHIVE_PREFIX  "a,"
1536
1537 static Relation
1538 _vc_getarchrel(Relation heaprel)
1539 {
1540     Relation archrel;
1541     char *archrelname;
1542
1543     archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
1544     sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
1545
1546     archrel = heap_openr(archrelname);
1547
1548     pfree(archrelname);
1549     return (archrel);
1550 }
1551
1552 /*
1553  *  _vc_archive() -- write a tuple to an archive relation
1554  *
1555  *      In the future, this will invoke the archived accessd method.  For
1556  *      now, archive relations are on mag disk.
1557  */
1558 static void
1559 _vc_archive(Relation archrel, HeapTuple htup)
1560 {
1561     doinsert(archrel, htup);
1562 }
1563
1564 static bool
1565 _vc_isarchrel(char *rname)
1566 {
1567     if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
1568         return (true);
1569
1570     return (false);
1571 }
1572
1573 static char *
1574 _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
1575 {
1576     int res;
1577     int last = nelem - 1;
1578     int celm = nelem / 2;
1579     bool last_move, first_move;
1580     
1581     last_move = first_move = true;
1582     for ( ; ; )
1583     {
1584         if ( first_move == true )
1585         {
1586             res = compar (bot, elm);
1587             if ( res > 0 )
1588                 return (NULL);
1589             if ( res == 0 )
1590                 return (bot);
1591             first_move = false;
1592         }
1593         if ( last_move == true )
1594         {
1595             res = compar (elm, bot + last*size);
1596             if ( res > 0 )
1597                 return (NULL);
1598             if ( res == 0 )
1599                 return (bot + last*size);
1600             last_move = false;
1601         }
1602         res = compar (elm, bot + celm*size);
1603         if ( res == 0 )
1604             return (bot + celm*size);
1605         if ( res < 0 )
1606         {
1607             if ( celm == 0 )
1608                 return (NULL);
1609             last = celm - 1;
1610             celm = celm / 2;
1611             last_move = true;
1612             continue;
1613         }
1614         
1615         if ( celm == last )
1616             return (NULL);
1617             
1618         last = last - celm - 1;
1619         bot = bot + (celm+1)*size;
1620         celm = (last + 1) / 2;
1621         first_move = true;
1622     }
1623
1624 } /* _vc_find_eq */
1625
1626 static int 
1627 _vc_cmp_blk (char *left, char *right)
1628 {
1629     BlockNumber lblk, rblk;
1630
1631     lblk = (*((VPageDescr*)left))->vpd_blkno;
1632     rblk = (*((VPageDescr*)right))->vpd_blkno;
1633
1634     if ( lblk < rblk )
1635         return (-1);
1636     if ( lblk == rblk )
1637         return (0);
1638     return (1);
1639
1640 } /* _vc_cmp_blk */
1641
1642 static int 
1643 _vc_cmp_offno (char *left, char *right)
1644 {
1645
1646     if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
1647         return (-1);
1648     if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
1649         return (0);
1650     return (1);
1651
1652 } /* _vc_cmp_offno */
1653
1654
1655 static void
1656 _vc_getindices (Oid relid, int *nindices, Relation **Irel)
1657 {
1658     Relation pgindex;
1659     Relation irel;
1660     TupleDesc pgidesc;
1661     HeapTuple pgitup;
1662     HeapScanDesc pgiscan;
1663     Datum d;
1664     int i, k;
1665     bool n;
1666     ScanKeyData pgikey;
1667     Oid *ioid;
1668
1669     *nindices = i = 0;
1670     
1671     ioid = (Oid *) palloc(10*sizeof(Oid));
1672
1673     /* prepare a heap scan on the pg_index relation */
1674     pgindex = heap_openr(IndexRelationName);
1675     pgidesc = RelationGetTupleDescriptor(pgindex);
1676
1677     ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
1678                            ObjectIdEqualRegProcedure,
1679                            ObjectIdGetDatum(relid));
1680
1681     pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
1682
1683     while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
1684         d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
1685                                  pgidesc, &n);
1686         i++;
1687         if ( i % 10 == 0 )
1688             ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
1689         ioid[i-1] = DatumGetObjectId(d);
1690     }
1691
1692     heap_endscan(pgiscan);
1693     heap_close(pgindex);
1694
1695     if ( i == 0 ) {     /* No one index found */
1696         pfree(ioid);
1697         return;
1698     }
1699
1700     if ( Irel != (Relation **) NULL )
1701         *Irel = (Relation *) palloc(i * sizeof(Relation));
1702     
1703     for (k = 0; i > 0; )
1704     {
1705         irel = index_open(ioid[--i]);
1706         if ( irel != (Relation) NULL )
1707         {
1708             if ( Irel != (Relation **) NULL )
1709                 (*Irel)[k] = irel;
1710             else
1711                 index_close (irel);
1712             k++;
1713         }
1714         else
1715             elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
1716     }
1717     *nindices = k;
1718     pfree(ioid);
1719
1720     if ( Irel != (Relation **) NULL && *nindices == 0 )
1721     {
1722         pfree (*Irel);
1723         *Irel = (Relation *) NULL;
1724     }
1725
1726 } /* _vc_getindices */
1727
1728
1729 static void
1730 _vc_clsindices (int nindices, Relation *Irel)
1731 {
1732
1733     if ( Irel == (Relation*) NULL )
1734         return;
1735
1736     while (nindices--) {
1737         index_close (Irel[nindices]);
1738     }
1739     pfree (Irel);
1740
1741 } /* _vc_clsindices */
1742
1743
1744 static void
1745 _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
1746 {
1747     IndDesc *idcur;
1748     HeapTuple pgIndexTup;
1749     AttrNumber *attnumP;
1750     int natts;
1751     int i;
1752
1753     *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
1754     
1755     for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
1756         pgIndexTup =
1757                 SearchSysCacheTuple(INDEXRELID,
1758                                 ObjectIdGetDatum(Irel[i]->rd_id),
1759                                 0,0,0);
1760         Assert(pgIndexTup);
1761         idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
1762         for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
1763                 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
1764                 attnumP++, natts++);
1765         if (idcur->tform->indproc != InvalidOid) {
1766             idcur->finfoP = &(idcur->finfo);
1767             FIgetnArgs(idcur->finfoP) = natts;
1768             natts = 1;
1769             FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
1770             *(FIgetname(idcur->finfoP)) = '\0';
1771         } else
1772             idcur->finfoP = (FuncIndexInfo *) NULL;
1773         
1774         idcur->natts = natts;
1775     }
1776     
1777 } /* _vc_mkindesc */
1778
1779
1780 static bool
1781 _vc_enough_space (VPageDescr vpd, Size len)
1782 {
1783
1784     len = DOUBLEALIGN(len);
1785
1786     if ( len > vpd->vpd_free )
1787         return (false);
1788     
1789     if ( vpd->vpd_nusd < vpd->vpd_noff )        /* there are free itemid(s) */
1790         return (true);                          /* and len <= free_space */
1791     
1792     /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
1793     if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
1794         return (true);
1795     
1796     return (false);
1797     
1798 } /* _vc_enough_space */