]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Subselects with =, >, etc.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.60 1998/02/03 19:26:33 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <parser/parse_oper.h>
40 #include <storage/smgr.h>
41 #include <storage/lmgr.h>
42 #include <utils/inval.h>
43 #include <utils/mcxt.h>
44 #include <utils/inval.h>
45 #include <utils/syscache.h>
46 #include <utils/builtins.h>
47 #include <commands/vacuum.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 #include <rusagestub.h>
52 #else
53 #include <sys/time.h>
54 #include <sys/resource.h>
55 #endif
56
57 /* #include <port-protos.h> */ /* Why? */
58
59 extern int BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
60
61 bool            VacuumRunning = false;
62
63 static Portal vc_portal;
64
65 static int      MESSAGE_LEVEL;          /* message level */
66
67 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
68 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
69 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
70 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL )
71 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \
72                                                                    stats->f_cmpgt.fn_addr != NULL && \
73                                                                    RegProcedureIsValid(stats->outfunc) )
74
75
76 /* non-export function prototypes */
77 static void vc_init(void);
78 static void vc_shutdown(void);
79 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
80 static VRelList vc_getrels(NameData *VacRelP);
81 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
82 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
83 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
84 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
85 static void vc_vacpage(Page page, VPageDescr vpd);
86 static void vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
87 static void vc_scanoneind(Relation indrel, int nhtups);
88 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
89 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
90 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
91 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
92 static void vc_setpagelock(Relation rel, BlockNumber blkno);
93 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
94 static void vc_reappage(VPageList vpl, VPageDescr vpc);
95 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
96 static void vc_free(VRelList vrl);
97 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
98 static void vc_clsindices(int nindices, Relation *Irel);
99 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
100 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
101 static int      vc_cmp_blk(char *left, char *right);
102 static int      vc_cmp_offno(char *left, char *right);
103 static bool vc_enough_space(VPageDescr vpd, Size len);
104
105 void
106 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
107 {
108         char       *pname;
109         MemoryContext old;
110         PortalVariableMemory pmem;
111         NameData        VacRel;
112         List       *le;
113         List       *va_cols = NIL;
114
115         /*
116          * Create a portal for safe memory across transctions.  We need to
117          * palloc the name space for it because our hash function expects the
118          * name to be on a longword boundary.  CreatePortal copies the name to
119          * safe storage for us.
120          */
121         pname = (char *) palloc(strlen(VACPNAME) + 1);
122         strcpy(pname, VACPNAME);
123         vc_portal = CreatePortal(pname);
124         pfree(pname);
125
126         if (verbose)
127                 MESSAGE_LEVEL = NOTICE;
128         else
129                 MESSAGE_LEVEL = DEBUG;
130
131         /* vacrel gets de-allocated on transaction commit */
132         if (vacrel)
133                 strcpy(VacRel.data, vacrel);
134
135         pmem = PortalGetVariableMemory(vc_portal);
136         old = MemoryContextSwitchTo((MemoryContext) pmem);
137
138         if (va_spec == NIL || analyze)
139                 elog(ERROR,"Can't vacuum columns, only tables.  You can 'vacuum analyze' columns.");
140
141         foreach(le, va_spec)
142         {
143                 char       *col = (char *) lfirst(le);
144                 char       *dest;
145
146                 dest = (char *) palloc(strlen(col) + 1);
147                 strcpy(dest, col);
148                 va_cols = lappend(va_cols, dest);
149         }
150         MemoryContextSwitchTo(old);
151
152         /* initialize vacuum cleaner */
153         vc_init();
154
155         /* vacuum the database */
156         if (vacrel)
157                 vc_vacuum(&VacRel, analyze, va_cols);
158         else
159                 vc_vacuum(NULL, analyze, NIL);
160
161         PortalDestroy(&vc_portal);
162
163         /* clean up */
164         vc_shutdown();
165 }
166
167 /*
168  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
169  *
170  *              We run exactly one vacuum cleaner at a time.  We use the file system
171  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
172  *              cleaner instantiation crosses transaction boundaries, and we'd lose
173  *              postgres-style locks at the end of every transaction.
174  *
175  *              The strangeness with committing and starting transactions in the
176  *              init and shutdown routines is due to the fact that the vacuum cleaner
177  *              is invoked via a sql command, and so is already executing inside
178  *              a transaction.  We need to leave ourselves in a predictable state
179  *              on entry and exit to the vacuum cleaner.  We commit the transaction
180  *              started in PostgresMain() inside vc_init(), and start one in
181  *              vc_shutdown() to match the commit waiting for us back in
182  *              PostgresMain().
183  */
184 static void
185 vc_init()
186 {
187         int                     fd;
188
189         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
190                 elog(ERROR, "can't create lock file -- another vacuum cleaner running?");
191
192         close(fd);
193
194         /*
195          * By here, exclusive open on the lock file succeeded.  If we abort
196          * for any reason during vacuuming, we need to remove the lock file.
197          * This global variable is checked in the transaction manager on xact
198          * abort, and the routine vc_abort() is called if necessary.
199          */
200
201         VacuumRunning = true;
202
203         /* matches the StartTransaction in PostgresMain() */
204         CommitTransactionCommand();
205 }
206
207 static void
208 vc_shutdown()
209 {
210         /* on entry, not in a transaction */
211         if (unlink("pg_vlock") < 0)
212                 elog(ERROR, "vacuum: can't destroy lock file!");
213
214         /* okay, we're done */
215         VacuumRunning = false;
216
217         /* matches the CommitTransaction in PostgresMain() */
218         StartTransactionCommand();
219
220 }
221
222 void
223 vc_abort()
224 {
225         /* on abort, remove the vacuum cleaner lock file */
226         unlink("pg_vlock");
227
228         VacuumRunning = false;
229 }
230
231 /*
232  *      vc_vacuum() -- vacuum the database.
233  *
234  *              This routine builds a list of relations to vacuum, and then calls
235  *              code that vacuums them one at a time.  We are careful to vacuum each
236  *              relation in a separate transaction in order to avoid holding too many
237  *              locks at one time.
238  */
239 static void
240 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
241 {
242         VRelList        vrl,
243                                 cur;
244
245         /* get list of relations */
246         vrl = vc_getrels(VacRelP);
247
248         if (analyze && VacRelP == NULL && vrl != NULL)
249                 vc_delhilowstats(InvalidOid, 0, NULL);
250
251         /* vacuum each heap relation */
252         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
253                 vc_vacone(cur->vrl_relid, analyze, va_cols);
254
255         vc_free(vrl);
256 }
257
258 static VRelList
259 vc_getrels(NameData *VacRelP)
260 {
261         Relation        pgclass;
262         TupleDesc       pgcdesc;
263         HeapScanDesc pgcscan;
264         HeapTuple       pgctup;
265         Buffer          buf;
266         PortalVariableMemory portalmem;
267         MemoryContext old;
268         VRelList        vrl,
269                                 cur;
270         Datum           d;
271         char       *rname;
272         char            rkind;
273         bool            n;
274         ScanKeyData pgckey;
275         bool            found = false;
276
277         StartTransactionCommand();
278
279         if (VacRelP->data)
280         {
281                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
282                                                            NameEqualRegProcedure,
283                                                            PointerGetDatum(VacRelP->data));
284         }
285         else
286         {
287                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
288                                                   CharacterEqualRegProcedure, CharGetDatum('r'));
289         }
290
291         portalmem = PortalGetVariableMemory(vc_portal);
292         vrl = cur = (VRelList) NULL;
293
294         pgclass = heap_openr(RelationRelationName);
295         pgcdesc = RelationGetTupleDescriptor(pgclass);
296
297         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
298
299         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
300         {
301
302                 found = true;
303
304                 d = heap_getattr(pgctup, Anum_pg_class_relname, pgcdesc, &n);
305                 rname = (char *) d;
306
307                 /*
308                  * don't vacuum large objects for now - something breaks when we
309                  * do
310                  */
311                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
312                         rname[1] == 'i' && rname[2] == 'n' &&
313                         (rname[3] == 'v' || rname[3] == 'x') &&
314                         rname[4] >= '0' && rname[4] <= '9')
315                 {
316                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
317                                  rname);
318                         ReleaseBuffer(buf);
319                         continue;
320                 }
321
322                 d = heap_getattr(pgctup, Anum_pg_class_relkind, pgcdesc, &n);
323
324                 rkind = DatumGetChar(d);
325
326                 /* skip system relations */
327                 if (rkind != 'r')
328                 {
329                         ReleaseBuffer(buf);
330                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
331                         continue;
332                 }
333
334                 /* get a relation list entry for this guy */
335                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
336                 if (vrl == (VRelList) NULL)
337                 {
338                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
339                 }
340                 else
341                 {
342                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
343                         cur = cur->vrl_next;
344                 }
345                 MemoryContextSwitchTo(old);
346
347                 cur->vrl_relid = pgctup->t_oid;
348                 cur->vrl_next = (VRelList) NULL;
349
350                 /* wei hates it if you forget to do this */
351                 ReleaseBuffer(buf);
352         }
353         if (found == false)
354                 elog(NOTICE, "Vacuum: table not found");
355
356
357         heap_endscan(pgcscan);
358         heap_close(pgclass);
359
360         CommitTransactionCommand();
361
362         return (vrl);
363 }
364
365 /*
366  *      vc_vacone() -- vacuum one heap relation
367  *
368  *              This routine vacuums a single heap, cleans out its indices, and
369  *              updates its statistics npages and ntups statistics.
370  *
371  *              Doing one heap at a time incurs extra overhead, since we need to
372  *              check that the heap exists again just before we vacuum it.      The
373  *              reason that we do this is so that vacuuming can be spread across
374  *              many small transactions.  Otherwise, two-phase locking would require
375  *              us to lock the entire database during one pass of the vacuum cleaner.
376  */
377 static void
378 vc_vacone(Oid relid, bool analyze, List *va_cols)
379 {
380         Relation        pgclass;
381         TupleDesc       pgcdesc;
382         HeapTuple       pgctup,
383                                 pgttup;
384         Buffer          pgcbuf;
385         HeapScanDesc pgcscan;
386         Relation        onerel;
387         ScanKeyData pgckey;
388         VPageListData Vvpl;                     /* List of pages to vacuum and/or clean
389                                                                  * indices */
390         VPageListData Fvpl;                     /* List of pages with space enough for
391                                                                  * re-using */
392         VPageDescr *vpp;
393         Relation   *Irel;
394         int32           nindices,
395                                 i;
396         VRelStats  *vacrelstats;
397
398         StartTransactionCommand();
399
400         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
401                                                    ObjectIdEqualRegProcedure,
402                                                    ObjectIdGetDatum(relid));
403
404         pgclass = heap_openr(RelationRelationName);
405         pgcdesc = RelationGetTupleDescriptor(pgclass);
406         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
407
408         /*
409          * Race condition -- if the pg_class tuple has gone away since the
410          * last time we saw it, we don't need to vacuum it.
411          */
412
413         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
414         {
415                 heap_endscan(pgcscan);
416                 heap_close(pgclass);
417                 CommitTransactionCommand();
418                 return;
419         }
420
421         /* now open the class and vacuum it */
422         onerel = heap_open(relid);
423
424         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
425         vacrelstats->relid = relid;
426         vacrelstats->npages = vacrelstats->ntups = 0;
427         vacrelstats->hasindex = false;
428         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
429         {
430                 int                     attr_cnt,
431                                    *attnums = NULL;
432                 AttributeTupleForm *attr;
433
434                 attr_cnt = onerel->rd_att->natts;
435                 attr = onerel->rd_att->attrs;
436
437                 if (va_cols != NIL)
438                 {
439                         int                     tcnt = 0;
440                         List       *le;
441
442                         if (length(va_cols) > attr_cnt)
443                                 elog(ERROR, "vacuum: too many attributes specified for relation %s",
444                                          (RelationGetRelationName(onerel))->data);
445                         attnums = (int *) palloc(attr_cnt * sizeof(int));
446                         foreach(le, va_cols)
447                         {
448                                 char       *col = (char *) lfirst(le);
449
450                                 for (i = 0; i < attr_cnt; i++)
451                                 {
452                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
453                                                 break;
454                                 }
455                                 if (i < attr_cnt)               /* found */
456                                         attnums[tcnt++] = i;
457                                 else
458                                 {
459                                         elog(ERROR, "vacuum: there is no attribute %s in %s",
460                                                  col, (RelationGetRelationName(onerel))->data);
461                                 }
462                         }
463                         attr_cnt = tcnt;
464                 }
465
466                 vacrelstats->vacattrstats =
467                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
468
469                 for (i = 0; i < attr_cnt; i++)
470                 {
471                         Operator        func_operator;
472                         OperatorTupleForm pgopform;
473                         VacAttrStats *stats;
474
475                         stats = &vacrelstats->vacattrstats[i];
476                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
477                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
478                         stats->best = stats->guess1 = stats->guess2 = 0;
479                         stats->max = stats->min = 0;
480                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
481                         stats->max_len = stats->min_len = 0;
482                         stats->initialized = false;
483                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
484                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
485
486                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
487                         if (func_operator != NULL)
488                         {
489                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
490                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
491                         }
492                         else
493                                 stats->f_cmpeq.fn_addr = NULL;
494
495                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
496                         if (func_operator != NULL)
497                         {
498                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
499                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
500                         }
501                         else
502                                 stats->f_cmplt.fn_addr = NULL;
503
504                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
505                         if (func_operator != NULL)
506                         {
507                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
508                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
509                         }
510                         else
511                                 stats->f_cmpgt.fn_addr = NULL;
512
513                         pgttup = SearchSysCacheTuple(TYPOID,
514                                                                  ObjectIdGetDatum(stats->attr->atttypid),
515                                                                                  0, 0, 0);
516                         if (HeapTupleIsValid(pgttup))
517                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
518                         else
519                                 stats->outfunc = InvalidOid;
520                 }
521                 vacrelstats->va_natts = attr_cnt;
522                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
523                 if (attnums)
524                         pfree(attnums);
525         }
526         else
527         {
528                 vacrelstats->va_natts = 0;
529                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
530         }
531
532         /* we require the relation to be locked until the indices are cleaned */
533         RelationSetLockForWrite(onerel);
534
535         /* scan it */
536         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
537         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
538
539         /* Now open indices */
540         Irel = (Relation *) NULL;
541         vc_getindices(vacrelstats->relid, &nindices, &Irel);
542
543         if (nindices > 0)
544                 vacrelstats->hasindex = true;
545         else
546                 vacrelstats->hasindex = false;
547
548         /* Clean/scan index relation(s) */
549         if (Irel != (Relation *) NULL)
550         {
551                 if (Vvpl.vpl_npages > 0)
552                 {
553                         for (i = 0; i < nindices; i++)
554                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
555                 }
556                 else
557 /* just scan indices to update statistic */
558                 {
559                         for (i = 0; i < nindices; i++)
560                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
561                 }
562         }
563
564         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
565                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
566         else
567         {
568                 if (Irel != (Relation *) NULL)
569                         vc_clsindices(nindices, Irel);
570                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
571                         vc_vacheap(vacrelstats, onerel, &Vvpl);
572         }
573
574         /* ok - free Vvpl list of reapped pages */
575         if (Vvpl.vpl_npages > 0)
576         {
577                 vpp = Vvpl.vpl_pgdesc;
578                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
579                         pfree(*vpp);
580                 pfree(Vvpl.vpl_pgdesc);
581                 if (Fvpl.vpl_npages > 0)
582                         pfree(Fvpl.vpl_pgdesc);
583         }
584
585         /* all done with this class */
586         heap_close(onerel);
587         heap_endscan(pgcscan);
588         heap_close(pgclass);
589
590         /* update statistics in pg_class */
591         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
592                                 vacrelstats->hasindex, vacrelstats);
593
594         /* next command frees attribute stats */
595
596         CommitTransactionCommand();
597 }
598
599 /*
600  *      vc_scanheap() -- scan an open heap relation
601  *
602  *              This routine sets commit times, constructs Vvpl list of
603  *              empty/uninitialized pages and pages with dead tuples and
604  *              ~LP_USED line pointers, constructs Fvpl list of pages
605  *              appropriate for purposes of shrinking and maintains statistics
606  *              on the number of live tuples in a heap.
607  */
608 static void
609 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
610                         VPageList Vvpl, VPageList Fvpl)
611 {
612         int                     nblocks,
613                                 blkno;
614         ItemId          itemid;
615         ItemPointer itemptr;
616         HeapTuple       htup;
617         Buffer          buf;
618         Page            page,
619                                 tempPage = NULL;
620         OffsetNumber offnum,
621                                 maxoff;
622         bool            pgchanged,
623                                 tupgone,
624                                 dobufrel,
625                                 notup;
626         char       *relname;
627         VPageDescr      vpc,
628                                 vp;
629         uint32          nvac,
630                                 ntups,
631                                 nunused,
632                                 ncrash,
633                                 nempg,
634                                 nnepg,
635                                 nchpg,
636                                 nemend;
637         Size            frsize,
638                                 frsusf;
639         Size            min_tlen = MAXTUPLEN;
640         Size            max_tlen = 0;
641         int32           i /* , attr_cnt */ ;
642         struct rusage ru0,
643                                 ru1;
644         bool            do_shrinking = true;
645
646         getrusage(RUSAGE_SELF, &ru0);
647
648         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
649         frsize = frsusf = 0;
650
651         relname = (RelationGetRelationName(onerel))->data;
652
653         nblocks = RelationGetNumberOfBlocks(onerel);
654
655         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
656         vpc->vpd_nusd = 0;
657
658         for (blkno = 0; blkno < nblocks; blkno++)
659         {
660                 buf = ReadBuffer(onerel, blkno);
661                 page = BufferGetPage(buf);
662                 vpc->vpd_blkno = blkno;
663                 vpc->vpd_noff = 0;
664
665                 if (PageIsNew(page))
666                 {
667                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
668                                  relname, blkno);
669                         PageInit(page, BufferGetPageSize(buf), 0);
670                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
671                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
672                         nnepg++;
673                         nemend++;
674                         vc_reappage(Vvpl, vpc);
675                         WriteBuffer(buf);
676                         continue;
677                 }
678
679                 if (PageIsEmpty(page))
680                 {
681                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
682                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
683                         nempg++;
684                         nemend++;
685                         vc_reappage(Vvpl, vpc);
686                         ReleaseBuffer(buf);
687                         continue;
688                 }
689
690                 pgchanged = false;
691                 notup = true;
692                 maxoff = PageGetMaxOffsetNumber(page);
693                 for (offnum = FirstOffsetNumber;
694                          offnum <= maxoff;
695                          offnum = OffsetNumberNext(offnum))
696                 {
697                         itemid = PageGetItemId(page, offnum);
698
699                         /*
700                          * Collect un-used items too - it's possible to have indices
701                          * pointing here after crash.
702                          */
703                         if (!ItemIdIsUsed(itemid))
704                         {
705                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
706                                 nunused++;
707                                 continue;
708                         }
709
710                         htup = (HeapTuple) PageGetItem(page, itemid);
711                         tupgone = false;
712
713                         if (!(htup->t_infomask & HEAP_XMIN_COMMITTED))
714                         {
715                                 if (htup->t_infomask & HEAP_XMIN_INVALID)
716                                         tupgone = true;
717                                 else
718                                 {
719                                         if (TransactionIdDidAbort(htup->t_xmin))
720                                                 tupgone = true;
721                                         else if (TransactionIdDidCommit(htup->t_xmin))
722                                         {
723                                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
724                                                 pgchanged = true;
725                                         }
726                                         else if (!TransactionIdIsInProgress(htup->t_xmin))
727                                         {
728                                                 /*
729                                                  * Not Aborted, Not Committed, Not in Progress - 
730                                                  * so it's from crashed process. - vadim 11/26/96
731                                                  */
732                                                 ncrash++;
733                                                 tupgone = true;
734                                         }
735                                         else
736                                         {
737                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
738                                                          relname, blkno, offnum, htup->t_xmin);
739                                                 do_shrinking = false;
740                                         }
741                                 }
742                         }
743
744                         /* 
745                          * here we are concerned about tuples with xmin committed 
746                          * and xmax unknown or committed
747                          */
748                         if (htup->t_infomask & HEAP_XMIN_COMMITTED && 
749                                 !(htup->t_infomask & HEAP_XMAX_INVALID))
750                         {
751                                 if (htup->t_infomask & HEAP_XMAX_COMMITTED)
752                                         tupgone = true;
753                                 else if (TransactionIdDidAbort(htup->t_xmax))
754                                 {
755                                         htup->t_infomask |= HEAP_XMAX_INVALID;
756                                         pgchanged = true;
757                                 }
758                                 else if (TransactionIdDidCommit(htup->t_xmax))
759                                         tupgone = true;
760                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
761                                 {
762                                         /*
763                                          * Not Aborted, Not Committed, Not in Progress - so it
764                                          * from crashed process. - vadim 06/02/97
765                                          */
766                                         htup->t_infomask |= HEAP_XMAX_INVALID;;
767                                         pgchanged = true;
768                                 }
769                                 else
770                                 {
771                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
772                                                  relname, blkno, offnum, htup->t_xmax);
773                                         do_shrinking = false;
774                                 }
775                         }
776
777                         /*
778                          * It's possibly! But from where it comes ? And should we fix
779                          * it ?  - vadim 11/28/96
780                          */
781                         itemptr = &(htup->t_ctid);
782                         if (!ItemPointerIsValid(itemptr) ||
783                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
784                         {
785                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
786                                          relname, blkno, offnum,
787                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
788                                          itemptr->ip_posid, tupgone);
789                         }
790
791                         /*
792                          * Other checks...
793                          */
794                         if (htup->t_len != itemid->lp_len)
795                         {
796                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
797                                          relname, blkno, offnum,
798                                          itemid->lp_len, htup->t_len, tupgone);
799                         }
800                         if (!OidIsValid(htup->t_oid))
801                         {
802                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
803                                          relname, blkno, offnum, tupgone);
804                         }
805
806                         if (tupgone)
807                         {
808                                 ItemId          lpp;
809
810                                 if (tempPage == (Page) NULL)
811                                 {
812                                         Size            pageSize;
813
814                                         pageSize = PageGetPageSize(page);
815                                         tempPage = (Page) palloc(pageSize);
816                                         memmove(tempPage, page, pageSize);
817                                 }
818
819                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
820
821                                 /* mark it unused */
822                                 lpp->lp_flags &= ~LP_USED;
823
824                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
825                                 nvac++;
826
827                         }
828                         else
829                         {
830                                 ntups++;
831                                 notup = false;
832                                 if (htup->t_len < min_tlen)
833                                         min_tlen = htup->t_len;
834                                 if (htup->t_len > max_tlen)
835                                         max_tlen = htup->t_len;
836                                 vc_attrstats(onerel, vacrelstats, htup);
837                         }
838                 }
839
840                 if (pgchanged)
841                 {
842                         WriteBuffer(buf);
843                         dobufrel = false;
844                         nchpg++;
845                 }
846                 else
847                         dobufrel = true;
848                 if (tempPage != (Page) NULL)
849                 {                                               /* Some tuples are gone */
850                         PageRepairFragmentation(tempPage);
851                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
852                         frsize += vpc->vpd_free;
853                         vc_reappage(Vvpl, vpc);
854                         pfree(tempPage);
855                         tempPage = (Page) NULL;
856                 }
857                 else if (vpc->vpd_noff > 0)
858                 {                                               /* there are only ~LP_USED line pointers */
859                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
860                         frsize += vpc->vpd_free;
861                         vc_reappage(Vvpl, vpc);
862                 }
863                 if (dobufrel)
864                         ReleaseBuffer(buf);
865                 if (notup)
866                         nemend++;
867                 else
868                         nemend = 0;
869         }
870
871         pfree(vpc);
872
873         /* save stats in the rel list for use later */
874         vacrelstats->ntups = ntups;
875         vacrelstats->npages = nblocks;
876 /*        vacrelstats->natts = attr_cnt;*/
877         if (ntups == 0)
878                 min_tlen = max_tlen = 0;
879         vacrelstats->min_tlen = min_tlen;
880         vacrelstats->max_tlen = max_tlen;
881
882         Vvpl->vpl_nemend = nemend;
883         Fvpl->vpl_nemend = nemend;
884
885         /*
886          * Try to make Fvpl keeping in mind that we can't use free space of
887          * "empty" end-pages and last page if it reapped.
888          */
889         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
890         {
891                 int                     nusf;           /* blocks usefull for re-using */
892
893                 nusf = Vvpl->vpl_npages - nemend;
894                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
895                         nusf--;
896
897                 for (i = 0; i < nusf; i++)
898                 {
899                         vp = Vvpl->vpl_pgdesc[i];
900                         if (vc_enough_space(vp, min_tlen))
901                         {
902                                 vc_vpinsert(Fvpl, vp);
903                                 frsusf += vp->vpd_free;
904                         }
905                 }
906         }
907
908         getrusage(RUSAGE_SELF, &ru1);
909
910         elog(MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
911 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
912                  relname,
913                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
914                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
915                  frsize, frsusf, nemend, Fvpl->vpl_npages,
916                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
917                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
918
919 }                                                               /* vc_scanheap */
920
921
922 /*
923  *      vc_rpfheap() -- try to repaire relation' fragmentation
924  *
925  *              This routine marks dead tuples as unused and tries re-use dead space
926  *              by moving tuples (and inserting indices if needed). It constructs
927  *              Nvpl list of free-ed pages (moved tuples) and clean indices
928  *              for them after committing (in hack-manner - without losing locks
929  *              and freeing memory!) current transaction. It truncates relation
930  *              if some end-blocks are gone away.
931  */
932 static void
933 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
934                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
935 {
936         TransactionId myXID;
937         CommandId       myCID;
938         Buffer          buf,
939                                 ToBuf;
940         int                     nblocks,
941                                 blkno;
942         Page            page,
943                                 ToPage = NULL;
944         OffsetNumber offnum = 0,
945                                 maxoff = 0,
946                                 newoff,
947                                 moff;
948         ItemId          itemid,
949                                 newitemid;
950         HeapTuple       htup,
951                                 newtup;
952         TupleDesc       tupdesc = NULL;
953         Datum      *idatum = NULL;
954         char       *inulls = NULL;
955         InsertIndexResult iresult;
956         VPageListData Nvpl;
957         VPageDescr      ToVpd = NULL,
958                                 Fvplast,
959                                 Vvplast,
960                                 vpc,
961                            *vpp;
962         int                     ToVpI = 0;
963         IndDesc    *Idesc,
964                            *idcur;
965         int                     Fblklast,
966                                 Vblklast,
967                                 i;
968         Size            tlen;
969         int                     nmoved,
970                                 Fnpages,
971                                 Vnpages;
972         int                     nchkmvd,
973                                 ntups;
974         bool            isempty,
975                                 dowrite;
976         struct rusage ru0,
977                                 ru1;
978
979         getrusage(RUSAGE_SELF, &ru0);
980
981         myXID = GetCurrentTransactionId();
982         myCID = GetCurrentCommandId();
983
984         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
985         {
986                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
987                 tupdesc = RelationGetTupleDescriptor(onerel);
988                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
989                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
990         }
991
992         Nvpl.vpl_npages = 0;
993         Fnpages = Fvpl->vpl_npages;
994         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
995         Fblklast = Fvplast->vpd_blkno;
996         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
997         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
998         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
999         Vblklast = Vvplast->vpd_blkno;
1000         Assert(Vblklast >= Fblklast);
1001         ToBuf = InvalidBuffer;
1002         nmoved = 0;
1003
1004         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1005         vpc->vpd_nusd = vpc->vpd_noff = 0;
1006
1007         nblocks = vacrelstats->npages;
1008         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1009         {
1010                 /* if it's reapped page and it was used by me - quit */
1011                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1012                         break;
1013
1014                 buf = ReadBuffer(onerel, blkno);
1015                 page = BufferGetPage(buf);
1016
1017                 vpc->vpd_noff = 0;
1018
1019                 isempty = PageIsEmpty(page);
1020
1021                 dowrite = false;
1022                 if (blkno == Vblklast)  /* it's reapped page */
1023                 {
1024                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1025                         {                                       /* on this page - clean */
1026                                 Assert(!isempty);
1027                                 vc_vacpage(page, Vvplast);
1028                                 dowrite = true;
1029                         }
1030                         else
1031                         {
1032                                 Assert(isempty);
1033                         }
1034                         --Vnpages;
1035                         Assert(Vnpages > 0);
1036                         /* get prev reapped page from Vvpl */
1037                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1038                         Vblklast = Vvplast->vpd_blkno;
1039                         if (blkno == Fblklast)          /* this page in Fvpl too */
1040                         {
1041                                 --Fnpages;
1042                                 Assert(Fnpages > 0);
1043                                 Assert(Fvplast->vpd_nusd == 0);
1044                                 /* get prev reapped page from Fvpl */
1045                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1046                                 Fblklast = Fvplast->vpd_blkno;
1047                         }
1048                         Assert(Fblklast <= Vblklast);
1049                         if (isempty)
1050                         {
1051                                 ReleaseBuffer(buf);
1052                                 continue;
1053                         }
1054                 }
1055                 else
1056                 {
1057                         Assert(!isempty);
1058                 }
1059
1060                 vpc->vpd_blkno = blkno;
1061                 maxoff = PageGetMaxOffsetNumber(page);
1062                 for (offnum = FirstOffsetNumber;
1063                          offnum <= maxoff;
1064                          offnum = OffsetNumberNext(offnum))
1065                 {
1066                         itemid = PageGetItemId(page, offnum);
1067
1068                         if (!ItemIdIsUsed(itemid))
1069                                 continue;
1070
1071                         htup = (HeapTuple) PageGetItem(page, itemid);
1072                         tlen = htup->t_len;
1073
1074                         /* try to find new page for this tuple */
1075                         if (ToBuf == InvalidBuffer ||
1076                                 !vc_enough_space(ToVpd, tlen))
1077                         {
1078                                 if (ToBuf != InvalidBuffer)
1079                                 {
1080                                         WriteBuffer(ToBuf);
1081                                         ToBuf = InvalidBuffer;
1082
1083                                         /*
1084                                          * If no one tuple can't be added to this page -
1085                                          * remove page from Fvpl. - vadim 11/27/96
1086                                          */
1087                                         if (!vc_enough_space(ToVpd, vacrelstats->min_tlen))
1088                                         {
1089                                                 if (ToVpd != Fvplast)
1090                                                 {
1091                                                         Assert(Fnpages > ToVpI + 1);
1092                                                         memmove(Fvpl->vpl_pgdesc + ToVpI,
1093                                                                         Fvpl->vpl_pgdesc + ToVpI + 1,
1094                                                         sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1095                                                 }
1096                                                 Assert(Fnpages >= 1);
1097                                                 Fnpages--;
1098                                                 if (Fnpages == 0)
1099                                                         break;
1100                                                 /* get prev reapped page from Fvpl */
1101                                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1102                                                 Fblklast = Fvplast->vpd_blkno;
1103                                         }
1104                                 }
1105                                 for (i = 0; i < Fnpages; i++)
1106                                 {
1107                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1108                                                 break;
1109                                 }
1110                                 if (i == Fnpages)
1111                                         break;          /* can't move item anywhere */
1112                                 ToVpI = i;
1113                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1114                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1115                                 ToPage = BufferGetPage(ToBuf);
1116                                 /* if this page was not used before - clean it */
1117                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1118                                         vc_vacpage(ToPage, ToVpd);
1119                         }
1120
1121                         /* copy tuple */
1122                         newtup = (HeapTuple) palloc(tlen);
1123                         memmove((char *) newtup, (char *) htup, tlen);
1124
1125                         /* store transaction information */
1126                         TransactionIdStore(myXID, &(newtup->t_xmin));
1127                         newtup->t_cmin = myCID;
1128                         StoreInvalidTransactionId(&(newtup->t_xmax));
1129                         /* set xmin to unknown and xmax to invalid */
1130                         newtup->t_infomask &= ~(HEAP_XACT_MASK);
1131                         newtup->t_infomask |= HEAP_XMAX_INVALID;
1132
1133                         /* add tuple to the page */
1134                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1135                                                                  InvalidOffsetNumber, LP_USED);
1136                         if (newoff == InvalidOffsetNumber)
1137                         {
1138                                 elog(ERROR, "\
1139 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1140                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1141                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1142                         }
1143                         newitemid = PageGetItemId(ToPage, newoff);
1144                         pfree(newtup);
1145                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1146                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1147
1148                         /* now logically delete end-tuple */
1149                         TransactionIdStore(myXID, &(htup->t_xmax));
1150                         htup->t_cmax = myCID;
1151                         /* set xmax to unknown */
1152                         htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED);
1153
1154                         ToVpd->vpd_nusd++;
1155                         nmoved++;
1156                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1157                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1158
1159                         /* insert index' tuples if needed */
1160                         if (Irel != (Relation *) NULL)
1161                         {
1162                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1163                                 {
1164                                         FormIndexDatum(
1165                                                                    idcur->natts,
1166                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1167                                                                    newtup,
1168                                                                    tupdesc,
1169                                                                    InvalidBuffer,
1170                                                                    idatum,
1171                                                                    inulls,
1172                                                                    idcur->finfoP);
1173                                         iresult = index_insert(
1174                                                                                    Irel[i],
1175                                                                                    idatum,
1176                                                                                    inulls,
1177                                                                                    &(newtup->t_ctid),
1178                                                                                    onerel);
1179                                         if (iresult)
1180                                                 pfree(iresult);
1181                                 }
1182                         }
1183
1184                 }                                               /* walk along page */
1185
1186                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1187                 {
1188                         vc_reappage(&Nvpl, vpc);
1189                         WriteBuffer(buf);
1190                 }
1191                 else if (dowrite)
1192                         WriteBuffer(buf);
1193                 else
1194                         ReleaseBuffer(buf);
1195
1196                 if (offnum <= maxoff)
1197                         break;                          /* some item(s) left */
1198
1199         }                                                       /* walk along relation */
1200
1201         blkno++;                                        /* new number of blocks */
1202
1203         if (ToBuf != InvalidBuffer)
1204         {
1205                 Assert(nmoved > 0);
1206                 WriteBuffer(ToBuf);
1207         }
1208
1209         if (nmoved > 0)
1210         {
1211
1212                 /*
1213                  * We have to commit our tuple' movings before we'll truncate
1214                  * relation, but we shouldn't lose our locks. And so - quick hack:
1215                  * flush buffers and record status of current transaction as
1216                  * committed, and continue. - vadim 11/13/96
1217                  */
1218                 FlushBufferPool(!TransactionFlushEnabled());
1219                 TransactionIdCommit(myXID);
1220                 FlushBufferPool(!TransactionFlushEnabled());
1221         }
1222
1223         /*
1224          * Clean uncleaned reapped pages from Vvpl list and set xmin committed
1225          * for inserted tuples
1226          */
1227         nchkmvd = 0;
1228         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1229         {
1230                 Assert((*vpp)->vpd_blkno < blkno);
1231                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1232                 page = BufferGetPage(buf);
1233                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1234                 {
1235
1236                         /*
1237                          * noff == 0 in empty pages only - such pages should be
1238                          * re-used
1239                          */
1240                         Assert((*vpp)->vpd_noff > 0);
1241                         vc_vacpage(page, *vpp);
1242                 }
1243                 else
1244 /* this page was used */
1245                 {
1246                         ntups = 0;
1247                         moff = PageGetMaxOffsetNumber(page);
1248                         for (newoff = FirstOffsetNumber;
1249                                  newoff <= moff;
1250                                  newoff = OffsetNumberNext(newoff))
1251                         {
1252                                 itemid = PageGetItemId(page, newoff);
1253                                 if (!ItemIdIsUsed(itemid))
1254                                         continue;
1255                                 htup = (HeapTuple) PageGetItem(page, itemid);
1256                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1257                                 {
1258                                         htup->t_infomask |= HEAP_XMIN_COMMITTED;
1259                                         ntups++;
1260                                 }
1261                         }
1262                         Assert((*vpp)->vpd_nusd == ntups);
1263                         nchkmvd += ntups;
1264                 }
1265                 WriteBuffer(buf);
1266         }
1267         Assert(nmoved == nchkmvd);
1268
1269         getrusage(RUSAGE_SELF, &ru1);
1270
1271         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1272 Elapsed %u/%u sec.",
1273                  (RelationGetRelationName(onerel))->data,
1274                  nblocks, blkno, nmoved,
1275                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1276                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1277
1278         if (Nvpl.vpl_npages > 0)
1279         {
1280                 /* vacuum indices again if needed */
1281                 if (Irel != (Relation *) NULL)
1282                 {
1283                         VPageDescr *vpleft,
1284                                            *vpright,
1285                                                 vpsave;
1286
1287                         /* re-sort Nvpl.vpl_pgdesc */
1288                         for (vpleft = Nvpl.vpl_pgdesc,
1289                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1290                                  vpleft < vpright; vpleft++, vpright--)
1291                         {
1292                                 vpsave = *vpleft;
1293                                 *vpleft = *vpright;
1294                                 *vpright = vpsave;
1295                         }
1296                         for (i = 0; i < nindices; i++)
1297                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1298                 }
1299
1300                 /*
1301                  * clean moved tuples from last page in Nvpl list if some tuples
1302                  * left there
1303                  */
1304                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1305                 {
1306                         Assert(vpc->vpd_blkno == blkno - 1);
1307                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1308                         page = BufferGetPage(buf);
1309                         ntups = 0;
1310                         maxoff = offnum;
1311                         for (offnum = FirstOffsetNumber;
1312                                  offnum < maxoff;
1313                                  offnum = OffsetNumberNext(offnum))
1314                         {
1315                                 itemid = PageGetItemId(page, offnum);
1316                                 if (!ItemIdIsUsed(itemid))
1317                                         continue;
1318                                 htup = (HeapTuple) PageGetItem(page, itemid);
1319                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1320                                 itemid->lp_flags &= ~LP_USED;
1321                                 ntups++;
1322                         }
1323                         Assert(vpc->vpd_noff == ntups);
1324                         PageRepairFragmentation(page);
1325                         WriteBuffer(buf);
1326                 }
1327
1328                 /* now - free new list of reapped pages */
1329                 vpp = Nvpl.vpl_pgdesc;
1330                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1331                         pfree(*vpp);
1332                 pfree(Nvpl.vpl_pgdesc);
1333         }
1334
1335         /* truncate relation */
1336         if (blkno < nblocks)
1337         {
1338                 i = BlowawayRelationBuffers(onerel, blkno);
1339                 if (i < 0)
1340                         elog (FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1341                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1342                 Assert(blkno >= 0);
1343                 vacrelstats->npages = blkno;    /* set new number of blocks */
1344         }
1345
1346         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1347         {
1348                 pfree(Idesc);
1349                 pfree(idatum);
1350                 pfree(inulls);
1351                 vc_clsindices(nindices, Irel);
1352         }
1353
1354         pfree(vpc);
1355
1356 }                                                               /* vc_rpfheap */
1357
1358 /*
1359  *      vc_vacheap() -- free dead tuples
1360  *
1361  *              This routine marks dead tuples as unused and truncates relation
1362  *              if there are "empty" end-blocks.
1363  */
1364 static void
1365 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1366 {
1367         Buffer          buf;
1368         Page            page;
1369         VPageDescr *vpp;
1370         int                     nblocks;
1371         int                     i;
1372
1373         nblocks = Vvpl->vpl_npages;
1374         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1375
1376         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1377         {
1378                 if ((*vpp)->vpd_noff > 0)
1379                 {
1380                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1381                         page = BufferGetPage(buf);
1382                         vc_vacpage(page, *vpp);
1383                         WriteBuffer(buf);
1384                 }
1385         }
1386
1387         /* truncate relation if there are some empty end-pages */
1388         if (Vvpl->vpl_nemend > 0)
1389         {
1390                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1391                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1392                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1393                          (RelationGetRelationName(onerel))->data,
1394                          vacrelstats->npages, nblocks);
1395
1396                 /*
1397                  * we have to flush "empty" end-pages (if changed, but who knows
1398                  * it) before truncation
1399                  */
1400                 FlushBufferPool(!TransactionFlushEnabled());
1401                 
1402                 i = BlowawayRelationBuffers(onerel, nblocks);
1403                 if (i < 0)
1404                         elog (FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1405
1406                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1407                 Assert(nblocks >= 0);
1408                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1409         }
1410
1411 }                                                               /* vc_vacheap */
1412
1413 /*
1414  *      vc_vacpage() -- free dead tuples on a page
1415  *                                       and repaire its fragmentation.
1416  */
1417 static void
1418 vc_vacpage(Page page, VPageDescr vpd)
1419 {
1420         ItemId          itemid;
1421         int                     i;
1422
1423         Assert(vpd->vpd_nusd == 0);
1424         for (i = 0; i < vpd->vpd_noff; i++)
1425         {
1426                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1427                 itemid->lp_flags &= ~LP_USED;
1428         }
1429         PageRepairFragmentation(page);
1430
1431 }                                                               /* vc_vacpage */
1432
1433 /*
1434  *      _vc_scanoneind() -- scan one index relation to update statistic.
1435  *
1436  */
1437 static void
1438 vc_scanoneind(Relation indrel, int nhtups)
1439 {
1440         RetrieveIndexResult res;
1441         IndexScanDesc iscan;
1442         int                     nitups;
1443         int                     nipages;
1444         struct rusage ru0,
1445                                 ru1;
1446
1447         getrusage(RUSAGE_SELF, &ru0);
1448
1449         /* walk through the entire index */
1450         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1451         nitups = 0;
1452
1453         while ((res = index_getnext(iscan, ForwardScanDirection))
1454                    != (RetrieveIndexResult) NULL)
1455         {
1456                 nitups++;
1457                 pfree(res);
1458         }
1459
1460         index_endscan(iscan);
1461
1462         /* now update statistics in pg_class */
1463         nipages = RelationGetNumberOfBlocks(indrel);
1464         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1465
1466         getrusage(RUSAGE_SELF, &ru1);
1467
1468         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1469                  indrel->rd_rel->relname.data, nipages, nitups,
1470                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1471                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1472
1473         if (nitups != nhtups)
1474                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1475                          indrel->rd_rel->relname.data, nitups, nhtups);
1476
1477 }                                                               /* vc_scanoneind */
1478
1479 /*
1480  *      vc_vaconeind() -- vacuum one index relation.
1481  *
1482  *              Vpl is the VPageList of the heap we're currently vacuuming.
1483  *              It's locked. Indrel is an index relation on the vacuumed heap.
1484  *              We don't set locks on the index relation here, since the indexed
1485  *              access methods support locking at different granularities.
1486  *              We let them handle it.
1487  *
1488  *              Finally, we arrange to update the index relation's statistics in
1489  *              pg_class.
1490  */
1491 static void
1492 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1493 {
1494         RetrieveIndexResult res;
1495         IndexScanDesc iscan;
1496         ItemPointer heapptr;
1497         int                     nvac;
1498         int                     nitups;
1499         int                     nipages;
1500         VPageDescr      vp;
1501         struct rusage ru0,
1502                                 ru1;
1503
1504         getrusage(RUSAGE_SELF, &ru0);
1505
1506         /* walk through the entire index */
1507         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1508         nvac = 0;
1509         nitups = 0;
1510
1511         while ((res = index_getnext(iscan, ForwardScanDirection))
1512                    != (RetrieveIndexResult) NULL)
1513         {
1514                 heapptr = &res->heap_iptr;
1515
1516                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1517                 {
1518 #if 0
1519                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1520                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1521                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1522                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1523                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1524 #endif
1525                         if (vp->vpd_noff == 0)
1526                         {                                       /* this is EmptyPage !!! */
1527                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1528                                          indrel->rd_rel->relname.data,
1529                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1530                         }
1531                         ++nvac;
1532                         index_delete(indrel, &res->index_iptr);
1533                 }
1534                 else
1535                 {
1536                         nitups++;
1537                 }
1538
1539                 /* be tidy */
1540                 pfree(res);
1541         }
1542
1543         index_endscan(iscan);
1544
1545         /* now update statistics in pg_class */
1546         nipages = RelationGetNumberOfBlocks(indrel);
1547         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1548
1549         getrusage(RUSAGE_SELF, &ru1);
1550
1551         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1552                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1553                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1554                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1555
1556         if (nitups != nhtups)
1557                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1558                          indrel->rd_rel->relname.data, nitups, nhtups);
1559
1560 }                                                               /* vc_vaconeind */
1561
1562 /*
1563  *      vc_tidreapped() -- is a particular tid reapped?
1564  *
1565  *              vpl->VPageDescr_array is sorted in right order.
1566  */
1567 static VPageDescr
1568 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1569 {
1570         OffsetNumber ioffno;
1571         OffsetNumber *voff;
1572         VPageDescr      vp,
1573                            *vpp;
1574         VPageDescrData vpd;
1575
1576         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1577         ioffno = ItemPointerGetOffsetNumber(itemptr);
1578
1579         vp = &vpd;
1580         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1581                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1582                                                                         vc_cmp_blk);
1583
1584         if (vpp == (VPageDescr *) NULL)
1585                 return ((VPageDescr) NULL);
1586         vp = *vpp;
1587
1588         /* ok - we are on true page */
1589
1590         if (vp->vpd_noff == 0)
1591         {                                                       /* this is EmptyPage !!! */
1592                 return (vp);
1593         }
1594
1595         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1596                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1597                                                                            vc_cmp_offno);
1598
1599         if (voff == (OffsetNumber *) NULL)
1600                 return ((VPageDescr) NULL);
1601
1602         return (vp);
1603
1604 }                                                               /* vc_tidreapped */
1605
1606 /*
1607  *      vc_attrstats() -- compute column statistics used by the optimzer
1608  *
1609  *      We compute the column min, max, null and non-null counts.
1610  *      Plus we attempt to find the count of the value that occurs most
1611  *      frequently in each column
1612  *      These figures are used to compute the selectivity of the column
1613  *
1614  *      We use a three-bucked cache to get the most frequent item
1615  *      The 'guess' buckets count hits.  A cache miss causes guess1
1616  *      to get the most hit 'guess' item in the most recent cycle, and
1617  *      the new item goes into guess2.  Whenever the total count of hits
1618  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1619  *
1620  *      This method works perfectly for columns with unique values, and columns
1621  *      with only two unique values, plus nulls.
1622  *
1623  *      It becomes less perfect as the number of unique values increases and
1624  *      their distribution in the table becomes more random.
1625  *
1626  */
1627 static void
1628 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1629 {
1630         int                     i,
1631                                 attr_cnt = vacrelstats->va_natts;
1632         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1633         TupleDesc       tupDesc = onerel->rd_att;
1634         Datum           value;
1635         bool            isnull;
1636
1637         for (i = 0; i < attr_cnt; i++)
1638         {
1639                 VacAttrStats *stats = &vacattrstats[i];
1640                 bool            value_hit = true;
1641
1642                 value = heap_getattr(htup,
1643                                                          stats->attr->attnum, tupDesc, &isnull);
1644
1645                 if (!VacAttrStatsEqValid(stats))
1646                         continue;
1647
1648                 if (isnull)
1649                         stats->null_cnt++;
1650                 else
1651                 {
1652                         stats->nonnull_cnt++;
1653                         if (stats->initialized == false)
1654                         {
1655                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1656                                 /* best_cnt gets incremented later */
1657                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1658                                 stats->guess1_cnt = stats->guess1_hits = 1;
1659                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1660                                 stats->guess2_hits = 1;
1661                                 if (VacAttrStatsLtGtValid(stats))
1662                                 {
1663                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1664                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1665                                 }
1666                                 stats->initialized = true;
1667                         }
1668                         if (VacAttrStatsLtGtValid(stats))
1669                         {
1670                                 if ((*fmgr_faddr(&stats->f_cmplt)) (value, stats->min))
1671                                 {
1672                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1673                                         stats->min_cnt = 0;
1674                                 }
1675                                 if ((*fmgr_faddr(&stats->f_cmpgt)) (value, stats->max))
1676                                 {
1677                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1678                                         stats->max_cnt = 0;
1679                                 }
1680                                 if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->min))
1681                                         stats->min_cnt++;
1682                                 else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->max))
1683                                         stats->max_cnt++;
1684                         }
1685                         if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->best))
1686                                 stats->best_cnt++;
1687                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess1))
1688                         {
1689                                 stats->guess1_cnt++;
1690                                 stats->guess1_hits++;
1691                         }
1692                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess2))
1693                                 stats->guess2_hits++;
1694                         else
1695                                 value_hit = false;
1696
1697                         if (stats->guess2_hits > stats->guess1_hits)
1698                         {
1699                                 swapDatum(stats->guess1, stats->guess2);
1700                                 swapInt(stats->guess1_len, stats->guess2_len);
1701                                 stats->guess1_cnt = stats->guess2_hits;
1702                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1703                         }
1704                         if (stats->guess1_cnt > stats->best_cnt)
1705                         {
1706                                 swapDatum(stats->best, stats->guess1);
1707                                 swapInt(stats->best_len, stats->guess1_len);
1708                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1709                                 stats->guess1_hits = 1;
1710                                 stats->guess2_hits = 1;
1711                         }
1712                         if (!value_hit)
1713                         {
1714                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1715                                 stats->guess1_hits = 1;
1716                                 stats->guess2_hits = 1;
1717                         }
1718                 }
1719         }
1720         return;
1721 }
1722
1723 /*
1724  *      vc_bucketcpy() -- update pg_class statistics for one relation
1725  *
1726  */
1727 static void
1728 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1729 {
1730         if (attr->attbyval && attr->attlen != -1)
1731                 *bucket = value;
1732         else
1733         {
1734                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1735
1736                 if (len > *bucket_len)
1737                 {
1738                         if (*bucket_len != 0)
1739                                 pfree(DatumGetPointer(*bucket));
1740                         *bucket = PointerGetDatum(palloc(len));
1741                         *bucket_len = len;
1742                 }
1743                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1744         }
1745 }
1746
1747 /*
1748  *      vc_updstats() -- update pg_class statistics for one relation
1749  *
1750  *              This routine works for both index and heap relation entries in
1751  *              pg_class.  We violate no-overwrite semantics here by storing new
1752  *              values for ntups, npages, and hasindex directly in the pg_class
1753  *              tuple that's already on the page.  The reason for this is that if
1754  *              we updated these tuples in the usual way, then every tuple in pg_class
1755  *              would be replaced every day.  This would make planning and executing
1756  *              historical queries very expensive.
1757  */
1758 static void
1759 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1760 {
1761         Relation        rd,
1762                                 ad,
1763                                 sd;
1764         HeapScanDesc rsdesc,
1765                                 asdesc;
1766         TupleDesc       sdesc;
1767         HeapTuple       rtup,
1768                                 atup,
1769                                 stup;
1770         Buffer          rbuf,
1771                                 abuf;
1772         Form_pg_class pgcform;
1773         ScanKeyData rskey,
1774                                 askey;
1775         AttributeTupleForm attp;
1776
1777         /*
1778          * update number of tuples and number of pages in pg_class
1779          */
1780         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1781                                                    ObjectIdEqualRegProcedure,
1782                                                    ObjectIdGetDatum(relid));
1783
1784         rd = heap_openr(RelationRelationName);
1785         rsdesc = heap_beginscan(rd, false, false, 1, &rskey);
1786
1787         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1788                 elog(ERROR, "pg_class entry for relid %d vanished during vacuuming",
1789                          relid);
1790
1791         /* overwrite the existing statistics in the tuple */
1792         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1793         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1794         pgcform->reltuples = ntups;
1795         pgcform->relpages = npages;
1796         pgcform->relhasindex = hasindex;
1797
1798         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1799         {
1800                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1801                 int                     natts = vacrelstats->va_natts;
1802
1803                 ad = heap_openr(AttributeRelationName);
1804                 sd = heap_openr(StatisticRelationName);
1805                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1806                                                            F_INT4EQ, relid);
1807
1808                 asdesc = heap_beginscan(ad, false, false, 1, &askey);
1809
1810                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1811                 {
1812                         int                     i;
1813                         float32data selratio;           /* average ratio of rows selected
1814                                                                                  * for a random constant */
1815                         VacAttrStats *stats;
1816                         Datum           values[Natts_pg_statistic];
1817                         char            nulls[Natts_pg_statistic];
1818
1819                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1820                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1821                                 /* they are unique anyway */
1822                                 continue;
1823
1824                         for (i = 0; i < natts; i++)
1825                         {
1826                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1827                                         break;
1828                         }
1829                         if (i >= natts)
1830                                 continue;
1831                         stats = &(vacattrstats[i]);
1832
1833                         /* overwrite the existing statistics in the tuple */
1834                         if (VacAttrStatsEqValid(stats))
1835                         {
1836
1837                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1838
1839                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1840                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1841                                         selratio = 0;
1842                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1843                                 {
1844                                         double          min_cnt_d = stats->min_cnt,
1845                                                                 max_cnt_d = stats->max_cnt,
1846                                                                 null_cnt_d = stats->null_cnt,
1847                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1848
1849                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1850                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1851                                 }
1852                                 else
1853                                 {
1854                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1855                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1856
1857                                         /*
1858                                          * we assume count of other values are 20% of best
1859                                          * count in table
1860                                          */
1861                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1862                                 }
1863                                 if (selratio > 1.0)
1864                                         selratio = 1.0;
1865                                 attp->attdisbursion = selratio;
1866                                 WriteNoReleaseBuffer(abuf);
1867
1868                                 /* DO PG_STATISTIC INSERTS */
1869
1870                                 /*
1871                                  * doing system relations, especially pg_statistic is a
1872                                  * problem
1873                                  */
1874                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1875                                                                                                                                                  * !IsSystemRelationName(
1876                                                                                                                                                  *
1877                                          pgcform->relname.data) */ )
1878                                 {
1879                                         FmgrInfo        out_function;
1880                                         char       *out_string;
1881
1882                                         for (i = 0; i < Natts_pg_statistic; ++i)
1883                                                 nulls[i] = ' ';
1884
1885                                         /* ----------------
1886                                          *      initialize values[]
1887                                          * ----------------
1888                                          */
1889                                         i = 0;
1890                                         values[i++] = (Datum) relid;            /* 1 */
1891                                         values[i++] = (Datum) attp->attnum; /* 2 */
1892                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1893                                         fmgr_info(stats->outfunc, &out_function);
1894                                         out_string = (*fmgr_faddr(&out_function)) (stats->min, stats->attr->atttypid);
1895                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1896                                         pfree(out_string);
1897                                         out_string = (char *) (*fmgr_faddr(&out_function)) (stats->max, stats->attr->atttypid);
1898                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1899                                         pfree(out_string);
1900
1901                                         sdesc = sd->rd_att;
1902
1903                                         stup = heap_formtuple(sdesc, values, nulls);
1904
1905                                         /* ----------------
1906                                          *      insert the tuple in the relation and get the tuple's oid.
1907                                          * ----------------
1908                                          */
1909                                         heap_insert(sd, stup);
1910                                         pfree(DatumGetPointer(values[3]));
1911                                         pfree(DatumGetPointer(values[4]));
1912                                         pfree(stup);
1913                                 }
1914                         }
1915                 }
1916                 heap_endscan(asdesc);
1917                 heap_close(ad);
1918                 heap_close(sd);
1919         }
1920
1921         /* XXX -- after write, should invalidate relcache in other backends */
1922         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1923
1924         /*
1925          * invalidating system relations confuses the function cache of
1926          * pg_operator and pg_opclass
1927          */
1928         if (!IsSystemRelationName(pgcform->relname.data))
1929                 RelationInvalidateHeapTuple(rd, rtup);
1930
1931         /* that's all, folks */
1932         heap_endscan(rsdesc);
1933         heap_close(rd);
1934 }
1935
1936 /*
1937  *      vc_delhilowstats() -- delete pg_statistics rows
1938  *
1939  */
1940 static void
1941 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1942 {
1943         Relation        pgstatistic;
1944         HeapScanDesc pgsscan;
1945         HeapTuple       pgstup;
1946         ScanKeyData pgskey;
1947
1948         pgstatistic = heap_openr(StatisticRelationName);
1949
1950         if (relid != InvalidOid)
1951         {
1952                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1953                                                            ObjectIdEqualRegProcedure,
1954                                                            ObjectIdGetDatum(relid));
1955                 pgsscan = heap_beginscan(pgstatistic, false, false, 1, &pgskey);
1956         }
1957         else
1958                 pgsscan = heap_beginscan(pgstatistic, false, false, 0, NULL);
1959
1960         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1961         {
1962                 if (attcnt > 0)
1963                 {
1964                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
1965                         int                     i;
1966
1967                         for (i = 0; i < attcnt; i++)
1968                         {
1969                                 if (pgs->staattnum == attnums[i] + 1)
1970                                         break;
1971                         }
1972                         if (i >= attcnt)
1973                                 continue;               /* don't delete it */
1974                 }
1975                 heap_delete(pgstatistic, &pgstup->t_ctid);
1976         }
1977
1978         heap_endscan(pgsscan);
1979         heap_close(pgstatistic);
1980 }
1981
1982 static void
1983 vc_setpagelock(Relation rel, BlockNumber blkno)
1984 {
1985         ItemPointerData itm;
1986
1987         ItemPointerSet(&itm, blkno, 1);
1988
1989         RelationSetLockForWritePage(rel, &itm);
1990 }
1991
1992 /*
1993  *      vc_reappage() -- save a page on the array of reapped pages.
1994  *
1995  *              As a side effect of the way that the vacuuming loop for a given
1996  *              relation works, higher pages come after lower pages in the array
1997  *              (and highest tid on a page is last).
1998  */
1999 static void
2000 vc_reappage(VPageList vpl, VPageDescr vpc)
2001 {
2002         VPageDescr      newvpd;
2003
2004         /* allocate a VPageDescrData entry */
2005         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
2006
2007         /* fill it in */
2008         if (vpc->vpd_noff > 0)
2009                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
2010         newvpd->vpd_blkno = vpc->vpd_blkno;
2011         newvpd->vpd_free = vpc->vpd_free;
2012         newvpd->vpd_nusd = vpc->vpd_nusd;
2013         newvpd->vpd_noff = vpc->vpd_noff;
2014
2015         /* insert this page into vpl list */
2016         vc_vpinsert(vpl, newvpd);
2017
2018 }                                                               /* vc_reappage */
2019
2020 static void
2021 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2022 {
2023
2024         /* allocate a VPageDescr entry if needed */
2025         if (vpl->vpl_npages == 0)
2026                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2027         else if (vpl->vpl_npages % 100 == 0)
2028                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2029         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2030         (vpl->vpl_npages)++;
2031
2032 }
2033
2034 static void
2035 vc_free(VRelList vrl)
2036 {
2037         VRelList        p_vrl;
2038         MemoryContext old;
2039         PortalVariableMemory pmem;
2040
2041         pmem = PortalGetVariableMemory(vc_portal);
2042         old = MemoryContextSwitchTo((MemoryContext) pmem);
2043
2044         while (vrl != (VRelList) NULL)
2045         {
2046
2047                 /* free rel list entry */
2048                 p_vrl = vrl;
2049                 vrl = vrl->vrl_next;
2050                 pfree(p_vrl);
2051         }
2052
2053         MemoryContextSwitchTo(old);
2054 }
2055
2056 static char *
2057 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2058 {
2059         int                     res;
2060         int                     last = nelem - 1;
2061         int                     celm = nelem / 2;
2062         bool            last_move,
2063                                 first_move;
2064
2065         last_move = first_move = true;
2066         for (;;)
2067         {
2068                 if (first_move == true)
2069                 {
2070                         res = compar(bot, elm);
2071                         if (res > 0)
2072                                 return (NULL);
2073                         if (res == 0)
2074                                 return (bot);
2075                         first_move = false;
2076                 }
2077                 if (last_move == true)
2078                 {
2079                         res = compar(elm, bot + last * size);
2080                         if (res > 0)
2081                                 return (NULL);
2082                         if (res == 0)
2083                                 return (bot + last * size);
2084                         last_move = false;
2085                 }
2086                 res = compar(elm, bot + celm * size);
2087                 if (res == 0)
2088                         return (bot + celm * size);
2089                 if (res < 0)
2090                 {
2091                         if (celm == 0)
2092                                 return (NULL);
2093                         last = celm - 1;
2094                         celm = celm / 2;
2095                         last_move = true;
2096                         continue;
2097                 }
2098
2099                 if (celm == last)
2100                         return (NULL);
2101
2102                 last = last - celm - 1;
2103                 bot = bot + (celm + 1) * size;
2104                 celm = (last + 1) / 2;
2105                 first_move = true;
2106         }
2107
2108 }                                                               /* vc_find_eq */
2109
2110 static int
2111 vc_cmp_blk(char *left, char *right)
2112 {
2113         BlockNumber lblk,
2114                                 rblk;
2115
2116         lblk = (*((VPageDescr *) left))->vpd_blkno;
2117         rblk = (*((VPageDescr *) right))->vpd_blkno;
2118
2119         if (lblk < rblk)
2120                 return (-1);
2121         if (lblk == rblk)
2122                 return (0);
2123         return (1);
2124
2125 }                                                               /* vc_cmp_blk */
2126
2127 static int
2128 vc_cmp_offno(char *left, char *right)
2129 {
2130
2131         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2132                 return (-1);
2133         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2134                 return (0);
2135         return (1);
2136
2137 }                                                               /* vc_cmp_offno */
2138
2139
2140 static void
2141 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2142 {
2143         Relation        pgindex;
2144         Relation        irel;
2145         TupleDesc       pgidesc;
2146         HeapTuple       pgitup;
2147         HeapScanDesc pgiscan;
2148         Datum           d;
2149         int                     i,
2150                                 k;
2151         bool            n;
2152         ScanKeyData pgikey;
2153         Oid                *ioid;
2154
2155         *nindices = i = 0;
2156
2157         ioid = (Oid *) palloc(10 * sizeof(Oid));
2158
2159         /* prepare a heap scan on the pg_index relation */
2160         pgindex = heap_openr(IndexRelationName);
2161         pgidesc = RelationGetTupleDescriptor(pgindex);
2162
2163         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2164                                                    ObjectIdEqualRegProcedure,
2165                                                    ObjectIdGetDatum(relid));
2166
2167         pgiscan = heap_beginscan(pgindex, false, false, 1, &pgikey);
2168
2169         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2170         {
2171                 d = heap_getattr(pgitup, Anum_pg_index_indexrelid,
2172                                                  pgidesc, &n);
2173                 i++;
2174                 if (i % 10 == 0)
2175                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2176                 ioid[i - 1] = DatumGetObjectId(d);
2177         }
2178
2179         heap_endscan(pgiscan);
2180         heap_close(pgindex);
2181
2182         if (i == 0)
2183         {                                                       /* No one index found */
2184                 pfree(ioid);
2185                 return;
2186         }
2187
2188         if (Irel != (Relation **) NULL)
2189                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2190
2191         for (k = 0; i > 0;)
2192         {
2193                 irel = index_open(ioid[--i]);
2194                 if (irel != (Relation) NULL)
2195                 {
2196                         if (Irel != (Relation **) NULL)
2197                                 (*Irel)[k] = irel;
2198                         else
2199                                 index_close(irel);
2200                         k++;
2201                 }
2202                 else
2203                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2204         }
2205         *nindices = k;
2206         pfree(ioid);
2207
2208         if (Irel != (Relation **) NULL && *nindices == 0)
2209         {
2210                 pfree(*Irel);
2211                 *Irel = (Relation *) NULL;
2212         }
2213
2214 }                                                               /* vc_getindices */
2215
2216
2217 static void
2218 vc_clsindices(int nindices, Relation *Irel)
2219 {
2220
2221         if (Irel == (Relation *) NULL)
2222                 return;
2223
2224         while (nindices--)
2225         {
2226                 index_close(Irel[nindices]);
2227         }
2228         pfree(Irel);
2229
2230 }                                                               /* vc_clsindices */
2231
2232
2233 static void
2234 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2235 {
2236         IndDesc    *idcur;
2237         HeapTuple       pgIndexTup;
2238         AttrNumber *attnumP;
2239         int                     natts;
2240         int                     i;
2241
2242         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2243
2244         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2245         {
2246                 pgIndexTup =
2247                         SearchSysCacheTuple(INDEXRELID,
2248                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2249                                                                 0, 0, 0);
2250                 Assert(pgIndexTup);
2251                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2252                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2253                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2254                          attnumP++, natts++);
2255                 if (idcur->tform->indproc != InvalidOid)
2256                 {
2257                         idcur->finfoP = &(idcur->finfo);
2258                         FIgetnArgs(idcur->finfoP) = natts;
2259                         natts = 1;
2260                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2261                         *(FIgetname(idcur->finfoP)) = '\0';
2262                 }
2263                 else
2264                         idcur->finfoP = (FuncIndexInfo *) NULL;
2265
2266                 idcur->natts = natts;
2267         }
2268
2269 }                                                               /* vc_mkindesc */
2270
2271
2272 static bool
2273 vc_enough_space(VPageDescr vpd, Size len)
2274 {
2275
2276         len = DOUBLEALIGN(len);
2277
2278         if (len > vpd->vpd_free)
2279                 return (false);
2280
2281         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2282                 return (true);                  /* and len <= free_space */
2283
2284         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2285         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2286                 return (true);
2287
2288         return (false);
2289
2290 }                                                               /* vc_enough_space */