]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Closing opened indices.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *    the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.12 1997/01/05 10:58:15 vadim Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/file.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <utils/portal.h>
24 #include <access/genam.h>
25 #include <access/heapam.h>
26 #include <access/xact.h>
27 #include <storage/bufmgr.h>
28 #include <access/transam.h>
29 #include <catalog/pg_index.h>
30 #include <catalog/index.h>
31 #include <catalog/catname.h>
32 #include <catalog/pg_class.h>
33 #include <catalog/pg_proc.h>
34 #include <storage/smgr.h>
35 #include <storage/lmgr.h>
36 #include <utils/mcxt.h>
37 #include <utils/syscache.h>
38 #include <commands/vacuum.h>
39 #include <storage/bufpage.h>
40 #include "storage/shmem.h"
41 #ifdef NEED_RUSAGE
42 # include <rusagestub.h>
43 #else /* NEED_RUSAGE */
44 # include <sys/time.h>
45 # include <sys/resource.h>
46 #endif /* NEED_RUSAGE */
47
48 bool VacuumRunning =    false;
49
50 #ifdef VACUUM_QUIET
51 static int MESSLEV = DEBUG;
52 #else
53 static int MESSLEV = NOTICE;
54 #endif
55
56 typedef struct {
57     FuncIndexInfo       finfo;
58     FuncIndexInfo       *finfoP;
59     IndexTupleForm      tform;
60     int                 natts;
61 } IndDesc;
62
63 /* non-export function prototypes */
64 static void _vc_init(void);
65 static void _vc_shutdown(void);
66 static void _vc_vacuum(NameData *VacRelP);
67 static VRelList _vc_getrels(Portal p, NameData *VacRelP);
68 static void _vc_vacone (VRelList curvrl);
69 static void _vc_scanheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl);
70 static void _vc_rpfheap (VRelList curvrl, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
71 static void _vc_vacheap (VRelList curvrl, Relation onerel, VPageList vpl);
72 static void _vc_vacpage (Page page, VPageDescr vpd, Relation archrel);
73 static void _vc_vaconeind (VPageList vpl, Relation indrel, int nhtups);
74 static void _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex);
75 static void _vc_setpagelock(Relation rel, BlockNumber blkno);
76 static VPageDescr _vc_tidreapped (ItemPointer itemptr, VPageList curvrl);
77 static void _vc_reappage (VPageList vpl, VPageDescr vpc);
78 static void _vc_vpinsert (VPageList vpl, VPageDescr vpnew);
79 static void _vc_free(Portal p, VRelList vrl);
80 static void _vc_getindices (Oid relid, int *nindices, Relation **Irel);
81 static void _vc_clsindices (int nindices, Relation *Irel);
82 static Relation _vc_getarchrel(Relation heaprel);
83 static void _vc_archive(Relation archrel, HeapTuple htup);
84 static bool _vc_isarchrel(char *rname);
85 static void _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
86 static char * _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *));
87 static int _vc_cmp_blk (char *left, char *right);
88 static int _vc_cmp_offno (char *left, char *right);
89 static bool _vc_enough_space (VPageDescr vpd, Size len);
90
91
92 void
93 vacuum(char *vacrel)
94 {
95     NameData VacRel;
96
97     /* vacrel gets de-allocated on transaction commit */
98         
99     /* initialize vacuum cleaner */
100     _vc_init();
101
102     /* vacuum the database */
103     if (vacrel)
104     {
105         strcpy(VacRel.data,vacrel);
106         _vc_vacuum(&VacRel);
107     }
108     else
109         _vc_vacuum(NULL);
110
111     /* clean up */
112     _vc_shutdown();
113 }
114
115 /*
116  *  _vc_init(), _vc_shutdown() -- start up and shut down the vacuum cleaner.
117  *
118  *      We run exactly one vacuum cleaner at a time.  We use the file system
119  *      to guarantee an exclusive lock on vacuuming, since a single vacuum
120  *      cleaner instantiation crosses transaction boundaries, and we'd lose
121  *      postgres-style locks at the end of every transaction.
122  *
123  *      The strangeness with committing and starting transactions in the
124  *      init and shutdown routines is due to the fact that the vacuum cleaner
125  *      is invoked via a sql command, and so is already executing inside
126  *      a transaction.  We need to leave ourselves in a predictable state
127  *      on entry and exit to the vacuum cleaner.  We commit the transaction
128  *      started in PostgresMain() inside _vc_init(), and start one in
129  *      _vc_shutdown() to match the commit waiting for us back in
130  *      PostgresMain().
131  */
132 static void
133 _vc_init()
134 {
135     int fd;
136
137     if ((fd = open("pg_vlock", O_CREAT|O_EXCL, 0600)) < 0)
138         elog(WARN, "can't create lock file -- another vacuum cleaner running?");
139
140     close(fd);
141
142     /*
143      *  By here, exclusive open on the lock file succeeded.  If we abort
144      *  for any reason during vacuuming, we need to remove the lock file.
145      *  This global variable is checked in the transaction manager on xact
146      *  abort, and the routine vc_abort() is called if necessary.
147      */
148
149     VacuumRunning = true;
150
151     /* matches the StartTransaction in PostgresMain() */
152     CommitTransactionCommand();
153 }
154
155 static void
156 _vc_shutdown()
157 {
158     /* on entry, not in a transaction */
159     if (unlink("pg_vlock") < 0)
160         elog(WARN, "vacuum: can't destroy lock file!");
161
162     /* okay, we're done */
163     VacuumRunning = false;
164
165     /* matches the CommitTransaction in PostgresMain() */
166     StartTransactionCommand();
167 }
168
169 void
170 vc_abort()
171 {
172     /* on abort, remove the vacuum cleaner lock file */
173     (void) unlink("pg_vlock");
174
175     VacuumRunning = false;
176 }
177
178 /*
179  *  _vc_vacuum() -- vacuum the database.
180  *
181  *      This routine builds a list of relations to vacuum, and then calls
182  *      code that vacuums them one at a time.  We are careful to vacuum each
183  *      relation in a separate transaction in order to avoid holding too many
184  *      locks at one time.
185  */
186 static void
187 _vc_vacuum(NameData *VacRelP)
188 {
189     VRelList vrl, cur;
190     char *pname;
191     Portal p;
192
193     /*
194      *  Create a portal for safe memory across transctions.  We need to
195      *  palloc the name space for it because our hash function expects
196      *  the name to be on a longword boundary.  CreatePortal copies the
197      *  name to safe storage for us.
198      */
199
200     pname = (char *) palloc(strlen(VACPNAME) + 1);
201     strcpy(pname, VACPNAME);
202     p = CreatePortal(pname);
203     pfree(pname);
204
205     /* get list of relations */
206     vrl = _vc_getrels(p, VacRelP);
207
208     /* vacuum each heap relation */
209     for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
210         _vc_vacone (cur);
211
212     _vc_free(p, vrl);
213
214     PortalDestroy(&p);
215 }
216
217 static VRelList
218 _vc_getrels(Portal p, NameData *VacRelP)
219 {
220     Relation pgclass;
221     TupleDesc pgcdesc;
222     HeapScanDesc pgcscan;
223     HeapTuple pgctup;
224     Buffer buf;
225     PortalVariableMemory portalmem;
226     MemoryContext old;
227     VRelList vrl, cur;
228     Datum d;
229     char *rname;
230     char rkind;
231     int16 smgrno;
232     bool n;
233     ScanKeyData  pgckey;
234     bool found = false;
235
236     StartTransactionCommand();
237
238     if (VacRelP->data) {
239         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
240                                NameEqualRegProcedure, 
241                                PointerGetDatum(VacRelP->data));
242     } else {
243         ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
244                                CharacterEqualRegProcedure, CharGetDatum('r'));
245     }
246
247     portalmem = PortalGetVariableMemory(p);
248     vrl = cur = (VRelList) NULL;
249
250     pgclass = heap_openr(RelationRelationName);
251     pgcdesc = RelationGetTupleDescriptor(pgclass);
252
253     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
254
255     while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf))) {
256
257         found = true;
258         
259         /*
260          *  We have to be careful not to vacuum the archive (since it
261          *  already contains vacuumed tuples), and not to vacuum
262          *  relations on write-once storage managers like the Sony
263          *  jukebox at Berkeley.
264          */
265
266         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
267                                  pgcdesc, &n);
268         rname = (char*)d;
269
270         /* skip archive relations */
271         if (_vc_isarchrel(rname)) {
272             ReleaseBuffer(buf);
273             continue;
274         }
275
276         /* don't vacuum large objects for now - something breaks when we do */
277         if ( (strlen(rname) > 4) && rname[0] == 'X' &&
278                 rname[1] == 'i' && rname[2] == 'n' &&
279                 (rname[3] == 'v' || rname[3] == 'x'))
280         {
281             elog (NOTICE, "Rel %.*s: can't vacuum LargeObjects now", 
282                         NAMEDATALEN, rname);
283             ReleaseBuffer(buf);
284             continue;
285         }
286
287         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
288                                  pgcdesc, &n);
289         smgrno = DatumGetInt16(d);
290
291         /* skip write-once storage managers */
292         if (smgriswo(smgrno)) {
293             ReleaseBuffer(buf);
294             continue;
295         }
296
297         d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
298                                  pgcdesc, &n);
299
300         rkind = DatumGetChar(d);
301
302         /* skip system relations */
303         if (rkind != 'r') {
304             ReleaseBuffer(buf);
305             elog(NOTICE, "Vacuum: can not process index and certain system tables" );
306             continue;
307         }
308                                  
309         /* get a relation list entry for this guy */
310         old = MemoryContextSwitchTo((MemoryContext)portalmem);
311         if (vrl == (VRelList) NULL) {
312             vrl = cur = (VRelList) palloc(sizeof(VRelListData));
313         } else {
314             cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
315             cur = cur->vrl_next;
316         }
317         (void) MemoryContextSwitchTo(old);
318
319         cur->vrl_relid = pgctup->t_oid;
320         cur->vrl_attlist = (VAttList) NULL;
321         cur->vrl_npages = cur->vrl_ntups = 0;
322         cur->vrl_hasindex = false;
323         cur->vrl_next = (VRelList) NULL;
324
325         /* wei hates it if you forget to do this */
326         ReleaseBuffer(buf);
327     }
328     if (found == false)
329         elog(NOTICE, "Vacuum: table not found" );
330
331     
332     heap_close(pgclass);
333     heap_endscan(pgcscan);
334
335     CommitTransactionCommand();
336
337     return (vrl);
338 }
339
340 /*
341  *  _vc_vacone() -- vacuum one heap relation
342  *
343  *      This routine vacuums a single heap, cleans out its indices, and
344  *      updates its statistics npages and ntuples statistics.
345  *
346  *      Doing one heap at a time incurs extra overhead, since we need to
347  *      check that the heap exists again just before we vacuum it.  The
348  *      reason that we do this is so that vacuuming can be spread across
349  *      many small transactions.  Otherwise, two-phase locking would require
350  *      us to lock the entire database during one pass of the vacuum cleaner.
351  */
352 static void
353 _vc_vacone (VRelList curvrl)
354 {
355     Relation pgclass;
356     TupleDesc pgcdesc;
357     HeapTuple pgctup;
358     Buffer pgcbuf;
359     HeapScanDesc pgcscan;
360     Relation onerel;
361     ScanKeyData pgckey;
362     VPageListData Vvpl; /* List of pages to vacuum and/or clean indices */
363     VPageListData Fvpl; /* List of pages with space enough for re-using */
364     VPageDescr *vpp;
365     Relation *Irel;
366     int nindices;
367     int i;
368
369     StartTransactionCommand();
370
371     ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
372                            ObjectIdEqualRegProcedure,
373                            ObjectIdGetDatum(curvrl->vrl_relid));
374
375     pgclass = heap_openr(RelationRelationName);
376     pgcdesc = RelationGetTupleDescriptor(pgclass);
377     pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
378
379     /*
380      *  Race condition -- if the pg_class tuple has gone away since the
381      *  last time we saw it, we don't need to vacuum it.
382      */
383
384     if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf))) {
385         heap_endscan(pgcscan);
386         heap_close(pgclass);
387         CommitTransactionCommand();
388         return;
389     }
390
391     /* now open the class and vacuum it */
392     onerel = heap_open(curvrl->vrl_relid);
393
394     /* we require the relation to be locked until the indices are cleaned */
395     RelationSetLockForWrite(onerel);
396
397     /* scan it */
398     Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
399     _vc_scanheap(curvrl, onerel, &Vvpl, &Fvpl);
400
401     /* Now open/count indices */
402     Irel = (Relation *) NULL;
403     if ( Vvpl.vpl_npages > 0 )
404                     /* Open all indices of this relation */
405         _vc_getindices(curvrl->vrl_relid, &nindices, &Irel);
406     else
407                     /* Count indices only */
408         _vc_getindices(curvrl->vrl_relid, &nindices, NULL);
409     
410     if ( nindices > 0 )
411         curvrl->vrl_hasindex = true;
412     else
413         curvrl->vrl_hasindex = false;
414
415     /* Clean index' relation(s) */
416     if ( Irel != (Relation*) NULL )
417     {
418         for (i = 0; i < nindices; i++)
419             _vc_vaconeind (&Vvpl, Irel[i], curvrl->vrl_ntups);
420     }
421
422     if ( Fvpl.vpl_npages > 0 )          /* Try to shrink heap */
423         _vc_rpfheap (curvrl, onerel, &Vvpl, &Fvpl, nindices, Irel);
424     else if ( Vvpl.vpl_npages > 0 )     /* Clean pages from Vvpl list */
425     {
426         if ( Irel != (Relation*) NULL )
427             _vc_clsindices (nindices, Irel);
428         _vc_vacheap (curvrl, onerel, &Vvpl);
429     }
430
431     /* ok - free Vvpl list of reapped pages */
432     if ( Vvpl.vpl_npages > 0 )
433     {
434         vpp = Vvpl.vpl_pgdesc;
435         for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
436             pfree(*vpp);
437         pfree (Vvpl.vpl_pgdesc);
438         if ( Fvpl.vpl_npages > 0 )
439             pfree (Fvpl.vpl_pgdesc);
440     }
441
442     /* all done with this class */
443     heap_close(onerel);
444     heap_endscan(pgcscan);
445     heap_close(pgclass);
446
447     /* update statistics in pg_class */
448     _vc_updstats(curvrl->vrl_relid, curvrl->vrl_npages, curvrl->vrl_ntups,
449                  curvrl->vrl_hasindex);
450
451     CommitTransactionCommand();
452 }
453
454 /*
455  *  _vc_scanheap() -- scan an open heap relation
456  *
457  *      This routine sets commit times, constructs Vvpl list of 
458  *      empty/uninitialized pages and pages with dead tuples and
459  *      ~LP_USED line pointers, constructs Fvpl list of pages
460  *      appropriate for purposes of shrinking and maintains statistics 
461  *      on the number of live tuples in a heap.
462  */
463 static void
464 _vc_scanheap (VRelList curvrl, Relation onerel, 
465                         VPageList Vvpl, VPageList Fvpl)
466 {
467     int nblocks, blkno;
468     ItemId itemid;
469     ItemPointer itemptr;
470     HeapTuple htup;
471     Buffer buf;
472     Page page, tempPage = NULL;
473     OffsetNumber offnum, maxoff;
474     bool pgchanged, tupgone, dobufrel, notup;
475     AbsoluteTime purgetime, expiretime;
476     RelativeTime preservetime;
477     char *relname;
478     VPageDescr vpc, vp;
479     uint32 nvac, ntups, nunused, ncrash, nempg, nnepg, nchpg, nemend;
480     Size frsize, frsusf;
481     Size min_tlen = MAXTUPLEN;
482     Size max_tlen = 0;
483     int i;
484     struct rusage ru0, ru1;
485
486     getrusage(RUSAGE_SELF, &ru0);
487
488     nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
489     frsize = frsusf = 0;
490     
491     relname = (RelationGetRelationName(onerel))->data;
492
493     nblocks = RelationGetNumberOfBlocks(onerel);
494
495     /* calculate the purge time: tuples that expired before this time
496        will be archived or deleted */
497     purgetime = GetCurrentTransactionStartTime();
498     expiretime = (AbsoluteTime)onerel->rd_rel->relexpires;
499     preservetime = (RelativeTime)onerel->rd_rel->relpreserved;
500
501     if (RelativeTimeIsValid(preservetime) && (preservetime)) {
502         purgetime -= preservetime;
503         if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime) &&
504             expiretime > purgetime)
505             purgetime = expiretime;
506     }
507
508     else if (AbsoluteTimeIsBackwardCompatiblyValid(expiretime))
509         purgetime = expiretime;
510
511     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
512     vpc->vpd_nusd = 0;
513             
514     for (blkno = 0; blkno < nblocks; blkno++) {
515         buf = ReadBuffer(onerel, blkno);
516         page = BufferGetPage(buf);
517         vpc->vpd_blkno = blkno;
518         vpc->vpd_noff = 0;
519
520         if (PageIsNew(page)) {
521             elog (NOTICE, "Rel %.*s: Uninitialized page %u - fixing",
522                 NAMEDATALEN, relname, blkno);
523             PageInit (page, BufferGetPageSize (buf), 0);
524             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
525             frsize += (vpc->vpd_free - sizeof (ItemIdData));
526             nnepg++;
527             nemend++;
528             _vc_reappage (Vvpl, vpc);
529             WriteBuffer(buf);
530             continue;
531         }
532
533         if (PageIsEmpty(page)) {
534             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
535             frsize += (vpc->vpd_free - sizeof (ItemIdData));
536             nempg++;
537             nemend++;
538             _vc_reappage (Vvpl, vpc);
539             ReleaseBuffer(buf);
540             continue;
541         }
542
543         pgchanged = false;
544         notup = true;
545         maxoff = PageGetMaxOffsetNumber(page);
546         for (offnum = FirstOffsetNumber;
547              offnum <= maxoff;
548              offnum = OffsetNumberNext(offnum)) {
549             itemid = PageGetItemId(page, offnum);
550
551             /*
552              * Collect un-used items too - it's possible to have
553              * indices pointing here after crash.
554              */
555             if (!ItemIdIsUsed(itemid)) {
556                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
557                 nunused++;
558                 continue;
559             }
560
561             htup = (HeapTuple) PageGetItem(page, itemid);
562             tupgone = false;
563
564             if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) && 
565                 TransactionIdIsValid((TransactionId)htup->t_xmin)) {
566
567                 if (TransactionIdDidAbort(htup->t_xmin)) {
568                     tupgone = true;
569                 } else if (TransactionIdDidCommit(htup->t_xmin)) {
570                     htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
571                     pgchanged = true;
572                 } else if ( !TransactionIdIsInProgress (htup->t_xmin) ) {
573                     /* 
574                      * Not Aborted, Not Committed, Not in Progress -
575                      * so it from crashed process. - vadim 11/26/96
576                      */
577                     ncrash++;
578                     tupgone = true;
579                 }
580                 else {
581                     elog (MESSLEV, "Rel %.*s: InsertTransactionInProgress %u for TID %u/%u",
582                         NAMEDATALEN, relname, htup->t_xmin, blkno, offnum);
583                 }
584             }
585
586             if (TransactionIdIsValid((TransactionId)htup->t_xmax)) {
587                 if (TransactionIdDidAbort(htup->t_xmax)) {
588                     StoreInvalidTransactionId(&(htup->t_xmax));
589                     pgchanged = true;
590                 } else if (TransactionIdDidCommit(htup->t_xmax)) {
591                     if (!AbsoluteTimeIsBackwardCompatiblyReal(htup->t_tmax)) {
592
593                         htup->t_tmax = TransactionIdGetCommitTime(htup->t_xmax);  
594                         pgchanged = true;
595                     }
596
597                     /*
598                      *  Reap the dead tuple if its expiration time is
599                      *  before purgetime.
600                      */
601
602                     if (htup->t_tmax < purgetime) {
603                         tupgone = true;
604                     }
605                 }
606             }
607
608             /*
609              * Is it possible at all ? - vadim 11/26/96
610              */
611             if ( !TransactionIdIsValid((TransactionId)htup->t_xmin) )
612             {
613                 elog (NOTICE, "TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
614 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.", 
615                         TransactionIdIsValid((TransactionId)htup->t_xmax),
616                         tupgone);
617             }
618             
619             /*
620              * It's possibly! But from where it comes ?
621              * And should we fix it ?  - vadim 11/28/96
622              */
623             itemptr = &(htup->t_ctid);
624             if ( !ItemPointerIsValid (itemptr) || 
625                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno )
626             {
627                 elog (NOTICE, "ITEM POINTER IS INVALID: %u/%u FOR %u/%u. TUPGONE %d.", 
628                         BlockIdGetBlockNumber(&(itemptr->ip_blkid)), 
629                         itemptr->ip_posid, blkno, offnum, tupgone);
630             }
631
632             /*
633              * Other checks...
634              */
635             if ( htup->t_len != itemid->lp_len )
636             {
637                 elog (NOTICE, "PAGEHEADER' LEN %u IS NOT THE SAME AS HTUP' %u FOR %u/%u.TUPGONE %d.", 
638                         itemid->lp_len, htup->t_len, blkno, offnum, tupgone);
639             }
640             if ( !OidIsValid(htup->t_oid) )
641             {
642                 elog (NOTICE, "OID IS INVALID FOR %u/%u.TUPGONE %d.", 
643                         blkno, offnum, tupgone);
644             }
645             
646             if (tupgone) {
647                 ItemId lpp;
648                                                     
649                 if ( tempPage == (Page) NULL )
650                 {
651                     Size pageSize;
652                     
653                     pageSize = PageGetPageSize(page);
654                     tempPage = (Page) palloc(pageSize);
655                     memmove (tempPage, page, pageSize);
656                 }
657                 
658                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
659
660                 /* mark it unused */
661                 lpp->lp_flags &= ~LP_USED;
662
663                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
664                 nvac++;
665
666             } else {
667                 ntups++;
668                 notup = false;
669                 if ( htup->t_len < min_tlen )
670                     min_tlen = htup->t_len;
671                 if ( htup->t_len > max_tlen )
672                     max_tlen = htup->t_len;
673             }
674         }
675
676         if (pgchanged) {
677             WriteBuffer(buf);
678             dobufrel = false;
679             nchpg++;
680         }
681         else
682             dobufrel = true;
683         if ( tempPage != (Page) NULL )
684         { /* Some tuples are gone */
685             PageRepairFragmentation(tempPage);
686             vpc->vpd_free = ((PageHeader)tempPage)->pd_upper - ((PageHeader)tempPage)->pd_lower;
687             frsize += vpc->vpd_free;
688             _vc_reappage (Vvpl, vpc);
689             pfree (tempPage);
690             tempPage = (Page) NULL;
691         }
692         else if ( vpc->vpd_noff > 0 )
693         { /* there are only ~LP_USED line pointers */
694             vpc->vpd_free = ((PageHeader)page)->pd_upper - ((PageHeader)page)->pd_lower;
695             frsize += vpc->vpd_free;
696             _vc_reappage (Vvpl, vpc);
697         }
698         if ( dobufrel )
699             ReleaseBuffer(buf);
700         if ( notup )
701             nemend++;
702         else
703             nemend = 0;
704     }
705
706     pfree (vpc);
707
708     /* save stats in the rel list for use later */
709     curvrl->vrl_ntups = ntups;
710     curvrl->vrl_npages = nblocks;
711     if ( ntups == 0 )
712         min_tlen = max_tlen = 0;
713     curvrl->vrl_min_tlen = min_tlen;
714     curvrl->vrl_max_tlen = max_tlen;
715     
716     Vvpl->vpl_nemend = nemend;
717     Fvpl->vpl_nemend = nemend;
718
719     /* 
720      * Try to make Fvpl keeping in mind that we can't use free space 
721      * of "empty" end-pages and last page if it reapped.
722      */
723     if ( Vvpl->vpl_npages - nemend > 0 )
724     {
725         int nusf;               /* blocks usefull for re-using */
726         
727         nusf = Vvpl->vpl_npages - nemend;
728         if ( (Vvpl->vpl_pgdesc[nusf-1])->vpd_blkno == nblocks - nemend - 1 )
729             nusf--;
730     
731         for (i = 0; i < nusf; i++)
732         {
733             vp = Vvpl->vpl_pgdesc[i];
734             if ( _vc_enough_space (vp, min_tlen) )
735             {
736                 _vc_vpinsert (Fvpl, vp);
737                 frsusf += vp->vpd_free;
738             }
739         }
740     }
741
742     getrusage(RUSAGE_SELF, &ru1);
743     
744     elog (MESSLEV, "Rel %.*s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
745 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
746         NAMEDATALEN, relname, 
747         nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
748         ntups, nvac, ncrash, nunused, min_tlen, max_tlen, 
749         frsize, frsusf, nemend, Fvpl->vpl_npages,
750         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
751         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
752
753 } /* _vc_scanheap */
754
755
756 /*
757  *  _vc_rpfheap() -- try to repaire relation' fragmentation
758  *
759  *      This routine marks dead tuples as unused and tries re-use dead space
760  *      by moving tuples (and inserting indices if needed). It constructs 
761  *      Nvpl list of free-ed pages (moved tuples) and clean indices
762  *      for them after committing (in hack-manner - without losing locks
763  *      and freeing memory!) current transaction. It truncates relation
764  *      if some end-blocks are gone away.
765  */
766 static void
767 _vc_rpfheap (VRelList curvrl, Relation onerel, 
768                 VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
769 {
770     TransactionId myXID;
771     CommandId myCID;
772     AbsoluteTime myCTM = 0;
773     Buffer buf, ToBuf;
774     int nblocks, blkno;
775     Page page, ToPage = NULL;
776     OffsetNumber offnum = 0, maxoff = 0, newoff, moff;
777     ItemId itemid, newitemid;
778     HeapTuple htup, newtup;
779     TupleDesc tupdesc = NULL;
780     Datum *idatum = NULL;
781     char *inulls = NULL;
782     InsertIndexResult iresult;
783     VPageListData Nvpl;
784     VPageDescr ToVpd = NULL, Fvplast, Vvplast, vpc, *vpp;
785     int ToVpI = 0;
786     IndDesc *Idesc, *idcur;
787     int Fblklast, Vblklast, i;
788     Size tlen;
789     int nmoved, Fnpages, Vnpages;
790     int nchkmvd, ntups;
791     bool isempty, dowrite;
792     Relation archrel;
793     struct rusage ru0, ru1;
794
795     getrusage(RUSAGE_SELF, &ru0);
796
797     myXID = GetCurrentTransactionId();
798     myCID = GetCurrentCommandId();
799         
800     if ( Irel != (Relation*) NULL )     /* preparation for index' inserts */
801     {
802         _vc_mkindesc (onerel, nindices, Irel, &Idesc);
803         tupdesc = RelationGetTupleDescriptor(onerel);
804         idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof (*idatum));
805         inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof (*inulls));
806     }
807
808     /* if the relation has an archive, open it */
809     if (onerel->rd_rel->relarch != 'n')
810     {
811         archrel = _vc_getarchrel(onerel);
812         /* Archive tuples from "empty" end-pages */
813         for ( vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1, 
814                                 i = Vvpl->vpl_nemend; i > 0; i--, vpp-- )
815         {
816             if ( (*vpp)->vpd_noff > 0 )
817             {
818                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
819                 page = BufferGetPage(buf);
820                 Assert ( !PageIsEmpty(page) );
821                 _vc_vacpage (page, *vpp, archrel);
822                 WriteBuffer (buf);
823             }
824         }
825     }
826     else
827         archrel = (Relation) NULL;
828
829     Nvpl.vpl_npages = 0;
830     Fnpages = Fvpl->vpl_npages;
831     Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
832     Fblklast = Fvplast->vpd_blkno;
833     Assert ( Vvpl->vpl_npages > Vvpl->vpl_nemend );
834     Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
835     Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
836     Vblklast = Vvplast->vpd_blkno;
837     Assert ( Vblklast >= Fblklast );
838     ToBuf = InvalidBuffer;
839     nmoved = 0;
840
841     vpc = (VPageDescr) palloc (sizeof(VPageDescrData) + MaxOffsetNumber*sizeof(OffsetNumber));
842     vpc->vpd_nusd = vpc->vpd_noff = 0;
843         
844     nblocks = curvrl->vrl_npages;
845     for (blkno = nblocks - Vvpl->vpl_nemend - 1; ; blkno--)
846     {
847         /* if it's reapped page and it was used by me - quit */
848         if ( blkno == Fblklast && Fvplast->vpd_nusd > 0 )
849             break;
850
851         buf = ReadBuffer(onerel, blkno);
852         page = BufferGetPage(buf);
853
854         vpc->vpd_noff = 0;
855
856         isempty = PageIsEmpty(page);
857
858         dowrite = false;
859         if ( blkno == Vblklast )                /* it's reapped page */
860         {
861             if ( Vvplast->vpd_noff > 0 )        /* there are dead tuples */
862             {                                   /* on this page - clean */
863                 Assert ( ! isempty );
864                 _vc_vacpage (page, Vvplast, archrel);
865                 dowrite = true;
866             }
867             else
868                 Assert ( isempty );
869             Assert ( --Vnpages > 0 );
870             /* get prev reapped page from Vvpl */
871             Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
872             Vblklast = Vvplast->vpd_blkno;
873             if ( blkno == Fblklast )    /* this page in Fvpl too */
874             {
875                 Assert ( --Fnpages > 0 );
876                 Assert ( Fvplast->vpd_nusd == 0 );
877                 /* get prev reapped page from Fvpl */
878                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
879                 Fblklast = Fvplast->vpd_blkno;
880             }
881             Assert ( Fblklast <= Vblklast );
882             if ( isempty )
883             {
884                 ReleaseBuffer(buf);
885                 continue;
886             }
887         }
888         else
889         {
890             Assert ( ! isempty );
891         }
892
893         vpc->vpd_blkno = blkno;
894         maxoff = PageGetMaxOffsetNumber(page);
895         for (offnum = FirstOffsetNumber;
896                 offnum <= maxoff;
897                 offnum = OffsetNumberNext(offnum))
898         {
899             itemid = PageGetItemId(page, offnum);
900
901             if (!ItemIdIsUsed(itemid))
902                 continue;
903
904             htup = (HeapTuple) PageGetItem(page, itemid);
905             tlen = htup->t_len;
906                 
907             /* try to find new page for this tuple */
908             if ( ToBuf == InvalidBuffer ||
909                 ! _vc_enough_space (ToVpd, tlen) )
910             {
911                 if ( ToBuf != InvalidBuffer )
912                 {
913                     WriteBuffer(ToBuf);
914                     ToBuf = InvalidBuffer;
915                     /*
916                      * If no one tuple can't be added to this page -
917                      * remove page from Fvpl. - vadim 11/27/96
918                      */ 
919                     if ( !_vc_enough_space (ToVpd, curvrl->vrl_min_tlen) )
920                     {
921                         if ( ToVpd != Fvplast )
922                         {
923                             Assert ( Fnpages > ToVpI + 1 );
924                             memmove (Fvpl->vpl_pgdesc + ToVpI, 
925                                 Fvpl->vpl_pgdesc + ToVpI + 1, 
926                                 sizeof (VPageDescr*) * (Fnpages - ToVpI - 1));
927                         }
928                         Assert ( Fnpages >= 1 );
929                         Fnpages--;
930                         if ( Fnpages == 0 )
931                             break;
932                         /* get prev reapped page from Fvpl */
933                         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
934                         Fblklast = Fvplast->vpd_blkno;
935                     }
936                 }
937                 for (i=0; i < Fnpages; i++)
938                 {
939                     if ( _vc_enough_space (Fvpl->vpl_pgdesc[i], tlen) )
940                         break;
941                 }
942                 if ( i == Fnpages )
943                     break;                      /* can't move item anywhere */
944                 ToVpI = i;
945                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
946                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
947                 ToPage = BufferGetPage(ToBuf);
948                 /* if this page was not used before - clean it */
949                 if ( ! PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0 )
950                     _vc_vacpage (ToPage, ToVpd, archrel);
951             }
952                 
953             /* copy tuple */
954             newtup = (HeapTuple) palloc (tlen);
955             memmove((char *) newtup, (char *) htup, tlen);
956
957             /* store transaction information */
958             TransactionIdStore(myXID, &(newtup->t_xmin));
959             newtup->t_cmin = myCID;
960             StoreInvalidTransactionId(&(newtup->t_xmax));
961             newtup->t_tmin = INVALID_ABSTIME;
962             newtup->t_tmax = CURRENT_ABSTIME;
963             ItemPointerSetInvalid(&newtup->t_chain);
964
965             /* add tuple to the page */
966             newoff = PageAddItem (ToPage, (Item)newtup, tlen, 
967                                 InvalidOffsetNumber, LP_USED);
968             if ( newoff == InvalidOffsetNumber )
969             {
970                 elog (WARN, "\
971 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
972                 tlen, ToVpd->vpd_blkno, ToVpd->vpd_free, 
973                 ToVpd->vpd_nusd, ToVpd->vpd_noff);
974             }
975             newitemid = PageGetItemId(ToPage, newoff);
976             pfree (newtup);
977             newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
978             ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
979
980             /* now logically delete end-tuple */
981             TransactionIdStore(myXID, &(htup->t_xmax));
982             htup->t_cmax = myCID;
983             memmove ((char*)&(htup->t_chain), (char*)&(newtup->t_ctid), sizeof (newtup->t_ctid));
984
985             ToVpd->vpd_nusd++;
986             nmoved++;
987             ToVpd->vpd_free = ((PageHeader)ToPage)->pd_upper - ((PageHeader)ToPage)->pd_lower;
988             vpc->vpd_voff[vpc->vpd_noff++] = offnum;
989                 
990             /* insert index' tuples if needed */
991             if ( Irel != (Relation*) NULL )
992             {
993                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
994                 {
995                     FormIndexDatum (
996                                 idcur->natts,
997                                 (AttrNumber *)&(idcur->tform->indkey[0]),
998                                 newtup, 
999                                 tupdesc,
1000                                 InvalidBuffer,
1001                                 idatum,
1002                                 inulls,
1003                                 idcur->finfoP);
1004                     iresult = index_insert (
1005                                 Irel[i],
1006                                 idatum,
1007                                 inulls,
1008                                 &(newtup->t_ctid),
1009                                 true);
1010                     if (iresult) pfree(iresult);
1011                 }
1012             }
1013                 
1014         } /* walk along page */
1015
1016         if ( vpc->vpd_noff > 0 )                /* some tuples were moved */
1017         {
1018             _vc_reappage (&Nvpl, vpc);
1019             WriteBuffer(buf);
1020         }
1021         else if ( dowrite )
1022             WriteBuffer(buf);
1023         else
1024             ReleaseBuffer(buf);
1025             
1026         if ( offnum <= maxoff )
1027             break;                              /* some item(s) left */
1028             
1029     } /* walk along relation */
1030         
1031     blkno++;                            /* new number of blocks */
1032
1033     if ( ToBuf != InvalidBuffer )
1034     {
1035         Assert (nmoved > 0);
1036         WriteBuffer(ToBuf);
1037     }
1038
1039     if ( nmoved > 0 )
1040     {
1041         /* 
1042          * We have to commit our tuple' movings before we'll truncate 
1043          * relation, but we shouldn't lose our locks. And so - quick hack: 
1044          * flush buffers and record status of current transaction
1045          * as committed, and continue. - vadim 11/13/96
1046          */
1047         FlushBufferPool(!TransactionFlushEnabled());
1048         TransactionIdCommit(myXID);
1049         FlushBufferPool(!TransactionFlushEnabled());
1050         myCTM = TransactionIdGetCommitTime(myXID);
1051     }
1052         
1053     /* 
1054      * Clean uncleaned reapped pages from Vvpl list 
1055      * and set commit' times  for inserted tuples
1056      */
1057     nchkmvd = 0;
1058     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1059     {
1060         Assert ( (*vpp)->vpd_blkno < blkno );
1061         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1062         page = BufferGetPage(buf);
1063         if ( (*vpp)->vpd_nusd == 0 )    /* this page was not used */
1064         {
1065             /* noff == 0 in empty pages only - such pages should be re-used */
1066             Assert ( (*vpp)->vpd_noff > 0 );
1067             _vc_vacpage (page, *vpp, archrel);
1068         }
1069         else                            /* this page was used */
1070         {
1071             ntups = 0;
1072             moff = PageGetMaxOffsetNumber(page);
1073             for (newoff = FirstOffsetNumber;
1074                         newoff <= moff;
1075                         newoff = OffsetNumberNext(newoff))
1076             {
1077                 itemid = PageGetItemId(page, newoff);
1078                 if (!ItemIdIsUsed(itemid))
1079                     continue;
1080                 htup = (HeapTuple) PageGetItem(page, itemid);
1081                 if ( TransactionIdEquals((TransactionId)htup->t_xmin, myXID) )
1082                 {
1083                     htup->t_tmin = myCTM;
1084                     ntups++;
1085                 }
1086             }
1087             Assert ( (*vpp)->vpd_nusd == ntups );
1088             nchkmvd += ntups;
1089         }
1090         WriteBuffer (buf);
1091     }
1092     Assert ( nmoved == nchkmvd );
1093
1094     getrusage(RUSAGE_SELF, &ru1);
1095     
1096     elog (MESSLEV, "Rel %.*s: Pages: %u --> %u; Tuple(s) moved: %u. \
1097 Elapsed %u/%u sec.",
1098                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1099                 nblocks, blkno, nmoved,
1100                 ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1101                 ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1102
1103     if ( Nvpl.vpl_npages > 0 )
1104     {
1105         /* vacuum indices again if needed */
1106         if ( Irel != (Relation*) NULL )
1107         {
1108             VPageDescr *vpleft, *vpright, vpsave;
1109                 
1110             /* re-sort Nvpl.vpl_pgdesc */
1111             for (vpleft = Nvpl.vpl_pgdesc, 
1112                 vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1113                 vpleft < vpright; vpleft++, vpright--)
1114             {
1115                 vpsave = *vpleft; *vpleft = *vpright; *vpright = vpsave;
1116             }
1117             for (i = 0; i < nindices; i++)
1118                 _vc_vaconeind (&Nvpl, Irel[i], curvrl->vrl_ntups);
1119         }
1120
1121         /* 
1122          * clean moved tuples from last page in Nvpl list
1123          * if some tuples left there
1124          */
1125         if ( vpc->vpd_noff > 0 && offnum <= maxoff )
1126         {
1127             Assert (vpc->vpd_blkno == blkno - 1);
1128             buf = ReadBuffer(onerel, vpc->vpd_blkno);
1129             page = BufferGetPage (buf);
1130             ntups = 0;
1131             maxoff = offnum;
1132             for (offnum = FirstOffsetNumber;
1133                         offnum < maxoff;
1134                         offnum = OffsetNumberNext(offnum))
1135             {
1136                 itemid = PageGetItemId(page, offnum);
1137                 if (!ItemIdIsUsed(itemid))
1138                     continue;
1139                 htup = (HeapTuple) PageGetItem(page, itemid);
1140                 Assert ( TransactionIdEquals((TransactionId)htup->t_xmax, myXID) );
1141                 itemid->lp_flags &= ~LP_USED;
1142                 ntups++;
1143             }
1144             Assert ( vpc->vpd_noff == ntups );
1145             PageRepairFragmentation(page);
1146             WriteBuffer (buf);
1147         }
1148
1149         /* now - free new list of reapped pages */
1150         vpp = Nvpl.vpl_pgdesc;
1151         for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1152             pfree(*vpp);
1153         pfree (Nvpl.vpl_pgdesc);
1154     }
1155         
1156     /* truncate relation */
1157     if ( blkno < nblocks )
1158     {
1159         blkno = smgrtruncate (onerel->rd_rel->relsmgr, onerel, blkno);
1160         Assert ( blkno >= 0 );
1161         curvrl->vrl_npages = blkno;     /* set new number of blocks */
1162     }
1163
1164     if ( archrel != (Relation) NULL )
1165         heap_close(archrel);
1166
1167     if ( Irel != (Relation*) NULL )     /* pfree index' allocations */
1168     {
1169         pfree (Idesc);
1170         pfree (idatum);
1171         pfree (inulls);
1172         _vc_clsindices (nindices, Irel);
1173     }
1174
1175     pfree (vpc);
1176
1177 } /* _vc_rpfheap */
1178
1179 /*
1180  *  _vc_vacheap() -- free dead tuples
1181  *
1182  *      This routine marks dead tuples as unused and truncates relation
1183  *      if there are "empty" end-blocks.
1184  */
1185 static void
1186 _vc_vacheap (VRelList curvrl, Relation onerel, VPageList Vvpl)
1187 {
1188     Buffer buf;
1189     Page page;
1190     VPageDescr *vpp;
1191     Relation archrel;
1192     int nblocks;
1193     int i;
1194
1195     nblocks = Vvpl->vpl_npages;
1196     /* if the relation has an archive, open it */
1197     if (onerel->rd_rel->relarch != 'n')
1198         archrel = _vc_getarchrel(onerel);
1199     else
1200     {
1201         archrel = (Relation) NULL;
1202         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1203     }
1204         
1205     for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1206     {
1207         if ( (*vpp)->vpd_noff > 0 )
1208         {
1209             buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1210             page = BufferGetPage (buf);
1211             _vc_vacpage (page, *vpp, archrel);
1212             WriteBuffer (buf);
1213         }
1214     }
1215
1216     /* truncate relation if there are some empty end-pages */
1217     if ( Vvpl->vpl_nemend > 0 )
1218     {
1219         Assert ( curvrl->vrl_npages >= Vvpl->vpl_nemend );
1220         nblocks = curvrl->vrl_npages - Vvpl->vpl_nemend;
1221         elog (MESSLEV, "Rel %.*s: Pages: %u --> %u.",
1222                 NAMEDATALEN, (RelationGetRelationName(onerel))->data, 
1223                 curvrl->vrl_npages, nblocks);
1224
1225         /* 
1226          * we have to flush "empty" end-pages (if changed, but who knows it)
1227          * before truncation 
1228          */
1229         FlushBufferPool(!TransactionFlushEnabled());
1230
1231         nblocks = smgrtruncate (onerel->rd_rel->relsmgr, onerel, nblocks);
1232         Assert ( nblocks >= 0 );
1233         curvrl->vrl_npages = nblocks;   /* set new number of blocks */
1234     }
1235
1236     if ( archrel != (Relation) NULL )
1237         heap_close(archrel);
1238
1239 } /* _vc_vacheap */
1240
1241 /*
1242  *  _vc_vacpage() -- free (and archive if needed) dead tuples on a page
1243  *                   and repaire its fragmentation.
1244  */
1245 static void
1246 _vc_vacpage (Page page, VPageDescr vpd, Relation archrel)
1247 {
1248     ItemId itemid;
1249     HeapTuple htup;
1250     int i;
1251     
1252     Assert ( vpd->vpd_nusd == 0 );
1253     for (i=0; i < vpd->vpd_noff; i++)
1254     {
1255         itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1256         if ( archrel != (Relation) NULL && ItemIdIsUsed(itemid) )
1257         {
1258             htup = (HeapTuple) PageGetItem (page, itemid);
1259             _vc_archive (archrel, htup);
1260         }
1261         itemid->lp_flags &= ~LP_USED;
1262     }
1263     PageRepairFragmentation(page);
1264
1265 } /* _vc_vacpage */
1266
1267 /*
1268  *  _vc_vaconeind() -- vacuum one index relation.
1269  *
1270  *      Vpl is the VPageList of the heap we're currently vacuuming.
1271  *      It's locked. Indrel is an index relation on the vacuumed heap. 
1272  *      We don't set locks on the index relation here, since the indexed 
1273  *      access methods support locking at different granularities. 
1274  *      We let them handle it.
1275  *
1276  *      Finally, we arrange to update the index relation's statistics in
1277  *      pg_class.
1278  */
1279 static void
1280 _vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1281 {
1282     RetrieveIndexResult res;
1283     IndexScanDesc iscan;
1284     ItemPointer heapptr;
1285     int nvac;
1286     int nitups;
1287     int nipages;
1288     VPageDescr vp;
1289     struct rusage ru0, ru1;
1290
1291     getrusage(RUSAGE_SELF, &ru0);
1292
1293     /* walk through the entire index */
1294     iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1295     nvac = 0;
1296     nitups = 0;
1297
1298     while ((res = index_getnext(iscan, ForwardScanDirection))
1299            != (RetrieveIndexResult) NULL) {
1300         heapptr = &res->heap_iptr;
1301
1302         if ( (vp = _vc_tidreapped (heapptr, vpl)) != (VPageDescr) NULL)
1303         {
1304 #if 0
1305             elog(DEBUG, "<%x,%x> -> <%x,%x>",
1306                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1307                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1308                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1309                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1310 #endif
1311             if ( vp->vpd_noff == 0 ) 
1312             {                           /* this is EmptyPage !!! */
1313                 elog (NOTICE, "Ind %.*s: pointer to EmptyPage (blk %u off %u) - fixing",
1314                         NAMEDATALEN, indrel->rd_rel->relname.data,
1315                         vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1316             }
1317             ++nvac;
1318             index_delete(indrel, &res->index_iptr);
1319         } else {
1320             nitups++;
1321         }
1322
1323         /* be tidy */
1324         pfree(res);
1325     }
1326
1327     index_endscan(iscan);
1328
1329     /* now update statistics in pg_class */
1330     nipages = RelationGetNumberOfBlocks(indrel);
1331     _vc_updstats(indrel->rd_id, nipages, nitups, false);
1332
1333     getrusage(RUSAGE_SELF, &ru1);
1334
1335     elog (MESSLEV, "Ind %.*s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1336         NAMEDATALEN, indrel->rd_rel->relname.data, nipages, nitups, nvac,
1337         ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec, 
1338         ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1339
1340     if ( nitups != nhtups )
1341         elog (NOTICE, "NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1342                 nitups, nhtups);
1343
1344 } /* _vc_vaconeind */
1345
1346 /*
1347  *  _vc_tidreapped() -- is a particular tid reapped?
1348  *
1349  *      vpl->VPageDescr_array is sorted in right order.
1350  */
1351 static VPageDescr
1352 _vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1353 {
1354     OffsetNumber ioffno;
1355     OffsetNumber *voff;
1356     VPageDescr vp, *vpp;
1357     VPageDescrData vpd;
1358
1359     vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1360     ioffno = ItemPointerGetOffsetNumber(itemptr);
1361         
1362     vp = &vpd;
1363     vpp = (VPageDescr*) _vc_find_eq ((char*)(vpl->vpl_pgdesc), 
1364                 vpl->vpl_npages, sizeof (VPageDescr), (char*)&vp, 
1365                 _vc_cmp_blk);
1366
1367     if ( vpp == (VPageDescr*) NULL )
1368         return ((VPageDescr)NULL);
1369     vp = *vpp;
1370
1371     /* ok - we are on true page */
1372
1373     if ( vp->vpd_noff == 0 ) {          /* this is EmptyPage !!! */
1374         return (vp);
1375     }
1376     
1377     voff = (OffsetNumber*) _vc_find_eq ((char*)(vp->vpd_voff), 
1378                 vp->vpd_noff, sizeof (OffsetNumber), (char*)&ioffno, 
1379                 _vc_cmp_offno);
1380
1381     if ( voff == (OffsetNumber*) NULL )
1382         return ((VPageDescr)NULL);
1383
1384     return (vp);
1385
1386 } /* _vc_tidreapped */
1387
1388 /*
1389  *  _vc_updstats() -- update pg_class statistics for one relation
1390  *
1391  *      This routine works for both index and heap relation entries in
1392  *      pg_class.  We violate no-overwrite semantics here by storing new
1393  *      values for ntuples, npages, and hasindex directly in the pg_class
1394  *      tuple that's already on the page.  The reason for this is that if
1395  *      we updated these tuples in the usual way, then every tuple in pg_class
1396  *      would be replaced every day.  This would make planning and executing
1397  *      historical queries very expensive.
1398  */
1399 static void
1400 _vc_updstats(Oid relid, int npages, int ntuples, bool hasindex)
1401 {
1402     Relation rd;
1403     HeapScanDesc sdesc;
1404     HeapTuple tup;
1405     Buffer buf;
1406     Form_pg_class pgcform;
1407     ScanKeyData skey;
1408
1409     /*
1410      * update number of tuples and number of pages in pg_class
1411      */
1412     ScanKeyEntryInitialize(&skey, 0x0, ObjectIdAttributeNumber,
1413                            ObjectIdEqualRegProcedure,
1414                            ObjectIdGetDatum(relid));
1415
1416     rd = heap_openr(RelationRelationName);
1417     sdesc = heap_beginscan(rd, false, NowTimeQual, 1, &skey);
1418
1419     if (!HeapTupleIsValid(tup = heap_getnext(sdesc, 0, &buf)))
1420         elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1421                    relid);
1422
1423     /* overwrite the existing statistics in the tuple */
1424     _vc_setpagelock(rd, BufferGetBlockNumber(buf));
1425     pgcform = (Form_pg_class) GETSTRUCT(tup);
1426     pgcform->reltuples = ntuples;
1427     pgcform->relpages = npages;
1428     pgcform->relhasindex = hasindex;
1429  
1430     /* XXX -- after write, should invalidate relcache in other backends */
1431     WriteNoReleaseBuffer(buf);  /* heap_endscan release scan' buffers ? */
1432
1433     /* that's all, folks */
1434     heap_endscan(sdesc);
1435     heap_close(rd);
1436
1437 }
1438
1439 static void _vc_setpagelock(Relation rel, BlockNumber blkno)
1440 {
1441     ItemPointerData itm;
1442
1443     ItemPointerSet(&itm, blkno, 1);
1444
1445     RelationSetLockForWritePage(rel, &itm);
1446 }
1447
1448
1449 /*
1450  *  _vc_reappage() -- save a page on the array of reapped pages.
1451  *
1452  *      As a side effect of the way that the vacuuming loop for a given
1453  *      relation works, higher pages come after lower pages in the array
1454  *      (and highest tid on a page is last).
1455  */
1456 static void 
1457 _vc_reappage(VPageList vpl, VPageDescr vpc)
1458 {
1459     VPageDescr newvpd;
1460
1461     /* allocate a VPageDescrData entry */
1462     newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff*sizeof(OffsetNumber));
1463
1464     /* fill it in */
1465     if ( vpc->vpd_noff > 0 )
1466         memmove (newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff*sizeof(OffsetNumber));
1467     newvpd->vpd_blkno = vpc->vpd_blkno;
1468     newvpd->vpd_free = vpc->vpd_free;
1469     newvpd->vpd_nusd = vpc->vpd_nusd;
1470     newvpd->vpd_noff = vpc->vpd_noff;
1471
1472     /* insert this page into vpl list */
1473     _vc_vpinsert (vpl, newvpd);
1474     
1475 } /* _vc_reappage */
1476
1477 static void
1478 _vc_vpinsert (VPageList vpl, VPageDescr vpnew)
1479 {
1480
1481     /* allocate a VPageDescr entry if needed */
1482     if ( vpl->vpl_npages == 0 )
1483         vpl->vpl_pgdesc = (VPageDescr*) palloc(100*sizeof(VPageDescr));
1484     else if ( vpl->vpl_npages % 100 == 0 )
1485         vpl->vpl_pgdesc = (VPageDescr*) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages+100)*sizeof(VPageDescr));
1486     vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
1487     (vpl->vpl_npages)++;
1488     
1489 }
1490
1491 static void
1492 _vc_free(Portal p, VRelList vrl)
1493 {
1494     VRelList p_vrl;
1495     VAttList p_val, val;
1496     MemoryContext old;
1497     PortalVariableMemory pmem;
1498
1499     pmem = PortalGetVariableMemory(p);
1500     old = MemoryContextSwitchTo((MemoryContext)pmem);
1501
1502     while (vrl != (VRelList) NULL) {
1503
1504         /* free attribute list */
1505         val = vrl->vrl_attlist;
1506         while (val != (VAttList) NULL) {
1507             p_val = val;
1508             val = val->val_next;
1509             pfree(p_val);
1510         }
1511
1512         /* free rel list entry */
1513         p_vrl = vrl;
1514         vrl = vrl->vrl_next;
1515         pfree(p_vrl);
1516     }
1517
1518     (void) MemoryContextSwitchTo(old);
1519 }
1520
1521 /*
1522  *  _vc_getarchrel() -- open the archive relation for a heap relation
1523  *
1524  *      The archive relation is named 'a,XXXXX' for the heap relation
1525  *      whose relid is XXXXX.
1526  */
1527
1528 #define ARCHIVE_PREFIX  "a,"
1529
1530 static Relation
1531 _vc_getarchrel(Relation heaprel)
1532 {
1533     Relation archrel;
1534     char *archrelname;
1535
1536     archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
1537     sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
1538
1539     archrel = heap_openr(archrelname);
1540
1541     pfree(archrelname);
1542     return (archrel);
1543 }
1544
1545 /*
1546  *  _vc_archive() -- write a tuple to an archive relation
1547  *
1548  *      In the future, this will invoke the archived accessd method.  For
1549  *      now, archive relations are on mag disk.
1550  */
1551 static void
1552 _vc_archive(Relation archrel, HeapTuple htup)
1553 {
1554     doinsert(archrel, htup);
1555 }
1556
1557 static bool
1558 _vc_isarchrel(char *rname)
1559 {
1560     if (strncmp(ARCHIVE_PREFIX, rname,strlen(ARCHIVE_PREFIX)) == 0)
1561         return (true);
1562
1563     return (false);
1564 }
1565
1566 static char *
1567 _vc_find_eq (char *bot, int nelem, int size, char *elm, int (*compar)(char *, char *))
1568 {
1569     int res;
1570     int last = nelem - 1;
1571     int celm = nelem / 2;
1572     bool last_move, first_move;
1573     
1574     last_move = first_move = true;
1575     for ( ; ; )
1576     {
1577         if ( first_move == true )
1578         {
1579             res = compar (bot, elm);
1580             if ( res > 0 )
1581                 return (NULL);
1582             if ( res == 0 )
1583                 return (bot);
1584             first_move = false;
1585         }
1586         if ( last_move == true )
1587         {
1588             res = compar (elm, bot + last*size);
1589             if ( res > 0 )
1590                 return (NULL);
1591             if ( res == 0 )
1592                 return (bot + last*size);
1593             last_move = false;
1594         }
1595         res = compar (elm, bot + celm*size);
1596         if ( res == 0 )
1597             return (bot + celm*size);
1598         if ( res < 0 )
1599         {
1600             if ( celm == 0 )
1601                 return (NULL);
1602             last = celm - 1;
1603             celm = celm / 2;
1604             last_move = true;
1605             continue;
1606         }
1607         
1608         if ( celm == last )
1609             return (NULL);
1610             
1611         last = last - celm - 1;
1612         bot = bot + (celm+1)*size;
1613         celm = (last + 1) / 2;
1614         first_move = true;
1615     }
1616
1617 } /* _vc_find_eq */
1618
1619 static int 
1620 _vc_cmp_blk (char *left, char *right)
1621 {
1622     BlockNumber lblk, rblk;
1623
1624     lblk = (*((VPageDescr*)left))->vpd_blkno;
1625     rblk = (*((VPageDescr*)right))->vpd_blkno;
1626
1627     if ( lblk < rblk )
1628         return (-1);
1629     if ( lblk == rblk )
1630         return (0);
1631     return (1);
1632
1633 } /* _vc_cmp_blk */
1634
1635 static int 
1636 _vc_cmp_offno (char *left, char *right)
1637 {
1638
1639     if ( *(OffsetNumber*)left < *(OffsetNumber*)right )
1640         return (-1);
1641     if ( *(OffsetNumber*)left == *(OffsetNumber*)right )
1642         return (0);
1643     return (1);
1644
1645 } /* _vc_cmp_offno */
1646
1647
1648 static void
1649 _vc_getindices (Oid relid, int *nindices, Relation **Irel)
1650 {
1651     Relation pgindex;
1652     Relation irel;
1653     TupleDesc pgidesc;
1654     HeapTuple pgitup;
1655     HeapScanDesc pgiscan;
1656     Datum d;
1657     int i, k;
1658     bool n;
1659     ScanKeyData pgikey;
1660     Oid *ioid;
1661
1662     *nindices = i = 0;
1663     
1664     ioid = (Oid *) palloc(10*sizeof(Oid));
1665
1666     /* prepare a heap scan on the pg_index relation */
1667     pgindex = heap_openr(IndexRelationName);
1668     pgidesc = RelationGetTupleDescriptor(pgindex);
1669
1670     ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
1671                            ObjectIdEqualRegProcedure,
1672                            ObjectIdGetDatum(relid));
1673
1674     pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
1675
1676     while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL))) {
1677         d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
1678                                  pgidesc, &n);
1679         i++;
1680         if ( i % 10 == 0 )
1681             ioid = (Oid *) repalloc(ioid, (i+10)*sizeof(Oid));
1682         ioid[i-1] = DatumGetObjectId(d);
1683     }
1684
1685     heap_endscan(pgiscan);
1686     heap_close(pgindex);
1687
1688     if ( i == 0 ) {     /* No one index found */
1689         pfree(ioid);
1690         return;
1691     }
1692
1693     if ( Irel != (Relation **) NULL )
1694         *Irel = (Relation *) palloc(i * sizeof(Relation));
1695     
1696     for (k = 0; i > 0; )
1697     {
1698         irel = index_open(ioid[--i]);
1699         if ( irel != (Relation) NULL )
1700         {
1701             if ( Irel != (Relation **) NULL )
1702                 (*Irel)[k] = irel;
1703             else
1704                 index_close (irel);
1705             k++;
1706         }
1707         else
1708             elog (NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
1709     }
1710     *nindices = k;
1711     pfree(ioid);
1712
1713     if ( Irel != (Relation **) NULL && *nindices == 0 )
1714     {
1715         pfree (*Irel);
1716         *Irel = (Relation *) NULL;
1717     }
1718
1719 } /* _vc_getindices */
1720
1721
1722 static void
1723 _vc_clsindices (int nindices, Relation *Irel)
1724 {
1725
1726     if ( Irel == (Relation*) NULL )
1727         return;
1728
1729     while (nindices--) {
1730         index_close (Irel[nindices]);
1731     }
1732     pfree (Irel);
1733
1734 } /* _vc_clsindices */
1735
1736
1737 static void
1738 _vc_mkindesc (Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
1739 {
1740     IndDesc *idcur;
1741     HeapTuple pgIndexTup;
1742     AttrNumber *attnumP;
1743     int natts;
1744     int i;
1745
1746     *Idesc = (IndDesc *) palloc (nindices * sizeof (IndDesc));
1747     
1748     for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++) {
1749         pgIndexTup =
1750                 SearchSysCacheTuple(INDEXRELID,
1751                                 ObjectIdGetDatum(Irel[i]->rd_id),
1752                                 0,0,0);
1753         Assert(pgIndexTup);
1754         idcur->tform = (IndexTupleForm)GETSTRUCT(pgIndexTup);
1755         for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
1756                 *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
1757                 attnumP++, natts++);
1758         if (idcur->tform->indproc != InvalidOid) {
1759             idcur->finfoP = &(idcur->finfo);
1760             FIgetnArgs(idcur->finfoP) = natts;
1761             natts = 1;
1762             FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
1763             *(FIgetname(idcur->finfoP)) = '\0';
1764         } else
1765             idcur->finfoP = (FuncIndexInfo *) NULL;
1766         
1767         idcur->natts = natts;
1768     }
1769     
1770 } /* _vc_mkindesc */
1771
1772
1773 static bool
1774 _vc_enough_space (VPageDescr vpd, Size len)
1775 {
1776
1777     len = DOUBLEALIGN(len);
1778
1779     if ( len > vpd->vpd_free )
1780         return (false);
1781     
1782     if ( vpd->vpd_nusd < vpd->vpd_noff )        /* there are free itemid(s) */
1783         return (true);                          /* and len <= free_space */
1784     
1785     /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
1786     if ( len <= vpd->vpd_free - sizeof (ItemIdData) )
1787         return (true);
1788     
1789     return (false);
1790     
1791 } /* _vc_enough_space */