]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
From: t-ishii@sra.co.jp
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.68 1998/07/26 04:30:25 scrappy Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include "postgres.h"
22
23 #include "access/genam.h"
24 #include "access/heapam.h"
25 #include "access/transam.h"
26 #include "access/xact.h"
27 #include "catalog/catalog.h"
28 #include "catalog/catname.h"
29 #include "catalog/index.h"
30 #ifdef MULTIBYTE
31 #include "catalog/pg_class_mb.h"
32 #else
33 #include "catalog/pg_class.h"
34 #endif
35 #include "catalog/pg_index.h"
36 #include "catalog/pg_operator.h"
37 #include "catalog/pg_statistic.h"
38 #include "catalog/pg_type.h"
39 #include "commands/vacuum.h"
40 #include "fmgr.h"
41 #include "parser/parse_oper.h"
42 #include "storage/bufmgr.h"
43 #include "storage/bufpage.h"
44 #include "storage/shmem.h"
45 #include "storage/smgr.h"
46 #include "storage/lmgr.h"
47 #include "utils/builtins.h"
48 #include "utils/inval.h"
49 #include "utils/mcxt.h"
50 #include "utils/portal.h"
51 #include "utils/syscache.h"
52
53 #ifndef HAVE_GETRUSAGE
54 #include <rusagestub.h>
55 #else
56 #include <sys/time.h>
57 #include <sys/resource.h>
58 #endif
59
60  /* #include <port-protos.h> *//* Why? */
61
62 extern int      BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
63
64 bool            VacuumRunning = false;
65
66 static Portal vc_portal;
67
68 static int      MESSAGE_LEVEL;          /* message level */
69
70 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
71 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
72 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
73 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL )
74 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \
75                                                                    stats->f_cmpgt.fn_addr != NULL && \
76                                                                    RegProcedureIsValid(stats->outfunc) )
77
78
79 /* non-export function prototypes */
80 static void vc_init(void);
81 static void vc_shutdown(void);
82 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
83 static VRelList vc_getrels(NameData *VacRelP);
84 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
85 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
86 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
87 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
88 static void vc_vacpage(Page page, VPageDescr vpd);
89 static void vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
90 static void vc_scanoneind(Relation indrel, int nhtups);
91 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
92 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
93 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
94 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
95 static void vc_setpagelock(Relation rel, BlockNumber blkno);
96 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
97 static void vc_reappage(VPageList vpl, VPageDescr vpc);
98 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
99 static void vc_free(VRelList vrl);
100 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
101 static void vc_clsindices(int nindices, Relation *Irel);
102 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
103 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
104 static int      vc_cmp_blk(char *left, char *right);
105 static int      vc_cmp_offno(char *left, char *right);
106 static bool vc_enough_space(VPageDescr vpd, Size len);
107
108 void
109 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
110 {
111         char       *pname;
112         MemoryContext old;
113         PortalVariableMemory pmem;
114         NameData        VacRel;
115         List       *le;
116         List       *va_cols = NIL;
117
118         /*
119          * Create a portal for safe memory across transctions.  We need to
120          * palloc the name space for it because our hash function expects the
121          * name to be on a longword boundary.  CreatePortal copies the name to
122          * safe storage for us.
123          */
124         pname = (char *) palloc(strlen(VACPNAME) + 1);
125         strcpy(pname, VACPNAME);
126         vc_portal = CreatePortal(pname);
127         pfree(pname);
128
129         if (verbose)
130                 MESSAGE_LEVEL = NOTICE;
131         else
132                 MESSAGE_LEVEL = DEBUG;
133
134         /* vacrel gets de-allocated on transaction commit */
135         if (vacrel)
136                 strcpy(VacRel.data, vacrel);
137
138         pmem = PortalGetVariableMemory(vc_portal);
139         old = MemoryContextSwitchTo((MemoryContext) pmem);
140
141         if (va_spec != NIL && !analyze)
142                 elog(ERROR, "Can't vacuum columns, only tables.  You can 'vacuum analyze' columns.");
143
144         foreach(le, va_spec)
145         {
146                 char       *col = (char *) lfirst(le);
147                 char       *dest;
148
149                 dest = (char *) palloc(strlen(col) + 1);
150                 strcpy(dest, col);
151                 va_cols = lappend(va_cols, dest);
152         }
153         MemoryContextSwitchTo(old);
154
155         /* initialize vacuum cleaner */
156         vc_init();
157
158         /* vacuum the database */
159         if (vacrel)
160                 vc_vacuum(&VacRel, analyze, va_cols);
161         else
162                 vc_vacuum(NULL, analyze, NIL);
163
164         PortalDestroy(&vc_portal);
165
166         /* clean up */
167         vc_shutdown();
168 }
169
170 /*
171  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
172  *
173  *              We run exactly one vacuum cleaner at a time.  We use the file system
174  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
175  *              cleaner instantiation crosses transaction boundaries, and we'd lose
176  *              postgres-style locks at the end of every transaction.
177  *
178  *              The strangeness with committing and starting transactions in the
179  *              init and shutdown routines is due to the fact that the vacuum cleaner
180  *              is invoked via a sql command, and so is already executing inside
181  *              a transaction.  We need to leave ourselves in a predictable state
182  *              on entry and exit to the vacuum cleaner.  We commit the transaction
183  *              started in PostgresMain() inside vc_init(), and start one in
184  *              vc_shutdown() to match the commit waiting for us back in
185  *              PostgresMain().
186  */
187 static void
188 vc_init()
189 {
190         int                     fd;
191
192         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
193                 elog(ERROR, "can't create lock file -- another vacuum cleaner running?");
194
195         close(fd);
196
197         /*
198          * By here, exclusive open on the lock file succeeded.  If we abort
199          * for any reason during vacuuming, we need to remove the lock file.
200          * This global variable is checked in the transaction manager on xact
201          * abort, and the routine vc_abort() is called if necessary.
202          */
203
204         VacuumRunning = true;
205
206         /* matches the StartTransaction in PostgresMain() */
207         CommitTransactionCommand();
208 }
209
210 static void
211 vc_shutdown()
212 {
213         /* on entry, not in a transaction */
214         if (unlink("pg_vlock") < 0)
215                 elog(ERROR, "vacuum: can't destroy lock file!");
216
217         /* okay, we're done */
218         VacuumRunning = false;
219
220         /* matches the CommitTransaction in PostgresMain() */
221         StartTransactionCommand();
222
223 }
224
225 void
226 vc_abort()
227 {
228         /* on abort, remove the vacuum cleaner lock file */
229         unlink("pg_vlock");
230
231         VacuumRunning = false;
232 }
233
234 /*
235  *      vc_vacuum() -- vacuum the database.
236  *
237  *              This routine builds a list of relations to vacuum, and then calls
238  *              code that vacuums them one at a time.  We are careful to vacuum each
239  *              relation in a separate transaction in order to avoid holding too many
240  *              locks at one time.
241  */
242 static void
243 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
244 {
245         VRelList        vrl,
246                                 cur;
247
248         /* get list of relations */
249         vrl = vc_getrels(VacRelP);
250
251         if (analyze && VacRelP == NULL && vrl != NULL)
252                 vc_delhilowstats(InvalidOid, 0, NULL);
253
254         /* vacuum each heap relation */
255         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
256                 vc_vacone(cur->vrl_relid, analyze, va_cols);
257
258         vc_free(vrl);
259 }
260
261 static VRelList
262 vc_getrels(NameData *VacRelP)
263 {
264         Relation        pgclass;
265         TupleDesc       pgcdesc;
266         HeapScanDesc pgcscan;
267         HeapTuple       pgctup;
268         Buffer          buf;
269         PortalVariableMemory portalmem;
270         MemoryContext old;
271         VRelList        vrl,
272                                 cur;
273         Datum           d;
274         char       *rname;
275         char            rkind;
276         bool            n;
277         ScanKeyData pgckey;
278         bool            found = false;
279
280         StartTransactionCommand();
281
282         if (VacRelP->data)
283         {
284                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
285                                                            F_NAMEEQ,
286                                                            PointerGetDatum(VacRelP->data));
287         }
288         else
289         {
290                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
291                                                   F_CHAREQ, CharGetDatum('r'));
292         }
293
294         portalmem = PortalGetVariableMemory(vc_portal);
295         vrl = cur = (VRelList) NULL;
296
297         pgclass = heap_openr(RelationRelationName);
298         pgcdesc = RelationGetTupleDescriptor(pgclass);
299
300         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
301
302         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
303         {
304
305                 found = true;
306
307                 d = heap_getattr(pgctup, Anum_pg_class_relname, pgcdesc, &n);
308                 rname = (char *) d;
309
310                 /*
311                  * don't vacuum large objects for now - something breaks when we
312                  * do
313                  */
314                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
315                         rname[1] == 'i' && rname[2] == 'n' &&
316                         (rname[3] == 'v' || rname[3] == 'x') &&
317                         rname[4] >= '0' && rname[4] <= '9')
318                 {
319                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
320                                  rname);
321                         ReleaseBuffer(buf);
322                         continue;
323                 }
324
325                 d = heap_getattr(pgctup, Anum_pg_class_relkind, pgcdesc, &n);
326
327                 rkind = DatumGetChar(d);
328
329                 /* skip system relations */
330                 if (rkind != 'r')
331                 {
332                         ReleaseBuffer(buf);
333                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
334                         continue;
335                 }
336
337                 /* get a relation list entry for this guy */
338                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
339                 if (vrl == (VRelList) NULL)
340                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
341                 else
342                 {
343                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
344                         cur = cur->vrl_next;
345                 }
346                 MemoryContextSwitchTo(old);
347
348                 cur->vrl_relid = pgctup->t_oid;
349                 cur->vrl_next = (VRelList) NULL;
350
351                 /* wei hates it if you forget to do this */
352                 ReleaseBuffer(buf);
353         }
354         if (found == false)
355                 elog(NOTICE, "Vacuum: table not found");
356
357
358         heap_endscan(pgcscan);
359         heap_close(pgclass);
360
361         CommitTransactionCommand();
362
363         return (vrl);
364 }
365
366 /*
367  *      vc_vacone() -- vacuum one heap relation
368  *
369  *              This routine vacuums a single heap, cleans out its indices, and
370  *              updates its statistics npages and ntups statistics.
371  *
372  *              Doing one heap at a time incurs extra overhead, since we need to
373  *              check that the heap exists again just before we vacuum it.      The
374  *              reason that we do this is so that vacuuming can be spread across
375  *              many small transactions.  Otherwise, two-phase locking would require
376  *              us to lock the entire database during one pass of the vacuum cleaner.
377  */
378 static void
379 vc_vacone(Oid relid, bool analyze, List *va_cols)
380 {
381         Relation        pgclass;
382         TupleDesc       pgcdesc;
383         HeapTuple       pgctup,
384                                 pgttup;
385         Buffer          pgcbuf;
386         HeapScanDesc pgcscan;
387         Relation        onerel;
388         ScanKeyData pgckey;
389         VPageListData Vvpl;                     /* List of pages to vacuum and/or clean
390                                                                  * indices */
391         VPageListData Fvpl;                     /* List of pages with space enough for
392                                                                  * re-using */
393         VPageDescr *vpp;
394         Relation   *Irel;
395         int32           nindices,
396                                 i;
397         VRelStats  *vacrelstats;
398
399         StartTransactionCommand();
400
401         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
402                                                    F_OIDEQ,
403                                                    ObjectIdGetDatum(relid));
404
405         pgclass = heap_openr(RelationRelationName);
406         pgcdesc = RelationGetTupleDescriptor(pgclass);
407         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
408
409         /*
410          * Race condition -- if the pg_class tuple has gone away since the
411          * last time we saw it, we don't need to vacuum it.
412          */
413
414         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
415         {
416                 heap_endscan(pgcscan);
417                 heap_close(pgclass);
418                 CommitTransactionCommand();
419                 return;
420         }
421
422         /* now open the class and vacuum it */
423         onerel = heap_open(relid);
424
425         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
426         vacrelstats->relid = relid;
427         vacrelstats->npages = vacrelstats->ntups = 0;
428         vacrelstats->hasindex = false;
429         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
430         {
431                 int                     attr_cnt,
432                                    *attnums = NULL;
433                 AttributeTupleForm *attr;
434
435                 attr_cnt = onerel->rd_att->natts;
436                 attr = onerel->rd_att->attrs;
437
438                 if (va_cols != NIL)
439                 {
440                         int                     tcnt = 0;
441                         List       *le;
442
443                         if (length(va_cols) > attr_cnt)
444                                 elog(ERROR, "vacuum: too many attributes specified for relation %s",
445                                          (RelationGetRelationName(onerel))->data);
446                         attnums = (int *) palloc(attr_cnt * sizeof(int));
447                         foreach(le, va_cols)
448                         {
449                                 char       *col = (char *) lfirst(le);
450
451                                 for (i = 0; i < attr_cnt; i++)
452                                 {
453                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
454                                                 break;
455                                 }
456                                 if (i < attr_cnt)               /* found */
457                                         attnums[tcnt++] = i;
458                                 else
459                                 {
460                                         elog(ERROR, "vacuum: there is no attribute %s in %s",
461                                                  col, (RelationGetRelationName(onerel))->data);
462                                 }
463                         }
464                         attr_cnt = tcnt;
465                 }
466
467                 vacrelstats->vacattrstats =
468                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
469
470                 for (i = 0; i < attr_cnt; i++)
471                 {
472                         Operator        func_operator;
473                         OperatorTupleForm pgopform;
474                         VacAttrStats *stats;
475
476                         stats = &vacrelstats->vacattrstats[i];
477                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
478                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
479                         stats->best = stats->guess1 = stats->guess2 = 0;
480                         stats->max = stats->min = 0;
481                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
482                         stats->max_len = stats->min_len = 0;
483                         stats->initialized = false;
484                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
485                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
486
487                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
488                         if (func_operator != NULL)
489                         {
490                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
491                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
492                         }
493                         else
494                                 stats->f_cmpeq.fn_addr = NULL;
495
496                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
497                         if (func_operator != NULL)
498                         {
499                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
500                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
501                         }
502                         else
503                                 stats->f_cmplt.fn_addr = NULL;
504
505                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
506                         if (func_operator != NULL)
507                         {
508                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
509                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
510                         }
511                         else
512                                 stats->f_cmpgt.fn_addr = NULL;
513
514                         pgttup = SearchSysCacheTuple(TYPOID,
515                                                                  ObjectIdGetDatum(stats->attr->atttypid),
516                                                                                  0, 0, 0);
517                         if (HeapTupleIsValid(pgttup))
518                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
519                         else
520                                 stats->outfunc = InvalidOid;
521                 }
522                 vacrelstats->va_natts = attr_cnt;
523                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
524                 if (attnums)
525                         pfree(attnums);
526         }
527         else
528         {
529                 vacrelstats->va_natts = 0;
530                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
531         }
532
533         /* we require the relation to be locked until the indices are cleaned */
534         RelationSetLockForWrite(onerel);
535
536         /* scan it */
537         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
538         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
539
540         /* Now open indices */
541         Irel = (Relation *) NULL;
542         vc_getindices(vacrelstats->relid, &nindices, &Irel);
543
544         if (nindices > 0)
545                 vacrelstats->hasindex = true;
546         else
547                 vacrelstats->hasindex = false;
548
549         /* Clean/scan index relation(s) */
550         if (Irel != (Relation *) NULL)
551         {
552                 if (Vvpl.vpl_npages > 0)
553                 {
554                         for (i = 0; i < nindices; i++)
555                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
556                 }
557                 else
558 /* just scan indices to update statistic */
559                 {
560                         for (i = 0; i < nindices; i++)
561                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
562                 }
563         }
564
565         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
566                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
567         else
568         {
569                 if (Irel != (Relation *) NULL)
570                         vc_clsindices(nindices, Irel);
571                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
572                         vc_vacheap(vacrelstats, onerel, &Vvpl);
573         }
574
575         /* ok - free Vvpl list of reapped pages */
576         if (Vvpl.vpl_npages > 0)
577         {
578                 vpp = Vvpl.vpl_pgdesc;
579                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
580                         pfree(*vpp);
581                 pfree(Vvpl.vpl_pgdesc);
582                 if (Fvpl.vpl_npages > 0)
583                         pfree(Fvpl.vpl_pgdesc);
584         }
585
586         /* all done with this class */
587         heap_close(onerel);
588         heap_endscan(pgcscan);
589         heap_close(pgclass);
590
591         /* update statistics in pg_class */
592         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
593                                 vacrelstats->hasindex, vacrelstats);
594
595         /* next command frees attribute stats */
596
597         CommitTransactionCommand();
598 }
599
600 /*
601  *      vc_scanheap() -- scan an open heap relation
602  *
603  *              This routine sets commit times, constructs Vvpl list of
604  *              empty/uninitialized pages and pages with dead tuples and
605  *              ~LP_USED line pointers, constructs Fvpl list of pages
606  *              appropriate for purposes of shrinking and maintains statistics
607  *              on the number of live tuples in a heap.
608  */
609 static void
610 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
611                         VPageList Vvpl, VPageList Fvpl)
612 {
613         int                     nblocks,
614                                 blkno;
615         ItemId          itemid;
616         ItemPointer itemptr;
617         HeapTuple       htup;
618         Buffer          buf;
619         Page            page,
620                                 tempPage = NULL;
621         OffsetNumber offnum,
622                                 maxoff;
623         bool            pgchanged,
624                                 tupgone,
625                                 dobufrel,
626                                 notup;
627         char       *relname;
628         VPageDescr      vpc,
629                                 vp;
630         uint32          nvac,
631                                 ntups,
632                                 nunused,
633                                 ncrash,
634                                 nempg,
635                                 nnepg,
636                                 nchpg,
637                                 nemend;
638         Size            frsize,
639                                 frsusf;
640         Size            min_tlen = MAXTUPLEN;
641         Size            max_tlen = 0;
642         int32           i /* , attr_cnt */ ;
643         struct rusage ru0,
644                                 ru1;
645         bool            do_shrinking = true;
646
647         getrusage(RUSAGE_SELF, &ru0);
648
649         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
650         frsize = frsusf = 0;
651
652         relname = (RelationGetRelationName(onerel))->data;
653
654         nblocks = RelationGetNumberOfBlocks(onerel);
655
656         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
657         vpc->vpd_nusd = 0;
658
659         elog(MESSAGE_LEVEL, "--Relation %s--", relname);
660         
661         for (blkno = 0; blkno < nblocks; blkno++)
662         {
663                 buf = ReadBuffer(onerel, blkno);
664                 page = BufferGetPage(buf);
665                 vpc->vpd_blkno = blkno;
666                 vpc->vpd_noff = 0;
667
668                 if (PageIsNew(page))
669                 {
670                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
671                                  relname, blkno);
672                         PageInit(page, BufferGetPageSize(buf), 0);
673                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
674                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
675                         nnepg++;
676                         nemend++;
677                         vc_reappage(Vvpl, vpc);
678                         WriteBuffer(buf);
679                         continue;
680                 }
681
682                 if (PageIsEmpty(page))
683                 {
684                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
685                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
686                         nempg++;
687                         nemend++;
688                         vc_reappage(Vvpl, vpc);
689                         ReleaseBuffer(buf);
690                         continue;
691                 }
692
693                 pgchanged = false;
694                 notup = true;
695                 maxoff = PageGetMaxOffsetNumber(page);
696                 for (offnum = FirstOffsetNumber;
697                          offnum <= maxoff;
698                          offnum = OffsetNumberNext(offnum))
699                 {
700                         itemid = PageGetItemId(page, offnum);
701
702                         /*
703                          * Collect un-used items too - it's possible to have indices
704                          * pointing here after crash.
705                          */
706                         if (!ItemIdIsUsed(itemid))
707                         {
708                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
709                                 nunused++;
710                                 continue;
711                         }
712
713                         htup = (HeapTuple) PageGetItem(page, itemid);
714                         tupgone = false;
715
716                         if (!(htup->t_infomask & HEAP_XMIN_COMMITTED))
717                         {
718                                 if (htup->t_infomask & HEAP_XMIN_INVALID)
719                                         tupgone = true;
720                                 else
721                                 {
722                                         if (TransactionIdDidAbort(htup->t_xmin))
723                                                 tupgone = true;
724                                         else if (TransactionIdDidCommit(htup->t_xmin))
725                                         {
726                                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
727                                                 pgchanged = true;
728                                         }
729                                         else if (!TransactionIdIsInProgress(htup->t_xmin))
730                                         {
731
732                                                 /*
733                                                  * Not Aborted, Not Committed, Not in Progress -
734                                                  * so it's from crashed process. - vadim 11/26/96
735                                                  */
736                                                 ncrash++;
737                                                 tupgone = true;
738                                         }
739                                         else
740                                         {
741                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
742                                                          relname, blkno, offnum, htup->t_xmin);
743                                                 do_shrinking = false;
744                                         }
745                                 }
746                         }
747
748                         /*
749                          * here we are concerned about tuples with xmin committed and
750                          * xmax unknown or committed
751                          */
752                         if (htup->t_infomask & HEAP_XMIN_COMMITTED &&
753                                 !(htup->t_infomask & HEAP_XMAX_INVALID))
754                         {
755                                 if (htup->t_infomask & HEAP_XMAX_COMMITTED)
756                                         tupgone = true;
757                                 else if (TransactionIdDidAbort(htup->t_xmax))
758                                 {
759                                         htup->t_infomask |= HEAP_XMAX_INVALID;
760                                         pgchanged = true;
761                                 }
762                                 else if (TransactionIdDidCommit(htup->t_xmax))
763                                         tupgone = true;
764                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
765                                 {
766
767                                         /*
768                                          * Not Aborted, Not Committed, Not in Progress - so it
769                                          * from crashed process. - vadim 06/02/97
770                                          */
771                                         htup->t_infomask |= HEAP_XMAX_INVALID;;
772                                         pgchanged = true;
773                                 }
774                                 else
775                                 {
776                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
777                                                  relname, blkno, offnum, htup->t_xmax);
778                                         do_shrinking = false;
779                                 }
780                         }
781
782                         /*
783                          * It's possibly! But from where it comes ? And should we fix
784                          * it ?  - vadim 11/28/96
785                          */
786                         itemptr = &(htup->t_ctid);
787                         if (!ItemPointerIsValid(itemptr) ||
788                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
789                         {
790                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
791                                          relname, blkno, offnum,
792                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
793                                          itemptr->ip_posid, tupgone);
794                         }
795
796                         /*
797                          * Other checks...
798                          */
799                         if (htup->t_len != itemid->lp_len)
800                         {
801                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
802                                          relname, blkno, offnum,
803                                          itemid->lp_len, htup->t_len, tupgone);
804                         }
805                         if (!OidIsValid(htup->t_oid))
806                         {
807                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
808                                          relname, blkno, offnum, tupgone);
809                         }
810
811                         if (tupgone)
812                         {
813                                 ItemId          lpp;
814
815                                 if (tempPage == (Page) NULL)
816                                 {
817                                         Size            pageSize;
818
819                                         pageSize = PageGetPageSize(page);
820                                         tempPage = (Page) palloc(pageSize);
821                                         memmove(tempPage, page, pageSize);
822                                 }
823
824                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
825
826                                 /* mark it unused */
827                                 lpp->lp_flags &= ~LP_USED;
828
829                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
830                                 nvac++;
831
832                         }
833                         else
834                         {
835                                 ntups++;
836                                 notup = false;
837                                 if (htup->t_len < min_tlen)
838                                         min_tlen = htup->t_len;
839                                 if (htup->t_len > max_tlen)
840                                         max_tlen = htup->t_len;
841                                 vc_attrstats(onerel, vacrelstats, htup);
842                         }
843                 }
844
845                 if (pgchanged)
846                 {
847                         WriteBuffer(buf);
848                         dobufrel = false;
849                         nchpg++;
850                 }
851                 else
852                         dobufrel = true;
853                 if (tempPage != (Page) NULL)
854                 {                                               /* Some tuples are gone */
855                         PageRepairFragmentation(tempPage);
856                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
857                         frsize += vpc->vpd_free;
858                         vc_reappage(Vvpl, vpc);
859                         pfree(tempPage);
860                         tempPage = (Page) NULL;
861                 }
862                 else if (vpc->vpd_noff > 0)
863                 {                                               /* there are only ~LP_USED line pointers */
864                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
865                         frsize += vpc->vpd_free;
866                         vc_reappage(Vvpl, vpc);
867                 }
868                 if (dobufrel)
869                         ReleaseBuffer(buf);
870                 if (notup)
871                         nemend++;
872                 else
873                         nemend = 0;
874         }
875
876         pfree(vpc);
877
878         /* save stats in the rel list for use later */
879         vacrelstats->ntups = ntups;
880         vacrelstats->npages = nblocks;
881 /*        vacrelstats->natts = attr_cnt;*/
882         if (ntups == 0)
883                 min_tlen = max_tlen = 0;
884         vacrelstats->min_tlen = min_tlen;
885         vacrelstats->max_tlen = max_tlen;
886
887         Vvpl->vpl_nemend = nemend;
888         Fvpl->vpl_nemend = nemend;
889
890         /*
891          * Try to make Fvpl keeping in mind that we can't use free space of
892          * "empty" end-pages and last page if it reapped.
893          */
894         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
895         {
896                 int                     nusf;           /* blocks usefull for re-using */
897
898                 nusf = Vvpl->vpl_npages - nemend;
899                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
900                         nusf--;
901
902                 for (i = 0; i < nusf; i++)
903                 {
904                         vp = Vvpl->vpl_pgdesc[i];
905                         if (vc_enough_space(vp, min_tlen))
906                         {
907                                 vc_vpinsert(Fvpl, vp);
908                                 frsusf += vp->vpd_free;
909                         }
910                 }
911         }
912
913         getrusage(RUSAGE_SELF, &ru1);
914
915         elog(MESSAGE_LEVEL, "Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
916 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
917                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
918                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
919                  frsize, frsusf, nemend, Fvpl->vpl_npages,
920                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
921                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
922
923 }       /* vc_scanheap */
924
925
926 /*
927  *      vc_rpfheap() -- try to repaire relation' fragmentation
928  *
929  *              This routine marks dead tuples as unused and tries re-use dead space
930  *              by moving tuples (and inserting indices if needed). It constructs
931  *              Nvpl list of free-ed pages (moved tuples) and clean indices
932  *              for them after committing (in hack-manner - without losing locks
933  *              and freeing memory!) current transaction. It truncates relation
934  *              if some end-blocks are gone away.
935  */
936 static void
937 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
938                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
939 {
940         TransactionId myXID;
941         CommandId       myCID;
942         Buffer          buf,
943                                 ToBuf;
944         int                     nblocks,
945                                 blkno;
946         Page            page,
947                                 ToPage = NULL;
948         OffsetNumber offnum = 0,
949                                 maxoff = 0,
950                                 newoff,
951                                 moff;
952         ItemId          itemid,
953                                 newitemid;
954         HeapTuple       htup,
955                                 newtup;
956         TupleDesc       tupdesc = NULL;
957         Datum      *idatum = NULL;
958         char       *inulls = NULL;
959         InsertIndexResult iresult;
960         VPageListData Nvpl;
961         VPageDescr      ToVpd = NULL,
962                                 Fvplast,
963                                 Vvplast,
964                                 vpc,
965                            *vpp;
966         int                     ToVpI = 0;
967         IndDesc    *Idesc,
968                            *idcur;
969         int                     Fblklast,
970                                 Vblklast,
971                                 i;
972         Size            tlen;
973         int                     nmoved,
974                                 Fnpages,
975                                 Vnpages;
976         int                     nchkmvd,
977                                 ntups;
978         bool            isempty,
979                                 dowrite;
980         struct rusage ru0,
981                                 ru1;
982
983         getrusage(RUSAGE_SELF, &ru0);
984
985         myXID = GetCurrentTransactionId();
986         myCID = GetCurrentCommandId();
987
988         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
989         {
990                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
991                 tupdesc = RelationGetTupleDescriptor(onerel);
992                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
993                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
994         }
995
996         Nvpl.vpl_npages = 0;
997         Fnpages = Fvpl->vpl_npages;
998         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
999         Fblklast = Fvplast->vpd_blkno;
1000         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
1001         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
1002         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1003         Vblklast = Vvplast->vpd_blkno;
1004         Assert(Vblklast >= Fblklast);
1005         ToBuf = InvalidBuffer;
1006         nmoved = 0;
1007
1008         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1009         vpc->vpd_nusd = vpc->vpd_noff = 0;
1010
1011         nblocks = vacrelstats->npages;
1012         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1013         {
1014                 /* if it's reapped page and it was used by me - quit */
1015                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1016                         break;
1017
1018                 buf = ReadBuffer(onerel, blkno);
1019                 page = BufferGetPage(buf);
1020
1021                 vpc->vpd_noff = 0;
1022
1023                 isempty = PageIsEmpty(page);
1024
1025                 dowrite = false;
1026                 if (blkno == Vblklast)  /* it's reapped page */
1027                 {
1028                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1029                         {                                       /* on this page - clean */
1030                                 Assert(!isempty);
1031                                 vc_vacpage(page, Vvplast);
1032                                 dowrite = true;
1033                         }
1034                         else
1035                                 Assert(isempty);
1036                         --Vnpages;
1037                         Assert(Vnpages > 0);
1038                         /* get prev reapped page from Vvpl */
1039                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1040                         Vblklast = Vvplast->vpd_blkno;
1041                         if (blkno == Fblklast)          /* this page in Fvpl too */
1042                         {
1043                                 --Fnpages;
1044                                 Assert(Fnpages > 0);
1045                                 Assert(Fvplast->vpd_nusd == 0);
1046                                 /* get prev reapped page from Fvpl */
1047                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1048                                 Fblklast = Fvplast->vpd_blkno;
1049                         }
1050                         Assert(Fblklast <= Vblklast);
1051                         if (isempty)
1052                         {
1053                                 ReleaseBuffer(buf);
1054                                 continue;
1055                         }
1056                 }
1057                 else
1058                         Assert(!isempty);
1059
1060                 vpc->vpd_blkno = blkno;
1061                 maxoff = PageGetMaxOffsetNumber(page);
1062                 for (offnum = FirstOffsetNumber;
1063                          offnum <= maxoff;
1064                          offnum = OffsetNumberNext(offnum))
1065                 {
1066                         itemid = PageGetItemId(page, offnum);
1067
1068                         if (!ItemIdIsUsed(itemid))
1069                                 continue;
1070
1071                         htup = (HeapTuple) PageGetItem(page, itemid);
1072                         tlen = htup->t_len;
1073
1074                         /* try to find new page for this tuple */
1075                         if (ToBuf == InvalidBuffer ||
1076                                 !vc_enough_space(ToVpd, tlen))
1077                         {
1078                                 if (ToBuf != InvalidBuffer)
1079                                 {
1080                                         WriteBuffer(ToBuf);
1081                                         ToBuf = InvalidBuffer;
1082
1083                                         /*
1084                                          * If no one tuple can't be added to this page -
1085                                          * remove page from Fvpl. - vadim 11/27/96
1086                                          *
1087                                          * But we can't remove last page - this is our
1088                                          * "show-stopper" !!!   - vadim 02/25/98
1089                                          */
1090                                         if (ToVpd != Fvplast &&
1091                                                 !vc_enough_space(ToVpd, vacrelstats->min_tlen))
1092                                         {
1093                                                 Assert(Fnpages > ToVpI + 1);
1094                                                 memmove(Fvpl->vpl_pgdesc + ToVpI,
1095                                                                 Fvpl->vpl_pgdesc + ToVpI + 1,
1096                                                    sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1097                                                 Fnpages--;
1098                                                 Assert(Fvplast == Fvpl->vpl_pgdesc[Fnpages - 1]);
1099                                         }
1100                                 }
1101                                 for (i = 0; i < Fnpages; i++)
1102                                 {
1103                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1104                                                 break;
1105                                 }
1106                                 if (i == Fnpages)
1107                                         break;          /* can't move item anywhere */
1108                                 ToVpI = i;
1109                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1110                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1111                                 ToPage = BufferGetPage(ToBuf);
1112                                 /* if this page was not used before - clean it */
1113                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1114                                         vc_vacpage(ToPage, ToVpd);
1115                         }
1116
1117                         /* copy tuple */
1118                         newtup = (HeapTuple) palloc(tlen);
1119                         memmove((char *) newtup, (char *) htup, tlen);
1120
1121                         /* store transaction information */
1122                         TransactionIdStore(myXID, &(newtup->t_xmin));
1123                         newtup->t_cmin = myCID;
1124                         StoreInvalidTransactionId(&(newtup->t_xmax));
1125                         /* set xmin to unknown and xmax to invalid */
1126                         newtup->t_infomask &= ~(HEAP_XACT_MASK);
1127                         newtup->t_infomask |= HEAP_XMAX_INVALID;
1128
1129                         /* add tuple to the page */
1130                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1131                                                                  InvalidOffsetNumber, LP_USED);
1132                         if (newoff == InvalidOffsetNumber)
1133                         {
1134                                 elog(ERROR, "\
1135 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1136                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1137                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1138                         }
1139                         newitemid = PageGetItemId(ToPage, newoff);
1140                         pfree(newtup);
1141                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1142                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1143
1144                         /* now logically delete end-tuple */
1145                         TransactionIdStore(myXID, &(htup->t_xmax));
1146                         htup->t_cmax = myCID;
1147                         /* set xmax to unknown */
1148                         htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED);
1149
1150                         ToVpd->vpd_nusd++;
1151                         nmoved++;
1152                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1153                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1154
1155                         /* insert index' tuples if needed */
1156                         if (Irel != (Relation *) NULL)
1157                         {
1158                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1159                                 {
1160                                         FormIndexDatum(
1161                                                                    idcur->natts,
1162                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1163                                                                    newtup,
1164                                                                    tupdesc,
1165                                                                    InvalidBuffer,
1166                                                                    idatum,
1167                                                                    inulls,
1168                                                                    idcur->finfoP);
1169                                         iresult = index_insert(
1170                                                                                    Irel[i],
1171                                                                                    idatum,
1172                                                                                    inulls,
1173                                                                                    &(newtup->t_ctid),
1174                                                                                    onerel);
1175                                         if (iresult)
1176                                                 pfree(iresult);
1177                                 }
1178                         }
1179
1180                 }                                               /* walk along page */
1181
1182                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1183                 {
1184                         vc_reappage(&Nvpl, vpc);
1185                         WriteBuffer(buf);
1186                 }
1187                 else if (dowrite)
1188                         WriteBuffer(buf);
1189                 else
1190                         ReleaseBuffer(buf);
1191
1192                 if (offnum <= maxoff)
1193                         break;                          /* some item(s) left */
1194
1195         }                                                       /* walk along relation */
1196
1197         blkno++;                                        /* new number of blocks */
1198
1199         if (ToBuf != InvalidBuffer)
1200         {
1201                 Assert(nmoved > 0);
1202                 WriteBuffer(ToBuf);
1203         }
1204
1205         if (nmoved > 0)
1206         {
1207
1208                 /*
1209                  * We have to commit our tuple' movings before we'll truncate
1210                  * relation, but we shouldn't lose our locks. And so - quick hack:
1211                  * flush buffers and record status of current transaction as
1212                  * committed, and continue. - vadim 11/13/96
1213                  */
1214                 FlushBufferPool(!TransactionFlushEnabled());
1215                 TransactionIdCommit(myXID);
1216                 FlushBufferPool(!TransactionFlushEnabled());
1217         }
1218
1219         /*
1220          * Clean uncleaned reapped pages from Vvpl list and set xmin committed
1221          * for inserted tuples
1222          */
1223         nchkmvd = 0;
1224         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1225         {
1226                 Assert((*vpp)->vpd_blkno < blkno);
1227                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1228                 page = BufferGetPage(buf);
1229                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1230                 {
1231
1232                         /*
1233                          * noff == 0 in empty pages only - such pages should be
1234                          * re-used
1235                          */
1236                         Assert((*vpp)->vpd_noff > 0);
1237                         vc_vacpage(page, *vpp);
1238                 }
1239                 else
1240 /* this page was used */
1241                 {
1242                         ntups = 0;
1243                         moff = PageGetMaxOffsetNumber(page);
1244                         for (newoff = FirstOffsetNumber;
1245                                  newoff <= moff;
1246                                  newoff = OffsetNumberNext(newoff))
1247                         {
1248                                 itemid = PageGetItemId(page, newoff);
1249                                 if (!ItemIdIsUsed(itemid))
1250                                         continue;
1251                                 htup = (HeapTuple) PageGetItem(page, itemid);
1252                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1253                                 {
1254                                         htup->t_infomask |= HEAP_XMIN_COMMITTED;
1255                                         ntups++;
1256                                 }
1257                         }
1258                         Assert((*vpp)->vpd_nusd == ntups);
1259                         nchkmvd += ntups;
1260                 }
1261                 WriteBuffer(buf);
1262         }
1263         Assert(nmoved == nchkmvd);
1264
1265         getrusage(RUSAGE_SELF, &ru1);
1266
1267         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1268 Elapsed %u/%u sec.",
1269                  (RelationGetRelationName(onerel))->data,
1270                  nblocks, blkno, nmoved,
1271                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1272                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1273
1274         if (Nvpl.vpl_npages > 0)
1275         {
1276                 /* vacuum indices again if needed */
1277                 if (Irel != (Relation *) NULL)
1278                 {
1279                         VPageDescr *vpleft,
1280                                            *vpright,
1281                                                 vpsave;
1282
1283                         /* re-sort Nvpl.vpl_pgdesc */
1284                         for (vpleft = Nvpl.vpl_pgdesc,
1285                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1286                                  vpleft < vpright; vpleft++, vpright--)
1287                         {
1288                                 vpsave = *vpleft;
1289                                 *vpleft = *vpright;
1290                                 *vpright = vpsave;
1291                         }
1292                         for (i = 0; i < nindices; i++)
1293                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1294                 }
1295
1296                 /*
1297                  * clean moved tuples from last page in Nvpl list if some tuples
1298                  * left there
1299                  */
1300                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1301                 {
1302                         Assert(vpc->vpd_blkno == blkno - 1);
1303                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1304                         page = BufferGetPage(buf);
1305                         ntups = 0;
1306                         maxoff = offnum;
1307                         for (offnum = FirstOffsetNumber;
1308                                  offnum < maxoff;
1309                                  offnum = OffsetNumberNext(offnum))
1310                         {
1311                                 itemid = PageGetItemId(page, offnum);
1312                                 if (!ItemIdIsUsed(itemid))
1313                                         continue;
1314                                 htup = (HeapTuple) PageGetItem(page, itemid);
1315                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1316                                 itemid->lp_flags &= ~LP_USED;
1317                                 ntups++;
1318                         }
1319                         Assert(vpc->vpd_noff == ntups);
1320                         PageRepairFragmentation(page);
1321                         WriteBuffer(buf);
1322                 }
1323
1324                 /* now - free new list of reapped pages */
1325                 vpp = Nvpl.vpl_pgdesc;
1326                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1327                         pfree(*vpp);
1328                 pfree(Nvpl.vpl_pgdesc);
1329         }
1330
1331         /* truncate relation */
1332         if (blkno < nblocks)
1333         {
1334                 i = BlowawayRelationBuffers(onerel, blkno);
1335                 if (i < 0)
1336                         elog(FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1337                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1338                 Assert(blkno >= 0);
1339                 vacrelstats->npages = blkno;    /* set new number of blocks */
1340         }
1341
1342         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1343         {
1344                 pfree(Idesc);
1345                 pfree(idatum);
1346                 pfree(inulls);
1347                 vc_clsindices(nindices, Irel);
1348         }
1349
1350         pfree(vpc);
1351
1352 }       /* vc_rpfheap */
1353
1354 /*
1355  *      vc_vacheap() -- free dead tuples
1356  *
1357  *              This routine marks dead tuples as unused and truncates relation
1358  *              if there are "empty" end-blocks.
1359  */
1360 static void
1361 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1362 {
1363         Buffer          buf;
1364         Page            page;
1365         VPageDescr *vpp;
1366         int                     nblocks;
1367         int                     i;
1368
1369         nblocks = Vvpl->vpl_npages;
1370         nblocks -= Vvpl->vpl_nemend;/* nothing to do with them */
1371
1372         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1373         {
1374                 if ((*vpp)->vpd_noff > 0)
1375                 {
1376                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1377                         page = BufferGetPage(buf);
1378                         vc_vacpage(page, *vpp);
1379                         WriteBuffer(buf);
1380                 }
1381         }
1382
1383         /* truncate relation if there are some empty end-pages */
1384         if (Vvpl->vpl_nemend > 0)
1385         {
1386                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1387                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1388                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1389                          (RelationGetRelationName(onerel))->data,
1390                          vacrelstats->npages, nblocks);
1391
1392                 /*
1393                  * we have to flush "empty" end-pages (if changed, but who knows
1394                  * it) before truncation
1395                  */
1396                 FlushBufferPool(!TransactionFlushEnabled());
1397
1398                 i = BlowawayRelationBuffers(onerel, nblocks);
1399                 if (i < 0)
1400                         elog(FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1401
1402                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1403                 Assert(nblocks >= 0);
1404                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1405         }
1406
1407 }       /* vc_vacheap */
1408
1409 /*
1410  *      vc_vacpage() -- free dead tuples on a page
1411  *                                       and repaire its fragmentation.
1412  */
1413 static void
1414 vc_vacpage(Page page, VPageDescr vpd)
1415 {
1416         ItemId          itemid;
1417         int                     i;
1418
1419         Assert(vpd->vpd_nusd == 0);
1420         for (i = 0; i < vpd->vpd_noff; i++)
1421         {
1422                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1423                 itemid->lp_flags &= ~LP_USED;
1424         }
1425         PageRepairFragmentation(page);
1426
1427 }       /* vc_vacpage */
1428
1429 /*
1430  *      _vc_scanoneind() -- scan one index relation to update statistic.
1431  *
1432  */
1433 static void
1434 vc_scanoneind(Relation indrel, int nhtups)
1435 {
1436         RetrieveIndexResult res;
1437         IndexScanDesc iscan;
1438         int                     nitups;
1439         int                     nipages;
1440         struct rusage ru0,
1441                                 ru1;
1442
1443         getrusage(RUSAGE_SELF, &ru0);
1444
1445         /* walk through the entire index */
1446         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1447         nitups = 0;
1448
1449         while ((res = index_getnext(iscan, ForwardScanDirection))
1450                    != (RetrieveIndexResult) NULL)
1451         {
1452                 nitups++;
1453                 pfree(res);
1454         }
1455
1456         index_endscan(iscan);
1457
1458         /* now update statistics in pg_class */
1459         nipages = RelationGetNumberOfBlocks(indrel);
1460         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1461
1462         getrusage(RUSAGE_SELF, &ru1);
1463
1464         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1465                  indrel->rd_rel->relname.data, nipages, nitups,
1466                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1467                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1468
1469         if (nitups != nhtups)
1470                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1471                          indrel->rd_rel->relname.data, nitups, nhtups);
1472
1473 }       /* vc_scanoneind */
1474
1475 /*
1476  *      vc_vaconeind() -- vacuum one index relation.
1477  *
1478  *              Vpl is the VPageList of the heap we're currently vacuuming.
1479  *              It's locked. Indrel is an index relation on the vacuumed heap.
1480  *              We don't set locks on the index relation here, since the indexed
1481  *              access methods support locking at different granularities.
1482  *              We let them handle it.
1483  *
1484  *              Finally, we arrange to update the index relation's statistics in
1485  *              pg_class.
1486  */
1487 static void
1488 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1489 {
1490         RetrieveIndexResult res;
1491         IndexScanDesc iscan;
1492         ItemPointer heapptr;
1493         int                     nvac;
1494         int                     nitups;
1495         int                     nipages;
1496         VPageDescr      vp;
1497         struct rusage ru0,
1498                                 ru1;
1499
1500         getrusage(RUSAGE_SELF, &ru0);
1501
1502         /* walk through the entire index */
1503         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1504         nvac = 0;
1505         nitups = 0;
1506
1507         while ((res = index_getnext(iscan, ForwardScanDirection))
1508                    != (RetrieveIndexResult) NULL)
1509         {
1510                 heapptr = &res->heap_iptr;
1511
1512                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1513                 {
1514 #if 0
1515                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1516                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1517                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1518                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1519                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1520 #endif
1521                         if (vp->vpd_noff == 0)
1522                         {                                       /* this is EmptyPage !!! */
1523                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1524                                          indrel->rd_rel->relname.data,
1525                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1526                         }
1527                         ++nvac;
1528                         index_delete(indrel, &res->index_iptr);
1529                 }
1530                 else
1531                         nitups++;
1532
1533                 /* be tidy */
1534                 pfree(res);
1535         }
1536
1537         index_endscan(iscan);
1538
1539         /* now update statistics in pg_class */
1540         nipages = RelationGetNumberOfBlocks(indrel);
1541         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1542
1543         getrusage(RUSAGE_SELF, &ru1);
1544
1545         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1546                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1547                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1548                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1549
1550         if (nitups != nhtups)
1551                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1552                          indrel->rd_rel->relname.data, nitups, nhtups);
1553
1554 }       /* vc_vaconeind */
1555
1556 /*
1557  *      vc_tidreapped() -- is a particular tid reapped?
1558  *
1559  *              vpl->VPageDescr_array is sorted in right order.
1560  */
1561 static VPageDescr
1562 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1563 {
1564         OffsetNumber ioffno;
1565         OffsetNumber *voff;
1566         VPageDescr      vp,
1567                            *vpp;
1568         VPageDescrData vpd;
1569
1570         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1571         ioffno = ItemPointerGetOffsetNumber(itemptr);
1572
1573         vp = &vpd;
1574         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1575                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1576                                                                         vc_cmp_blk);
1577
1578         if (vpp == (VPageDescr *) NULL)
1579                 return ((VPageDescr) NULL);
1580         vp = *vpp;
1581
1582         /* ok - we are on true page */
1583
1584         if (vp->vpd_noff == 0)
1585         {                                                       /* this is EmptyPage !!! */
1586                 return (vp);
1587         }
1588
1589         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1590                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1591                                                                            vc_cmp_offno);
1592
1593         if (voff == (OffsetNumber *) NULL)
1594                 return ((VPageDescr) NULL);
1595
1596         return (vp);
1597
1598 }       /* vc_tidreapped */
1599
1600 /*
1601  *      vc_attrstats() -- compute column statistics used by the optimzer
1602  *
1603  *      We compute the column min, max, null and non-null counts.
1604  *      Plus we attempt to find the count of the value that occurs most
1605  *      frequently in each column
1606  *      These figures are used to compute the selectivity of the column
1607  *
1608  *      We use a three-bucked cache to get the most frequent item
1609  *      The 'guess' buckets count hits.  A cache miss causes guess1
1610  *      to get the most hit 'guess' item in the most recent cycle, and
1611  *      the new item goes into guess2.  Whenever the total count of hits
1612  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1613  *
1614  *      This method works perfectly for columns with unique values, and columns
1615  *      with only two unique values, plus nulls.
1616  *
1617  *      It becomes less perfect as the number of unique values increases and
1618  *      their distribution in the table becomes more random.
1619  *
1620  */
1621 static void
1622 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1623 {
1624         int                     i,
1625                                 attr_cnt = vacrelstats->va_natts;
1626         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1627         TupleDesc       tupDesc = onerel->rd_att;
1628         Datum           value;
1629         bool            isnull;
1630
1631         for (i = 0; i < attr_cnt; i++)
1632         {
1633                 VacAttrStats *stats = &vacattrstats[i];
1634                 bool            value_hit = true;
1635
1636                 value = heap_getattr(htup,
1637                                                          stats->attr->attnum, tupDesc, &isnull);
1638
1639                 if (!VacAttrStatsEqValid(stats))
1640                         continue;
1641
1642                 if (isnull)
1643                         stats->null_cnt++;
1644                 else
1645                 {
1646                         stats->nonnull_cnt++;
1647                         if (stats->initialized == false)
1648                         {
1649                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1650                                 /* best_cnt gets incremented later */
1651                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1652                                 stats->guess1_cnt = stats->guess1_hits = 1;
1653                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1654                                 stats->guess2_hits = 1;
1655                                 if (VacAttrStatsLtGtValid(stats))
1656                                 {
1657                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1658                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1659                                 }
1660                                 stats->initialized = true;
1661                         }
1662                         if (VacAttrStatsLtGtValid(stats))
1663                         {
1664                                 if ((*fmgr_faddr(&stats->f_cmplt)) (value, stats->min))
1665                                 {
1666                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1667                                         stats->min_cnt = 0;
1668                                 }
1669                                 if ((*fmgr_faddr(&stats->f_cmpgt)) (value, stats->max))
1670                                 {
1671                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1672                                         stats->max_cnt = 0;
1673                                 }
1674                                 if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->min))
1675                                         stats->min_cnt++;
1676                                 else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->max))
1677                                         stats->max_cnt++;
1678                         }
1679                         if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->best))
1680                                 stats->best_cnt++;
1681                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess1))
1682                         {
1683                                 stats->guess1_cnt++;
1684                                 stats->guess1_hits++;
1685                         }
1686                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess2))
1687                                 stats->guess2_hits++;
1688                         else
1689                                 value_hit = false;
1690
1691                         if (stats->guess2_hits > stats->guess1_hits)
1692                         {
1693                                 swapDatum(stats->guess1, stats->guess2);
1694                                 swapInt(stats->guess1_len, stats->guess2_len);
1695                                 stats->guess1_cnt = stats->guess2_hits;
1696                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1697                         }
1698                         if (stats->guess1_cnt > stats->best_cnt)
1699                         {
1700                                 swapDatum(stats->best, stats->guess1);
1701                                 swapInt(stats->best_len, stats->guess1_len);
1702                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1703                                 stats->guess1_hits = 1;
1704                                 stats->guess2_hits = 1;
1705                         }
1706                         if (!value_hit)
1707                         {
1708                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1709                                 stats->guess1_hits = 1;
1710                                 stats->guess2_hits = 1;
1711                         }
1712                 }
1713         }
1714         return;
1715 }
1716
1717 /*
1718  *      vc_bucketcpy() -- update pg_class statistics for one relation
1719  *
1720  */
1721 static void
1722 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1723 {
1724         if (attr->attbyval && attr->attlen != -1)
1725                 *bucket = value;
1726         else
1727         {
1728                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1729
1730                 if (len > *bucket_len)
1731                 {
1732                         if (*bucket_len != 0)
1733                                 pfree(DatumGetPointer(*bucket));
1734                         *bucket = PointerGetDatum(palloc(len));
1735                         *bucket_len = len;
1736                 }
1737                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1738         }
1739 }
1740
1741 /*
1742  *      vc_updstats() -- update pg_class statistics for one relation
1743  *
1744  *              This routine works for both index and heap relation entries in
1745  *              pg_class.  We violate no-overwrite semantics here by storing new
1746  *              values for ntups, npages, and hasindex directly in the pg_class
1747  *              tuple that's already on the page.  The reason for this is that if
1748  *              we updated these tuples in the usual way, then every tuple in pg_class
1749  *              would be replaced every day.  This would make planning and executing
1750  *              historical queries very expensive.
1751  */
1752 static void
1753 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1754 {
1755         Relation        rd,
1756                                 ad,
1757                                 sd;
1758         HeapScanDesc rsdesc,
1759                                 asdesc;
1760         TupleDesc       sdesc;
1761         HeapTuple       rtup,
1762                                 atup,
1763                                 stup;
1764         Buffer          rbuf,
1765                                 abuf;
1766         Form_pg_class pgcform;
1767         ScanKeyData rskey,
1768                                 askey;
1769         AttributeTupleForm attp;
1770
1771         /*
1772          * update number of tuples and number of pages in pg_class
1773          */
1774         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1775                                                    F_OIDEQ,
1776                                                    ObjectIdGetDatum(relid));
1777
1778         rd = heap_openr(RelationRelationName);
1779         rsdesc = heap_beginscan(rd, false, false, 1, &rskey);
1780
1781         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1782                 elog(ERROR, "pg_class entry for relid %d vanished during vacuuming",
1783                          relid);
1784
1785         /* overwrite the existing statistics in the tuple */
1786         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1787         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1788         pgcform->reltuples = ntups;
1789         pgcform->relpages = npages;
1790         pgcform->relhasindex = hasindex;
1791
1792         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1793         {
1794                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1795                 int                     natts = vacrelstats->va_natts;
1796
1797                 ad = heap_openr(AttributeRelationName);
1798                 sd = heap_openr(StatisticRelationName);
1799                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1800                                                            F_INT4EQ, relid);
1801
1802                 asdesc = heap_beginscan(ad, false, false, 1, &askey);
1803
1804                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1805                 {
1806                         int                     i;
1807                         float32data selratio;           /* average ratio of rows selected
1808                                                                                  * for a random constant */
1809                         VacAttrStats *stats;
1810                         Datum           values[Natts_pg_statistic];
1811                         char            nulls[Natts_pg_statistic];
1812
1813                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1814                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1815                                 /* they are unique anyway */
1816                                 continue;
1817
1818                         for (i = 0; i < natts; i++)
1819                         {
1820                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1821                                         break;
1822                         }
1823                         if (i >= natts)
1824                                 continue;
1825                         stats = &(vacattrstats[i]);
1826
1827                         /* overwrite the existing statistics in the tuple */
1828                         if (VacAttrStatsEqValid(stats))
1829                         {
1830
1831                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1832
1833                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1834                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1835                                         selratio = 0;
1836                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1837                                 {
1838                                         double          min_cnt_d = stats->min_cnt,
1839                                                                 max_cnt_d = stats->max_cnt,
1840                                                                 null_cnt_d = stats->null_cnt,
1841                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1842
1843                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1844                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1845                                 }
1846                                 else
1847                                 {
1848                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1849                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1850
1851                                         /*
1852                                          * we assume count of other values are 20% of best
1853                                          * count in table
1854                                          */
1855                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1856                                 }
1857                                 if (selratio > 1.0)
1858                                         selratio = 1.0;
1859                                 attp->attdisbursion = selratio;
1860                                 WriteNoReleaseBuffer(abuf);
1861
1862                                 /* DO PG_STATISTIC INSERTS */
1863
1864                                 /*
1865                                  * doing system relations, especially pg_statistic is a
1866                                  * problem
1867                                  */
1868                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1869                                                                                                                                                  * !IsSystemRelationName(
1870                                                                                                                                                  *
1871                                          pgcform->relname.data) */ )
1872                                 {
1873                                         FmgrInfo        out_function;
1874                                         char       *out_string;
1875
1876                                         for (i = 0; i < Natts_pg_statistic; ++i)
1877                                                 nulls[i] = ' ';
1878
1879                                         /* ----------------
1880                                          *      initialize values[]
1881                                          * ----------------
1882                                          */
1883                                         i = 0;
1884                                         values[i++] = (Datum) relid;            /* 1 */
1885                                         values[i++] = (Datum) attp->attnum; /* 2 */
1886                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1887                                         fmgr_info(stats->outfunc, &out_function);
1888                                         out_string = (*fmgr_faddr(&out_function)) (stats->min, stats->attr->atttypid);
1889                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1890                                         pfree(out_string);
1891                                         out_string = (char *) (*fmgr_faddr(&out_function)) (stats->max, stats->attr->atttypid);
1892                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1893                                         pfree(out_string);
1894
1895                                         sdesc = sd->rd_att;
1896
1897                                         stup = heap_formtuple(sdesc, values, nulls);
1898
1899                                         /* ----------------
1900                                          *      insert the tuple in the relation and get the tuple's oid.
1901                                          * ----------------
1902                                          */
1903                                         heap_insert(sd, stup);
1904                                         pfree(DatumGetPointer(values[3]));
1905                                         pfree(DatumGetPointer(values[4]));
1906                                         pfree(stup);
1907                                 }
1908                         }
1909                 }
1910                 heap_endscan(asdesc);
1911                 heap_close(ad);
1912                 heap_close(sd);
1913         }
1914
1915         /* XXX -- after write, should invalidate relcache in other backends */
1916         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1917
1918         /*
1919          * invalidating system relations confuses the function cache of
1920          * pg_operator and pg_opclass
1921          */
1922         if (!IsSystemRelationName(pgcform->relname.data))
1923                 RelationInvalidateHeapTuple(rd, rtup);
1924
1925         /* that's all, folks */
1926         heap_endscan(rsdesc);
1927         heap_close(rd);
1928 }
1929
1930 /*
1931  *      vc_delhilowstats() -- delete pg_statistics rows
1932  *
1933  */
1934 static void
1935 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1936 {
1937         Relation        pgstatistic;
1938         HeapScanDesc pgsscan;
1939         HeapTuple       pgstup;
1940         ScanKeyData pgskey;
1941
1942         pgstatistic = heap_openr(StatisticRelationName);
1943
1944         if (relid != InvalidOid)
1945         {
1946                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1947                                                            F_OIDEQ,
1948                                                            ObjectIdGetDatum(relid));
1949                 pgsscan = heap_beginscan(pgstatistic, false, false, 1, &pgskey);
1950         }
1951         else
1952                 pgsscan = heap_beginscan(pgstatistic, false, false, 0, NULL);
1953
1954         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1955         {
1956                 if (attcnt > 0)
1957                 {
1958                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
1959                         int                     i;
1960
1961                         for (i = 0; i < attcnt; i++)
1962                         {
1963                                 if (pgs->staattnum == attnums[i] + 1)
1964                                         break;
1965                         }
1966                         if (i >= attcnt)
1967                                 continue;               /* don't delete it */
1968                 }
1969                 heap_delete(pgstatistic, &pgstup->t_ctid);
1970         }
1971
1972         heap_endscan(pgsscan);
1973         heap_close(pgstatistic);
1974 }
1975
1976 static void
1977 vc_setpagelock(Relation rel, BlockNumber blkno)
1978 {
1979         ItemPointerData itm;
1980
1981         ItemPointerSet(&itm, blkno, 1);
1982
1983         RelationSetLockForWritePage(rel, &itm);
1984 }
1985
1986 /*
1987  *      vc_reappage() -- save a page on the array of reapped pages.
1988  *
1989  *              As a side effect of the way that the vacuuming loop for a given
1990  *              relation works, higher pages come after lower pages in the array
1991  *              (and highest tid on a page is last).
1992  */
1993 static void
1994 vc_reappage(VPageList vpl, VPageDescr vpc)
1995 {
1996         VPageDescr      newvpd;
1997
1998         /* allocate a VPageDescrData entry */
1999         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
2000
2001         /* fill it in */
2002         if (vpc->vpd_noff > 0)
2003                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
2004         newvpd->vpd_blkno = vpc->vpd_blkno;
2005         newvpd->vpd_free = vpc->vpd_free;
2006         newvpd->vpd_nusd = vpc->vpd_nusd;
2007         newvpd->vpd_noff = vpc->vpd_noff;
2008
2009         /* insert this page into vpl list */
2010         vc_vpinsert(vpl, newvpd);
2011
2012 }       /* vc_reappage */
2013
2014 static void
2015 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2016 {
2017
2018         /* allocate a VPageDescr entry if needed */
2019         if (vpl->vpl_npages == 0)
2020                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2021         else if (vpl->vpl_npages % 100 == 0)
2022                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2023         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2024         (vpl->vpl_npages)++;
2025
2026 }
2027
2028 static void
2029 vc_free(VRelList vrl)
2030 {
2031         VRelList        p_vrl;
2032         MemoryContext old;
2033         PortalVariableMemory pmem;
2034
2035         pmem = PortalGetVariableMemory(vc_portal);
2036         old = MemoryContextSwitchTo((MemoryContext) pmem);
2037
2038         while (vrl != (VRelList) NULL)
2039         {
2040
2041                 /* free rel list entry */
2042                 p_vrl = vrl;
2043                 vrl = vrl->vrl_next;
2044                 pfree(p_vrl);
2045         }
2046
2047         MemoryContextSwitchTo(old);
2048 }
2049
2050 static char *
2051 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2052 {
2053         int                     res;
2054         int                     last = nelem - 1;
2055         int                     celm = nelem / 2;
2056         bool            last_move,
2057                                 first_move;
2058
2059         last_move = first_move = true;
2060         for (;;)
2061         {
2062                 if (first_move == true)
2063                 {
2064                         res = compar(bot, elm);
2065                         if (res > 0)
2066                                 return (NULL);
2067                         if (res == 0)
2068                                 return (bot);
2069                         first_move = false;
2070                 }
2071                 if (last_move == true)
2072                 {
2073                         res = compar(elm, bot + last * size);
2074                         if (res > 0)
2075                                 return (NULL);
2076                         if (res == 0)
2077                                 return (bot + last * size);
2078                         last_move = false;
2079                 }
2080                 res = compar(elm, bot + celm * size);
2081                 if (res == 0)
2082                         return (bot + celm * size);
2083                 if (res < 0)
2084                 {
2085                         if (celm == 0)
2086                                 return (NULL);
2087                         last = celm - 1;
2088                         celm = celm / 2;
2089                         last_move = true;
2090                         continue;
2091                 }
2092
2093                 if (celm == last)
2094                         return (NULL);
2095
2096                 last = last - celm - 1;
2097                 bot = bot + (celm + 1) * size;
2098                 celm = (last + 1) / 2;
2099                 first_move = true;
2100         }
2101
2102 }       /* vc_find_eq */
2103
2104 static int
2105 vc_cmp_blk(char *left, char *right)
2106 {
2107         BlockNumber lblk,
2108                                 rblk;
2109
2110         lblk = (*((VPageDescr *) left))->vpd_blkno;
2111         rblk = (*((VPageDescr *) right))->vpd_blkno;
2112
2113         if (lblk < rblk)
2114                 return (-1);
2115         if (lblk == rblk)
2116                 return (0);
2117         return (1);
2118
2119 }       /* vc_cmp_blk */
2120
2121 static int
2122 vc_cmp_offno(char *left, char *right)
2123 {
2124
2125         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2126                 return (-1);
2127         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2128                 return (0);
2129         return (1);
2130
2131 }       /* vc_cmp_offno */
2132
2133
2134 static void
2135 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2136 {
2137         Relation        pgindex;
2138         Relation        irel;
2139         TupleDesc       pgidesc;
2140         HeapTuple       pgitup;
2141         HeapScanDesc pgiscan;
2142         Datum           d;
2143         int                     i,
2144                                 k;
2145         bool            n;
2146         ScanKeyData pgikey;
2147         Oid                *ioid;
2148
2149         *nindices = i = 0;
2150
2151         ioid = (Oid *) palloc(10 * sizeof(Oid));
2152
2153         /* prepare a heap scan on the pg_index relation */
2154         pgindex = heap_openr(IndexRelationName);
2155         pgidesc = RelationGetTupleDescriptor(pgindex);
2156
2157         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2158                                                    F_OIDEQ,
2159                                                    ObjectIdGetDatum(relid));
2160
2161         pgiscan = heap_beginscan(pgindex, false, false, 1, &pgikey);
2162
2163         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2164         {
2165                 d = heap_getattr(pgitup, Anum_pg_index_indexrelid,
2166                                                  pgidesc, &n);
2167                 i++;
2168                 if (i % 10 == 0)
2169                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2170                 ioid[i - 1] = DatumGetObjectId(d);
2171         }
2172
2173         heap_endscan(pgiscan);
2174         heap_close(pgindex);
2175
2176         if (i == 0)
2177         {                                                       /* No one index found */
2178                 pfree(ioid);
2179                 return;
2180         }
2181
2182         if (Irel != (Relation **) NULL)
2183                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2184
2185         for (k = 0; i > 0;)
2186         {
2187                 irel = index_open(ioid[--i]);
2188                 if (irel != (Relation) NULL)
2189                 {
2190                         if (Irel != (Relation **) NULL)
2191                                 (*Irel)[k] = irel;
2192                         else
2193                                 index_close(irel);
2194                         k++;
2195                 }
2196                 else
2197                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2198         }
2199         *nindices = k;
2200         pfree(ioid);
2201
2202         if (Irel != (Relation **) NULL && *nindices == 0)
2203         {
2204                 pfree(*Irel);
2205                 *Irel = (Relation *) NULL;
2206         }
2207
2208 }       /* vc_getindices */
2209
2210
2211 static void
2212 vc_clsindices(int nindices, Relation *Irel)
2213 {
2214
2215         if (Irel == (Relation *) NULL)
2216                 return;
2217
2218         while (nindices--)
2219                 index_close(Irel[nindices]);
2220         pfree(Irel);
2221
2222 }       /* vc_clsindices */
2223
2224
2225 static void
2226 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2227 {
2228         IndDesc    *idcur;
2229         HeapTuple       pgIndexTup;
2230         AttrNumber *attnumP;
2231         int                     natts;
2232         int                     i;
2233
2234         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2235
2236         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2237         {
2238                 pgIndexTup =
2239                         SearchSysCacheTuple(INDEXRELID,
2240                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2241                                                                 0, 0, 0);
2242                 Assert(pgIndexTup);
2243                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2244                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2245                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2246                          attnumP++, natts++);
2247                 if (idcur->tform->indproc != InvalidOid)
2248                 {
2249                         idcur->finfoP = &(idcur->finfo);
2250                         FIgetnArgs(idcur->finfoP) = natts;
2251                         natts = 1;
2252                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2253                         *(FIgetname(idcur->finfoP)) = '\0';
2254                 }
2255                 else
2256                         idcur->finfoP = (FuncIndexInfo *) NULL;
2257
2258                 idcur->natts = natts;
2259         }
2260
2261 }       /* vc_mkindesc */
2262
2263
2264 static bool
2265 vc_enough_space(VPageDescr vpd, Size len)
2266 {
2267
2268         len = DOUBLEALIGN(len);
2269
2270         if (len > vpd->vpd_free)
2271                 return (false);
2272
2273         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2274                 return (true);                  /* and len <= free_space */
2275
2276         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2277         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2278                 return (true);
2279
2280         return (false);
2281
2282 }       /* vc_enough_space */