]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Blowaway relation buffers from buffer pool before truncation.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.48 1997/09/22 07:12:33 vadim Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include <postgres.h>
22
23 #include <fmgr.h>
24 #include <utils/portal.h>
25 #include <access/genam.h>
26 #include <access/heapam.h>
27 #include <access/xact.h>
28 #include <storage/bufmgr.h>
29 #include <access/transam.h>
30 #include <catalog/pg_index.h>
31 #include <catalog/index.h>
32 #include <catalog/catname.h>
33 #include <catalog/catalog.h>
34 #include <catalog/pg_class.h>
35 #include <catalog/pg_proc.h>
36 #include <catalog/pg_statistic.h>
37 #include <catalog/pg_type.h>
38 #include <catalog/pg_operator.h>
39 #include <storage/smgr.h>
40 #include <storage/lmgr.h>
41 #include <utils/inval.h>
42 #include <utils/mcxt.h>
43 #include <utils/inval.h>
44 #include <utils/syscache.h>
45 #include <utils/builtins.h>
46 #include <commands/vacuum.h>
47 #include <parser/catalog_utils.h>
48 #include <storage/bufpage.h>
49 #include "storage/shmem.h"
50 #ifndef HAVE_GETRUSAGE
51 #include <rusagestub.h>
52 #else
53 #include <sys/time.h>
54 #include <sys/resource.h>
55 #endif
56
57 #include <port-protos.h>
58
59 extern int BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
60
61 bool            VacuumRunning = false;
62
63 static Portal vc_portal;
64
65 static int      MESSAGE_LEVEL;          /* message level */
66
67 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
68 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
69 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
70 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq != NULL )
71 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt != NULL && \
72                                                                    stats->f_cmpgt != NULL && \
73                                                                    RegProcedureIsValid(stats->outfunc) )
74
75
76 /* non-export function prototypes */
77 static void vc_init(void);
78 static void vc_shutdown(void);
79 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
80 static VRelList vc_getrels(NameData *VacRelP);
81 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
82 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
83 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
84 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
85 static void vc_vacpage(Page page, VPageDescr vpd, Relation archrel);
86 static void vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
87 static void vc_scanoneind(Relation indrel, int nhtups);
88 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
89 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
90 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
91 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
92 static void vc_setpagelock(Relation rel, BlockNumber blkno);
93 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
94 static void vc_reappage(VPageList vpl, VPageDescr vpc);
95 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
96 static void vc_free(VRelList vrl);
97 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
98 static void vc_clsindices(int nindices, Relation *Irel);
99 static Relation vc_getarchrel(Relation heaprel);
100 static void vc_archive(Relation archrel, HeapTuple htup);
101 static bool vc_isarchrel(char *rname);
102 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
103 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
104 static int      vc_cmp_blk(char *left, char *right);
105 static int      vc_cmp_offno(char *left, char *right);
106 static bool vc_enough_space(VPageDescr vpd, Size len);
107
108 void
109 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
110 {
111         char       *pname;
112         MemoryContext old;
113         PortalVariableMemory pmem;
114         NameData        VacRel;
115         List       *le;
116         List       *va_cols = NIL;
117
118         /*
119          * Create a portal for safe memory across transctions.  We need to
120          * palloc the name space for it because our hash function expects the
121          * name to be on a longword boundary.  CreatePortal copies the name to
122          * safe storage for us.
123          */
124         pname = (char *) palloc(strlen(VACPNAME) + 1);
125         strcpy(pname, VACPNAME);
126         vc_portal = CreatePortal(pname);
127         pfree(pname);
128
129         if (verbose)
130                 MESSAGE_LEVEL = NOTICE;
131         else
132                 MESSAGE_LEVEL = DEBUG;
133
134         /* vacrel gets de-allocated on transaction commit */
135         if (vacrel)
136                 strcpy(VacRel.data, vacrel);
137
138         pmem = PortalGetVariableMemory(vc_portal);
139         old = MemoryContextSwitchTo((MemoryContext) pmem);
140
141         Assert(va_spec == NIL || analyze);
142         foreach(le, va_spec)
143         {
144                 char       *col = (char *) lfirst(le);
145                 char       *dest;
146
147                 dest = (char *) palloc(strlen(col) + 1);
148                 strcpy(dest, col);
149                 va_cols = lappend(va_cols, dest);
150         }
151         MemoryContextSwitchTo(old);
152
153         /* initialize vacuum cleaner */
154         vc_init();
155
156         /* vacuum the database */
157         if (vacrel)
158                 vc_vacuum(&VacRel, analyze, va_cols);
159         else
160                 vc_vacuum(NULL, analyze, NIL);
161
162         PortalDestroy(&vc_portal);
163
164         /* clean up */
165         vc_shutdown();
166 }
167
168 /*
169  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
170  *
171  *              We run exactly one vacuum cleaner at a time.  We use the file system
172  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
173  *              cleaner instantiation crosses transaction boundaries, and we'd lose
174  *              postgres-style locks at the end of every transaction.
175  *
176  *              The strangeness with committing and starting transactions in the
177  *              init and shutdown routines is due to the fact that the vacuum cleaner
178  *              is invoked via a sql command, and so is already executing inside
179  *              a transaction.  We need to leave ourselves in a predictable state
180  *              on entry and exit to the vacuum cleaner.  We commit the transaction
181  *              started in PostgresMain() inside vc_init(), and start one in
182  *              vc_shutdown() to match the commit waiting for us back in
183  *              PostgresMain().
184  */
185 static void
186 vc_init()
187 {
188         int                     fd;
189
190         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
191                 elog(WARN, "can't create lock file -- another vacuum cleaner running?");
192
193         close(fd);
194
195         /*
196          * By here, exclusive open on the lock file succeeded.  If we abort
197          * for any reason during vacuuming, we need to remove the lock file.
198          * This global variable is checked in the transaction manager on xact
199          * abort, and the routine vc_abort() is called if necessary.
200          */
201
202         VacuumRunning = true;
203
204         /* matches the StartTransaction in PostgresMain() */
205         CommitTransactionCommand();
206 }
207
208 static void
209 vc_shutdown()
210 {
211         /* on entry, not in a transaction */
212         if (unlink("pg_vlock") < 0)
213                 elog(WARN, "vacuum: can't destroy lock file!");
214
215         /* okay, we're done */
216         VacuumRunning = false;
217
218         /* matches the CommitTransaction in PostgresMain() */
219         StartTransactionCommand();
220
221 }
222
223 void
224 vc_abort()
225 {
226         /* on abort, remove the vacuum cleaner lock file */
227         unlink("pg_vlock");
228
229         VacuumRunning = false;
230 }
231
232 /*
233  *      vc_vacuum() -- vacuum the database.
234  *
235  *              This routine builds a list of relations to vacuum, and then calls
236  *              code that vacuums them one at a time.  We are careful to vacuum each
237  *              relation in a separate transaction in order to avoid holding too many
238  *              locks at one time.
239  */
240 static void
241 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
242 {
243         VRelList        vrl,
244                                 cur;
245
246         /* get list of relations */
247         vrl = vc_getrels(VacRelP);
248
249         if (analyze && VacRelP == NULL && vrl != NULL)
250                 vc_delhilowstats(InvalidOid, 0, NULL);
251
252         /* vacuum each heap relation */
253         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
254                 vc_vacone(cur->vrl_relid, analyze, va_cols);
255
256         vc_free(vrl);
257 }
258
259 static VRelList
260 vc_getrels(NameData *VacRelP)
261 {
262         Relation        pgclass;
263         TupleDesc       pgcdesc;
264         HeapScanDesc pgcscan;
265         HeapTuple       pgctup;
266         Buffer          buf;
267         PortalVariableMemory portalmem;
268         MemoryContext old;
269         VRelList        vrl,
270                                 cur;
271         Datum           d;
272         char       *rname;
273         char            rkind;
274         int16           smgrno;
275         bool            n;
276         ScanKeyData pgckey;
277         bool            found = false;
278
279         StartTransactionCommand();
280
281         if (VacRelP->data)
282         {
283                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
284                                                            NameEqualRegProcedure,
285                                                            PointerGetDatum(VacRelP->data));
286         }
287         else
288         {
289                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
290                                                   CharacterEqualRegProcedure, CharGetDatum('r'));
291         }
292
293         portalmem = PortalGetVariableMemory(vc_portal);
294         vrl = cur = (VRelList) NULL;
295
296         pgclass = heap_openr(RelationRelationName);
297         pgcdesc = RelationGetTupleDescriptor(pgclass);
298
299         pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
300
301         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
302         {
303
304                 found = true;
305
306                 /*
307                  * We have to be careful not to vacuum the archive (since it
308                  * already contains vacuumed tuples), and not to vacuum relations
309                  * on write-once storage managers like the Sony jukebox at
310                  * Berkeley.
311                  */
312
313                 d = heap_getattr(pgctup, buf, Anum_pg_class_relname, pgcdesc, &n);
314                 rname = (char *) d;
315
316                 /* skip archive relations */
317                 if (vc_isarchrel(rname))
318                 {
319                         ReleaseBuffer(buf);
320                         continue;
321                 }
322
323                 /*
324                  * don't vacuum large objects for now - something breaks when we
325                  * do
326                  */
327                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
328                         rname[1] == 'i' && rname[2] == 'n' &&
329                         (rname[3] == 'v' || rname[3] == 'x') &&
330                         rname[4] >= '0' && rname[4] <= '9')
331                 {
332                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
333                                  rname);
334                         ReleaseBuffer(buf);
335                         continue;
336                 }
337
338                 d = heap_getattr(pgctup, buf, Anum_pg_class_relsmgr, pgcdesc, &n);
339                 smgrno = DatumGetInt16(d);
340
341                 /* skip write-once storage managers */
342                 if (smgriswo(smgrno))
343                 {
344                         ReleaseBuffer(buf);
345                         continue;
346                 }
347
348                 d = heap_getattr(pgctup, buf, Anum_pg_class_relkind, pgcdesc, &n);
349
350                 rkind = DatumGetChar(d);
351
352                 /* skip system relations */
353                 if (rkind != 'r')
354                 {
355                         ReleaseBuffer(buf);
356                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
357                         continue;
358                 }
359
360                 /* get a relation list entry for this guy */
361                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
362                 if (vrl == (VRelList) NULL)
363                 {
364                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
365                 }
366                 else
367                 {
368                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
369                         cur = cur->vrl_next;
370                 }
371                 MemoryContextSwitchTo(old);
372
373                 cur->vrl_relid = pgctup->t_oid;
374                 cur->vrl_next = (VRelList) NULL;
375
376                 /* wei hates it if you forget to do this */
377                 ReleaseBuffer(buf);
378         }
379         if (found == false)
380                 elog(NOTICE, "Vacuum: table not found");
381
382
383         heap_endscan(pgcscan);
384         heap_close(pgclass);
385
386         CommitTransactionCommand();
387
388         return (vrl);
389 }
390
391 /*
392  *      vc_vacone() -- vacuum one heap relation
393  *
394  *              This routine vacuums a single heap, cleans out its indices, and
395  *              updates its statistics npages and ntups statistics.
396  *
397  *              Doing one heap at a time incurs extra overhead, since we need to
398  *              check that the heap exists again just before we vacuum it.      The
399  *              reason that we do this is so that vacuuming can be spread across
400  *              many small transactions.  Otherwise, two-phase locking would require
401  *              us to lock the entire database during one pass of the vacuum cleaner.
402  */
403 static void
404 vc_vacone(Oid relid, bool analyze, List *va_cols)
405 {
406         Relation        pgclass;
407         TupleDesc       pgcdesc;
408         HeapTuple       pgctup,
409                                 pgttup;
410         Buffer          pgcbuf;
411         HeapScanDesc pgcscan;
412         Relation        onerel;
413         ScanKeyData pgckey;
414         VPageListData Vvpl;                     /* List of pages to vacuum and/or clean
415                                                                  * indices */
416         VPageListData Fvpl;                     /* List of pages with space enough for
417                                                                  * re-using */
418         VPageDescr *vpp;
419         Relation   *Irel;
420         int32           nindices,
421                                 i;
422         VRelStats  *vacrelstats;
423
424         StartTransactionCommand();
425
426         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
427                                                    ObjectIdEqualRegProcedure,
428                                                    ObjectIdGetDatum(relid));
429
430         pgclass = heap_openr(RelationRelationName);
431         pgcdesc = RelationGetTupleDescriptor(pgclass);
432         pgcscan = heap_beginscan(pgclass, false, NowTimeQual, 1, &pgckey);
433
434         /*
435          * Race condition -- if the pg_class tuple has gone away since the
436          * last time we saw it, we don't need to vacuum it.
437          */
438
439         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
440         {
441                 heap_endscan(pgcscan);
442                 heap_close(pgclass);
443                 CommitTransactionCommand();
444                 return;
445         }
446
447         /* now open the class and vacuum it */
448         onerel = heap_open(relid);
449
450         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
451         vacrelstats->relid = relid;
452         vacrelstats->npages = vacrelstats->ntups = 0;
453         vacrelstats->hasindex = false;
454         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
455         {
456                 int                     attr_cnt,
457                                    *attnums = NULL;
458                 AttributeTupleForm *attr;
459
460                 attr_cnt = onerel->rd_att->natts;
461                 attr = onerel->rd_att->attrs;
462
463                 if (va_cols != NIL)
464                 {
465                         int                     tcnt = 0;
466                         List       *le;
467
468                         if (length(va_cols) > attr_cnt)
469                                 elog(WARN, "vacuum: too many attributes specified for relation %s",
470                                          (RelationGetRelationName(onerel))->data);
471                         attnums = (int *) palloc(attr_cnt * sizeof(int));
472                         foreach(le, va_cols)
473                         {
474                                 char       *col = (char *) lfirst(le);
475
476                                 for (i = 0; i < attr_cnt; i++)
477                                 {
478                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
479                                                 break;
480                                 }
481                                 if (i < attr_cnt)               /* found */
482                                         attnums[tcnt++] = i;
483                                 else
484                                 {
485                                         elog(WARN, "vacuum: there is no attribute %s in %s",
486                                                  col, (RelationGetRelationName(onerel))->data);
487                                 }
488                         }
489                         attr_cnt = tcnt;
490                 }
491
492                 vacrelstats->vacattrstats =
493                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
494
495                 for (i = 0; i < attr_cnt; i++)
496                 {
497                         Operator        func_operator;
498                         OperatorTupleForm pgopform;
499                         VacAttrStats *stats;
500
501                         stats = &vacrelstats->vacattrstats[i];
502                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
503                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
504                         stats->best = stats->guess1 = stats->guess2 = 0;
505                         stats->max = stats->min = 0;
506                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
507                         stats->max_len = stats->min_len = 0;
508                         stats->initialized = false;
509                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
510                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
511
512                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
513                         if (func_operator != NULL)
514                         {
515                                 int                     nargs;
516
517                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
518                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq), &nargs);
519                         }
520                         else
521                                 stats->f_cmpeq = NULL;
522
523                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
524                         if (func_operator != NULL)
525                         {
526                                 int                     nargs;
527
528                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
529                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt), &nargs);
530                         }
531                         else
532                                 stats->f_cmplt = NULL;
533
534                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
535                         if (func_operator != NULL)
536                         {
537                                 int                     nargs;
538
539                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
540                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt), &nargs);
541                         }
542                         else
543                                 stats->f_cmpgt = NULL;
544
545                         pgttup = SearchSysCacheTuple(TYPOID,
546                                                                  ObjectIdGetDatum(stats->attr->atttypid),
547                                                                                  0, 0, 0);
548                         if (HeapTupleIsValid(pgttup))
549                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
550                         else
551                                 stats->outfunc = InvalidOid;
552                 }
553                 vacrelstats->va_natts = attr_cnt;
554                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
555                 if (attnums)
556                         pfree(attnums);
557         }
558         else
559         {
560                 vacrelstats->va_natts = 0;
561                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
562         }
563
564         /* we require the relation to be locked until the indices are cleaned */
565         RelationSetLockForWrite(onerel);
566
567         /* scan it */
568         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
569         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
570
571         /* Now open indices */
572         Irel = (Relation *) NULL;
573         vc_getindices(vacrelstats->relid, &nindices, &Irel);
574
575         if (nindices > 0)
576                 vacrelstats->hasindex = true;
577         else
578                 vacrelstats->hasindex = false;
579
580         /* Clean/scan index relation(s) */
581         if (Irel != (Relation *) NULL)
582         {
583                 if (Vvpl.vpl_npages > 0)
584                 {
585                         for (i = 0; i < nindices; i++)
586                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
587                 }
588                 else
589 /* just scan indices to update statistic */
590                 {
591                         for (i = 0; i < nindices; i++)
592                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
593                 }
594         }
595
596         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
597                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
598         else
599         {
600                 if (Irel != (Relation *) NULL)
601                         vc_clsindices(nindices, Irel);
602                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
603                         vc_vacheap(vacrelstats, onerel, &Vvpl);
604         }
605
606         /* ok - free Vvpl list of reapped pages */
607         if (Vvpl.vpl_npages > 0)
608         {
609                 vpp = Vvpl.vpl_pgdesc;
610                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
611                         pfree(*vpp);
612                 pfree(Vvpl.vpl_pgdesc);
613                 if (Fvpl.vpl_npages > 0)
614                         pfree(Fvpl.vpl_pgdesc);
615         }
616
617         /* all done with this class */
618         heap_close(onerel);
619         heap_endscan(pgcscan);
620         heap_close(pgclass);
621
622         /* update statistics in pg_class */
623         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
624                                 vacrelstats->hasindex, vacrelstats);
625
626         /* next command frees attribute stats */
627
628         CommitTransactionCommand();
629 }
630
631 /*
632  *      vc_scanheap() -- scan an open heap relation
633  *
634  *              This routine sets commit times, constructs Vvpl list of
635  *              empty/uninitialized pages and pages with dead tuples and
636  *              ~LP_USED line pointers, constructs Fvpl list of pages
637  *              appropriate for purposes of shrinking and maintains statistics
638  *              on the number of live tuples in a heap.
639  */
640 static void
641 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
642                         VPageList Vvpl, VPageList Fvpl)
643 {
644         int                     nblocks,
645                                 blkno;
646         ItemId          itemid;
647         ItemPointer itemptr;
648         HeapTuple       htup;
649         Buffer          buf;
650         Page            page,
651                                 tempPage = NULL;
652         OffsetNumber offnum,
653                                 maxoff;
654         bool            pgchanged,
655                                 tupgone,
656                                 dobufrel,
657                                 notup;
658         char       *relname;
659         VPageDescr      vpc,
660                                 vp;
661         uint32          nvac,
662                                 ntups,
663                                 nunused,
664                                 ncrash,
665                                 nempg,
666                                 nnepg,
667                                 nchpg,
668                                 nemend;
669         Size            frsize,
670                                 frsusf;
671         Size            min_tlen = MAXTUPLEN;
672         Size            max_tlen = 0;
673         int32           i /* , attr_cnt */ ;
674         struct rusage ru0,
675                                 ru1;
676         bool            do_shrinking = true;
677
678         getrusage(RUSAGE_SELF, &ru0);
679
680         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
681         frsize = frsusf = 0;
682
683         relname = (RelationGetRelationName(onerel))->data;
684
685         nblocks = RelationGetNumberOfBlocks(onerel);
686
687         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
688         vpc->vpd_nusd = 0;
689
690         for (blkno = 0; blkno < nblocks; blkno++)
691         {
692                 buf = ReadBuffer(onerel, blkno);
693                 page = BufferGetPage(buf);
694                 vpc->vpd_blkno = blkno;
695                 vpc->vpd_noff = 0;
696
697                 if (PageIsNew(page))
698                 {
699                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
700                                  relname, blkno);
701                         PageInit(page, BufferGetPageSize(buf), 0);
702                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
703                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
704                         nnepg++;
705                         nemend++;
706                         vc_reappage(Vvpl, vpc);
707                         WriteBuffer(buf);
708                         continue;
709                 }
710
711                 if (PageIsEmpty(page))
712                 {
713                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
714                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
715                         nempg++;
716                         nemend++;
717                         vc_reappage(Vvpl, vpc);
718                         ReleaseBuffer(buf);
719                         continue;
720                 }
721
722                 pgchanged = false;
723                 notup = true;
724                 maxoff = PageGetMaxOffsetNumber(page);
725                 for (offnum = FirstOffsetNumber;
726                          offnum <= maxoff;
727                          offnum = OffsetNumberNext(offnum))
728                 {
729                         itemid = PageGetItemId(page, offnum);
730
731                         /*
732                          * Collect un-used items too - it's possible to have indices
733                          * pointing here after crash.
734                          */
735                         if (!ItemIdIsUsed(itemid))
736                         {
737                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
738                                 nunused++;
739                                 continue;
740                         }
741
742                         htup = (HeapTuple) PageGetItem(page, itemid);
743                         tupgone = false;
744
745                         if (!AbsoluteTimeIsBackwardCompatiblyValid(htup->t_tmin) &&
746                                 TransactionIdIsValid((TransactionId) htup->t_xmin))
747                         {
748
749                                 if (TransactionIdDidAbort(htup->t_xmin))
750                                 {
751                                         tupgone = true;
752                                 }
753                                 else if (TransactionIdDidCommit(htup->t_xmin))
754                                 {
755                                         htup->t_tmin = TransactionIdGetCommitTime(htup->t_xmin);
756                                         pgchanged = true;
757                                 }
758                                 else if (!TransactionIdIsInProgress(htup->t_xmin))
759                                 {
760
761                                         /*
762                                          * Not Aborted, Not Committed, Not in Progress - so it
763                                          * from crashed process. - vadim 11/26/96
764                                          */
765                                         ncrash++;
766                                         tupgone = true;
767                                 }
768                                 else
769                                 {
770                                         elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
771                                                  relname, blkno, offnum, htup->t_xmin);
772                                         do_shrinking = false;
773                                 }
774                         }
775
776                         if (TransactionIdIsValid((TransactionId) htup->t_xmax))
777                         {
778                                 if (TransactionIdDidAbort(htup->t_xmax))
779                                 {
780                                         StoreInvalidTransactionId(&(htup->t_xmax));
781                                         pgchanged = true;
782                                 }
783                                 else if (TransactionIdDidCommit(htup->t_xmax))
784                                         tupgone = true;
785                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
786                                 {
787
788                                         /*
789                                          * Not Aborted, Not Committed, Not in Progress - so it
790                                          * from crashed process. - vadim 06/02/97
791                                          */
792                                         StoreInvalidTransactionId(&(htup->t_xmax));
793                                         pgchanged = true;
794                                 }
795                                 else
796                                 {
797                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
798                                                  relname, blkno, offnum, htup->t_xmax);
799                                         do_shrinking = false;
800                                 }
801                         }
802
803                         /*
804                          * Is it possible at all ? - vadim 11/26/96
805                          */
806                         if (!TransactionIdIsValid((TransactionId) htup->t_xmin))
807                         {
808                                 elog(NOTICE, "Rel %s: TID %u/%u: INSERT_TRANSACTION_ID IS INVALID. \
809 DELETE_TRANSACTION_ID_VALID %d, TUPGONE %d.",
810                                          relname, blkno, offnum,
811                                          TransactionIdIsValid((TransactionId) htup->t_xmax),
812                                          tupgone);
813                         }
814
815                         /*
816                          * It's possibly! But from where it comes ? And should we fix
817                          * it ?  - vadim 11/28/96
818                          */
819                         itemptr = &(htup->t_ctid);
820                         if (!ItemPointerIsValid(itemptr) ||
821                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
822                         {
823                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
824                                          relname, blkno, offnum,
825                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
826                                          itemptr->ip_posid, tupgone);
827                         }
828
829                         /*
830                          * Other checks...
831                          */
832                         if (htup->t_len != itemid->lp_len)
833                         {
834                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
835                                          relname, blkno, offnum,
836                                          itemid->lp_len, htup->t_len, tupgone);
837                         }
838                         if (!OidIsValid(htup->t_oid))
839                         {
840                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
841                                          relname, blkno, offnum, tupgone);
842                         }
843
844                         if (tupgone)
845                         {
846                                 ItemId          lpp;
847
848                                 if (tempPage == (Page) NULL)
849                                 {
850                                         Size            pageSize;
851
852                                         pageSize = PageGetPageSize(page);
853                                         tempPage = (Page) palloc(pageSize);
854                                         memmove(tempPage, page, pageSize);
855                                 }
856
857                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
858
859                                 /* mark it unused */
860                                 lpp->lp_flags &= ~LP_USED;
861
862                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
863                                 nvac++;
864
865                         }
866                         else
867                         {
868                                 ntups++;
869                                 notup = false;
870                                 if (htup->t_len < min_tlen)
871                                         min_tlen = htup->t_len;
872                                 if (htup->t_len > max_tlen)
873                                         max_tlen = htup->t_len;
874                                 vc_attrstats(onerel, vacrelstats, htup);
875                         }
876                 }
877
878                 if (pgchanged)
879                 {
880                         WriteBuffer(buf);
881                         dobufrel = false;
882                         nchpg++;
883                 }
884                 else
885                         dobufrel = true;
886                 if (tempPage != (Page) NULL)
887                 {                                               /* Some tuples are gone */
888                         PageRepairFragmentation(tempPage);
889                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
890                         frsize += vpc->vpd_free;
891                         vc_reappage(Vvpl, vpc);
892                         pfree(tempPage);
893                         tempPage = (Page) NULL;
894                 }
895                 else if (vpc->vpd_noff > 0)
896                 {                                               /* there are only ~LP_USED line pointers */
897                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
898                         frsize += vpc->vpd_free;
899                         vc_reappage(Vvpl, vpc);
900                 }
901                 if (dobufrel)
902                         ReleaseBuffer(buf);
903                 if (notup)
904                         nemend++;
905                 else
906                         nemend = 0;
907         }
908
909         pfree(vpc);
910
911         /* save stats in the rel list for use later */
912         vacrelstats->ntups = ntups;
913         vacrelstats->npages = nblocks;
914 /*        vacrelstats->natts = attr_cnt;*/
915         if (ntups == 0)
916                 min_tlen = max_tlen = 0;
917         vacrelstats->min_tlen = min_tlen;
918         vacrelstats->max_tlen = max_tlen;
919
920         Vvpl->vpl_nemend = nemend;
921         Fvpl->vpl_nemend = nemend;
922
923         /*
924          * Try to make Fvpl keeping in mind that we can't use free space of
925          * "empty" end-pages and last page if it reapped.
926          */
927         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
928         {
929                 int                     nusf;           /* blocks usefull for re-using */
930
931                 nusf = Vvpl->vpl_npages - nemend;
932                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
933                         nusf--;
934
935                 for (i = 0; i < nusf; i++)
936                 {
937                         vp = Vvpl->vpl_pgdesc[i];
938                         if (vc_enough_space(vp, min_tlen))
939                         {
940                                 vc_vpinsert(Fvpl, vp);
941                                 frsusf += vp->vpd_free;
942                         }
943                 }
944         }
945
946         getrusage(RUSAGE_SELF, &ru1);
947
948         elog(MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
949 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
950                  relname,
951                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
952                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
953                  frsize, frsusf, nemend, Fvpl->vpl_npages,
954                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
955                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
956
957 }                                                               /* vc_scanheap */
958
959
960 /*
961  *      vc_rpfheap() -- try to repaire relation' fragmentation
962  *
963  *              This routine marks dead tuples as unused and tries re-use dead space
964  *              by moving tuples (and inserting indices if needed). It constructs
965  *              Nvpl list of free-ed pages (moved tuples) and clean indices
966  *              for them after committing (in hack-manner - without losing locks
967  *              and freeing memory!) current transaction. It truncates relation
968  *              if some end-blocks are gone away.
969  */
970 static void
971 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
972                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
973 {
974         TransactionId myXID;
975         CommandId       myCID;
976         AbsoluteTime myCTM = 0;
977         Buffer          buf,
978                                 ToBuf;
979         int                     nblocks,
980                                 blkno;
981         Page            page,
982                                 ToPage = NULL;
983         OffsetNumber offnum = 0,
984                                 maxoff = 0,
985                                 newoff,
986                                 moff;
987         ItemId          itemid,
988                                 newitemid;
989         HeapTuple       htup,
990                                 newtup;
991         TupleDesc       tupdesc = NULL;
992         Datum      *idatum = NULL;
993         char       *inulls = NULL;
994         InsertIndexResult iresult;
995         VPageListData Nvpl;
996         VPageDescr      ToVpd = NULL,
997                                 Fvplast,
998                                 Vvplast,
999                                 vpc,
1000                            *vpp;
1001         int                     ToVpI = 0;
1002         IndDesc    *Idesc,
1003                            *idcur;
1004         int                     Fblklast,
1005                                 Vblklast,
1006                                 i;
1007         Size            tlen;
1008         int                     nmoved,
1009                                 Fnpages,
1010                                 Vnpages;
1011         int                     nchkmvd,
1012                                 ntups;
1013         bool            isempty,
1014                                 dowrite;
1015         Relation        archrel;
1016         struct rusage ru0,
1017                                 ru1;
1018
1019         getrusage(RUSAGE_SELF, &ru0);
1020
1021         myXID = GetCurrentTransactionId();
1022         myCID = GetCurrentCommandId();
1023
1024         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
1025         {
1026                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
1027                 tupdesc = RelationGetTupleDescriptor(onerel);
1028                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
1029                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
1030         }
1031
1032         /* if the relation has an archive, open it */
1033         if (onerel->rd_rel->relarch != 'n')
1034         {
1035                 archrel = vc_getarchrel(onerel);
1036                 /* Archive tuples from "empty" end-pages */
1037                 for (vpp = Vvpl->vpl_pgdesc + Vvpl->vpl_npages - 1,
1038                          i = Vvpl->vpl_nemend; i > 0; i--, vpp--)
1039                 {
1040                         if ((*vpp)->vpd_noff > 0)
1041                         {
1042                                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1043                                 page = BufferGetPage(buf);
1044                                 Assert(!PageIsEmpty(page));
1045                                 vc_vacpage(page, *vpp, archrel);
1046                                 WriteBuffer(buf);
1047                         }
1048                 }
1049         }
1050         else
1051                 archrel = (Relation) NULL;
1052
1053         Nvpl.vpl_npages = 0;
1054         Fnpages = Fvpl->vpl_npages;
1055         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1056         Fblklast = Fvplast->vpd_blkno;
1057         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
1058         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
1059         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1060         Vblklast = Vvplast->vpd_blkno;
1061         Assert(Vblklast >= Fblklast);
1062         ToBuf = InvalidBuffer;
1063         nmoved = 0;
1064
1065         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1066         vpc->vpd_nusd = vpc->vpd_noff = 0;
1067
1068         nblocks = vacrelstats->npages;
1069         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1070         {
1071                 /* if it's reapped page and it was used by me - quit */
1072                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1073                         break;
1074
1075                 buf = ReadBuffer(onerel, blkno);
1076                 page = BufferGetPage(buf);
1077
1078                 vpc->vpd_noff = 0;
1079
1080                 isempty = PageIsEmpty(page);
1081
1082                 dowrite = false;
1083                 if (blkno == Vblklast)  /* it's reapped page */
1084                 {
1085                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1086                         {                                       /* on this page - clean */
1087                                 Assert(!isempty);
1088                                 vc_vacpage(page, Vvplast, archrel);
1089                                 dowrite = true;
1090                         }
1091                         else
1092                         {
1093                                 Assert(isempty);
1094                         }
1095                         --Vnpages;
1096                         Assert(Vnpages > 0);
1097                         /* get prev reapped page from Vvpl */
1098                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1099                         Vblklast = Vvplast->vpd_blkno;
1100                         if (blkno == Fblklast)          /* this page in Fvpl too */
1101                         {
1102                                 --Fnpages;
1103                                 Assert(Fnpages > 0);
1104                                 Assert(Fvplast->vpd_nusd == 0);
1105                                 /* get prev reapped page from Fvpl */
1106                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1107                                 Fblklast = Fvplast->vpd_blkno;
1108                         }
1109                         Assert(Fblklast <= Vblklast);
1110                         if (isempty)
1111                         {
1112                                 ReleaseBuffer(buf);
1113                                 continue;
1114                         }
1115                 }
1116                 else
1117                 {
1118                         Assert(!isempty);
1119                 }
1120
1121                 vpc->vpd_blkno = blkno;
1122                 maxoff = PageGetMaxOffsetNumber(page);
1123                 for (offnum = FirstOffsetNumber;
1124                          offnum <= maxoff;
1125                          offnum = OffsetNumberNext(offnum))
1126                 {
1127                         itemid = PageGetItemId(page, offnum);
1128
1129                         if (!ItemIdIsUsed(itemid))
1130                                 continue;
1131
1132                         htup = (HeapTuple) PageGetItem(page, itemid);
1133                         tlen = htup->t_len;
1134
1135                         /* try to find new page for this tuple */
1136                         if (ToBuf == InvalidBuffer ||
1137                                 !vc_enough_space(ToVpd, tlen))
1138                         {
1139                                 if (ToBuf != InvalidBuffer)
1140                                 {
1141                                         WriteBuffer(ToBuf);
1142                                         ToBuf = InvalidBuffer;
1143
1144                                         /*
1145                                          * If no one tuple can't be added to this page -
1146                                          * remove page from Fvpl. - vadim 11/27/96
1147                                          */
1148                                         if (!vc_enough_space(ToVpd, vacrelstats->min_tlen))
1149                                         {
1150                                                 if (ToVpd != Fvplast)
1151                                                 {
1152                                                         Assert(Fnpages > ToVpI + 1);
1153                                                         memmove(Fvpl->vpl_pgdesc + ToVpI,
1154                                                                         Fvpl->vpl_pgdesc + ToVpI + 1,
1155                                                         sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1156                                                 }
1157                                                 Assert(Fnpages >= 1);
1158                                                 Fnpages--;
1159                                                 if (Fnpages == 0)
1160                                                         break;
1161                                                 /* get prev reapped page from Fvpl */
1162                                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1163                                                 Fblklast = Fvplast->vpd_blkno;
1164                                         }
1165                                 }
1166                                 for (i = 0; i < Fnpages; i++)
1167                                 {
1168                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1169                                                 break;
1170                                 }
1171                                 if (i == Fnpages)
1172                                         break;          /* can't move item anywhere */
1173                                 ToVpI = i;
1174                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1175                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1176                                 ToPage = BufferGetPage(ToBuf);
1177                                 /* if this page was not used before - clean it */
1178                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1179                                         vc_vacpage(ToPage, ToVpd, archrel);
1180                         }
1181
1182                         /* copy tuple */
1183                         newtup = (HeapTuple) palloc(tlen);
1184                         memmove((char *) newtup, (char *) htup, tlen);
1185
1186                         /* store transaction information */
1187                         TransactionIdStore(myXID, &(newtup->t_xmin));
1188                         newtup->t_cmin = myCID;
1189                         StoreInvalidTransactionId(&(newtup->t_xmax));
1190                         newtup->t_tmin = INVALID_ABSTIME;
1191                         newtup->t_tmax = CURRENT_ABSTIME;
1192                         ItemPointerSetInvalid(&newtup->t_chain);
1193
1194                         /* add tuple to the page */
1195                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1196                                                                  InvalidOffsetNumber, LP_USED);
1197                         if (newoff == InvalidOffsetNumber)
1198                         {
1199                                 elog(WARN, "\
1200 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1201                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1202                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1203                         }
1204                         newitemid = PageGetItemId(ToPage, newoff);
1205                         pfree(newtup);
1206                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1207                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1208
1209                         /* now logically delete end-tuple */
1210                         TransactionIdStore(myXID, &(htup->t_xmax));
1211                         htup->t_cmax = myCID;
1212                         memmove((char *) &(htup->t_chain), (char *) &(newtup->t_ctid), sizeof(newtup->t_ctid));
1213
1214                         ToVpd->vpd_nusd++;
1215                         nmoved++;
1216                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1217                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1218
1219                         /* insert index' tuples if needed */
1220                         if (Irel != (Relation *) NULL)
1221                         {
1222                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1223                                 {
1224                                         FormIndexDatum(
1225                                                                    idcur->natts,
1226                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1227                                                                    newtup,
1228                                                                    tupdesc,
1229                                                                    InvalidBuffer,
1230                                                                    idatum,
1231                                                                    inulls,
1232                                                                    idcur->finfoP);
1233                                         iresult = index_insert(
1234                                                                                    Irel[i],
1235                                                                                    idatum,
1236                                                                                    inulls,
1237                                                                                    &(newtup->t_ctid),
1238                                                                                    onerel);
1239                                         if (iresult)
1240                                                 pfree(iresult);
1241                                 }
1242                         }
1243
1244                 }                                               /* walk along page */
1245
1246                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1247                 {
1248                         vc_reappage(&Nvpl, vpc);
1249                         WriteBuffer(buf);
1250                 }
1251                 else if (dowrite)
1252                         WriteBuffer(buf);
1253                 else
1254                         ReleaseBuffer(buf);
1255
1256                 if (offnum <= maxoff)
1257                         break;                          /* some item(s) left */
1258
1259         }                                                       /* walk along relation */
1260
1261         blkno++;                                        /* new number of blocks */
1262
1263         if (ToBuf != InvalidBuffer)
1264         {
1265                 Assert(nmoved > 0);
1266                 WriteBuffer(ToBuf);
1267         }
1268
1269         if (nmoved > 0)
1270         {
1271
1272                 /*
1273                  * We have to commit our tuple' movings before we'll truncate
1274                  * relation, but we shouldn't lose our locks. And so - quick hack:
1275                  * flush buffers and record status of current transaction as
1276                  * committed, and continue. - vadim 11/13/96
1277                  */
1278                 FlushBufferPool(!TransactionFlushEnabled());
1279                 TransactionIdCommit(myXID);
1280                 FlushBufferPool(!TransactionFlushEnabled());
1281                 myCTM = TransactionIdGetCommitTime(myXID);
1282         }
1283
1284         /*
1285          * Clean uncleaned reapped pages from Vvpl list and set commit' times
1286          * for inserted tuples
1287          */
1288         nchkmvd = 0;
1289         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1290         {
1291                 Assert((*vpp)->vpd_blkno < blkno);
1292                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1293                 page = BufferGetPage(buf);
1294                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1295                 {
1296
1297                         /*
1298                          * noff == 0 in empty pages only - such pages should be
1299                          * re-used
1300                          */
1301                         Assert((*vpp)->vpd_noff > 0);
1302                         vc_vacpage(page, *vpp, archrel);
1303                 }
1304                 else
1305 /* this page was used */
1306                 {
1307                         ntups = 0;
1308                         moff = PageGetMaxOffsetNumber(page);
1309                         for (newoff = FirstOffsetNumber;
1310                                  newoff <= moff;
1311                                  newoff = OffsetNumberNext(newoff))
1312                         {
1313                                 itemid = PageGetItemId(page, newoff);
1314                                 if (!ItemIdIsUsed(itemid))
1315                                         continue;
1316                                 htup = (HeapTuple) PageGetItem(page, itemid);
1317                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1318                                 {
1319                                         htup->t_tmin = myCTM;
1320                                         ntups++;
1321                                 }
1322                         }
1323                         Assert((*vpp)->vpd_nusd == ntups);
1324                         nchkmvd += ntups;
1325                 }
1326                 WriteBuffer(buf);
1327         }
1328         Assert(nmoved == nchkmvd);
1329
1330         getrusage(RUSAGE_SELF, &ru1);
1331
1332         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1333 Elapsed %u/%u sec.",
1334                  (RelationGetRelationName(onerel))->data,
1335                  nblocks, blkno, nmoved,
1336                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1337                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1338
1339         if (Nvpl.vpl_npages > 0)
1340         {
1341                 /* vacuum indices again if needed */
1342                 if (Irel != (Relation *) NULL)
1343                 {
1344                         VPageDescr *vpleft,
1345                                            *vpright,
1346                                                 vpsave;
1347
1348                         /* re-sort Nvpl.vpl_pgdesc */
1349                         for (vpleft = Nvpl.vpl_pgdesc,
1350                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1351                                  vpleft < vpright; vpleft++, vpright--)
1352                         {
1353                                 vpsave = *vpleft;
1354                                 *vpleft = *vpright;
1355                                 *vpright = vpsave;
1356                         }
1357                         for (i = 0; i < nindices; i++)
1358                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1359                 }
1360
1361                 /*
1362                  * clean moved tuples from last page in Nvpl list if some tuples
1363                  * left there
1364                  */
1365                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1366                 {
1367                         Assert(vpc->vpd_blkno == blkno - 1);
1368                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1369                         page = BufferGetPage(buf);
1370                         ntups = 0;
1371                         maxoff = offnum;
1372                         for (offnum = FirstOffsetNumber;
1373                                  offnum < maxoff;
1374                                  offnum = OffsetNumberNext(offnum))
1375                         {
1376                                 itemid = PageGetItemId(page, offnum);
1377                                 if (!ItemIdIsUsed(itemid))
1378                                         continue;
1379                                 htup = (HeapTuple) PageGetItem(page, itemid);
1380                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1381                                 itemid->lp_flags &= ~LP_USED;
1382                                 ntups++;
1383                         }
1384                         Assert(vpc->vpd_noff == ntups);
1385                         PageRepairFragmentation(page);
1386                         WriteBuffer(buf);
1387                 }
1388
1389                 /* now - free new list of reapped pages */
1390                 vpp = Nvpl.vpl_pgdesc;
1391                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1392                         pfree(*vpp);
1393                 pfree(Nvpl.vpl_pgdesc);
1394         }
1395
1396         /* truncate relation */
1397         if (blkno < nblocks)
1398         {
1399                 i = BlowawayRelationBuffers(onerel, blkno);
1400                 if (i < 0)
1401                         elog (FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1402                 blkno = smgrtruncate(onerel->rd_rel->relsmgr, onerel, blkno);
1403                 Assert(blkno >= 0);
1404                 vacrelstats->npages = blkno;    /* set new number of blocks */
1405         }
1406
1407         if (archrel != (Relation) NULL)
1408                 heap_close(archrel);
1409
1410         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1411         {
1412                 pfree(Idesc);
1413                 pfree(idatum);
1414                 pfree(inulls);
1415                 vc_clsindices(nindices, Irel);
1416         }
1417
1418         pfree(vpc);
1419
1420 }                                                               /* vc_rpfheap */
1421
1422 /*
1423  *      vc_vacheap() -- free dead tuples
1424  *
1425  *              This routine marks dead tuples as unused and truncates relation
1426  *              if there are "empty" end-blocks.
1427  */
1428 static void
1429 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1430 {
1431         Buffer          buf;
1432         Page            page;
1433         VPageDescr *vpp;
1434         Relation        archrel;
1435         int                     nblocks;
1436         int                     i;
1437
1438         nblocks = Vvpl->vpl_npages;
1439         /* if the relation has an archive, open it */
1440         if (onerel->rd_rel->relarch != 'n')
1441                 archrel = vc_getarchrel(onerel);
1442         else
1443         {
1444                 archrel = (Relation) NULL;
1445                 nblocks -= Vvpl->vpl_nemend;    /* nothing to do with them */
1446         }
1447
1448         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1449         {
1450                 if ((*vpp)->vpd_noff > 0)
1451                 {
1452                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1453                         page = BufferGetPage(buf);
1454                         vc_vacpage(page, *vpp, archrel);
1455                         WriteBuffer(buf);
1456                 }
1457         }
1458
1459         /* truncate relation if there are some empty end-pages */
1460         if (Vvpl->vpl_nemend > 0)
1461         {
1462                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1463                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1464                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1465                          (RelationGetRelationName(onerel))->data,
1466                          vacrelstats->npages, nblocks);
1467
1468                 /*
1469                  * we have to flush "empty" end-pages (if changed, but who knows
1470                  * it) before truncation
1471                  */
1472                 FlushBufferPool(!TransactionFlushEnabled());
1473                 
1474                 i = BlowawayRelationBuffers(onerel, nblocks);
1475                 if (i < 0)
1476                         elog (FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1477
1478                 nblocks = smgrtruncate(onerel->rd_rel->relsmgr, onerel, nblocks);
1479                 Assert(nblocks >= 0);
1480                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1481         }
1482
1483         if (archrel != (Relation) NULL)
1484                 heap_close(archrel);
1485
1486 }                                                               /* vc_vacheap */
1487
1488 /*
1489  *      vc_vacpage() -- free (and archive if needed) dead tuples on a page
1490  *                                       and repaire its fragmentation.
1491  */
1492 static void
1493 vc_vacpage(Page page, VPageDescr vpd, Relation archrel)
1494 {
1495         ItemId          itemid;
1496         HeapTuple       htup;
1497         int                     i;
1498
1499         Assert(vpd->vpd_nusd == 0);
1500         for (i = 0; i < vpd->vpd_noff; i++)
1501         {
1502                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1503                 if (archrel != (Relation) NULL && ItemIdIsUsed(itemid))
1504                 {
1505                         htup = (HeapTuple) PageGetItem(page, itemid);
1506                         vc_archive(archrel, htup);
1507                 }
1508                 itemid->lp_flags &= ~LP_USED;
1509         }
1510         PageRepairFragmentation(page);
1511
1512 }                                                               /* vc_vacpage */
1513
1514 /*
1515  *      _vc_scanoneind() -- scan one index relation to update statistic.
1516  *
1517  */
1518 static void
1519 vc_scanoneind(Relation indrel, int nhtups)
1520 {
1521         RetrieveIndexResult res;
1522         IndexScanDesc iscan;
1523         int                     nitups;
1524         int                     nipages;
1525         struct rusage ru0,
1526                                 ru1;
1527
1528         getrusage(RUSAGE_SELF, &ru0);
1529
1530         /* walk through the entire index */
1531         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1532         nitups = 0;
1533
1534         while ((res = index_getnext(iscan, ForwardScanDirection))
1535                    != (RetrieveIndexResult) NULL)
1536         {
1537                 nitups++;
1538                 pfree(res);
1539         }
1540
1541         index_endscan(iscan);
1542
1543         /* now update statistics in pg_class */
1544         nipages = RelationGetNumberOfBlocks(indrel);
1545         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1546
1547         getrusage(RUSAGE_SELF, &ru1);
1548
1549         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1550                  indrel->rd_rel->relname.data, nipages, nitups,
1551                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1552                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1553
1554         if (nitups != nhtups)
1555                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1556                          indrel->rd_rel->relname.data, nitups, nhtups);
1557
1558 }                                                               /* vc_scanoneind */
1559
1560 /*
1561  *      vc_vaconeind() -- vacuum one index relation.
1562  *
1563  *              Vpl is the VPageList of the heap we're currently vacuuming.
1564  *              It's locked. Indrel is an index relation on the vacuumed heap.
1565  *              We don't set locks on the index relation here, since the indexed
1566  *              access methods support locking at different granularities.
1567  *              We let them handle it.
1568  *
1569  *              Finally, we arrange to update the index relation's statistics in
1570  *              pg_class.
1571  */
1572 static void
1573 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1574 {
1575         RetrieveIndexResult res;
1576         IndexScanDesc iscan;
1577         ItemPointer heapptr;
1578         int                     nvac;
1579         int                     nitups;
1580         int                     nipages;
1581         VPageDescr      vp;
1582         struct rusage ru0,
1583                                 ru1;
1584
1585         getrusage(RUSAGE_SELF, &ru0);
1586
1587         /* walk through the entire index */
1588         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1589         nvac = 0;
1590         nitups = 0;
1591
1592         while ((res = index_getnext(iscan, ForwardScanDirection))
1593                    != (RetrieveIndexResult) NULL)
1594         {
1595                 heapptr = &res->heap_iptr;
1596
1597                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1598                 {
1599 #if 0
1600                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1601                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1602                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1603                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1604                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1605 #endif
1606                         if (vp->vpd_noff == 0)
1607                         {                                       /* this is EmptyPage !!! */
1608                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1609                                          indrel->rd_rel->relname.data,
1610                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1611                         }
1612                         ++nvac;
1613                         index_delete(indrel, &res->index_iptr);
1614                 }
1615                 else
1616                 {
1617                         nitups++;
1618                 }
1619
1620                 /* be tidy */
1621                 pfree(res);
1622         }
1623
1624         index_endscan(iscan);
1625
1626         /* now update statistics in pg_class */
1627         nipages = RelationGetNumberOfBlocks(indrel);
1628         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1629
1630         getrusage(RUSAGE_SELF, &ru1);
1631
1632         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1633                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1634                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1635                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1636
1637         if (nitups != nhtups)
1638                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1639                          indrel->rd_rel->relname.data, nitups, nhtups);
1640
1641 }                                                               /* vc_vaconeind */
1642
1643 /*
1644  *      vc_tidreapped() -- is a particular tid reapped?
1645  *
1646  *              vpl->VPageDescr_array is sorted in right order.
1647  */
1648 static VPageDescr
1649 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1650 {
1651         OffsetNumber ioffno;
1652         OffsetNumber *voff;
1653         VPageDescr      vp,
1654                            *vpp;
1655         VPageDescrData vpd;
1656
1657         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1658         ioffno = ItemPointerGetOffsetNumber(itemptr);
1659
1660         vp = &vpd;
1661         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1662                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1663                                                                         vc_cmp_blk);
1664
1665         if (vpp == (VPageDescr *) NULL)
1666                 return ((VPageDescr) NULL);
1667         vp = *vpp;
1668
1669         /* ok - we are on true page */
1670
1671         if (vp->vpd_noff == 0)
1672         {                                                       /* this is EmptyPage !!! */
1673                 return (vp);
1674         }
1675
1676         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1677                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1678                                                                            vc_cmp_offno);
1679
1680         if (voff == (OffsetNumber *) NULL)
1681                 return ((VPageDescr) NULL);
1682
1683         return (vp);
1684
1685 }                                                               /* vc_tidreapped */
1686
1687 /*
1688  *      vc_attrstats() -- compute column statistics used by the optimzer
1689  *
1690  *      We compute the column min, max, null and non-null counts.
1691  *      Plus we attempt to find the count of the value that occurs most
1692  *      frequently in each column
1693  *      These figures are used to compute the selectivity of the column
1694  *
1695  *      We use a three-bucked cache to get the most frequent item
1696  *      The 'guess' buckets count hits.  A cache miss causes guess1
1697  *      to get the most hit 'guess' item in the most recent cycle, and
1698  *      the new item goes into guess2.  Whenever the total count of hits
1699  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1700  *
1701  *      This method works perfectly for columns with unique values, and columns
1702  *      with only two unique values, plus nulls.
1703  *
1704  *      It becomes less perfect as the number of unique values increases and
1705  *      their distribution in the table becomes more random.
1706  *
1707  */
1708 static void
1709 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1710 {
1711         int                     i,
1712                                 attr_cnt = vacrelstats->va_natts;
1713         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1714         TupleDesc       tupDesc = onerel->rd_att;
1715         Datum           value;
1716         bool            isnull;
1717
1718         for (i = 0; i < attr_cnt; i++)
1719         {
1720                 VacAttrStats *stats = &vacattrstats[i];
1721                 bool            value_hit = true;
1722
1723                 value = heap_getattr(htup, InvalidBuffer,
1724                                                          stats->attr->attnum, tupDesc, &isnull);
1725
1726                 if (!VacAttrStatsEqValid(stats))
1727                         continue;
1728
1729                 if (isnull)
1730                         stats->null_cnt++;
1731                 else
1732                 {
1733                         stats->nonnull_cnt++;
1734                         if (stats->initialized == false)
1735                         {
1736                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1737                                 /* best_cnt gets incremented later */
1738                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1739                                 stats->guess1_cnt = stats->guess1_hits = 1;
1740                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1741                                 stats->guess2_hits = 1;
1742                                 if (VacAttrStatsLtGtValid(stats))
1743                                 {
1744                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1745                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1746                                 }
1747                                 stats->initialized = true;
1748                         }
1749                         if (VacAttrStatsLtGtValid(stats))
1750                         {
1751                                 if ((*(stats->f_cmplt)) (value, stats->min))
1752                                 {
1753                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1754                                         stats->min_cnt = 0;
1755                                 }
1756                                 if ((*(stats->f_cmpgt)) (value, stats->max))
1757                                 {
1758                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1759                                         stats->max_cnt = 0;
1760                                 }
1761                                 if ((*(stats->f_cmpeq)) (value, stats->min))
1762                                         stats->min_cnt++;
1763                                 else if ((*(stats->f_cmpeq)) (value, stats->max))
1764                                         stats->max_cnt++;
1765                         }
1766                         if ((*(stats->f_cmpeq)) (value, stats->best))
1767                                 stats->best_cnt++;
1768                         else if ((*(stats->f_cmpeq)) (value, stats->guess1))
1769                         {
1770                                 stats->guess1_cnt++;
1771                                 stats->guess1_hits++;
1772                         }
1773                         else if ((*(stats->f_cmpeq)) (value, stats->guess2))
1774                                 stats->guess2_hits++;
1775                         else
1776                                 value_hit = false;
1777
1778                         if (stats->guess2_hits > stats->guess1_hits)
1779                         {
1780                                 swapDatum(stats->guess1, stats->guess2);
1781                                 swapInt(stats->guess1_len, stats->guess2_len);
1782                                 stats->guess1_cnt = stats->guess2_hits;
1783                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1784                         }
1785                         if (stats->guess1_cnt > stats->best_cnt)
1786                         {
1787                                 swapDatum(stats->best, stats->guess1);
1788                                 swapInt(stats->best_len, stats->guess1_len);
1789                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1790                                 stats->guess1_hits = 1;
1791                                 stats->guess2_hits = 1;
1792                         }
1793                         if (!value_hit)
1794                         {
1795                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1796                                 stats->guess1_hits = 1;
1797                                 stats->guess2_hits = 1;
1798                         }
1799                 }
1800         }
1801         return;
1802 }
1803
1804 /*
1805  *      vc_bucketcpy() -- update pg_class statistics for one relation
1806  *
1807  */
1808 static void
1809 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1810 {
1811         if (attr->attbyval && attr->attlen != -1)
1812                 *bucket = value;
1813         else
1814         {
1815                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1816
1817                 if (len > *bucket_len)
1818                 {
1819                         if (*bucket_len != 0)
1820                                 pfree(DatumGetPointer(*bucket));
1821                         *bucket = PointerGetDatum(palloc(len));
1822                         *bucket_len = len;
1823                 }
1824                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1825         }
1826 }
1827
1828 /*
1829  *      vc_updstats() -- update pg_class statistics for one relation
1830  *
1831  *              This routine works for both index and heap relation entries in
1832  *              pg_class.  We violate no-overwrite semantics here by storing new
1833  *              values for ntups, npages, and hasindex directly in the pg_class
1834  *              tuple that's already on the page.  The reason for this is that if
1835  *              we updated these tuples in the usual way, then every tuple in pg_class
1836  *              would be replaced every day.  This would make planning and executing
1837  *              historical queries very expensive.
1838  */
1839 static void
1840 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1841 {
1842         Relation        rd,
1843                                 ad,
1844                                 sd;
1845         HeapScanDesc rsdesc,
1846                                 asdesc;
1847         TupleDesc       sdesc;
1848         HeapTuple       rtup,
1849                                 atup,
1850                                 stup;
1851         Buffer          rbuf,
1852                                 abuf;
1853         Form_pg_class pgcform;
1854         ScanKeyData rskey,
1855                                 askey;
1856         AttributeTupleForm attp;
1857
1858         /*
1859          * update number of tuples and number of pages in pg_class
1860          */
1861         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1862                                                    ObjectIdEqualRegProcedure,
1863                                                    ObjectIdGetDatum(relid));
1864
1865         rd = heap_openr(RelationRelationName);
1866         rsdesc = heap_beginscan(rd, false, NowTimeQual, 1, &rskey);
1867
1868         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1869                 elog(WARN, "pg_class entry for relid %d vanished during vacuuming",
1870                          relid);
1871
1872         /* overwrite the existing statistics in the tuple */
1873         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1874         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1875         pgcform->reltuples = ntups;
1876         pgcform->relpages = npages;
1877         pgcform->relhasindex = hasindex;
1878
1879         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1880         {
1881                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1882                 int                     natts = vacrelstats->va_natts;
1883
1884                 ad = heap_openr(AttributeRelationName);
1885                 sd = heap_openr(StatisticRelationName);
1886                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1887                                                            F_INT4EQ, relid);
1888
1889                 asdesc = heap_beginscan(ad, false, NowTimeQual, 1, &askey);
1890
1891                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1892                 {
1893                         int                     i;
1894                         float32data selratio;           /* average ratio of rows selected
1895                                                                                  * for a random constant */
1896                         VacAttrStats *stats;
1897                         Datum           values[Natts_pg_statistic];
1898                         char            nulls[Natts_pg_statistic];
1899
1900                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1901                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1902                                 /* they are unique anyway */
1903                                 continue;
1904
1905                         for (i = 0; i < natts; i++)
1906                         {
1907                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1908                                         break;
1909                         }
1910                         if (i >= natts)
1911                                 continue;
1912                         stats = &(vacattrstats[i]);
1913
1914                         /* overwrite the existing statistics in the tuple */
1915                         if (VacAttrStatsEqValid(stats))
1916                         {
1917
1918                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1919
1920                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1921                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1922                                         selratio = 0;
1923                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1924                                 {
1925                                         double          min_cnt_d = stats->min_cnt,
1926                                                                 max_cnt_d = stats->max_cnt,
1927                                                                 null_cnt_d = stats->null_cnt,
1928                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1929
1930                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1931                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1932                                 }
1933                                 else
1934                                 {
1935                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1936                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1937
1938                                         /*
1939                                          * we assume count of other values are 20% of best
1940                                          * count in table
1941                                          */
1942                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1943                                 }
1944                                 if (selratio > 1.0)
1945                                         selratio = 1.0;
1946                                 attp->attdisbursion = selratio;
1947                                 WriteNoReleaseBuffer(abuf);
1948
1949                                 /* DO PG_STATISTIC INSERTS */
1950
1951                                 /*
1952                                  * doing system relations, especially pg_statistic is a
1953                                  * problem
1954                                  */
1955                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1956                                                                                                                                                  * !IsSystemRelationName(
1957                                                                                                                                                  *
1958                                          pgcform->relname.data) */ )
1959                                 {
1960                                         func_ptr        out_function;
1961                                         char       *out_string;
1962                                         int                     dummy;
1963
1964                                         for (i = 0; i < Natts_pg_statistic; ++i)
1965                                                 nulls[i] = ' ';
1966
1967                                         /* ----------------
1968                                          *      initialize values[]
1969                                          * ----------------
1970                                          */
1971                                         i = 0;
1972                                         values[i++] = (Datum) relid;            /* 1 */
1973                                         values[i++] = (Datum) attp->attnum; /* 2 */
1974                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1975                                         fmgr_info(stats->outfunc, &out_function, &dummy);
1976                                         out_string = (*out_function) (stats->min, stats->attr->atttypid);
1977                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1978                                         pfree(out_string);
1979                                         out_string = (char *) (*out_function) (stats->max, stats->attr->atttypid);
1980                                         values[i++] = (Datum) fmgr(TextInRegProcedure, out_string);
1981                                         pfree(out_string);
1982
1983                                         sdesc = sd->rd_att;
1984
1985                                         stup = heap_formtuple(sdesc, values, nulls);
1986
1987                                         /* ----------------
1988                                          *      insert the tuple in the relation and get the tuple's oid.
1989                                          * ----------------
1990                                          */
1991                                         heap_insert(sd, stup);
1992                                         pfree(DatumGetPointer(values[3]));
1993                                         pfree(DatumGetPointer(values[4]));
1994                                         pfree(stup);
1995                                 }
1996                         }
1997                 }
1998                 heap_endscan(asdesc);
1999                 heap_close(ad);
2000                 heap_close(sd);
2001         }
2002
2003         /* XXX -- after write, should invalidate relcache in other backends */
2004         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
2005
2006         /*
2007          * invalidating system relations confuses the function cache of
2008          * pg_operator and pg_opclass
2009          */
2010         if (!IsSystemRelationName(pgcform->relname.data))
2011                 RelationInvalidateHeapTuple(rd, rtup);
2012
2013         /* that's all, folks */
2014         heap_endscan(rsdesc);
2015         heap_close(rd);
2016 }
2017
2018 /*
2019  *      vc_delhilowstats() -- delete pg_statistics rows
2020  *
2021  */
2022 static void
2023 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
2024 {
2025         Relation        pgstatistic;
2026         HeapScanDesc pgsscan;
2027         HeapTuple       pgstup;
2028         ScanKeyData pgskey;
2029
2030         pgstatistic = heap_openr(StatisticRelationName);
2031
2032         if (relid != InvalidOid)
2033         {
2034                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
2035                                                            ObjectIdEqualRegProcedure,
2036                                                            ObjectIdGetDatum(relid));
2037                 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 1, &pgskey);
2038         }
2039         else
2040                 pgsscan = heap_beginscan(pgstatistic, false, NowTimeQual, 0, NULL);
2041
2042         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
2043         {
2044                 if (attcnt > 0)
2045                 {
2046                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
2047                         int                     i;
2048
2049                         for (i = 0; i < attcnt; i++)
2050                         {
2051                                 if (pgs->staattnum == attnums[i] + 1)
2052                                         break;
2053                         }
2054                         if (i >= attcnt)
2055                                 continue;               /* don't delete it */
2056                 }
2057                 heap_delete(pgstatistic, &pgstup->t_ctid);
2058         }
2059
2060         heap_endscan(pgsscan);
2061         heap_close(pgstatistic);
2062 }
2063
2064 static void
2065 vc_setpagelock(Relation rel, BlockNumber blkno)
2066 {
2067         ItemPointerData itm;
2068
2069         ItemPointerSet(&itm, blkno, 1);
2070
2071         RelationSetLockForWritePage(rel, &itm);
2072 }
2073
2074 /*
2075  *      vc_reappage() -- save a page on the array of reapped pages.
2076  *
2077  *              As a side effect of the way that the vacuuming loop for a given
2078  *              relation works, higher pages come after lower pages in the array
2079  *              (and highest tid on a page is last).
2080  */
2081 static void
2082 vc_reappage(VPageList vpl, VPageDescr vpc)
2083 {
2084         VPageDescr      newvpd;
2085
2086         /* allocate a VPageDescrData entry */
2087         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
2088
2089         /* fill it in */
2090         if (vpc->vpd_noff > 0)
2091                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
2092         newvpd->vpd_blkno = vpc->vpd_blkno;
2093         newvpd->vpd_free = vpc->vpd_free;
2094         newvpd->vpd_nusd = vpc->vpd_nusd;
2095         newvpd->vpd_noff = vpc->vpd_noff;
2096
2097         /* insert this page into vpl list */
2098         vc_vpinsert(vpl, newvpd);
2099
2100 }                                                               /* vc_reappage */
2101
2102 static void
2103 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2104 {
2105
2106         /* allocate a VPageDescr entry if needed */
2107         if (vpl->vpl_npages == 0)
2108                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2109         else if (vpl->vpl_npages % 100 == 0)
2110                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2111         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2112         (vpl->vpl_npages)++;
2113
2114 }
2115
2116 static void
2117 vc_free(VRelList vrl)
2118 {
2119         VRelList        p_vrl;
2120         MemoryContext old;
2121         PortalVariableMemory pmem;
2122
2123         pmem = PortalGetVariableMemory(vc_portal);
2124         old = MemoryContextSwitchTo((MemoryContext) pmem);
2125
2126         while (vrl != (VRelList) NULL)
2127         {
2128
2129                 /* free rel list entry */
2130                 p_vrl = vrl;
2131                 vrl = vrl->vrl_next;
2132                 pfree(p_vrl);
2133         }
2134
2135         MemoryContextSwitchTo(old);
2136 }
2137
2138 /*
2139  *      vc_getarchrel() -- open the archive relation for a heap relation
2140  *
2141  *              The archive relation is named 'a,XXXXX' for the heap relation
2142  *              whose relid is XXXXX.
2143  */
2144
2145 #define ARCHIVE_PREFIX  "a,"
2146
2147 static Relation
2148 vc_getarchrel(Relation heaprel)
2149 {
2150         Relation        archrel;
2151         char       *archrelname;
2152
2153         archrelname = palloc(sizeof(ARCHIVE_PREFIX) + NAMEDATALEN); /* bogus */
2154         sprintf(archrelname, "%s%d", ARCHIVE_PREFIX, heaprel->rd_id);
2155
2156         archrel = heap_openr(archrelname);
2157
2158         pfree(archrelname);
2159         return (archrel);
2160 }
2161
2162 /*
2163  *      vc_archive() -- write a tuple to an archive relation
2164  *
2165  *              In the future, this will invoke the archived accessd method.  For
2166  *              now, archive relations are on mag disk.
2167  */
2168 static void
2169 vc_archive(Relation archrel, HeapTuple htup)
2170 {
2171         doinsert(archrel, htup);
2172 }
2173
2174 static bool
2175 vc_isarchrel(char *rname)
2176 {
2177         if (strncmp(ARCHIVE_PREFIX, rname, strlen(ARCHIVE_PREFIX)) == 0)
2178                 return (true);
2179
2180         return (false);
2181 }
2182
2183 static char *
2184 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2185 {
2186         int                     res;
2187         int                     last = nelem - 1;
2188         int                     celm = nelem / 2;
2189         bool            last_move,
2190                                 first_move;
2191
2192         last_move = first_move = true;
2193         for (;;)
2194         {
2195                 if (first_move == true)
2196                 {
2197                         res = compar(bot, elm);
2198                         if (res > 0)
2199                                 return (NULL);
2200                         if (res == 0)
2201                                 return (bot);
2202                         first_move = false;
2203                 }
2204                 if (last_move == true)
2205                 {
2206                         res = compar(elm, bot + last * size);
2207                         if (res > 0)
2208                                 return (NULL);
2209                         if (res == 0)
2210                                 return (bot + last * size);
2211                         last_move = false;
2212                 }
2213                 res = compar(elm, bot + celm * size);
2214                 if (res == 0)
2215                         return (bot + celm * size);
2216                 if (res < 0)
2217                 {
2218                         if (celm == 0)
2219                                 return (NULL);
2220                         last = celm - 1;
2221                         celm = celm / 2;
2222                         last_move = true;
2223                         continue;
2224                 }
2225
2226                 if (celm == last)
2227                         return (NULL);
2228
2229                 last = last - celm - 1;
2230                 bot = bot + (celm + 1) * size;
2231                 celm = (last + 1) / 2;
2232                 first_move = true;
2233         }
2234
2235 }                                                               /* vc_find_eq */
2236
2237 static int
2238 vc_cmp_blk(char *left, char *right)
2239 {
2240         BlockNumber lblk,
2241                                 rblk;
2242
2243         lblk = (*((VPageDescr *) left))->vpd_blkno;
2244         rblk = (*((VPageDescr *) right))->vpd_blkno;
2245
2246         if (lblk < rblk)
2247                 return (-1);
2248         if (lblk == rblk)
2249                 return (0);
2250         return (1);
2251
2252 }                                                               /* vc_cmp_blk */
2253
2254 static int
2255 vc_cmp_offno(char *left, char *right)
2256 {
2257
2258         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2259                 return (-1);
2260         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2261                 return (0);
2262         return (1);
2263
2264 }                                                               /* vc_cmp_offno */
2265
2266
2267 static void
2268 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2269 {
2270         Relation        pgindex;
2271         Relation        irel;
2272         TupleDesc       pgidesc;
2273         HeapTuple       pgitup;
2274         HeapScanDesc pgiscan;
2275         Datum           d;
2276         int                     i,
2277                                 k;
2278         bool            n;
2279         ScanKeyData pgikey;
2280         Oid                *ioid;
2281
2282         *nindices = i = 0;
2283
2284         ioid = (Oid *) palloc(10 * sizeof(Oid));
2285
2286         /* prepare a heap scan on the pg_index relation */
2287         pgindex = heap_openr(IndexRelationName);
2288         pgidesc = RelationGetTupleDescriptor(pgindex);
2289
2290         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2291                                                    ObjectIdEqualRegProcedure,
2292                                                    ObjectIdGetDatum(relid));
2293
2294         pgiscan = heap_beginscan(pgindex, false, NowTimeQual, 1, &pgikey);
2295
2296         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2297         {
2298                 d = heap_getattr(pgitup, InvalidBuffer, Anum_pg_index_indexrelid,
2299                                                  pgidesc, &n);
2300                 i++;
2301                 if (i % 10 == 0)
2302                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2303                 ioid[i - 1] = DatumGetObjectId(d);
2304         }
2305
2306         heap_endscan(pgiscan);
2307         heap_close(pgindex);
2308
2309         if (i == 0)
2310         {                                                       /* No one index found */
2311                 pfree(ioid);
2312                 return;
2313         }
2314
2315         if (Irel != (Relation **) NULL)
2316                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2317
2318         for (k = 0; i > 0;)
2319         {
2320                 irel = index_open(ioid[--i]);
2321                 if (irel != (Relation) NULL)
2322                 {
2323                         if (Irel != (Relation **) NULL)
2324                                 (*Irel)[k] = irel;
2325                         else
2326                                 index_close(irel);
2327                         k++;
2328                 }
2329                 else
2330                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2331         }
2332         *nindices = k;
2333         pfree(ioid);
2334
2335         if (Irel != (Relation **) NULL && *nindices == 0)
2336         {
2337                 pfree(*Irel);
2338                 *Irel = (Relation *) NULL;
2339         }
2340
2341 }                                                               /* vc_getindices */
2342
2343
2344 static void
2345 vc_clsindices(int nindices, Relation *Irel)
2346 {
2347
2348         if (Irel == (Relation *) NULL)
2349                 return;
2350
2351         while (nindices--)
2352         {
2353                 index_close(Irel[nindices]);
2354         }
2355         pfree(Irel);
2356
2357 }                                                               /* vc_clsindices */
2358
2359
2360 static void
2361 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2362 {
2363         IndDesc    *idcur;
2364         HeapTuple       pgIndexTup;
2365         AttrNumber *attnumP;
2366         int                     natts;
2367         int                     i;
2368
2369         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2370
2371         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2372         {
2373                 pgIndexTup =
2374                         SearchSysCacheTuple(INDEXRELID,
2375                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2376                                                                 0, 0, 0);
2377                 Assert(pgIndexTup);
2378                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2379                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2380                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2381                          attnumP++, natts++);
2382                 if (idcur->tform->indproc != InvalidOid)
2383                 {
2384                         idcur->finfoP = &(idcur->finfo);
2385                         FIgetnArgs(idcur->finfoP) = natts;
2386                         natts = 1;
2387                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2388                         *(FIgetname(idcur->finfoP)) = '\0';
2389                 }
2390                 else
2391                         idcur->finfoP = (FuncIndexInfo *) NULL;
2392
2393                 idcur->natts = natts;
2394         }
2395
2396 }                                                               /* vc_mkindesc */
2397
2398
2399 static bool
2400 vc_enough_space(VPageDescr vpd, Size len)
2401 {
2402
2403         len = DOUBLEALIGN(len);
2404
2405         if (len > vpd->vpd_free)
2406                 return (false);
2407
2408         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2409                 return (true);                  /* and len <= free_space */
2410
2411         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2412         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2413                 return (true);
2414
2415         return (false);
2416
2417 }                                                               /* vc_enough_space */