]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Massive commit to run PGINDENT on all *.c and *.h files.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.43 1997/09/07 04:41:02 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <utils/builtins.h>
46 #include <commands/vacuum.h>
47 #include <parser/catalog_utils.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 #include <rusagestub.h>
52 #else
53 #include <sys/time.h>
54 #include <sys/resource.h>
55 #endif
56
57 #include <port-protos.h>
58
59 bool                    VacuumRunning = false;
60
61 static Portal   vc_portal;
62
63 static int              MESSAGE_LEVEL;  /* message level */
64
65 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
66 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
67 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
68 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
69 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
70                                                                    stats->f_cmpgt != NULL && \
71                                                                    RegProcedureIsValid(stats->outfunc) )
72
73
74 /* non-export function prototypes */
75 static void             vc_init(void);
76 static void             vc_shutdown(void);
77 static void             vc_vacuum(NameData * VacRelP, bool analyze, List * va_cols);
78 static VRelList vc_getrels(NameData * VacRelP);
79 static void             vc_vacone(Oid relid, bool analyze, List * va_cols);
80 static void             vc_scanheap(VRelStats * vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
81 static void             vc_rpfheap(VRelStats * vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation * Irel);
82 static void             vc_vacheap(VRelStats * vacrelstats, Relation onerel, VPageList vpl);
83 static void             vc_vacpage(Page page, VPageDescr vpd, Relation archrel);
84 static void             vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
85 static void             vc_scanoneind(Relation indrel, int nhtups);
86 static void             vc_attrstats(Relation onerel, VRelStats * vacrelstats, HeapTuple htup);
87 static void             vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum * bucket, int16 * bucket_len);
88 static void             vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats * vacrelstats);
89 static void             vc_delhilowstats(Oid relid, int attcnt, int *attnums);
90 static void             vc_setpagelock(Relation rel, BlockNumber blkno);
91 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
92 static void             vc_reappage(VPageList vpl, VPageDescr vpc);
93 static void             vc_vpinsert(VPageList vpl, VPageDescr vpnew);
94 static void             vc_free(VRelList vrl);
95 static void             vc_getindices(Oid relid, int *nindices, Relation ** Irel);
96 static void             vc_clsindices(int nindices, Relation * Irel);
97 static Relation vc_getarchrel(Relation heaprel);
98 static void             vc_archive(Relation archrel, HeapTuple htup);
99 static bool             vc_isarchrel(char *rname);
100 static void             vc_mkindesc(Relation onerel, int nindices, Relation * Irel, IndDesc ** Idesc);
101 static char    *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
102 static int              vc_cmp_blk(char *left, char *right);
103 static int              vc_cmp_offno(char *left, char *right);
104 static bool             vc_enough_space(VPageDescr vpd, Size len);
105
106 void
107 vacuum(char *vacrel, bool verbose, bool analyze, List * va_spec)
108 {
109         char               *pname;
110         MemoryContext   old;
111         PortalVariableMemory pmem;
112         NameData                VacRel;
113         List               *le;
114         List               *va_cols = NIL;
115
116         /*
117          * Create a portal for safe memory across transctions.  We need to
118          * palloc the name space for it because our hash function expects the
119          * name to be on a longword boundary.  CreatePortal copies the name to
120          * safe storage for us.
121          */
122         pname = (char *) palloc(strlen(VACPNAME) + 1);
123         strcpy(pname, VACPNAME);
124         vc_portal = CreatePortal(pname);
125         pfree(pname);
126
127         if (verbose)
128                 MESSAGE_LEVEL = NOTICE;
129         else
130                 MESSAGE_LEVEL = DEBUG;
131
132         /* vacrel gets de-allocated on transaction commit */
133         if (vacrel)
134                 strcpy(VacRel.data, vacrel);
135
136         pmem = PortalGetVariableMemory(vc_portal);
137         old = MemoryContextSwitchTo((MemoryContext) pmem);
138
139         Assert(va_spec == NIL || analyze);
140         foreach(le, va_spec)
141         {
142                 char               *col = (char *) lfirst(le);
143                 char               *dest;
144
145                 dest = (char *) palloc(strlen(col) + 1);
146                 strcpy(dest, col);
147                 va_cols = lappend(va_cols, dest);
148         }
149         MemoryContextSwitchTo(old);
150
151         /* initialize vacuum cleaner */
152         vc_init();
153
154         /* vacuum the database */
155         if (vacrel)
156                 vc_vacuum(&VacRel, analyze, va_cols);
157         else
158                 vc_vacuum(NULL, analyze, NIL);
159
160         PortalDestroy(&vc_portal);
161
162         /* clean up */
163         vc_shutdown();
164 }
165
166 /*
167  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
168  *
169  *              We run exactly one vacuum cleaner at a time.  We use the file system
170  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
171  *              cleaner instantiation crosses transaction boundaries, and we'd lose
172  *              postgres-style locks at the end of every transaction.
173  *
174  *              The strangeness with committing and starting transactions in the
175  *              init and shutdown routines is due to the fact that the vacuum cleaner
176  *              is invoked via a sql command, and so is already executing inside
177  *              a transaction.  We need to leave ourselves in a predictable state
178  *              on entry and exit to the vacuum cleaner.  We commit the transaction
179  *              started in PostgresMain() inside vc_init(), and start one in
180  *              vc_shutdown() to match the commit waiting for us back in
181  *              PostgresMain().
182  */
183 static void
184 vc_init()
185 {
186         int                             fd;
187
188         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
189                 elog(WARN, "can't create lock file -- another vacuum cleaner running?");
190
191         close(fd);
192
193         /*
194          * By here, exclusive open on the lock file succeeded.  If we abort
195          * for any reason during vacuuming, we need to remove the lock file.
196          * This global variable is checked in the transaction manager on xact
197          * abort, and the routine vc_abort() is called if necessary.
198          */
199
200         VacuumRunning = true;
201
202         /* matches the StartTransaction in PostgresMain() */
203         CommitTransactionCommand();
204 }
205
206 static void
207 vc_shutdown()
208 {
209         /* on entry, not in a transaction */
210         if (unlink("pg_vlock") < 0)
211                 elog(WARN, "vacuum: can't destroy lock file!");
212
213         /* okay, we're done */
214         VacuumRunning = false;
215
216         /* matches the CommitTransaction in PostgresMain() */
217         StartTransactionCommand();
218
219 }
220
221 void
222 vc_abort()
223 {
224         /* on abort, remove the vacuum cleaner lock file */
225         unlink("pg_vlock");
226
227         VacuumRunning = false;
228 }
229
230 /*
231  *      vc_vacuum() -- vacuum the database.
232  *
233  *              This routine builds a list of relations to vacuum, and then calls
234  *              code that vacuums them one at a time.  We are careful to vacuum each
235  *              relation in a separate transaction in order to avoid holding too many
236  *              locks at one time.
237  */
238 static void
239 vc_vacuum(NameData * VacRelP, bool analyze, List * va_cols)
240 {
241         VRelList                vrl,
242                                         cur;
243
244         /* get list of relations */
245         vrl = vc_getrels(VacRelP);
246
247         if (analyze && VacRelP == NULL && vrl != NULL)
248                 vc_delhilowstats(InvalidOid, 0, NULL);
249
250         /* vacuum each heap relation */
251         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
252                 vc_vacone(cur->vrl_relid, analyze, va_cols);
253
254         vc_free(vrl);
255 }
256
257 static                  VRelList
258 vc_getrels(NameData * VacRelP)
259 {
260         Relation                pgclass;
261         TupleDesc               pgcdesc;
262         HeapScanDesc    pgcscan;
263         HeapTuple               pgctup;
264         Buffer                  buf;
265         PortalVariableMemory portalmem;
266         MemoryContext   old;
267         VRelList                vrl,
268                                         cur;
269         Datum                   d;
270         char               *rname;
271         char                    rkind;
272         int16                   smgrno;
273         bool                    n;
274         ScanKeyData             pgckey;
275         bool                    found = false;
276
277         StartTransactionCommand();
278
279         if (VacRelP->data)
280         {
281                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
282                                                            NameEqualRegProcedure,
283                                                            PointerGetDatum(VacRelP->data));
284         }
285         else
286         {
287                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
288                                                   CharacterEqualRegProcedure, CharGetDatum('r'));
289         }
290
291         portalmem = PortalGetVariableMemory(vc_portal);
292         vrl = cur = (VRelList) NULL;
293
294         pgclass = heap_openr(RelationRelationName);
295         pgcdesc = RelationGetTupleDescriptor(pgclass);
296
297         pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
298
299         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
300         {
301
302                 found = true;
303
304                 /*
305                  * We have to be careful not to vacuum the archive (since it
306                  * already contains vacuumed tuples), and not to vacuum relations
307                  * on write-once storage managers like the Sony jukebox at
308                  * Berkeley.
309                  */
310
311                 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relname,
312                                                                  pgcdesc, &n);
313                 rname = (char *) d;
314
315                 /* skip archive relations */
316                 if (vc_isarchrel(rname))
317                 {
318                         ReleaseBuffer(buf);
319                         continue;
320                 }
321
322                 /*
323                  * don't vacuum large objects for now - something breaks when we
324                  * do
325                  */
326                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
327                         rname[1] == 'i' && rname[2] == 'n' &&
328                         (rname[3] == 'v' || rname[3] == 'x') &&
329                         rname[4] >= '0' && rname[4] <= '9')
330                 {
331                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
332                                  rname);
333                         ReleaseBuffer(buf);
334                         continue;
335                 }
336
337                 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relsmgr,
338                                                                  pgcdesc, &n);
339                 smgrno = DatumGetInt16(d);
340
341                 /* skip write-once storage managers */
342                 if (smgriswo(smgrno))
343                 {
344                         ReleaseBuffer(buf);
345                         continue;
346                 }
347
348                 d = (Datum) heap_getattr(pgctup, buf, Anum_pg_class_relkind,
349                                                                  pgcdesc, &n);
350
351                 rkind = DatumGetChar(d);
352
353                 /* skip system relations */
354                 if (rkind != 'r')
355                 {
356                         ReleaseBuffer(buf);
357                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
358                         continue;
359                 }
360
361                 /* get a relation list entry for this guy */
362                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
363                 if (vrl == (VRelList) NULL)
364                 {
365                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
366                 }
367                 else
368                 {
369                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
370                         cur = cur->vrl_next;
371                 }
372                 MemoryContextSwitchTo(old);
373
374                 cur->vrl_relid = pgctup->t_oid;
375                 cur->vrl_next = (VRelList) NULL;
376
377                 /* wei hates it if you forget to do this */
378                 ReleaseBuffer(buf);
379         }
380         if (found == false)
381                 elog(NOTICE, "Vacuum: table not found");
382
383
384         heap_endscan(pgcscan);
385         heap_close(pgclass);
386
387         CommitTransactionCommand();
388
389         return (vrl);
390 }
391
392 /*
393  *      vc_vacone() -- vacuum one heap relation
394  *
395  *              This routine vacuums a single heap, cleans out its indices, and
396  *              updates its statistics npages and ntups statistics.
397  *
398  *              Doing one heap at a time incurs extra overhead, since we need to
399  *              check that the heap exists again just before we vacuum it.      The
400  *              reason that we do this is so that vacuuming can be spread across
401  *              many small transactions.  Otherwise, two-phase locking would require
402  *              us to lock the entire database during one pass of the vacuum cleaner.
403  */
404 static void
405 vc_vacone(Oid relid, bool analyze, List * va_cols)
406 {
407         Relation                pgclass;
408         TupleDesc               pgcdesc;
409         HeapTuple               pgctup,
410                                         pgttup;
411         Buffer                  pgcbuf;
412         HeapScanDesc    pgcscan;
413         Relation                onerel;
414         ScanKeyData             pgckey;
415         VPageListData   Vvpl;           /* List of pages to vacuum and/or clean
416                                                                  * indices */
417         VPageListData   Fvpl;           /* List of pages with space enough for
418                                                                  * re-using */
419         VPageDescr         *vpp;
420         Relation           *Irel;
421         int32                   nindices,
422                                         i;
423         VRelStats          *vacrelstats;
424
425         StartTransactionCommand();
426
427         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
428                                                    ObjectIdEqualRegProcedure,
429                                                    ObjectIdGetDatum(relid));
430
431         pgclass = heap_openr(RelationRelationName);
432         pgcdesc = RelationGetTupleDescriptor(pgclass);
433         pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
434
435         /*
436          * Race condition -- if the pg_class tuple has gone away since the
437          * last time we saw it, we don't need to vacuum it.
438          */
439
440         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
441         {
442                 heap_endscan(pgcscan);
443                 heap_close(pgclass);
444                 CommitTransactionCommand();
445                 return;
446         }
447
448         /* now open the class and vacuum it */
449         onerel = heap_open(relid);
450
451         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
452         vacrelstats->relid = relid;
453         vacrelstats->npages = vacrelstats->ntups = 0;
454         vacrelstats->hasindex = false;
455         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
456         {
457                 int                             attr_cnt,
458                                            *attnums = NULL;
459                 AttributeTupleForm *attr;
460
461                 attr_cnt = onerel->rd_att->natts;
462                 attr = onerel->rd_att->attrs;
463
464                 if (va_cols != NIL)
465                 {
466                         int                             tcnt = 0;
467                         List               *le;
468
469                         if (length(va_cols) > attr_cnt)
470                                 elog(WARN, "vacuum: too many attributes specified for relation %s",
471                                          (RelationGetRelationName(onerel))->data);
472                         attnums = (int *) palloc(attr_cnt * sizeof(int));
473                         foreach(le, va_cols)
474                         {
475                                 char               *col = (char *) lfirst(le);
476
477                                 for (i = 0; i < attr_cnt; i++)
478                                 {
479                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
480                                                 break;
481                                 }
482                                 if (i < attr_cnt)               /* found */
483                                         attnums[tcnt++] = i;
484                                 else
485                                 {
486                                         elog(WARN, "vacuum: there is no attribute %s in %s",
487                                                  col, (RelationGetRelationName(onerel))->data);
488                                 }
489                         }
490                         attr_cnt = tcnt;
491                 }
492
493                 vacrelstats->vacattrstats =
494                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
495
496                 for (i = 0; i < attr_cnt; i++)
497                 {
498                         Operator                func_operator;
499                         OperatorTupleForm pgopform;
500                         VacAttrStats   *stats;
501
502                         stats = &vacrelstats->vacattrstats[i];
503                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
504                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
505                         stats->best = stats->guess1 = stats->guess2 = 0;
506                         stats->max = stats->min = 0;
507                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
508                         stats->max_len = stats->min_len = 0;
509                         stats->initialized = false;
510                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
511                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
512
513                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
514                         if (func_operator != NULL)
515                         {
516                                 int                             nargs;
517
518                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
519                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq), &nargs);
520                         }
521                         else
522                                 stats->f_cmpeq = NULL;
523
524                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
525                         if (func_operator != NULL)
526                         {
527                                 int                             nargs;
528
529                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
530                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt), &nargs);
531                         }
532                         else
533                                 stats->f_cmplt = NULL;
534
535                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
536                         if (func_operator != NULL)
537                         {
538                                 int                             nargs;
539
540                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
541                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt), &nargs);
542                         }
543                         else
544                                 stats->f_cmpgt = NULL;
545
546                         pgttup = SearchSysCacheTuple(TYPOID,
547                                                                  ObjectIdGetDatum(stats->attr->atttypid),
548                                                                                  0, 0, 0);
549                         if (HeapTupleIsValid(pgttup))
550                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
551                         else
552                                 stats->outfunc = InvalidOid;
553                 }
554                 vacrelstats->va_natts = attr_cnt;
555                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
556                 if (attnums)
557                         pfree(attnums);
558         }
559         else
560         {
561                 vacrelstats->va_natts = 0;
562                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
563         }
564
565         /* we require the relation to be locked until the indices are cleaned */
566         RelationSetLockForWrite(onerel);
567
568         /* scan it */
569         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
570         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
571
572         /* Now open indices */
573         Irel = (Relation *) NULL;
574         vc_getindices(vacrelstats->relid, &nindices, &Irel);
575
576         if (nindices > 0)
577                 vacrelstats->hasindex = true;
578         else
579                 vacrelstats->hasindex = false;
580
581         /* Clean/scan index relation(s) */
582         if (Irel != (Relation *) NULL)
583         {
584                 if (Vvpl.vpl_npages > 0)
585                 {
586                         for (i = 0; i < nindices; i++)
587                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
588                 }
589                 else
590 /* just scan indices to update statistic */
591                 {
592                         for (i = 0; i < nindices; i++)
593                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
594                 }
595         }
596
597         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
598                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
599         else
600         {
601                 if (Irel != (Relation *) NULL)
602                         vc_clsindices(nindices, Irel);
603                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
604                         vc_vacheap(vacrelstats, onerel, &Vvpl);
605         }
606
607         /* ok - free Vvpl list of reapped pages */
608         if (Vvpl.vpl_npages > 0)
609         {
610                 vpp = Vvpl.vpl_pgdesc;
611                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
612                         pfree(*vpp);
613                 pfree(Vvpl.vpl_pgdesc);
614                 if (Fvpl.vpl_npages > 0)
615                         pfree(Fvpl.vpl_pgdesc);
616         }
617
618         /* all done with this class */
619         heap_close(onerel);
620         heap_endscan(pgcscan);
621         heap_close(pgclass);
622
623         /* update statistics in pg_class */
624         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
625                                 vacrelstats->hasindex, vacrelstats);
626
627         /* next command frees attribute stats */
628
629         CommitTransactionCommand();
630 }
631
632 /*
633  *      vc_scanheap() -- scan an open heap relation
634  *
635  *              This routine sets commit times, constructs Vvpl list of
636  *              empty/uninitialized pages and pages with dead tuples and
637  *              ~LP_USED line pointers, constructs Fvpl list of pages
638  *              appropriate for purposes of shrinking and maintains statistics
639  *              on the number of live tuples in a heap.
640  */
641 static void
642 vc_scanheap(VRelStats * vacrelstats, Relation onerel,
643                         VPageList Vvpl, VPageList Fvpl)
644 {
645         int                             nblocks,
646                                         blkno;
647         ItemId                  itemid;
648         ItemPointer             itemptr;
649         HeapTuple               htup;
650         Buffer                  buf;
651         Page                    page,
652                                         tempPage = NULL;
653         OffsetNumber    offnum,
654                                         maxoff;
655         bool                    pgchanged,
656                                         tupgone,
657                                         dobufrel,
658                                         notup;
659         char               *relname;
660         VPageDescr              vpc,
661                                         vp;
662         uint32                  nvac,
663                                         ntups,
664                                         nunused,
665                                         ncrash,
666                                         nempg,
667                                         nnepg,
668                                         nchpg,
669                                         nemend;
670         Size                    frsize,
671                                         frsusf;
672         Size                    min_tlen = MAXTUPLEN;
673         Size                    max_tlen = 0;
674         int32                   i /* , attr_cnt */ ;
675         struct rusage   ru0,
676                                         ru1;
677         bool                    do_shrinking = true;
678
679         getrusage(RUSAGE_SELF, &ru0);
680
681         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
682         frsize = frsusf = 0;
683
684         relname = (RelationGetRelationName(onerel))->data;
685
686         nblocks = RelationGetNumberOfBlocks(onerel);
687
688         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
689         vpc->vpd_nusd = 0;
690
691         for (blkno = 0; blkno < nblocks; blkno++)
692         {
693                 buf = ReadBuffer(onerel, blkno);
694                 page = BufferGetPage(buf);
695                 vpc->vpd_blkno = blkno;
696                 vpc->vpd_noff = 0;
697
698                 if (PageIsNew(page))
699                 {
700                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
701                                  relname, blkno);
702                         PageInit(page, BufferGetPageSize(buf), 0);
703                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
704                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
705                         nnepg++;
706                         nemend++;
707                         vc_reappage(Vvpl, vpc);
708                         WriteBuffer(buf);
709                         continue;
710                 }
711
712                 if (PageIsEmpty(page))
713                 {
714                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
715                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
716                         nempg++;
717                         nemend++;
718                         vc_reappage(Vvpl, vpc);
719                         ReleaseBuffer(buf);
720                         continue;
721                 }
722
723                 pgchanged = false;
724                 notup = true;
725                 maxoff = PageGetMaxOffsetNumber(page);
726                 for (offnum = FirstOffsetNumber;
727                          offnum <= maxoff;
728                          offnum = OffsetNumberNext(offnum))
729                 {
730                         itemid = PageGetItemId(page, offnum);
731
732                         /*
733                          * Collect un-used items too - it's possible to have indices
734                          * pointing here after crash.
735                          */
736                         if (!ItemIdIsUsed(itemid))
737                         {
738                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
739                                 nunused++;
740                                 continue;
741                         }
742
743                         htup = (HeapTuple) PageGetItem(page, itemid);
744                         tupgone = false;
745
746                         if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) &&
747                                 TransactionIdIsValid((TransactionId) htup->t_xmin))
748                         {
749
750                                 if (TransactionIdDidAbort(htup->t_xmin))
751                                 {
752                                         tupgone = true;
753                                 }
754                                 else if (TransactionIdDidCommit(htup->t_xmin))
755                                 {
756                                         htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
757                                         pgchanged = true;
758                                 }
759                                 else if (!TransactionIdIsInProgress(htup->t_xmin))
760                                 {
761
762                                         /*
763                                          * Not Aborted, Not Committed, Not in Progress - so it
764                                          * from crashed process. - vadim 11/26/96
765                                          */
766                                         ncrash++;
767                                         tupgone = true;
768                                 }
769                                 else
770                                 {
771                                         elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
772                                                  relname, blkno, offnum, htup->t_xmin);
773                                         do_shrinking = false;
774                                 }
775                         }
776
777                         if (TransactionIdIsValid((TransactionId) htup->t_xmax))
778                         {
779                                 if (TransactionIdDidAbort(htup->t_xmax))
780                                 {
781                                         StoreInvalidTransactionId(&(htup->t_xmax));
782                                         pgchanged = true;
783                                 }
784                                 else if (TransactionIdDidCommit(htup->t_xmax))
785                                         tupgone = true;
786                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
787                                 {
788
789                                         /*
790                                          * Not Aborted, Not Committed, Not in Progress - so it
791                                          * from crashed process. - vadim 06/02/97
792                                          */
793                                         StoreInvalidTransactionId(&(htup->t_xmax));
794                                         pgchanged = true;
795                                 }
796                                 else
797                                 {
798                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
799                                                  relname, blkno, offnum, htup->t_xmax);
800                                         do_shrinking = false;
801                                 }
802                         }
803
804                         /*
805                          * Is it possible at all ? - vadim 11/26/96
806                          */
807                         if (!TransactionIdIsValid((TransactionId) htup->t_xmin))
808                         {
809                                 elog(NOTICE, "Rel %s: TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
810 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.",
811                                          relname, blkno, offnum,
812                                          TransactionIdIsValid((TransactionId) htup->t_xmax),
813                                          tupgone);
814                         }
815
816                         /*
817                          * It's possibly! But from where it comes ? And should we fix
818                          * it ?  - vadim 11/28/96
819                          */
820                         itemptr = &(htup->t_ctid);
821                         if (!ItemPointerIsValid(itemptr) ||
822                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
823                         {
824                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
825                                          relname, blkno, offnum,
826                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
827                                          itemptr->ip_posid, tupgone);
828                         }
829
830                         /*
831                          * Other checks...
832                          */
833                         if (htup->t_len != itemid->lp_len)
834                         {
835                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
836                                          relname, blkno, offnum,
837                                          itemid->lp_len, htup->t_len, tupgone);
838                         }
839                         if (!OidIsValid(htup->t_oid))
840                         {
841                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
842                                          relname, blkno, offnum, tupgone);
843                         }
844
845                         if (tupgone)
846                         {
847                                 ItemId                  lpp;
848
849                                 if (tempPage == (Page) NULL)
850                                 {
851                                         Size                    pageSize;
852
853                                         pageSize = PageGetPageSize(page);
854                                         tempPage = (Page) palloc(pageSize);
855                                         memmove(tempPage, page, pageSize);
856                                 }
857
858                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
859
860                                 /* mark it unused */
861                                 lpp->lp_flags &= ~LP_USED;
862
863                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
864                                 nvac++;
865
866                         }
867                         else
868                         {
869                                 ntups++;
870                                 notup = false;
871                                 if (htup->t_len < min_tlen)
872                                         min_tlen = htup->t_len;
873                                 if (htup->t_len > max_tlen)
874                                         max_tlen = htup->t_len;
875                                 vc_attrstats(onerel, vacrelstats, htup);
876                         }
877                 }
878
879                 if (pgchanged)
880                 {
881                         WriteBuffer(buf);
882                         dobufrel = false;
883                         nchpg++;
884                 }
885                 else
886                         dobufrel = true;
887                 if (tempPage != (Page) NULL)
888                 {                                               /* Some tuples are gone */
889                         PageRepairFragmentation(tempPage);
890                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
891                         frsize += vpc->vpd_free;
892                         vc_reappage(Vvpl, vpc);
893                         pfree(tempPage);
894                         tempPage = (Page) NULL;
895                 }
896                 else if (vpc->vpd_noff > 0)
897                 {                                               /* there are only ~LP_USED line pointers */
898                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
899                         frsize += vpc->vpd_free;
900                         vc_reappage(Vvpl, vpc);
901                 }
902                 if (dobufrel)
903                         ReleaseBuffer(buf);
904                 if (notup)
905                         nemend++;
906                 else
907                         nemend = 0;
908         }
909
910         pfree(vpc);
911
912         /* save stats in the rel list for use later */
913         vacrelstats->ntups = ntups;
914         vacrelstats->npages = nblocks;
915 /*        vacrelstats->natts = attr_cnt;*/
916         if (ntups == 0)
917                 min_tlen = max_tlen = 0;
918         vacrelstats->min_tlen = min_tlen;
919         vacrelstats->max_tlen = max_tlen;
920
921         Vvpl->vpl_nemend = nemend;
922         Fvpl->vpl_nemend = nemend;
923
924         /*
925          * Try to make Fvpl keeping in mind that we can't use free space of
926          * "empty" end-pages and last page if it reapped.
927          */
928         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
929         {
930                 int                             nusf;   /* blocks usefull for re-using */
931
932                 nusf = Vvpl->vpl_npages - nemend;
933                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
934                         nusf--;
935
936                 for (i = 0; i < nusf; i++)
937                 {
938                         vp = Vvpl->vpl_pgdesc[i];
939                         if (vc_enough_space(vp, min_tlen))
940                         {
941                                 vc_vpinsert(Fvpl, vp);
942                                 frsusf += vp->vpd_free;
943                         }
944                 }
945         }
946
947         getrusage(RUSAGE_SELF, &ru1);
948
949         elog(MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
950 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
951                  relname,
952                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
953                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
954                  frsize, frsusf, nemend, Fvpl->vpl_npages,
955                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
956                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
957
958 }                                                               /* vc_scanheap */
959
960
961 /*
962  *      vc_rpfheap() -- try to repaire relation' fragmentation
963  *
964  *              This routine marks dead tuples as unused and tries re-use dead space
965  *              by moving tuples (and inserting indices if needed). It constructs
966  *              Nvpl list of free-ed pages (moved tuples) and clean indices
967  *              for them after committing (in hack-manner - without losing locks
968  *              and freeing memory!) current transaction. It truncates relation
969  *              if some end-blocks are gone away.
970  */
971 static void
972 vc_rpfheap(VRelStats * vacrelstats, Relation onerel,
973                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation * Irel)
974 {
975         TransactionId   myXID;
976         CommandId               myCID;
977         AbsoluteTime    myCTM = 0;
978         Buffer                  buf,
979                                         ToBuf;
980         int                             nblocks,
981                                         blkno;
982         Page                    page,
983                                         ToPage = NULL;
984         OffsetNumber    offnum = 0,
985                                         maxoff = 0,
986                                         newoff,
987                                         moff;
988         ItemId                  itemid,
989                                         newitemid;
990         HeapTuple               htup,
991                                         newtup;
992         TupleDesc               tupdesc = NULL;
993         Datum              *idatum = NULL;
994         char               *inulls = NULL;
995         InsertIndexResult iresult;
996         VPageListData   Nvpl;
997         VPageDescr              ToVpd = NULL,
998                                         Fvplast,
999                                         Vvplast,
1000                                         vpc,
1001                                    *vpp;
1002         int                             ToVpI = 0;
1003         IndDesc            *Idesc,
1004                                    *idcur;
1005         int                             Fblklast,
1006                                         Vblklast,
1007                                         i;
1008         Size                    tlen;
1009         int                             nmoved,
1010                                         Fnpages,
1011                                         Vnpages;
1012         int                             nchkmvd,
1013                                         ntups;
1014         bool                    isempty,
1015                                         dowrite;
1016         Relation                archrel;
1017         struct rusage   ru0,
1018                                         ru1;
1019
1020         getrusage(RUSAGE_SELF, &ru0);
1021
1022         myXID = GetCurrentTransactionId();
1023         myCID = GetCurrentCommandId();
1024
1025         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
1026         {
1027                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
1028                 tupdesc = RelationGetTupleDescriptor(onerel);
1029                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
1030                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
1031         }
1032
1033         /* if the relation has an archive, open it */
1034         if (onerel->rd_rel->relarch != 'n')
1035         {
1036                 archrel = vc_getarchrel(onerel);
1037                 /* Archive tuples from "empty" end-pages */
1038                 for (vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1,
1039                          i = Vvpl->vpl_nemend; i > 0; i--, vpp--)
1040                 {
1041                         if ((*vpp)->vpd_noff > 0)
1042                         {
1043                                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1044                                 page = BufferGetPage(buf);
1045                                 Assert(!PageIsEmpty(page));
1046                                 vc_vacpage(page, *vpp, archrel);
1047                                 WriteBuffer(buf);
1048                         }
1049                 }
1050         }
1051         else
1052                 archrel = (Relation) NULL;
1053
1054         Nvpl.vpl_npages = 0;
1055         Fnpages = Fvpl->vpl_npages;
1056         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1057         Fblklast = Fvplast->vpd_blkno;
1058         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
1059         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
1060         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1061         Vblklast = Vvplast->vpd_blkno;
1062         Assert(Vblklast >= Fblklast);
1063         ToBuf = InvalidBuffer;
1064         nmoved = 0;
1065
1066         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1067         vpc->vpd_nusd = vpc->vpd_noff = 0;
1068
1069         nblocks = vacrelstats->npages;
1070         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1071         {
1072                 /* if it's reapped page and it was used by me - quit */
1073                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1074                         break;
1075
1076                 buf = ReadBuffer(onerel, blkno);
1077                 page = BufferGetPage(buf);
1078
1079                 vpc->vpd_noff = 0;
1080
1081                 isempty = PageIsEmpty(page);
1082
1083                 dowrite = false;
1084                 if (blkno == Vblklast)  /* it's reapped page */
1085                 {
1086                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1087                         {                                       /* on this page - clean */
1088                                 Assert(!isempty);
1089                                 vc_vacpage(page, Vvplast, archrel);
1090                                 dowrite = true;
1091                         }
1092                         else
1093                         {
1094                                 Assert(isempty);
1095                         }
1096                         --Vnpages;
1097                         Assert(Vnpages > 0);
1098                         /* get prev reapped page from Vvpl */
1099                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1100                         Vblklast = Vvplast->vpd_blkno;
1101                         if (blkno == Fblklast)          /* this page in Fvpl too */
1102                         {
1103                                 --Fnpages;
1104                                 Assert(Fnpages > 0);
1105                                 Assert(Fvplast->vpd_nusd == 0);
1106                                 /* get prev reapped page from Fvpl */
1107                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1108                                 Fblklast = Fvplast->vpd_blkno;
1109                         }
1110                         Assert(Fblklast <= Vblklast);
1111                         if (isempty)
1112                         {
1113                                 ReleaseBuffer(buf);
1114                                 continue;
1115                         }
1116                 }
1117                 else
1118                 {
1119                         Assert(!isempty);
1120                 }
1121
1122                 vpc->vpd_blkno = blkno;
1123                 maxoff = PageGetMaxOffsetNumber(page);
1124                 for (offnum = FirstOffsetNumber;
1125                          offnum <= maxoff;
1126                          offnum = OffsetNumberNext(offnum))
1127                 {
1128                         itemid = PageGetItemId(page, offnum);
1129
1130                         if (!ItemIdIsUsed(itemid))
1131                                 continue;
1132
1133                         htup = (HeapTuple) PageGetItem(page, itemid);
1134                         tlen = htup->t_len;
1135
1136                         /* try to find new page for this tuple */
1137                         if (ToBuf == InvalidBuffer ||
1138                                 !vc_enough_space(ToVpd, tlen))
1139                         {
1140                                 if (ToBuf != InvalidBuffer)
1141                                 {
1142                                         WriteBuffer(ToBuf);
1143                                         ToBuf = InvalidBuffer;
1144
1145                                         /*
1146                                          * If no one tuple can't be added to this page -
1147                                          * remove page from Fvpl. - vadim 11/27/96
1148                                          */
1149                                         if (!vc_enough_space(ToVpd, vacrelstats->min_tlen))
1150                                         {
1151                                                 if (ToVpd != Fvplast)
1152                                                 {
1153                                                         Assert(Fnpages > ToVpI + 1);
1154                                                         memmove(Fvpl->vpl_pgdesc + ToVpI,
1155                                                                         Fvpl->vpl_pgdesc + ToVpI + 1,
1156                                                         sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1157                                                 }
1158                                                 Assert(Fnpages >= 1);
1159                                                 Fnpages--;
1160                                                 if (Fnpages == 0)
1161                                                         break;
1162                                                 /* get prev reapped page from Fvpl */
1163                                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1164                                                 Fblklast = Fvplast->vpd_blkno;
1165                                         }
1166                                 }
1167                                 for (i = 0; i < Fnpages; i++)
1168                                 {
1169                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1170                                                 break;
1171                                 }
1172                                 if (i == Fnpages)
1173                                         break;          /* can't move item anywhere */
1174                                 ToVpI = i;
1175                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1176                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1177                                 ToPage = BufferGetPage(ToBuf);
1178                                 /* if this page was not used before - clean it */
1179                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1180                                         vc_vacpage(ToPage, ToVpd, archrel);
1181                         }
1182
1183                         /* copy tuple */
1184                         newtup = (HeapTuple) palloc(tlen);
1185                         memmove((char *) newtup, (char *) htup, tlen);
1186
1187                         /* store transaction information */
1188                         TransactionIdStore(myXID, &(newtup->t_xmin));
1189                         newtup->t_cmin = myCID;
1190                         StoreInvalidTransactionId(&(newtup->t_xmax));
1191                         newtup->t_tmin = INVALID_ABSTIME;
1192                         newtup->t_tmax = CURRENT_ABSTIME;
1193                         ItemPointerSetInvalid(&newtup->t_chain);
1194
1195                         /* add tuple to the page */
1196                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1197                                                                  InvalidOffsetNumber, LP_USED);
1198                         if (newoff == InvalidOffsetNumber)
1199                         {
1200                                 elog(WARN, "\
1201 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1202                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1203                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1204                         }
1205                         newitemid = PageGetItemId(ToPage, newoff);
1206                         pfree(newtup);
1207                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1208                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1209
1210                         /* now logically delete end-tuple */
1211                         TransactionIdStore(myXID, &(htup->t_xmax));
1212                         htup->t_cmax = myCID;
1213                         memmove((char *) &(htup->t_chain), (char *) &(newtup->t_ctid), sizeof(newtup->t_ctid));
1214
1215                         ToVpd->vpd_nusd++;
1216                         nmoved++;
1217                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1218                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1219
1220                         /* insert index' tuples if needed */
1221                         if (Irel != (Relation *) NULL)
1222                         {
1223                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1224                                 {
1225                                         FormIndexDatum(
1226                                                                    idcur->natts,
1227                                                           (AttrNumber *) & (idcur->tform->indkey[0]),
1228                                                                    newtup,
1229                                                                    tupdesc,
1230                                                                    InvalidBuffer,
1231                                                                    idatum,
1232                                                                    inulls,
1233                                                                    idcur->finfoP);
1234                                         iresult = index_insert(
1235                                                                                    Irel[i],
1236                                                                                    idatum,
1237                                                                                    inulls,
1238                                                                                    &(newtup->t_ctid),
1239                                                                                    onerel);
1240                                         if (iresult)
1241                                                 pfree(iresult);
1242                                 }
1243                         }
1244
1245                 }                                               /* walk along page */
1246
1247                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1248                 {
1249                         vc_reappage(&Nvpl, vpc);
1250                         WriteBuffer(buf);
1251                 }
1252                 else if (dowrite)
1253                         WriteBuffer(buf);
1254                 else
1255                         ReleaseBuffer(buf);
1256
1257                 if (offnum <= maxoff)
1258                         break;                          /* some item(s) left */
1259
1260         }                                                       /* walk along relation */
1261
1262         blkno++;                                        /* new number of blocks */
1263
1264         if (ToBuf != InvalidBuffer)
1265         {
1266                 Assert(nmoved > 0);
1267                 WriteBuffer(ToBuf);
1268         }
1269
1270         if (nmoved > 0)
1271         {
1272
1273                 /*
1274                  * We have to commit our tuple' movings before we'll truncate
1275                  * relation, but we shouldn't lose our locks. And so - quick hack:
1276                  * flush buffers and record status of current transaction as
1277                  * committed, and continue. - vadim 11/13/96
1278                  */
1279                 FlushBufferPool(!TransactionFlushEnabled());
1280                 TransactionIdCommit(myXID);
1281                 FlushBufferPool(!TransactionFlushEnabled());
1282                 myCTM = TransactionIdGetCommitTime(myXID);
1283         }
1284
1285         /*
1286          * Clean uncleaned reapped pages from Vvpl list and set commit' times
1287          * for inserted tuples
1288          */
1289         nchkmvd = 0;
1290         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1291         {
1292                 Assert((*vpp)->vpd_blkno < blkno);
1293                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1294                 page = BufferGetPage(buf);
1295                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1296                 {
1297
1298                         /*
1299                          * noff == 0 in empty pages only - such pages should be
1300                          * re-used
1301                          */
1302                         Assert((*vpp)->vpd_noff > 0);
1303                         vc_vacpage(page, *vpp, archrel);
1304                 }
1305                 else
1306 /* this page was used */
1307                 {
1308                         ntups = 0;
1309                         moff = PageGetMaxOffsetNumber(page);
1310                         for (newoff = FirstOffsetNumber;
1311                                  newoff <= moff;
1312                                  newoff = OffsetNumberNext(newoff))
1313                         {
1314                                 itemid = PageGetItemId(page, newoff);
1315                                 if (!ItemIdIsUsed(itemid))
1316                                         continue;
1317                                 htup = (HeapTuple) PageGetItem(page, itemid);
1318                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1319                                 {
1320                                         htup->t_tmin = myCTM;
1321                                         ntups++;
1322                                 }
1323                         }
1324                         Assert((*vpp)->vpd_nusd == ntups);
1325                         nchkmvd += ntups;
1326                 }
1327                 WriteBuffer(buf);
1328         }
1329         Assert(nmoved == nchkmvd);
1330
1331         getrusage(RUSAGE_SELF, &ru1);
1332
1333         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1334 Elapsed %u/%u sec.",
1335                  (RelationGetRelationName(onerel))->data,
1336                  nblocks, blkno, nmoved,
1337                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1338                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1339
1340         if (Nvpl.vpl_npages > 0)
1341         {
1342                 /* vacuum indices again if needed */
1343                 if (Irel != (Relation *) NULL)
1344                 {
1345                         VPageDescr         *vpleft,
1346                                                    *vpright,
1347                                                         vpsave;
1348
1349                         /* re-sort Nvpl.vpl_pgdesc */
1350                         for (vpleft = Nvpl.vpl_pgdesc,
1351                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1352                                  vpleft < vpright; vpleft++, vpright--)
1353                         {
1354                                 vpsave = *vpleft;
1355                                 *vpleft = *vpright;
1356                                 *vpright = vpsave;
1357                         }
1358                         for (i = 0; i < nindices; i++)
1359                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1360                 }
1361
1362                 /*
1363                  * clean moved tuples from last page in Nvpl list if some tuples
1364                  * left there
1365                  */
1366                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1367                 {
1368                         Assert(vpc->vpd_blkno == blkno - 1);
1369                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1370                         page = BufferGetPage(buf);
1371                         ntups = 0;
1372                         maxoff = offnum;
1373                         for (offnum = FirstOffsetNumber;
1374                                  offnum < maxoff;
1375                                  offnum = OffsetNumberNext(offnum))
1376                         {
1377                                 itemid = PageGetItemId(page, offnum);
1378                                 if (!ItemIdIsUsed(itemid))
1379                                         continue;
1380                                 htup = (HeapTuple) PageGetItem(page, itemid);
1381                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1382                                 itemid->lp_flags &= ~LP_USED;
1383                                 ntups++;
1384                         }
1385                         Assert(vpc->vpd_noff == ntups);
1386                         PageRepairFragmentation(page);
1387                         WriteBuffer(buf);
1388                 }
1389
1390                 /* now - free new list of reapped pages */
1391                 vpp = Nvpl.vpl_pgdesc;
1392                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1393                         pfree(*vpp);
1394                 pfree(Nvpl.vpl_pgdesc);
1395         }
1396
1397         /* truncate relation */
1398         if (blkno < nblocks)
1399         {
1400                 blkno = smgrtruncate(onerel->rd_rel->relsmgr, onerel, blkno);
1401                 Assert(blkno >= 0);
1402                 vacrelstats->npages = blkno;    /* set new number of blocks */
1403         }
1404
1405         if (archrel != (Relation) NULL)
1406                 heap_close(archrel);
1407
1408         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1409         {
1410                 pfree(Idesc);
1411                 pfree(idatum);
1412                 pfree(inulls);
1413                 vc_clsindices(nindices, Irel);
1414         }
1415
1416         pfree(vpc);
1417
1418 }                                                               /* vc_rpfheap */
1419
1420 /*
1421  *      vc_vacheap() -- free dead tuples
1422  *
1423  *              This routine marks dead tuples as unused and truncates relation
1424  *              if there are "empty" end-blocks.
1425  */
1426 static void
1427 vc_vacheap(VRelStats * vacrelstats, Relation onerel, VPageList Vvpl)
1428 {
1429         Buffer                  buf;
1430         Page                    page;
1431         VPageDescr         *vpp;
1432         Relation                archrel;
1433         int                             nblocks;
1434         int                             i;
1435
1436         nblocks = Vvpl->vpl_npages;
1437         /* if the relation has an archive, open it */
1438         if (onerel->rd_rel->relarch != 'n')
1439                 archrel = vc_getarchrel(onerel);
1440         else
1441         {
1442                 archrel = (Relation) NULL;
1443                 nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1444         }
1445
1446         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1447         {
1448                 if ((*vpp)->vpd_noff > 0)
1449                 {
1450                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1451                         page = BufferGetPage(buf);
1452                         vc_vacpage(page, *vpp, archrel);
1453                         WriteBuffer(buf);
1454                 }
1455         }
1456
1457         /* truncate relation if there are some empty end-pages */
1458         if (Vvpl->vpl_nemend > 0)
1459         {
1460                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1461                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1462                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1463                          (RelationGetRelationName(onerel))->data,
1464                          vacrelstats->npages, nblocks);
1465
1466                 /*
1467                  * we have to flush "empty" end-pages (if changed, but who knows
1468                  * it) before truncation
1469                  */
1470                 FlushBufferPool(!TransactionFlushEnabled());
1471
1472                 nblocks = smgrtruncate(onerel->rd_rel->relsmgr, onerel, nblocks);
1473                 Assert(nblocks >= 0);
1474                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1475         }
1476
1477         if (archrel != (Relation) NULL)
1478                 heap_close(archrel);
1479
1480 }                                                               /* vc_vacheap */
1481
1482 /*
1483  *      vc_vacpage() -- free (and archive if needed) dead tuples on a page
1484  *                                       and repaire its fragmentation.
1485  */
1486 static void
1487 vc_vacpage(Page page, VPageDescr vpd, Relation archrel)
1488 {
1489         ItemId                  itemid;
1490         HeapTuple               htup;
1491         int                             i;
1492
1493         Assert(vpd->vpd_nusd == 0);
1494         for (i = 0; i < vpd->vpd_noff; i++)
1495         {
1496                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1497                 if (archrel != (Relation) NULL && ItemIdIsUsed(itemid))
1498                 {
1499                         htup = (HeapTuple) PageGetItem(page, itemid);
1500                         vc_archive(archrel, htup);
1501                 }
1502                 itemid->lp_flags &= ~LP_USED;
1503         }
1504         PageRepairFragmentation(page);
1505
1506 }                                                               /* vc_vacpage */
1507
1508 /*
1509  *      _vc_scanoneind() -- scan one index relation to update statistic.
1510  *
1511  */
1512 static void
1513 vc_scanoneind(Relation indrel, int nhtups)
1514 {
1515         RetrieveIndexResult res;
1516         IndexScanDesc   iscan;
1517         int                             nitups;
1518         int                             nipages;
1519         struct rusage   ru0,
1520                                         ru1;
1521
1522         getrusage(RUSAGE_SELF, &ru0);
1523
1524         /* walk through the entire index */
1525         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1526         nitups = 0;
1527
1528         while ((res = index_getnext(iscan, ForwardScanDirection))
1529                    != (RetrieveIndexResult) NULL)
1530         {
1531                 nitups++;
1532                 pfree(res);
1533         }
1534
1535         index_endscan(iscan);
1536
1537         /* now update statistics in pg_class */
1538         nipages = RelationGetNumberOfBlocks(indrel);
1539         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1540
1541         getrusage(RUSAGE_SELF, &ru1);
1542
1543         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1544                  indrel->rd_rel->relname.data, nipages, nitups,
1545                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1546                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1547
1548         if (nitups != nhtups)
1549                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1550                          indrel->rd_rel->relname.data, nitups, nhtups);
1551
1552 }                                                               /* vc_scanoneind */
1553
1554 /*
1555  *      vc_vaconeind() -- vacuum one index relation.
1556  *
1557  *              Vpl is the VPageList of the heap we're currently vacuuming.
1558  *              It's locked. Indrel is an index relation on the vacuumed heap.
1559  *              We don't set locks on the index relation here, since the indexed
1560  *              access methods support locking at different granularities.
1561  *              We let them handle it.
1562  *
1563  *              Finally, we arrange to update the index relation's statistics in
1564  *              pg_class.
1565  */
1566 static void
1567 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1568 {
1569         RetrieveIndexResult res;
1570         IndexScanDesc   iscan;
1571         ItemPointer             heapptr;
1572         int                             nvac;
1573         int                             nitups;
1574         int                             nipages;
1575         VPageDescr              vp;
1576         struct rusage   ru0,
1577                                         ru1;
1578
1579         getrusage(RUSAGE_SELF, &ru0);
1580
1581         /* walk through the entire index */
1582         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1583         nvac = 0;
1584         nitups = 0;
1585
1586         while ((res = index_getnext(iscan, ForwardScanDirection))
1587                    != (RetrieveIndexResult) NULL)
1588         {
1589                 heapptr = &res->heap_iptr;
1590
1591                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1592                 {
1593 #if 0
1594                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1595                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1596                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1597                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1598                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1599 #endif
1600                         if (vp->vpd_noff == 0)
1601                         {                                       /* this is EmptyPage !!! */
1602                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1603                                          indrel->rd_rel->relname.data,
1604                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1605                         }
1606                         ++nvac;
1607                         index_delete(indrel, &res->index_iptr);
1608                 }
1609                 else
1610                 {
1611                         nitups++;
1612                 }
1613
1614                 /* be tidy */
1615                 pfree(res);
1616         }
1617
1618         index_endscan(iscan);
1619
1620         /* now update statistics in pg_class */
1621         nipages = RelationGetNumberOfBlocks(indrel);
1622         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1623
1624         getrusage(RUSAGE_SELF, &ru1);
1625
1626         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1627                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1628                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1629                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1630
1631         if (nitups != nhtups)
1632                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1633                          indrel->rd_rel->relname.data, nitups, nhtups);
1634
1635 }                                                               /* vc_vaconeind */
1636
1637 /*
1638  *      vc_tidreapped() -- is a particular tid reapped?
1639  *
1640  *              vpl->VPageDescr_array is sorted in right order.
1641  */
1642 static                  VPageDescr
1643 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1644 {
1645         OffsetNumber    ioffno;
1646         OffsetNumber   *voff;
1647         VPageDescr              vp,
1648                                    *vpp;
1649         VPageDescrData  vpd;
1650
1651         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1652         ioffno = ItemPointerGetOffsetNumber(itemptr);
1653
1654         vp = &vpd;
1655         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1656                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1657                                                                         vc_cmp_blk);
1658
1659         if (vpp == (VPageDescr *) NULL)
1660                 return ((VPageDescr) NULL);
1661         vp = *vpp;
1662
1663         /* ok - we are on true page */
1664
1665         if (vp->vpd_noff == 0)
1666         {                                                       /* this is EmptyPage !!! */
1667                 return (vp);
1668         }
1669
1670         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1671                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1672                                                                            vc_cmp_offno);
1673
1674         if (voff == (OffsetNumber *) NULL)
1675                 return ((VPageDescr) NULL);
1676
1677         return (vp);
1678
1679 }                                                               /* vc_tidreapped */
1680
1681 /*
1682  *      vc_attrstats() -- compute column statistics used by the optimzer
1683  *
1684  *      We compute the column min, max, null and non-null counts.
1685  *      Plus we attempt to find the count of the value that occurs most
1686  *      frequently in each column
1687  *      These figures are used to compute the selectivity of the column
1688  *
1689  *      We use a three-bucked cache to get the most frequent item
1690  *      The 'guess' buckets count hits.  A cache miss causes guess1
1691  *      to get the most hit 'guess' item in the most recent cycle, and
1692  *      the new item goes into guess2.  Whenever the total count of hits
1693  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1694  *
1695  *      This method works perfectly for columns with unique values, and columns
1696  *      with only two unique values, plus nulls.
1697  *
1698  *      It becomes less perfect as the number of unique values increases and
1699  *      their distribution in the table becomes more random.
1700  *
1701  */
1702 static void
1703 vc_attrstats(Relation onerel, VRelStats * vacrelstats, HeapTuple htup)
1704 {
1705         int                             i,
1706                                         attr_cnt = vacrelstats->va_natts;
1707         VacAttrStats   *vacattrstats = vacrelstats->vacattrstats;
1708         TupleDesc               tupDesc = onerel->rd_att;
1709         Datum                   value;
1710         bool                    isnull;
1711
1712         for (i = 0; i < attr_cnt; i++)
1713         {
1714                 VacAttrStats   *stats = &vacattrstats[i];
1715                 bool                    value_hit = true;
1716
1717                 value = (Datum) heap_getattr(htup, InvalidBuffer,
1718                                                                   stats->attr->attnum, tupDesc, &isnull);
1719
1720                 if (!VacAttrStatsEqValid(stats))
1721                         continue;
1722
1723                 if (isnull)
1724                         stats->null_cnt++;
1725                 else
1726                 {
1727                         stats->nonnull_cnt++;
1728                         if (stats->initialized == false)
1729                         {
1730                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1731                                 /* best_cnt gets incremented later */
1732                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1733                                 stats->guess1_cnt = stats->guess1_hits = 1;
1734                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1735                                 stats->guess2_hits = 1;
1736                                 if (VacAttrStatsLtGtValid(stats))
1737                                 {
1738                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1739                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1740                                 }
1741                                 stats->initialized = true;
1742                         }
1743                         if (VacAttrStatsLtGtValid(stats))
1744                         {
1745                                 if ((*(stats->f_cmplt)) (value, stats->min))
1746                                 {
1747                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1748                                         stats->min_cnt = 0;
1749                                 }
1750                                 if ((*(stats->f_cmpgt)) (value, stats->max))
1751                                 {
1752                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1753                                         stats->max_cnt = 0;
1754                                 }
1755                                 if ((*(stats->f_cmpeq)) (value, stats->min))
1756                                         stats->min_cnt++;
1757                                 else if ((*(stats->f_cmpeq)) (value, stats->max))
1758                                         stats->max_cnt++;
1759                         }
1760                         if ((*(stats->f_cmpeq)) (value, stats->best))
1761                                 stats->best_cnt++;
1762                         else if ((*(stats->f_cmpeq)) (value, stats->guess1))
1763                         {
1764                                 stats->guess1_cnt++;
1765                                 stats->guess1_hits++;
1766                         }
1767                         else if ((*(stats->f_cmpeq)) (value, stats->guess2))
1768                                 stats->guess2_hits++;
1769                         else
1770                                 value_hit = false;
1771
1772                         if (stats->guess2_hits > stats->guess1_hits)
1773                         {
1774                                 swapDatum(stats->guess1, stats->guess2);
1775                                 swapInt(stats->guess1_len, stats->guess2_len);
1776                                 stats->guess1_cnt = stats->guess2_hits;
1777                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1778                         }
1779                         if (stats->guess1_cnt > stats->best_cnt)
1780                         {
1781                                 swapDatum(stats->best, stats->guess1);
1782                                 swapInt(stats->best_len, stats->guess1_len);
1783                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1784                                 stats->guess1_hits = 1;
1785                                 stats->guess2_hits = 1;
1786                         }
1787                         if (!value_hit)
1788                         {
1789                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1790                                 stats->guess1_hits = 1;
1791                                 stats->guess2_hits = 1;
1792                         }
1793                 }
1794         }
1795         return;
1796 }
1797
1798 /*
1799  *      vc_bucketcpy() -- update pg_class statistics for one relation
1800  *
1801  */
1802 static void
1803 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum * bucket, int16 * bucket_len)
1804 {
1805         if (attr->attbyval && attr->attlen != -1)
1806                 *bucket = value;
1807         else
1808         {
1809                 int                             len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1810
1811                 if (len > *bucket_len)
1812                 {
1813                         if (*bucket_len != 0)
1814                                 pfree(DatumGetPointer(*bucket));
1815                         *bucket = PointerGetDatum(palloc(len));
1816                         *bucket_len = len;
1817                 }
1818                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1819         }
1820 }
1821
1822 /*
1823  *      vc_updstats() -- update pg_class statistics for one relation
1824  *
1825  *              This routine works for both index and heap relation entries in
1826  *              pg_class.  We violate no-overwrite semantics here by storing new
1827  *              values for ntups, npages, and hasindex directly in the pg_class
1828  *              tuple that's already on the page.  The reason for this is that if
1829  *              we updated these tuples in the usual way, then every tuple in pg_class
1830  *              would be replaced every day.  This would make planning and executing
1831  *              historical queries very expensive.
1832  */
1833 static void
1834 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats * vacrelstats)
1835 {
1836         Relation                rd,
1837                                         ad,
1838                                         sd;
1839         HeapScanDesc    rsdesc,
1840                                         asdesc;
1841         TupleDesc               sdesc;
1842         HeapTuple               rtup,
1843                                         atup,
1844                                         stup;
1845         Buffer                  rbuf,
1846                                         abuf;
1847         Form_pg_class   pgcform;
1848         ScanKeyData             rskey,
1849                                         askey;
1850         AttributeTupleForm attp;
1851
1852         /*
1853          * update number of tuples and number of pages in pg_class
1854          */
1855         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1856                                                    ObjectIdEqualRegProcedure,
1857                                                    ObjectIdGetDatum(relid));
1858
1859         rd = heap_openr(RelationRelationName);
1860         rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1861
1862         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1863                 elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1864                          relid);
1865
1866         /* overwrite the existing statistics in the tuple */
1867         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1868         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1869         pgcform->reltuples = ntups;
1870         pgcform->relpages = npages;
1871         pgcform->relhasindex = hasindex;
1872
1873         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1874         {
1875                 VacAttrStats   *vacattrstats = vacrelstats->vacattrstats;
1876                 int                             natts = vacrelstats->va_natts;
1877
1878                 ad = heap_openr(AttributeRelationName);
1879                 sd = heap_openr(StatisticRelationName);
1880                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1881                                                            F_INT4EQ, relid);
1882
1883                 asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1884
1885                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1886                 {
1887                         int                             i;
1888                         float32data             selratio;       /* average ratio of rows selected
1889                                                                                  * for a random constant */
1890                         VacAttrStats   *stats;
1891                         Datum                   values[Natts_pg_statistic];
1892                         char                    nulls[Natts_pg_statistic];
1893
1894                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1895                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1896                                 /* they are unique anyway */
1897                                 continue;
1898
1899                         for (i = 0; i < natts; i++)
1900                         {
1901                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1902                                         break;
1903                         }
1904                         if (i >= natts)
1905                                 continue;
1906                         stats = &(vacattrstats[i]);
1907
1908                         /* overwrite the existing statistics in the tuple */
1909                         if (VacAttrStatsEqValid(stats))
1910                         {
1911
1912                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1913
1914                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1915                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1916                                         selratio = 0;
1917                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1918                                 {
1919                                         double                  min_cnt_d = stats->min_cnt,
1920                                                                         max_cnt_d = stats->max_cnt,
1921                                                                         null_cnt_d = stats->null_cnt,
1922                                                                         nonnullcnt_d = stats->nonnull_cnt;      /* prevent overflow */
1923
1924                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1925                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1926                                 }
1927                                 else
1928                                 {
1929                                         double                  most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1930                                         double                  total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1931
1932                                         /*
1933                                          * we assume count of other values are 20% of best
1934                                          * count in table
1935                                          */
1936                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1937                                 }
1938                                 if (selratio > 1.0)
1939                                         selratio = 1.0;
1940                                 attp->attdisbursion = selratio;
1941                                 WriteNoReleaseBuffer(abuf);
1942
1943                                 /* DO PG_STATISTIC INSERTS */
1944
1945                                 /*
1946                                  * doing system relations, especially pg_statistic is a
1947                                  * problem
1948                                  */
1949                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1950                                                                                                                                                  * !IsSystemRelationName(
1951                                                                                                                                                  * pgcform->relname.data)
1952                                                 */ )
1953                                 {
1954                                         func_ptr                out_function;
1955                                         char               *out_string;
1956                                         int                             dummy;
1957
1958                                         for (i = 0; i < Natts_pg_statistic; ++i)
1959                                                 nulls[i] = ' ';
1960
1961                                         /* ----------------
1962                                          *      initialize values[]
1963                                          * ----------------
1964                                          */
1965                                         i = 0;
1966                                         values[i++] = (Datum) relid;            /* 1 */
1967                                         values[i++] = (Datum) attp->attnum; /* 2 */
1968                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1969                                         fmgr_info(stats->outfunc, &out_function, &dummy);
1970                                         out_string = (*out_function) (stats->min, stats->attr->atttypid);
1971                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1972                                         pfree(out_string);
1973                                         out_string = (char *) (*out_function) (stats->max, stats->attr->atttypid);
1974                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1975                                         pfree(out_string);
1976
1977                                         sdesc = sd->rd_att;
1978
1979                                         stup = heap_formtuple(sdesc, values, nulls);
1980
1981                                         /* ----------------
1982                                          *      insert the tuple in the relation and get the tuple's oid.
1983                                          * ----------------
1984                                          */
1985                                         heap_insert(sd, stup);
1986                                         pfree(DatumGetPointer(values[3]));
1987                                         pfree(DatumGetPointer(values[4]));
1988                                         pfree(stup);
1989                                 }
1990                         }
1991                 }
1992                 heap_endscan(asdesc);
1993                 heap_close(ad);
1994                 heap_close(sd);
1995         }
1996
1997         /* XXX -- after write, should invalidate relcache in other backends */
1998         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1999
2000         /*
2001          * invalidating system relations confuses the function cache of
2002          * pg_operator and pg_opclass
2003          */
2004         if (!IsSystemRelationName(pgcform->relname.data))
2005                 RelationInvalidateHeapTuple(rd, rtup);
2006
2007         /* that's all, folks */
2008         heap_endscan(rsdesc);
2009         heap_close(rd);
2010 }
2011
2012 /*
2013  *      vc_delhilowstats() -- delete pg_statistics rows
2014  *
2015  */
2016 static void
2017 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
2018 {
2019         Relation                pgstatistic;
2020         HeapScanDesc    pgsscan;
2021         HeapTuple               pgstup;
2022         ScanKeyData             pgskey;
2023
2024         pgstatistic = heap_openr(StatisticRelationName);
2025
2026         if (relid != InvalidOid)
2027         {
2028                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
2029                                                            ObjectIdEqualRegProcedure,
2030                                                            ObjectIdGetDatum(relid));
2031                 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
2032         }
2033         else
2034                 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
2035
2036         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
2037         {
2038                 if (attcnt > 0)
2039                 {
2040                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
2041                         int                             i;
2042
2043                         for (i = 0; i < attcnt; i++)
2044                         {
2045                                 if (pgs->staattnum == attnums[i] + 1)
2046                                         break;
2047                         }
2048                         if (i >= attcnt)
2049                                 continue;               /* don't delete it */
2050                 }
2051                 heap_delete(pgstatistic, &pgstup->t_ctid);
2052         }
2053
2054         heap_endscan(pgsscan);
2055         heap_close(pgstatistic);
2056 }
2057
2058 static void
2059 vc_setpagelock(Relation rel, BlockNumber blkno)
2060 {
2061         ItemPointerData itm;
2062
2063         ItemPointerSet(&itm, blkno, 1);
2064
2065         RelationSetLockForWritePage(rel, &itm);
2066 }
2067
2068 /*
2069  *      vc_reappage() -- save a page on the array of reapped pages.
2070  *
2071  *              As a side effect of the way that the vacuuming loop for a given
2072  *              relation works, higher pages come after lower pages in the array
2073  *              (and highest tid on a page is last).
2074  */
2075 static void
2076 vc_reappage(VPageList vpl, VPageDescr vpc)
2077 {
2078         VPageDescr              newvpd;
2079
2080         /* allocate a VPageDescrData entry */
2081         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
2082
2083         /* fill it in */
2084         if (vpc->vpd_noff > 0)
2085                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
2086         newvpd->vpd_blkno = vpc->vpd_blkno;
2087         newvpd->vpd_free = vpc->vpd_free;
2088         newvpd->vpd_nusd = vpc->vpd_nusd;
2089         newvpd->vpd_noff = vpc->vpd_noff;
2090
2091         /* insert this page into vpl list */
2092         vc_vpinsert(vpl, newvpd);
2093
2094 }                                                               /* vc_reappage */
2095
2096 static void
2097 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2098 {
2099
2100         /* allocate a VPageDescr entry if needed */
2101         if (vpl->vpl_npages == 0)
2102                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2103         else if (vpl->vpl_npages % 100 == 0)
2104                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2105         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2106         (vpl->vpl_npages)++;
2107
2108 }
2109
2110 static void
2111 vc_free(VRelList vrl)
2112 {
2113         VRelList                p_vrl;
2114         MemoryContext   old;
2115         PortalVariableMemory pmem;
2116
2117         pmem = PortalGetVariableMemory(vc_portal);
2118         old = MemoryContextSwitchTo((MemoryContext) pmem);
2119
2120         while (vrl != (VRelList) NULL)
2121         {
2122
2123                 /* free rel list entry */
2124                 p_vrl = vrl;
2125                 vrl = vrl->vrl_next;
2126                 pfree(p_vrl);
2127         }
2128
2129         MemoryContextSwitchTo(old);
2130 }
2131
2132 /*
2133  *      vc_getarchrel() -- open the archive relation for a heap relation
2134  *
2135  *              The archive relation is named 'a,XXXXX' for the heap relation
2136  *              whose relid is XXXXX.
2137  */
2138
2139 #define ARCHIVE_PREFIX  "a,"
2140
2141 static                  Relation
2142 vc_getarchrel(Relation heaprel)
2143 {
2144         Relation                archrel;
2145         char               *archrelname;
2146
2147         archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
2148         sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
2149
2150         archrel = heap_openr(archrelname);
2151
2152         pfree(archrelname);
2153         return (archrel);
2154 }
2155
2156 /*
2157  *      vc_archive() -- write a tuple to an archive relation
2158  *
2159  *              In the future, this will invoke the archived accessd method.  For
2160  *              now, archive relations are on mag disk.
2161  */
2162 static void
2163 vc_archive(Relation archrel, HeapTuple htup)
2164 {
2165         doinsert(archrel, htup);
2166 }
2167
2168 static                  bool
2169 vc_isarchrel(char *rname)
2170 {
2171         if (strncmp(ARCHIVE_PREFIX, rname, strlen(ARCHIVE_PREFIX)) == 0)
2172                 return (true);
2173
2174         return (false);
2175 }
2176
2177 static char    *
2178 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2179 {
2180         int                             res;
2181         int                             last = nelem - 1;
2182         int                             celm = nelem / 2;
2183         bool                    last_move,
2184                                         first_move;
2185
2186         last_move = first_move = true;
2187         for (;;)
2188         {
2189                 if (first_move == true)
2190                 {
2191                         res = compar(bot, elm);
2192                         if (res > 0)
2193                                 return (NULL);
2194                         if (res == 0)
2195                                 return (bot);
2196                         first_move = false;
2197                 }
2198                 if (last_move == true)
2199                 {
2200                         res = compar(elm, bot + last * size);
2201                         if (res > 0)
2202                                 return (NULL);
2203                         if (res == 0)
2204                                 return (bot + last * size);
2205                         last_move = false;
2206                 }
2207                 res = compar(elm, bot + celm * size);
2208                 if (res == 0)
2209                         return (bot + celm * size);
2210                 if (res < 0)
2211                 {
2212                         if (celm == 0)
2213                                 return (NULL);
2214                         last = celm - 1;
2215                         celm = celm / 2;
2216                         last_move = true;
2217                         continue;
2218                 }
2219
2220                 if (celm == last)
2221                         return (NULL);
2222
2223                 last = last - celm - 1;
2224                 bot = bot + (celm + 1) * size;
2225                 celm = (last + 1) / 2;
2226                 first_move = true;
2227         }
2228
2229 }                                                               /* vc_find_eq */
2230
2231 static int
2232 vc_cmp_blk(char *left, char *right)
2233 {
2234         BlockNumber             lblk,
2235                                         rblk;
2236
2237         lblk = (*((VPageDescr *) left))->vpd_blkno;
2238         rblk = (*((VPageDescr *) right))->vpd_blkno;
2239
2240         if (lblk < rblk)
2241                 return (-1);
2242         if (lblk == rblk)
2243                 return (0);
2244         return (1);
2245
2246 }                                                               /* vc_cmp_blk */
2247
2248 static int
2249 vc_cmp_offno(char *left, char *right)
2250 {
2251
2252         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2253                 return (-1);
2254         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2255                 return (0);
2256         return (1);
2257
2258 }                                                               /* vc_cmp_offno */
2259
2260
2261 static void
2262 vc_getindices(Oid relid, int *nindices, Relation ** Irel)
2263 {
2264         Relation                pgindex;
2265         Relation                irel;
2266         TupleDesc               pgidesc;
2267         HeapTuple               pgitup;
2268         HeapScanDesc    pgiscan;
2269         Datum                   d;
2270         int                             i,
2271                                         k;
2272         bool                    n;
2273         ScanKeyData             pgikey;
2274         Oid                        *ioid;
2275
2276         *nindices = i = 0;
2277
2278         ioid = (Oid *) palloc(10 * sizeof(Oid));
2279
2280         /* prepare a heap scan on the pg_index relation */
2281         pgindex = heap_openr(IndexRelationName);
2282         pgidesc = RelationGetTupleDescriptor(pgindex);
2283
2284         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2285                                                    ObjectIdEqualRegProcedure,
2286                                                    ObjectIdGetDatum(relid));
2287
2288         pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2289
2290         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2291         {
2292                 d = (Datum) heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2293                                                                  pgidesc, &n);
2294                 i++;
2295                 if (i % 10 == 0)
2296                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2297                 ioid[i - 1] = DatumGetObjectId(d);
2298         }
2299
2300         heap_endscan(pgiscan);
2301         heap_close(pgindex);
2302
2303         if (i == 0)
2304         {                                                       /* No one index found */
2305                 pfree(ioid);
2306                 return;
2307         }
2308
2309         if (Irel != (Relation **) NULL)
2310                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2311
2312         for (k = 0; i > 0;)
2313         {
2314                 irel = index_open(ioid[--i]);
2315                 if (irel != (Relation) NULL)
2316                 {
2317                         if (Irel != (Relation **) NULL)
2318                                 (*Irel)[k] = irel;
2319                         else
2320                                 index_close(irel);
2321                         k++;
2322                 }
2323                 else
2324                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2325         }
2326         *nindices = k;
2327         pfree(ioid);
2328
2329         if (Irel != (Relation **) NULL && *nindices == 0)
2330         {
2331                 pfree(*Irel);
2332                 *Irel = (Relation *) NULL;
2333         }
2334
2335 }                                                               /* vc_getindices */
2336
2337
2338 static void
2339 vc_clsindices(int nindices, Relation * Irel)
2340 {
2341
2342         if (Irel == (Relation *) NULL)
2343                 return;
2344
2345         while (nindices--)
2346         {
2347                 index_close(Irel[nindices]);
2348         }
2349         pfree(Irel);
2350
2351 }                                                               /* vc_clsindices */
2352
2353
2354 static void
2355 vc_mkindesc(Relation onerel, int nindices, Relation * Irel, IndDesc ** Idesc)
2356 {
2357         IndDesc            *idcur;
2358         HeapTuple               pgIndexTup;
2359         AttrNumber         *attnumP;
2360         int                             natts;
2361         int                             i;
2362
2363         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2364
2365         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2366         {
2367                 pgIndexTup =
2368                         SearchSysCacheTuple(INDEXRELID,
2369                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2370                                                                 0, 0, 0);
2371                 Assert(pgIndexTup);
2372                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2373                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2374                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2375                          attnumP++, natts++);
2376                 if (idcur->tform->indproc != InvalidOid)
2377                 {
2378                         idcur->finfoP = &(idcur->finfo);
2379                         FIgetnArgs(idcur->finfoP) = natts;
2380                         natts = 1;
2381                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2382                         *(FIgetname(idcur->finfoP)) = '\0';
2383                 }
2384                 else
2385                         idcur->finfoP = (FuncIndexInfo *) NULL;
2386
2387                 idcur->natts = natts;
2388         }
2389
2390 }                                                               /* vc_mkindesc */
2391
2392
2393 static                  bool
2394 vc_enough_space(VPageDescr vpd, Size len)
2395 {
2396
2397         len = DOUBLEALIGN(len);
2398
2399         if (len > vpd->vpd_free)
2400                 return (false);
2401
2402         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2403                 return (true);                  /* and len <= free_space */
2404
2405         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2406         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2407                 return (true);
2408
2409         return (false);
2410
2411 }                                                               /* vc_enough_space */