]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Break parser functions into smaller files, group together.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.53 1997/11/25 21:59:09 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <parser/parse_oper.h>
40 #include <storage/smgr.h>
41 #include <storage/lmgr.h>
42 #include <utils/inval.h>
43 #include <utils/mcxt.h>
44 #include <utils/inval.h>
45 #include <utils/syscache.h>
46 #include <utils/builtins.h>
47 #include <commands/vacuum.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 #include <rusagestub.h>
52 #else
53 #include <sys/time.h>
54 #include <sys/resource.h>
55 #endif
56
57 #include <port-protos.h>
58
59 extern int BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
60
61 bool            VacuumRunning = false;
62
63 static Portal vc_portal;
64
65 static int      MESSAGE_LEVEL;          /* message level */
66
67 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
68 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
69 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
70 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
71 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
72                                                                    stats->f_cmpgt != NULL && \
73                                                                    RegProcedureIsValid(stats->outfunc) )
74
75
76 /* non-export function prototypes */
77 static void vc_init(void);
78 static void vc_shutdown(void);
79 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
80 static VRelList vc_getrels(NameData *VacRelP);
81 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
82 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
83 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
84 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
85 static void vc_vacpage(Page page, VPageDescr vpd);
86 static void vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
87 static void vc_scanoneind(Relation indrel, int nhtups);
88 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
89 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
90 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
91 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
92 static void vc_setpagelock(Relation rel, BlockNumber blkno);
93 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
94 static void vc_reappage(VPageList vpl, VPageDescr vpc);
95 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
96 static void vc_free(VRelList vrl);
97 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
98 static void vc_clsindices(int nindices, Relation *Irel);
99 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
100 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
101 static int      vc_cmp_blk(char *left, char *right);
102 static int      vc_cmp_offno(char *left, char *right);
103 static bool vc_enough_space(VPageDescr vpd, Size len);
104
105 void
106 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
107 {
108         char       *pname;
109         MemoryContext old;
110         PortalVariableMemory pmem;
111         NameData        VacRel;
112         List       *le;
113         List       *va_cols = NIL;
114
115         /*
116          * Create a portal for safe memory across transctions.  We need to
117          * palloc the name space for it because our hash function expects the
118          * name to be on a longword boundary.  CreatePortal copies the name to
119          * safe storage for us.
120          */
121         pname = (char *) palloc(strlen(VACPNAME) + 1);
122         strcpy(pname, VACPNAME);
123         vc_portal = CreatePortal(pname);
124         pfree(pname);
125
126         if (verbose)
127                 MESSAGE_LEVEL = NOTICE;
128         else
129                 MESSAGE_LEVEL = DEBUG;
130
131         /* vacrel gets de-allocated on transaction commit */
132         if (vacrel)
133                 strcpy(VacRel.data, vacrel);
134
135         pmem = PortalGetVariableMemory(vc_portal);
136         old = MemoryContextSwitchTo((MemoryContext) pmem);
137
138         Assert(va_spec == NIL || analyze);
139         foreach(le, va_spec)
140         {
141                 char       *col = (char *) lfirst(le);
142                 char       *dest;
143
144                 dest = (char *) palloc(strlen(col) + 1);
145                 strcpy(dest, col);
146                 va_cols = lappend(va_cols, dest);
147         }
148         MemoryContextSwitchTo(old);
149
150         /* initialize vacuum cleaner */
151         vc_init();
152
153         /* vacuum the database */
154         if (vacrel)
155                 vc_vacuum(&VacRel, analyze, va_cols);
156         else
157                 vc_vacuum(NULL, analyze, NIL);
158
159         PortalDestroy(&vc_portal);
160
161         /* clean up */
162         vc_shutdown();
163 }
164
165 /*
166  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
167  *
168  *              We run exactly one vacuum cleaner at a time.  We use the file system
169  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
170  *              cleaner instantiation crosses transaction boundaries, and we'd lose
171  *              postgres-style locks at the end of every transaction.
172  *
173  *              The strangeness with committing and starting transactions in the
174  *              init and shutdown routines is due to the fact that the vacuum cleaner
175  *              is invoked via a sql command, and so is already executing inside
176  *              a transaction.  We need to leave ourselves in a predictable state
177  *              on entry and exit to the vacuum cleaner.  We commit the transaction
178  *              started in PostgresMain() inside vc_init(), and start one in
179  *              vc_shutdown() to match the commit waiting for us back in
180  *              PostgresMain().
181  */
182 static void
183 vc_init()
184 {
185         int                     fd;
186
187         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
188                 elog(WARN, "can't create lock file -- another vacuum cleaner running?");
189
190         close(fd);
191
192         /*
193          * By here, exclusive open on the lock file succeeded.  If we abort
194          * for any reason during vacuuming, we need to remove the lock file.
195          * This global variable is checked in the transaction manager on xact
196          * abort, and the routine vc_abort() is called if necessary.
197          */
198
199         VacuumRunning = true;
200
201         /* matches the StartTransaction in PostgresMain() */
202         CommitTransactionCommand();
203 }
204
205 static void
206 vc_shutdown()
207 {
208         /* on entry, not in a transaction */
209         if (unlink("pg_vlock") < 0)
210                 elog(WARN, "vacuum: can't destroy lock file!");
211
212         /* okay, we're done */
213         VacuumRunning = false;
214
215         /* matches the CommitTransaction in PostgresMain() */
216         StartTransactionCommand();
217
218 }
219
220 void
221 vc_abort()
222 {
223         /* on abort, remove the vacuum cleaner lock file */
224         unlink("pg_vlock");
225
226         VacuumRunning = false;
227 }
228
229 /*
230  *      vc_vacuum() -- vacuum the database.
231  *
232  *              This routine builds a list of relations to vacuum, and then calls
233  *              code that vacuums them one at a time.  We are careful to vacuum each
234  *              relation in a separate transaction in order to avoid holding too many
235  *              locks at one time.
236  */
237 static void
238 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
239 {
240         VRelList        vrl,
241                                 cur;
242
243         /* get list of relations */
244         vrl = vc_getrels(VacRelP);
245
246         if (analyze && VacRelP == NULL && vrl != NULL)
247                 vc_delhilowstats(InvalidOid, 0, NULL);
248
249         /* vacuum each heap relation */
250         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
251                 vc_vacone(cur->vrl_relid, analyze, va_cols);
252
253         vc_free(vrl);
254 }
255
256 static VRelList
257 vc_getrels(NameData *VacRelP)
258 {
259         Relation        pgclass;
260         TupleDesc       pgcdesc;
261         HeapScanDesc pgcscan;
262         HeapTuple       pgctup;
263         Buffer          buf;
264         PortalVariableMemory portalmem;
265         MemoryContext old;
266         VRelList        vrl,
267                                 cur;
268         Datum           d;
269         char       *rname;
270         char            rkind;
271         bool            n;
272         ScanKeyData pgckey;
273         bool            found = false;
274
275         StartTransactionCommand();
276
277         if (VacRelP->data)
278         {
279                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
280                                                            NameEqualRegProcedure,
281                                                            PointerGetDatum(VacRelP->data));
282         }
283         else
284         {
285                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
286                                                   CharacterEqualRegProcedure, CharGetDatum('r'));
287         }
288
289         portalmem = PortalGetVariableMemory(vc_portal);
290         vrl = cur = (VRelList) NULL;
291
292         pgclass = heap_openr(RelationRelationName);
293         pgcdesc = RelationGetTupleDescriptor(pgclass);
294
295         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
296
297         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
298         {
299
300                 found = true;
301
302                 d = heap_getattr(pgctup, buf, Anum_pg_class_relname, pgcdesc, &n);
303                 rname = (char *) d;
304
305                 /*
306                  * don't vacuum large objects for now - something breaks when we
307                  * do
308                  */
309                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
310                         rname[1] == 'i' && rname[2] == 'n' &&
311                         (rname[3] == 'v' || rname[3] == 'x') &&
312                         rname[4] >= '0' && rname[4] <= '9')
313                 {
314                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
315                                  rname);
316                         ReleaseBuffer(buf);
317                         continue;
318                 }
319
320                 d = heap_getattr(pgctup, buf, Anum_pg_class_relkind, pgcdesc, &n);
321
322                 rkind = DatumGetChar(d);
323
324                 /* skip system relations */
325                 if (rkind != 'r')
326                 {
327                         ReleaseBuffer(buf);
328                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
329                         continue;
330                 }
331
332                 /* get a relation list entry for this guy */
333                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
334                 if (vrl == (VRelList) NULL)
335                 {
336                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
337                 }
338                 else
339                 {
340                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
341                         cur = cur->vrl_next;
342                 }
343                 MemoryContextSwitchTo(old);
344
345                 cur->vrl_relid = pgctup->t_oid;
346                 cur->vrl_next = (VRelList) NULL;
347
348                 /* wei hates it if you forget to do this */
349                 ReleaseBuffer(buf);
350         }
351         if (found == false)
352                 elog(NOTICE, "Vacuum: table not found");
353
354
355         heap_endscan(pgcscan);
356         heap_close(pgclass);
357
358         CommitTransactionCommand();
359
360         return (vrl);
361 }
362
363 /*
364  *      vc_vacone() -- vacuum one heap relation
365  *
366  *              This routine vacuums a single heap, cleans out its indices, and
367  *              updates its statistics npages and ntups statistics.
368  *
369  *              Doing one heap at a time incurs extra overhead, since we need to
370  *              check that the heap exists again just before we vacuum it.      The
371  *              reason that we do this is so that vacuuming can be spread across
372  *              many small transactions.  Otherwise, two-phase locking would require
373  *              us to lock the entire database during one pass of the vacuum cleaner.
374  */
375 static void
376 vc_vacone(Oid relid, bool analyze, List *va_cols)
377 {
378         Relation        pgclass;
379         TupleDesc       pgcdesc;
380         HeapTuple       pgctup,
381                                 pgttup;
382         Buffer          pgcbuf;
383         HeapScanDesc pgcscan;
384         Relation        onerel;
385         ScanKeyData pgckey;
386         VPageListData Vvpl;                     /* List of pages to vacuum and/or clean
387                                                                  * indices */
388         VPageListData Fvpl;                     /* List of pages with space enough for
389                                                                  * re-using */
390         VPageDescr *vpp;
391         Relation   *Irel;
392         int32           nindices,
393                                 i;
394         VRelStats  *vacrelstats;
395
396         StartTransactionCommand();
397
398         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
399                                                    ObjectIdEqualRegProcedure,
400                                                    ObjectIdGetDatum(relid));
401
402         pgclass = heap_openr(RelationRelationName);
403         pgcdesc = RelationGetTupleDescriptor(pgclass);
404         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
405
406         /*
407          * Race condition -- if the pg_class tuple has gone away since the
408          * last time we saw it, we don't need to vacuum it.
409          */
410
411         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
412         {
413                 heap_endscan(pgcscan);
414                 heap_close(pgclass);
415                 CommitTransactionCommand();
416                 return;
417         }
418
419         /* now open the class and vacuum it */
420         onerel = heap_open(relid);
421
422         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
423         vacrelstats->relid = relid;
424         vacrelstats->npages = vacrelstats->ntups = 0;
425         vacrelstats->hasindex = false;
426         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
427         {
428                 int                     attr_cnt,
429                                    *attnums = NULL;
430                 AttributeTupleForm *attr;
431
432                 attr_cnt = onerel->rd_att->natts;
433                 attr = onerel->rd_att->attrs;
434
435                 if (va_cols != NIL)
436                 {
437                         int                     tcnt = 0;
438                         List       *le;
439
440                         if (length(va_cols) > attr_cnt)
441                                 elog(WARN, "vacuum: too many attributes specified for relation %s",
442                                          (RelationGetRelationName(onerel))->data);
443                         attnums = (int *) palloc(attr_cnt * sizeof(int));
444                         foreach(le, va_cols)
445                         {
446                                 char       *col = (char *) lfirst(le);
447
448                                 for (i = 0; i < attr_cnt; i++)
449                                 {
450                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
451                                                 break;
452                                 }
453                                 if (i < attr_cnt)               /* found */
454                                         attnums[tcnt++] = i;
455                                 else
456                                 {
457                                         elog(WARN, "vacuum: there is no attribute %s in %s",
458                                                  col, (RelationGetRelationName(onerel))->data);
459                                 }
460                         }
461                         attr_cnt = tcnt;
462                 }
463
464                 vacrelstats->vacattrstats =
465                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
466
467                 for (i = 0; i < attr_cnt; i++)
468                 {
469                         Operator        func_operator;
470                         OperatorTupleForm pgopform;
471                         VacAttrStats *stats;
472
473                         stats = &vacrelstats->vacattrstats[i];
474                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
475                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
476                         stats->best = stats->guess1 = stats->guess2 = 0;
477                         stats->max = stats->min = 0;
478                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
479                         stats->max_len = stats->min_len = 0;
480                         stats->initialized = false;
481                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
482                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
483
484                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
485                         if (func_operator != NULL)
486                         {
487                                 int                     nargs;
488
489                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
490                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq), &nargs);
491                         }
492                         else
493                                 stats->f_cmpeq = NULL;
494
495                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
496                         if (func_operator != NULL)
497                         {
498                                 int                     nargs;
499
500                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
501                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt), &nargs);
502                         }
503                         else
504                                 stats->f_cmplt = NULL;
505
506                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
507                         if (func_operator != NULL)
508                         {
509                                 int                     nargs;
510
511                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
512                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt), &nargs);
513                         }
514                         else
515                                 stats->f_cmpgt = NULL;
516
517                         pgttup = SearchSysCacheTuple(TYPOID,
518                                                                  ObjectIdGetDatum(stats->attr->atttypid),
519                                                                                  0, 0, 0);
520                         if (HeapTupleIsValid(pgttup))
521                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
522                         else
523                                 stats->outfunc = InvalidOid;
524                 }
525                 vacrelstats->va_natts = attr_cnt;
526                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
527                 if (attnums)
528                         pfree(attnums);
529         }
530         else
531         {
532                 vacrelstats->va_natts = 0;
533                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
534         }
535
536         /* we require the relation to be locked until the indices are cleaned */
537         RelationSetLockForWrite(onerel);
538
539         /* scan it */
540         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
541         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
542
543         /* Now open indices */
544         Irel = (Relation *) NULL;
545         vc_getindices(vacrelstats->relid, &nindices, &Irel);
546
547         if (nindices > 0)
548                 vacrelstats->hasindex = true;
549         else
550                 vacrelstats->hasindex = false;
551
552         /* Clean/scan index relation(s) */
553         if (Irel != (Relation *) NULL)
554         {
555                 if (Vvpl.vpl_npages > 0)
556                 {
557                         for (i = 0; i < nindices; i++)
558                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
559                 }
560                 else
561 /* just scan indices to update statistic */
562                 {
563                         for (i = 0; i < nindices; i++)
564                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
565                 }
566         }
567
568         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
569                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
570         else
571         {
572                 if (Irel != (Relation *) NULL)
573                         vc_clsindices(nindices, Irel);
574                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
575                         vc_vacheap(vacrelstats, onerel, &Vvpl);
576         }
577
578         /* ok - free Vvpl list of reapped pages */
579         if (Vvpl.vpl_npages > 0)
580         {
581                 vpp = Vvpl.vpl_pgdesc;
582                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
583                         pfree(*vpp);
584                 pfree(Vvpl.vpl_pgdesc);
585                 if (Fvpl.vpl_npages > 0)
586                         pfree(Fvpl.vpl_pgdesc);
587         }
588
589         /* all done with this class */
590         heap_close(onerel);
591         heap_endscan(pgcscan);
592         heap_close(pgclass);
593
594         /* update statistics in pg_class */
595         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
596                                 vacrelstats->hasindex, vacrelstats);
597
598         /* next command frees attribute stats */
599
600         CommitTransactionCommand();
601 }
602
603 /*
604  *      vc_scanheap() -- scan an open heap relation
605  *
606  *              This routine sets commit times, constructs Vvpl list of
607  *              empty/uninitialized pages and pages with dead tuples and
608  *              ~LP_USED line pointers, constructs Fvpl list of pages
609  *              appropriate for purposes of shrinking and maintains statistics
610  *              on the number of live tuples in a heap.
611  */
612 static void
613 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
614                         VPageList Vvpl, VPageList Fvpl)
615 {
616         int                     nblocks,
617                                 blkno;
618         ItemId          itemid;
619         ItemPointer itemptr;
620         HeapTuple       htup;
621         Buffer          buf;
622         Page            page,
623                                 tempPage = NULL;
624         OffsetNumber offnum,
625                                 maxoff;
626         bool            pgchanged,
627                                 tupgone,
628                                 dobufrel,
629                                 notup;
630         char       *relname;
631         VPageDescr      vpc,
632                                 vp;
633         uint32          nvac,
634                                 ntups,
635                                 nunused,
636                                 ncrash,
637                                 nempg,
638                                 nnepg,
639                                 nchpg,
640                                 nemend;
641         Size            frsize,
642                                 frsusf;
643         Size            min_tlen = MAXTUPLEN;
644         Size            max_tlen = 0;
645         int32           i /* , attr_cnt */ ;
646         struct rusage ru0,
647                                 ru1;
648         bool            do_shrinking = true;
649
650         getrusage(RUSAGE_SELF, &ru0);
651
652         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
653         frsize = frsusf = 0;
654
655         relname = (RelationGetRelationName(onerel))->data;
656
657         nblocks = RelationGetNumberOfBlocks(onerel);
658
659         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
660         vpc->vpd_nusd = 0;
661
662         for (blkno = 0; blkno < nblocks; blkno++)
663         {
664                 buf = ReadBuffer(onerel, blkno);
665                 page = BufferGetPage(buf);
666                 vpc->vpd_blkno = blkno;
667                 vpc->vpd_noff = 0;
668
669                 if (PageIsNew(page))
670                 {
671                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
672                                  relname, blkno);
673                         PageInit(page, BufferGetPageSize(buf), 0);
674                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
675                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
676                         nnepg++;
677                         nemend++;
678                         vc_reappage(Vvpl, vpc);
679                         WriteBuffer(buf);
680                         continue;
681                 }
682
683                 if (PageIsEmpty(page))
684                 {
685                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
686                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
687                         nempg++;
688                         nemend++;
689                         vc_reappage(Vvpl, vpc);
690                         ReleaseBuffer(buf);
691                         continue;
692                 }
693
694                 pgchanged = false;
695                 notup = true;
696                 maxoff = PageGetMaxOffsetNumber(page);
697                 for (offnum = FirstOffsetNumber;
698                          offnum <= maxoff;
699                          offnum = OffsetNumberNext(offnum))
700                 {
701                         itemid = PageGetItemId(page, offnum);
702
703                         /*
704                          * Collect un-used items too - it's possible to have indices
705                          * pointing here after crash.
706                          */
707                         if (!ItemIdIsUsed(itemid))
708                         {
709                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
710                                 nunused++;
711                                 continue;
712                         }
713
714                         htup = (HeapTuple) PageGetItem(page, itemid);
715                         tupgone = false;
716
717                         if (!(htup->t_infomask & HEAP_XMIN_COMMITTED))
718                         {
719                                 if (htup->t_infomask & HEAP_XMIN_INVALID)
720                                         tupgone = true;
721                                 else
722                                 {
723                                         if (TransactionIdDidAbort(htup->t_xmin))
724                                                 tupgone = true;
725                                         else if (TransactionIdDidCommit(htup->t_xmin))
726                                         {
727                                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
728                                                 pgchanged = true;
729                                         }
730                                         else if (!TransactionIdIsInProgress(htup->t_xmin))
731                                         {
732                                                 /*
733                                                  * Not Aborted, Not Committed, Not in Progress - 
734                                                  * so it's from crashed process. - vadim 11/26/96
735                                                  */
736                                                 ncrash++;
737                                                 tupgone = true;
738                                         }
739                                         else
740                                         {
741                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
742                                                          relname, blkno, offnum, htup->t_xmin);
743                                                 do_shrinking = false;
744                                         }
745                                 }
746                         }
747
748                         /* 
749                          * here we are concerned about tuples with xmin committed 
750                          * and xmax unknown or committed
751                          */
752                         if (htup->t_infomask & HEAP_XMIN_COMMITTED && 
753                                 !(htup->t_infomask & HEAP_XMAX_INVALID))
754                         {
755                                 if (htup->t_infomask & HEAP_XMAX_COMMITTED)
756                                         tupgone = true;
757                                 else if (TransactionIdDidAbort(htup->t_xmax))
758                                 {
759                                         htup->t_infomask |= HEAP_XMAX_INVALID;
760                                         pgchanged = true;
761                                 }
762                                 else if (TransactionIdDidCommit(htup->t_xmax))
763                                         tupgone = true;
764                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
765                                 {
766                                         /*
767                                          * Not Aborted, Not Committed, Not in Progress - so it
768                                          * from crashed process. - vadim 06/02/97
769                                          */
770                                         htup->t_infomask |= HEAP_XMAX_INVALID;;
771                                         pgchanged = true;
772                                 }
773                                 else
774                                 {
775                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
776                                                  relname, blkno, offnum, htup->t_xmax);
777                                         do_shrinking = false;
778                                 }
779                         }
780
781                         /*
782                          * It's possibly! But from where it comes ? And should we fix
783                          * it ?  - vadim 11/28/96
784                          */
785                         itemptr = &(htup->t_ctid);
786                         if (!ItemPointerIsValid(itemptr) ||
787                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
788                         {
789                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
790                                          relname, blkno, offnum,
791                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
792                                          itemptr->ip_posid, tupgone);
793                         }
794
795                         /*
796                          * Other checks...
797                          */
798                         if (htup->t_len != itemid->lp_len)
799                         {
800                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
801                                          relname, blkno, offnum,
802                                          itemid->lp_len, htup->t_len, tupgone);
803                         }
804                         if (!OidIsValid(htup->t_oid))
805                         {
806                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
807                                          relname, blkno, offnum, tupgone);
808                         }
809
810                         if (tupgone)
811                         {
812                                 ItemId          lpp;
813
814                                 if (tempPage == (Page) NULL)
815                                 {
816                                         Size            pageSize;
817
818                                         pageSize = PageGetPageSize(page);
819                                         tempPage = (Page) palloc(pageSize);
820                                         memmove(tempPage, page, pageSize);
821                                 }
822
823                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
824
825                                 /* mark it unused */
826                                 lpp->lp_flags &= ~LP_USED;
827
828                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
829                                 nvac++;
830
831                         }
832                         else
833                         {
834                                 ntups++;
835                                 notup = false;
836                                 if (htup->t_len < min_tlen)
837                                         min_tlen = htup->t_len;
838                                 if (htup->t_len > max_tlen)
839                                         max_tlen = htup->t_len;
840                                 vc_attrstats(onerel, vacrelstats, htup);
841                         }
842                 }
843
844                 if (pgchanged)
845                 {
846                         WriteBuffer(buf);
847                         dobufrel = false;
848                         nchpg++;
849                 }
850                 else
851                         dobufrel = true;
852                 if (tempPage != (Page) NULL)
853                 {                                               /* Some tuples are gone */
854                         PageRepairFragmentation(tempPage);
855                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
856                         frsize += vpc->vpd_free;
857                         vc_reappage(Vvpl, vpc);
858                         pfree(tempPage);
859                         tempPage = (Page) NULL;
860                 }
861                 else if (vpc->vpd_noff > 0)
862                 {                                               /* there are only ~LP_USED line pointers */
863                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
864                         frsize += vpc->vpd_free;
865                         vc_reappage(Vvpl, vpc);
866                 }
867                 if (dobufrel)
868                         ReleaseBuffer(buf);
869                 if (notup)
870                         nemend++;
871                 else
872                         nemend = 0;
873         }
874
875         pfree(vpc);
876
877         /* save stats in the rel list for use later */
878         vacrelstats->ntups = ntups;
879         vacrelstats->npages = nblocks;
880 /*        vacrelstats->natts = attr_cnt;*/
881         if (ntups == 0)
882                 min_tlen = max_tlen = 0;
883         vacrelstats->min_tlen = min_tlen;
884         vacrelstats->max_tlen = max_tlen;
885
886         Vvpl->vpl_nemend = nemend;
887         Fvpl->vpl_nemend = nemend;
888
889         /*
890          * Try to make Fvpl keeping in mind that we can't use free space of
891          * "empty" end-pages and last page if it reapped.
892          */
893         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
894         {
895                 int                     nusf;           /* blocks usefull for re-using */
896
897                 nusf = Vvpl->vpl_npages - nemend;
898                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
899                         nusf--;
900
901                 for (i = 0; i < nusf; i++)
902                 {
903                         vp = Vvpl->vpl_pgdesc[i];
904                         if (vc_enough_space(vp, min_tlen))
905                         {
906                                 vc_vpinsert(Fvpl, vp);
907                                 frsusf += vp->vpd_free;
908                         }
909                 }
910         }
911
912         getrusage(RUSAGE_SELF, &ru1);
913
914         elog(MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
915 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
916                  relname,
917                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
918                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
919                  frsize, frsusf, nemend, Fvpl->vpl_npages,
920                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
921                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
922
923 }                                                               /* vc_scanheap */
924
925
926 /*
927  *      vc_rpfheap() -- try to repaire relation' fragmentation
928  *
929  *              This routine marks dead tuples as unused and tries re-use dead space
930  *              by moving tuples (and inserting indices if needed). It constructs
931  *              Nvpl list of free-ed pages (moved tuples) and clean indices
932  *              for them after committing (in hack-manner - without losing locks
933  *              and freeing memory!) current transaction. It truncates relation
934  *              if some end-blocks are gone away.
935  */
936 static void
937 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
938                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
939 {
940         TransactionId myXID;
941         CommandId       myCID;
942         Buffer          buf,
943                                 ToBuf;
944         int                     nblocks,
945                                 blkno;
946         Page            page,
947                                 ToPage = NULL;
948         OffsetNumber offnum = 0,
949                                 maxoff = 0,
950                                 newoff,
951                                 moff;
952         ItemId          itemid,
953                                 newitemid;
954         HeapTuple       htup,
955                                 newtup;
956         TupleDesc       tupdesc = NULL;
957         Datum      *idatum = NULL;
958         char       *inulls = NULL;
959         InsertIndexResult iresult;
960         VPageListData Nvpl;
961         VPageDescr      ToVpd = NULL,
962                                 Fvplast,
963                                 Vvplast,
964                                 vpc,
965                            *vpp;
966         int                     ToVpI = 0;
967         IndDesc    *Idesc,
968                            *idcur;
969         int                     Fblklast,
970                                 Vblklast,
971                                 i;
972         Size            tlen;
973         int                     nmoved,
974                                 Fnpages,
975                                 Vnpages;
976         int                     nchkmvd,
977                                 ntups;
978         bool            isempty,
979                                 dowrite;
980         struct rusage ru0,
981                                 ru1;
982
983         getrusage(RUSAGE_SELF, &ru0);
984
985         myXID = GetCurrentTransactionId();
986         myCID = GetCurrentCommandId();
987
988         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
989         {
990                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
991                 tupdesc = RelationGetTupleDescriptor(onerel);
992                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
993                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
994         }
995
996         Nvpl.vpl_npages = 0;
997         Fnpages = Fvpl->vpl_npages;
998         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
999         Fblklast = Fvplast->vpd_blkno;
1000         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
1001         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
1002         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1003         Vblklast = Vvplast->vpd_blkno;
1004         Assert(Vblklast >= Fblklast);
1005         ToBuf = InvalidBuffer;
1006         nmoved = 0;
1007
1008         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1009         vpc->vpd_nusd = vpc->vpd_noff = 0;
1010
1011         nblocks = vacrelstats->npages;
1012         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1013         {
1014                 /* if it's reapped page and it was used by me - quit */
1015                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1016                         break;
1017
1018                 buf = ReadBuffer(onerel, blkno);
1019                 page = BufferGetPage(buf);
1020
1021                 vpc->vpd_noff = 0;
1022
1023                 isempty = PageIsEmpty(page);
1024
1025                 dowrite = false;
1026                 if (blkno == Vblklast)  /* it's reapped page */
1027                 {
1028                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1029                         {                                       /* on this page - clean */
1030                                 Assert(!isempty);
1031                                 vc_vacpage(page, Vvplast);
1032                                 dowrite = true;
1033                         }
1034                         else
1035                         {
1036                                 Assert(isempty);
1037                         }
1038                         --Vnpages;
1039                         Assert(Vnpages > 0);
1040                         /* get prev reapped page from Vvpl */
1041                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1042                         Vblklast = Vvplast->vpd_blkno;
1043                         if (blkno == Fblklast)          /* this page in Fvpl too */
1044                         {
1045                                 --Fnpages;
1046                                 Assert(Fnpages > 0);
1047                                 Assert(Fvplast->vpd_nusd == 0);
1048                                 /* get prev reapped page from Fvpl */
1049                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1050                                 Fblklast = Fvplast->vpd_blkno;
1051                         }
1052                         Assert(Fblklast <= Vblklast);
1053                         if (isempty)
1054                         {
1055                                 ReleaseBuffer(buf);
1056                                 continue;
1057                         }
1058                 }
1059                 else
1060                 {
1061                         Assert(!isempty);
1062                 }
1063
1064                 vpc->vpd_blkno = blkno;
1065                 maxoff = PageGetMaxOffsetNumber(page);
1066                 for (offnum = FirstOffsetNumber;
1067                          offnum <= maxoff;
1068                          offnum = OffsetNumberNext(offnum))
1069                 {
1070                         itemid = PageGetItemId(page, offnum);
1071
1072                         if (!ItemIdIsUsed(itemid))
1073                                 continue;
1074
1075                         htup = (HeapTuple) PageGetItem(page, itemid);
1076                         tlen = htup->t_len;
1077
1078                         /* try to find new page for this tuple */
1079                         if (ToBuf == InvalidBuffer ||
1080                                 !vc_enough_space(ToVpd, tlen))
1081                         {
1082                                 if (ToBuf != InvalidBuffer)
1083                                 {
1084                                         WriteBuffer(ToBuf);
1085                                         ToBuf = InvalidBuffer;
1086
1087                                         /*
1088                                          * If no one tuple can't be added to this page -
1089                                          * remove page from Fvpl. - vadim 11/27/96
1090                                          */
1091                                         if (!vc_enough_space(ToVpd, vacrelstats->min_tlen))
1092                                         {
1093                                                 if (ToVpd != Fvplast)
1094                                                 {
1095                                                         Assert(Fnpages > ToVpI + 1);
1096                                                         memmove(Fvpl->vpl_pgdesc + ToVpI,
1097                                                                         Fvpl->vpl_pgdesc + ToVpI + 1,
1098                                                         sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1099                                                 }
1100                                                 Assert(Fnpages >= 1);
1101                                                 Fnpages--;
1102                                                 if (Fnpages == 0)
1103                                                         break;
1104                                                 /* get prev reapped page from Fvpl */
1105                                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1106                                                 Fblklast = Fvplast->vpd_blkno;
1107                                         }
1108                                 }
1109                                 for (i = 0; i < Fnpages; i++)
1110                                 {
1111                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1112                                                 break;
1113                                 }
1114                                 if (i == Fnpages)
1115                                         break;          /* can't move item anywhere */
1116                                 ToVpI = i;
1117                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1118                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1119                                 ToPage = BufferGetPage(ToBuf);
1120                                 /* if this page was not used before - clean it */
1121                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1122                                         vc_vacpage(ToPage, ToVpd);
1123                         }
1124
1125                         /* copy tuple */
1126                         newtup = (HeapTuple) palloc(tlen);
1127                         memmove((char *) newtup, (char *) htup, tlen);
1128
1129                         /* store transaction information */
1130                         TransactionIdStore(myXID, &(newtup->t_xmin));
1131                         newtup->t_cmin = myCID;
1132                         StoreInvalidTransactionId(&(newtup->t_xmax));
1133                         /* set xmin to unknown and xmax to invalid */
1134                         newtup->t_infomask &= ~(HEAP_XACT_MASK);
1135                         newtup->t_infomask |= HEAP_XMAX_INVALID;
1136
1137                         /* add tuple to the page */
1138                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1139                                                                  InvalidOffsetNumber, LP_USED);
1140                         if (newoff == InvalidOffsetNumber)
1141                         {
1142                                 elog(WARN, "\
1143 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1144                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1145                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1146                         }
1147                         newitemid = PageGetItemId(ToPage, newoff);
1148                         pfree(newtup);
1149                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1150                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1151
1152                         /* now logically delete end-tuple */
1153                         TransactionIdStore(myXID, &(htup->t_xmax));
1154                         htup->t_cmax = myCID;
1155                         /* set xmax to unknown */
1156                         htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED);
1157
1158                         ToVpd->vpd_nusd++;
1159                         nmoved++;
1160                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1161                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1162
1163                         /* insert index' tuples if needed */
1164                         if (Irel != (Relation *) NULL)
1165                         {
1166                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1167                                 {
1168                                         FormIndexDatum(
1169                                                                    idcur->natts,
1170                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1171                                                                    newtup,
1172                                                                    tupdesc,
1173                                                                    InvalidBuffer,
1174                                                                    idatum,
1175                                                                    inulls,
1176                                                                    idcur->finfoP);
1177                                         iresult = index_insert(
1178                                                                                    Irel[i],
1179                                                                                    idatum,
1180                                                                                    inulls,
1181                                                                                    &(newtup->t_ctid),
1182                                                                                    onerel);
1183                                         if (iresult)
1184                                                 pfree(iresult);
1185                                 }
1186                         }
1187
1188                 }                                               /* walk along page */
1189
1190                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1191                 {
1192                         vc_reappage(&Nvpl, vpc);
1193                         WriteBuffer(buf);
1194                 }
1195                 else if (dowrite)
1196                         WriteBuffer(buf);
1197                 else
1198                         ReleaseBuffer(buf);
1199
1200                 if (offnum <= maxoff)
1201                         break;                          /* some item(s) left */
1202
1203         }                                                       /* walk along relation */
1204
1205         blkno++;                                        /* new number of blocks */
1206
1207         if (ToBuf != InvalidBuffer)
1208         {
1209                 Assert(nmoved > 0);
1210                 WriteBuffer(ToBuf);
1211         }
1212
1213         if (nmoved > 0)
1214         {
1215
1216                 /*
1217                  * We have to commit our tuple' movings before we'll truncate
1218                  * relation, but we shouldn't lose our locks. And so - quick hack:
1219                  * flush buffers and record status of current transaction as
1220                  * committed, and continue. - vadim 11/13/96
1221                  */
1222                 FlushBufferPool(!TransactionFlushEnabled());
1223                 TransactionIdCommit(myXID);
1224                 FlushBufferPool(!TransactionFlushEnabled());
1225         }
1226
1227         /*
1228          * Clean uncleaned reapped pages from Vvpl list and set xmin committed
1229          * for inserted tuples
1230          */
1231         nchkmvd = 0;
1232         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1233         {
1234                 Assert((*vpp)->vpd_blkno < blkno);
1235                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1236                 page = BufferGetPage(buf);
1237                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1238                 {
1239
1240                         /*
1241                          * noff == 0 in empty pages only - such pages should be
1242                          * re-used
1243                          */
1244                         Assert((*vpp)->vpd_noff > 0);
1245                         vc_vacpage(page, *vpp);
1246                 }
1247                 else
1248 /* this page was used */
1249                 {
1250                         ntups = 0;
1251                         moff = PageGetMaxOffsetNumber(page);
1252                         for (newoff = FirstOffsetNumber;
1253                                  newoff <= moff;
1254                                  newoff = OffsetNumberNext(newoff))
1255                         {
1256                                 itemid = PageGetItemId(page, newoff);
1257                                 if (!ItemIdIsUsed(itemid))
1258                                         continue;
1259                                 htup = (HeapTuple) PageGetItem(page, itemid);
1260                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1261                                 {
1262                                         htup->t_infomask |= HEAP_XMIN_COMMITTED;
1263                                         ntups++;
1264                                 }
1265                         }
1266                         Assert((*vpp)->vpd_nusd == ntups);
1267                         nchkmvd += ntups;
1268                 }
1269                 WriteBuffer(buf);
1270         }
1271         Assert(nmoved == nchkmvd);
1272
1273         getrusage(RUSAGE_SELF, &ru1);
1274
1275         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1276 Elapsed %u/%u sec.",
1277                  (RelationGetRelationName(onerel))->data,
1278                  nblocks, blkno, nmoved,
1279                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1280                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1281
1282         if (Nvpl.vpl_npages > 0)
1283         {
1284                 /* vacuum indices again if needed */
1285                 if (Irel != (Relation *) NULL)
1286                 {
1287                         VPageDescr *vpleft,
1288                                            *vpright,
1289                                                 vpsave;
1290
1291                         /* re-sort Nvpl.vpl_pgdesc */
1292                         for (vpleft = Nvpl.vpl_pgdesc,
1293                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1294                                  vpleft < vpright; vpleft++, vpright--)
1295                         {
1296                                 vpsave = *vpleft;
1297                                 *vpleft = *vpright;
1298                                 *vpright = vpsave;
1299                         }
1300                         for (i = 0; i < nindices; i++)
1301                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1302                 }
1303
1304                 /*
1305                  * clean moved tuples from last page in Nvpl list if some tuples
1306                  * left there
1307                  */
1308                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1309                 {
1310                         Assert(vpc->vpd_blkno == blkno - 1);
1311                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1312                         page = BufferGetPage(buf);
1313                         ntups = 0;
1314                         maxoff = offnum;
1315                         for (offnum = FirstOffsetNumber;
1316                                  offnum < maxoff;
1317                                  offnum = OffsetNumberNext(offnum))
1318                         {
1319                                 itemid = PageGetItemId(page, offnum);
1320                                 if (!ItemIdIsUsed(itemid))
1321                                         continue;
1322                                 htup = (HeapTuple) PageGetItem(page, itemid);
1323                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1324                                 itemid->lp_flags &= ~LP_USED;
1325                                 ntups++;
1326                         }
1327                         Assert(vpc->vpd_noff == ntups);
1328                         PageRepairFragmentation(page);
1329                         WriteBuffer(buf);
1330                 }
1331
1332                 /* now - free new list of reapped pages */
1333                 vpp = Nvpl.vpl_pgdesc;
1334                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1335                         pfree(*vpp);
1336                 pfree(Nvpl.vpl_pgdesc);
1337         }
1338
1339         /* truncate relation */
1340         if (blkno < nblocks)
1341         {
1342                 i = BlowawayRelationBuffers(onerel, blkno);
1343                 if (i < 0)
1344                         elog (FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1345                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1346                 Assert(blkno >= 0);
1347                 vacrelstats->npages = blkno;    /* set new number of blocks */
1348         }
1349
1350         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1351         {
1352                 pfree(Idesc);
1353                 pfree(idatum);
1354                 pfree(inulls);
1355                 vc_clsindices(nindices, Irel);
1356         }
1357
1358         pfree(vpc);
1359
1360 }                                                               /* vc_rpfheap */
1361
1362 /*
1363  *      vc_vacheap() -- free dead tuples
1364  *
1365  *              This routine marks dead tuples as unused and truncates relation
1366  *              if there are "empty" end-blocks.
1367  */
1368 static void
1369 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1370 {
1371         Buffer          buf;
1372         Page            page;
1373         VPageDescr *vpp;
1374         int                     nblocks;
1375         int                     i;
1376
1377         nblocks = Vvpl->vpl_npages;
1378         nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1379
1380         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1381         {
1382                 if ((*vpp)->vpd_noff > 0)
1383                 {
1384                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1385                         page = BufferGetPage(buf);
1386                         vc_vacpage(page, *vpp);
1387                         WriteBuffer(buf);
1388                 }
1389         }
1390
1391         /* truncate relation if there are some empty end-pages */
1392         if (Vvpl->vpl_nemend > 0)
1393         {
1394                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1395                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1396                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1397                          (RelationGetRelationName(onerel))->data,
1398                          vacrelstats->npages, nblocks);
1399
1400                 /*
1401                  * we have to flush "empty" end-pages (if changed, but who knows
1402                  * it) before truncation
1403                  */
1404                 FlushBufferPool(!TransactionFlushEnabled());
1405                 
1406                 i = BlowawayRelationBuffers(onerel, nblocks);
1407                 if (i < 0)
1408                         elog (FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1409
1410                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1411                 Assert(nblocks >= 0);
1412                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1413         }
1414
1415 }                                                               /* vc_vacheap */
1416
1417 /*
1418  *      vc_vacpage() -- free dead tuples on a page
1419  *                                       and repaire its fragmentation.
1420  */
1421 static void
1422 vc_vacpage(Page page, VPageDescr vpd)
1423 {
1424         ItemId          itemid;
1425         int                     i;
1426
1427         Assert(vpd->vpd_nusd == 0);
1428         for (i = 0; i < vpd->vpd_noff; i++)
1429         {
1430                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1431                 itemid->lp_flags &= ~LP_USED;
1432         }
1433         PageRepairFragmentation(page);
1434
1435 }                                                               /* vc_vacpage */
1436
1437 /*
1438  *      _vc_scanoneind() -- scan one index relation to update statistic.
1439  *
1440  */
1441 static void
1442 vc_scanoneind(Relation indrel, int nhtups)
1443 {
1444         RetrieveIndexResult res;
1445         IndexScanDesc iscan;
1446         int                     nitups;
1447         int                     nipages;
1448         struct rusage ru0,
1449                                 ru1;
1450
1451         getrusage(RUSAGE_SELF, &ru0);
1452
1453         /* walk through the entire index */
1454         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1455         nitups = 0;
1456
1457         while ((res = index_getnext(iscan, ForwardScanDirection))
1458                    != (RetrieveIndexResult) NULL)
1459         {
1460                 nitups++;
1461                 pfree(res);
1462         }
1463
1464         index_endscan(iscan);
1465
1466         /* now update statistics in pg_class */
1467         nipages = RelationGetNumberOfBlocks(indrel);
1468         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1469
1470         getrusage(RUSAGE_SELF, &ru1);
1471
1472         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1473                  indrel->rd_rel->relname.data, nipages, nitups,
1474                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1475                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1476
1477         if (nitups != nhtups)
1478                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1479                          indrel->rd_rel->relname.data, nitups, nhtups);
1480
1481 }                                                               /* vc_scanoneind */
1482
1483 /*
1484  *      vc_vaconeind() -- vacuum one index relation.
1485  *
1486  *              Vpl is the VPageList of the heap we're currently vacuuming.
1487  *              It's locked. Indrel is an index relation on the vacuumed heap.
1488  *              We don't set locks on the index relation here, since the indexed
1489  *              access methods support locking at different granularities.
1490  *              We let them handle it.
1491  *
1492  *              Finally, we arrange to update the index relation's statistics in
1493  *              pg_class.
1494  */
1495 static void
1496 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1497 {
1498         RetrieveIndexResult res;
1499         IndexScanDesc iscan;
1500         ItemPointer heapptr;
1501         int                     nvac;
1502         int                     nitups;
1503         int                     nipages;
1504         VPageDescr      vp;
1505         struct rusage ru0,
1506                                 ru1;
1507
1508         getrusage(RUSAGE_SELF, &ru0);
1509
1510         /* walk through the entire index */
1511         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1512         nvac = 0;
1513         nitups = 0;
1514
1515         while ((res = index_getnext(iscan, ForwardScanDirection))
1516                    != (RetrieveIndexResult) NULL)
1517         {
1518                 heapptr = &res->heap_iptr;
1519
1520                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1521                 {
1522 #if 0
1523                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1524                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1525                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1526                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1527                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1528 #endif
1529                         if (vp->vpd_noff == 0)
1530                         {                                       /* this is EmptyPage !!! */
1531                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1532                                          indrel->rd_rel->relname.data,
1533                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1534                         }
1535                         ++nvac;
1536                         index_delete(indrel, &res->index_iptr);
1537                 }
1538                 else
1539                 {
1540                         nitups++;
1541                 }
1542
1543                 /* be tidy */
1544                 pfree(res);
1545         }
1546
1547         index_endscan(iscan);
1548
1549         /* now update statistics in pg_class */
1550         nipages = RelationGetNumberOfBlocks(indrel);
1551         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1552
1553         getrusage(RUSAGE_SELF, &ru1);
1554
1555         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1556                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1557                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1558                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1559
1560         if (nitups != nhtups)
1561                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1562                          indrel->rd_rel->relname.data, nitups, nhtups);
1563
1564 }                                                               /* vc_vaconeind */
1565
1566 /*
1567  *      vc_tidreapped() -- is a particular tid reapped?
1568  *
1569  *              vpl->VPageDescr_array is sorted in right order.
1570  */
1571 static VPageDescr
1572 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1573 {
1574         OffsetNumber ioffno;
1575         OffsetNumber *voff;
1576         VPageDescr      vp,
1577                            *vpp;
1578         VPageDescrData vpd;
1579
1580         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1581         ioffno = ItemPointerGetOffsetNumber(itemptr);
1582
1583         vp = &vpd;
1584         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1585                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1586                                                                         vc_cmp_blk);
1587
1588         if (vpp == (VPageDescr *) NULL)
1589                 return ((VPageDescr) NULL);
1590         vp = *vpp;
1591
1592         /* ok - we are on true page */
1593
1594         if (vp->vpd_noff == 0)
1595         {                                                       /* this is EmptyPage !!! */
1596                 return (vp);
1597         }
1598
1599         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1600                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1601                                                                            vc_cmp_offno);
1602
1603         if (voff == (OffsetNumber *) NULL)
1604                 return ((VPageDescr) NULL);
1605
1606         return (vp);
1607
1608 }                                                               /* vc_tidreapped */
1609
1610 /*
1611  *      vc_attrstats() -- compute column statistics used by the optimzer
1612  *
1613  *      We compute the column min, max, null and non-null counts.
1614  *      Plus we attempt to find the count of the value that occurs most
1615  *      frequently in each column
1616  *      These figures are used to compute the selectivity of the column
1617  *
1618  *      We use a three-bucked cache to get the most frequent item
1619  *      The 'guess' buckets count hits.  A cache miss causes guess1
1620  *      to get the most hit 'guess' item in the most recent cycle, and
1621  *      the new item goes into guess2.  Whenever the total count of hits
1622  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1623  *
1624  *      This method works perfectly for columns with unique values, and columns
1625  *      with only two unique values, plus nulls.
1626  *
1627  *      It becomes less perfect as the number of unique values increases and
1628  *      their distribution in the table becomes more random.
1629  *
1630  */
1631 static void
1632 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1633 {
1634         int                     i,
1635                                 attr_cnt = vacrelstats->va_natts;
1636         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1637         TupleDesc       tupDesc = onerel->rd_att;
1638         Datum           value;
1639         bool            isnull;
1640
1641         for (i = 0; i < attr_cnt; i++)
1642         {
1643                 VacAttrStats *stats = &vacattrstats[i];
1644                 bool            value_hit = true;
1645
1646                 value = heap_getattr(htup, InvalidBuffer,
1647                                                          stats->attr->attnum, tupDesc, &isnull);
1648
1649                 if (!VacAttrStatsEqValid(stats))
1650                         continue;
1651
1652                 if (isnull)
1653                         stats->null_cnt++;
1654                 else
1655                 {
1656                         stats->nonnull_cnt++;
1657                         if (stats->initialized == false)
1658                         {
1659                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1660                                 /* best_cnt gets incremented later */
1661                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1662                                 stats->guess1_cnt = stats->guess1_hits = 1;
1663                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1664                                 stats->guess2_hits = 1;
1665                                 if (VacAttrStatsLtGtValid(stats))
1666                                 {
1667                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1668                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1669                                 }
1670                                 stats->initialized = true;
1671                         }
1672                         if (VacAttrStatsLtGtValid(stats))
1673                         {
1674                                 if ((*(stats->f_cmplt)) (value, stats->min))
1675                                 {
1676                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1677                                         stats->min_cnt = 0;
1678                                 }
1679                                 if ((*(stats->f_cmpgt)) (value, stats->max))
1680                                 {
1681                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1682                                         stats->max_cnt = 0;
1683                                 }
1684                                 if ((*(stats->f_cmpeq)) (value, stats->min))
1685                                         stats->min_cnt++;
1686                                 else if ((*(stats->f_cmpeq)) (value, stats->max))
1687                                         stats->max_cnt++;
1688                         }
1689                         if ((*(stats->f_cmpeq)) (value, stats->best))
1690                                 stats->best_cnt++;
1691                         else if ((*(stats->f_cmpeq)) (value, stats->guess1))
1692                         {
1693                                 stats->guess1_cnt++;
1694                                 stats->guess1_hits++;
1695                         }
1696                         else if ((*(stats->f_cmpeq)) (value, stats->guess2))
1697                                 stats->guess2_hits++;
1698                         else
1699                                 value_hit = false;
1700
1701                         if (stats->guess2_hits > stats->guess1_hits)
1702                         {
1703                                 swapDatum(stats->guess1, stats->guess2);
1704                                 swapInt(stats->guess1_len, stats->guess2_len);
1705                                 stats->guess1_cnt = stats->guess2_hits;
1706                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1707                         }
1708                         if (stats->guess1_cnt > stats->best_cnt)
1709                         {
1710                                 swapDatum(stats->best, stats->guess1);
1711                                 swapInt(stats->best_len, stats->guess1_len);
1712                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1713                                 stats->guess1_hits = 1;
1714                                 stats->guess2_hits = 1;
1715                         }
1716                         if (!value_hit)
1717                         {
1718                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1719                                 stats->guess1_hits = 1;
1720                                 stats->guess2_hits = 1;
1721                         }
1722                 }
1723         }
1724         return;
1725 }
1726
1727 /*
1728  *      vc_bucketcpy() -- update pg_class statistics for one relation
1729  *
1730  */
1731 static void
1732 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1733 {
1734         if (attr->attbyval && attr->attlen != -1)
1735                 *bucket = value;
1736         else
1737         {
1738                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1739
1740                 if (len > *bucket_len)
1741                 {
1742                         if (*bucket_len != 0)
1743                                 pfree(DatumGetPointer(*bucket));
1744                         *bucket = PointerGetDatum(palloc(len));
1745                         *bucket_len = len;
1746                 }
1747                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1748         }
1749 }
1750
1751 /*
1752  *      vc_updstats() -- update pg_class statistics for one relation
1753  *
1754  *              This routine works for both index and heap relation entries in
1755  *              pg_class.  We violate no-overwrite semantics here by storing new
1756  *              values for ntups, npages, and hasindex directly in the pg_class
1757  *              tuple that's already on the page.  The reason for this is that if
1758  *              we updated these tuples in the usual way, then every tuple in pg_class
1759  *              would be replaced every day.  This would make planning and executing
1760  *              historical queries very expensive.
1761  */
1762 static void
1763 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1764 {
1765         Relation        rd,
1766                                 ad,
1767                                 sd;
1768         HeapScanDesc rsdesc,
1769                                 asdesc;
1770         TupleDesc       sdesc;
1771         HeapTuple       rtup,
1772                                 atup,
1773                                 stup;
1774         Buffer          rbuf,
1775                                 abuf;
1776         Form_pg_class pgcform;
1777         ScanKeyData rskey,
1778                                 askey;
1779         AttributeTupleForm attp;
1780
1781         /*
1782          * update number of tuples and number of pages in pg_class
1783          */
1784         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1785                                                    ObjectIdEqualRegProcedure,
1786                                                    ObjectIdGetDatum(relid));
1787
1788         rd = heap_openr(RelationRelationName);
1789         rsdesc = heap_beginscan(rd, false, false, 1, &rskey);
1790
1791         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1792                 elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1793                          relid);
1794
1795         /* overwrite the existing statistics in the tuple */
1796         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1797         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1798         pgcform->reltuples = ntups;
1799         pgcform->relpages = npages;
1800         pgcform->relhasindex = hasindex;
1801
1802         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1803         {
1804                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1805                 int                     natts = vacrelstats->va_natts;
1806
1807                 ad = heap_openr(AttributeRelationName);
1808                 sd = heap_openr(StatisticRelationName);
1809                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1810                                                            F_INT4EQ, relid);
1811
1812                 asdesc = heap_beginscan(ad, false, false, 1, &askey);
1813
1814                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1815                 {
1816                         int                     i;
1817                         float32data selratio;           /* average ratio of rows selected
1818                                                                                  * for a random constant */
1819                         VacAttrStats *stats;
1820                         Datum           values[Natts_pg_statistic];
1821                         char            nulls[Natts_pg_statistic];
1822
1823                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1824                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1825                                 /* they are unique anyway */
1826                                 continue;
1827
1828                         for (i = 0; i < natts; i++)
1829                         {
1830                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1831                                         break;
1832                         }
1833                         if (i >= natts)
1834                                 continue;
1835                         stats = &(vacattrstats[i]);
1836
1837                         /* overwrite the existing statistics in the tuple */
1838                         if (VacAttrStatsEqValid(stats))
1839                         {
1840
1841                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1842
1843                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1844                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1845                                         selratio = 0;
1846                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1847                                 {
1848                                         double          min_cnt_d = stats->min_cnt,
1849                                                                 max_cnt_d = stats->max_cnt,
1850                                                                 null_cnt_d = stats->null_cnt,
1851                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1852
1853                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1854                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1855                                 }
1856                                 else
1857                                 {
1858                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1859                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1860
1861                                         /*
1862                                          * we assume count of other values are 20% of best
1863                                          * count in table
1864                                          */
1865                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1866                                 }
1867                                 if (selratio > 1.0)
1868                                         selratio = 1.0;
1869                                 attp->attdisbursion = selratio;
1870                                 WriteNoReleaseBuffer(abuf);
1871
1872                                 /* DO PG_STATISTIC INSERTS */
1873
1874                                 /*
1875                                  * doing system relations, especially pg_statistic is a
1876                                  * problem
1877                                  */
1878                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1879                                                                                                                                                  * !IsSystemRelationName(
1880                                                                                                                                                  *
1881                                          pgcform->relname.data) */ )
1882                                 {
1883                                         func_ptr        out_function;
1884                                         char       *out_string;
1885                                         int                     dummy;
1886
1887                                         for (i = 0; i < Natts_pg_statistic; ++i)
1888                                                 nulls[i] = ' ';
1889
1890                                         /* ----------------
1891                                          *      initialize values[]
1892                                          * ----------------
1893                                          */
1894                                         i = 0;
1895                                         values[i++] = (Datum) relid;            /* 1 */
1896                                         values[i++] = (Datum) attp->attnum; /* 2 */
1897                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1898                                         fmgr_info(stats->outfunc, &out_function, &dummy);
1899                                         out_string = (*out_function) (stats->min, stats->attr->atttypid);
1900                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1901                                         pfree(out_string);
1902                                         out_string = (char *) (*out_function) (stats->max, stats->attr->atttypid);
1903                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1904                                         pfree(out_string);
1905
1906                                         sdesc = sd->rd_att;
1907
1908                                         stup = heap_formtuple(sdesc, values, nulls);
1909
1910                                         /* ----------------
1911                                          *      insert the tuple in the relation and get the tuple's oid.
1912                                          * ----------------
1913                                          */
1914                                         heap_insert(sd, stup);
1915                                         pfree(DatumGetPointer(values[3]));
1916                                         pfree(DatumGetPointer(values[4]));
1917                                         pfree(stup);
1918                                 }
1919                         }
1920                 }
1921                 heap_endscan(asdesc);
1922                 heap_close(ad);
1923                 heap_close(sd);
1924         }
1925
1926         /* XXX -- after write, should invalidate relcache in other backends */
1927         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1928
1929         /*
1930          * invalidating system relations confuses the function cache of
1931          * pg_operator and pg_opclass
1932          */
1933         if (!IsSystemRelationName(pgcform->relname.data))
1934                 RelationInvalidateHeapTuple(rd, rtup);
1935
1936         /* that's all, folks */
1937         heap_endscan(rsdesc);
1938         heap_close(rd);
1939 }
1940
1941 /*
1942  *      vc_delhilowstats() -- delete pg_statistics rows
1943  *
1944  */
1945 static void
1946 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1947 {
1948         Relation        pgstatistic;
1949         HeapScanDesc pgsscan;
1950         HeapTuple       pgstup;
1951         ScanKeyData pgskey;
1952
1953         pgstatistic = heap_openr(StatisticRelationName);
1954
1955         if (relid != InvalidOid)
1956         {
1957                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1958                                                            ObjectIdEqualRegProcedure,
1959                                                            ObjectIdGetDatum(relid));
1960                 pgsscan = heap_beginscan(pgstatistic, false, false, 1, &pgskey);
1961         }
1962         else
1963                 pgsscan = heap_beginscan(pgstatistic, false, false, 0, NULL);
1964
1965         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1966         {
1967                 if (attcnt > 0)
1968                 {
1969                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
1970                         int                     i;
1971
1972                         for (i = 0; i < attcnt; i++)
1973                         {
1974                                 if (pgs->staattnum == attnums[i] + 1)
1975                                         break;
1976                         }
1977                         if (i >= attcnt)
1978                                 continue;               /* don't delete it */
1979                 }
1980                 heap_delete(pgstatistic, &pgstup->t_ctid);
1981         }
1982
1983         heap_endscan(pgsscan);
1984         heap_close(pgstatistic);
1985 }
1986
1987 static void
1988 vc_setpagelock(Relation rel, BlockNumber blkno)
1989 {
1990         ItemPointerData itm;
1991
1992         ItemPointerSet(&itm, blkno, 1);
1993
1994         RelationSetLockForWritePage(rel, &itm);
1995 }
1996
1997 /*
1998  *      vc_reappage() -- save a page on the array of reapped pages.
1999  *
2000  *              As a side effect of the way that the vacuuming loop for a given
2001  *              relation works, higher pages come after lower pages in the array
2002  *              (and highest tid on a page is last).
2003  */
2004 static void
2005 vc_reappage(VPageList vpl, VPageDescr vpc)
2006 {
2007         VPageDescr      newvpd;
2008
2009         /* allocate a VPageDescrData entry */
2010         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
2011
2012         /* fill it in */
2013         if (vpc->vpd_noff > 0)
2014                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
2015         newvpd->vpd_blkno = vpc->vpd_blkno;
2016         newvpd->vpd_free = vpc->vpd_free;
2017         newvpd->vpd_nusd = vpc->vpd_nusd;
2018         newvpd->vpd_noff = vpc->vpd_noff;
2019
2020         /* insert this page into vpl list */
2021         vc_vpinsert(vpl, newvpd);
2022
2023 }                                                               /* vc_reappage */
2024
2025 static void
2026 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2027 {
2028
2029         /* allocate a VPageDescr entry if needed */
2030         if (vpl->vpl_npages == 0)
2031                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2032         else if (vpl->vpl_npages % 100 == 0)
2033                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2034         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2035         (vpl->vpl_npages)++;
2036
2037 }
2038
2039 static void
2040 vc_free(VRelList vrl)
2041 {
2042         VRelList        p_vrl;
2043         MemoryContext old;
2044         PortalVariableMemory pmem;
2045
2046         pmem = PortalGetVariableMemory(vc_portal);
2047         old = MemoryContextSwitchTo((MemoryContext) pmem);
2048
2049         while (vrl != (VRelList) NULL)
2050         {
2051
2052                 /* free rel list entry */
2053                 p_vrl = vrl;
2054                 vrl = vrl->vrl_next;
2055                 pfree(p_vrl);
2056         }
2057
2058         MemoryContextSwitchTo(old);
2059 }
2060
2061 static char *
2062 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2063 {
2064         int                     res;
2065         int                     last = nelem - 1;
2066         int                     celm = nelem / 2;
2067         bool            last_move,
2068                                 first_move;
2069
2070         last_move = first_move = true;
2071         for (;;)
2072         {
2073                 if (first_move == true)
2074                 {
2075                         res = compar(bot, elm);
2076                         if (res > 0)
2077                                 return (NULL);
2078                         if (res == 0)
2079                                 return (bot);
2080                         first_move = false;
2081                 }
2082                 if (last_move == true)
2083                 {
2084                         res = compar(elm, bot + last * size);
2085                         if (res > 0)
2086                                 return (NULL);
2087                         if (res == 0)
2088                                 return (bot + last * size);
2089                         last_move = false;
2090                 }
2091                 res = compar(elm, bot + celm * size);
2092                 if (res == 0)
2093                         return (bot + celm * size);
2094                 if (res < 0)
2095                 {
2096                         if (celm == 0)
2097                                 return (NULL);
2098                         last = celm - 1;
2099                         celm = celm / 2;
2100                         last_move = true;
2101                         continue;
2102                 }
2103
2104                 if (celm == last)
2105                         return (NULL);
2106
2107                 last = last - celm - 1;
2108                 bot = bot + (celm + 1) * size;
2109                 celm = (last + 1) / 2;
2110                 first_move = true;
2111         }
2112
2113 }                                                               /* vc_find_eq */
2114
2115 static int
2116 vc_cmp_blk(char *left, char *right)
2117 {
2118         BlockNumber lblk,
2119                                 rblk;
2120
2121         lblk = (*((VPageDescr *) left))->vpd_blkno;
2122         rblk = (*((VPageDescr *) right))->vpd_blkno;
2123
2124         if (lblk < rblk)
2125                 return (-1);
2126         if (lblk == rblk)
2127                 return (0);
2128         return (1);
2129
2130 }                                                               /* vc_cmp_blk */
2131
2132 static int
2133 vc_cmp_offno(char *left, char *right)
2134 {
2135
2136         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2137                 return (-1);
2138         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2139                 return (0);
2140         return (1);
2141
2142 }                                                               /* vc_cmp_offno */
2143
2144
2145 static void
2146 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2147 {
2148         Relation        pgindex;
2149         Relation        irel;
2150         TupleDesc       pgidesc;
2151         HeapTuple       pgitup;
2152         HeapScanDesc pgiscan;
2153         Datum           d;
2154         int                     i,
2155                                 k;
2156         bool            n;
2157         ScanKeyData pgikey;
2158         Oid                *ioid;
2159
2160         *nindices = i = 0;
2161
2162         ioid = (Oid *) palloc(10 * sizeof(Oid));
2163
2164         /* prepare a heap scan on the pg_index relation */
2165         pgindex = heap_openr(IndexRelationName);
2166         pgidesc = RelationGetTupleDescriptor(pgindex);
2167
2168         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2169                                                    ObjectIdEqualRegProcedure,
2170                                                    ObjectIdGetDatum(relid));
2171
2172         pgiscan = heap_beginscan(pgindex, false, false, 1, &pgikey);
2173
2174         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2175         {
2176                 d = heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2177                                                  pgidesc, &n);
2178                 i++;
2179                 if (i % 10 == 0)
2180                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2181                 ioid[i - 1] = DatumGetObjectId(d);
2182         }
2183
2184         heap_endscan(pgiscan);
2185         heap_close(pgindex);
2186
2187         if (i == 0)
2188         {                                                       /* No one index found */
2189                 pfree(ioid);
2190                 return;
2191         }
2192
2193         if (Irel != (Relation **) NULL)
2194                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2195
2196         for (k = 0; i > 0;)
2197         {
2198                 irel = index_open(ioid[--i]);
2199                 if (irel != (Relation) NULL)
2200                 {
2201                         if (Irel != (Relation **) NULL)
2202                                 (*Irel)[k] = irel;
2203                         else
2204                                 index_close(irel);
2205                         k++;
2206                 }
2207                 else
2208                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2209         }
2210         *nindices = k;
2211         pfree(ioid);
2212
2213         if (Irel != (Relation **) NULL && *nindices == 0)
2214         {
2215                 pfree(*Irel);
2216                 *Irel = (Relation *) NULL;
2217         }
2218
2219 }                                                               /* vc_getindices */
2220
2221
2222 static void
2223 vc_clsindices(int nindices, Relation *Irel)
2224 {
2225
2226         if (Irel == (Relation *) NULL)
2227                 return;
2228
2229         while (nindices--)
2230         {
2231                 index_close(Irel[nindices]);
2232         }
2233         pfree(Irel);
2234
2235 }                                                               /* vc_clsindices */
2236
2237
2238 static void
2239 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2240 {
2241         IndDesc    *idcur;
2242         HeapTuple       pgIndexTup;
2243         AttrNumber *attnumP;
2244         int                     natts;
2245         int                     i;
2246
2247         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2248
2249         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2250         {
2251                 pgIndexTup =
2252                         SearchSysCacheTuple(INDEXRELID,
2253                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2254                                                                 0, 0, 0);
2255                 Assert(pgIndexTup);
2256                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2257                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2258                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2259                          attnumP++, natts++);
2260                 if (idcur->tform->indproc != InvalidOid)
2261                 {
2262                         idcur->finfoP = &(idcur->finfo);
2263                         FIgetnArgs(idcur->finfoP) = natts;
2264                         natts = 1;
2265                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2266                         *(FIgetname(idcur->finfoP)) = '\0';
2267                 }
2268                 else
2269                         idcur->finfoP = (FuncIndexInfo *) NULL;
2270
2271                 idcur->natts = natts;
2272         }
2273
2274 }                                                               /* vc_mkindesc */
2275
2276
2277 static bool
2278 vc_enough_space(VPageDescr vpd, Size len)
2279 {
2280
2281         len = DOUBLEALIGN(len);
2282
2283         if (len > vpd->vpd_free)
2284                 return (false);
2285
2286         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2287                 return (true);                  /* and len <= free_space */
2288
2289         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2290         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2291                 return (true);
2292
2293         return (false);
2294
2295 }                                                               /* vc_enough_space */