]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Remove un-needed braces around single statements.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c--
4  *        the postgres vacuum cleaner
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.65 1998/06/15 19:28:16 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <sys/types.h>
15 #include <sys/file.h>
16 #include <string.h>
17 #include <sys/stat.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20
21 #include "postgres.h"
22
23 #include "access/genam.h"
24 #include "access/heapam.h"
25 #include "access/transam.h"
26 #include "access/xact.h"
27 #include "catalog/catalog.h"
28 #include "catalog/catname.h"
29 #include "catalog/index.h"
30 #include "catalog/pg_class.h"
31 #include "catalog/pg_index.h"
32 #include "catalog/pg_operator.h"
33 #include "catalog/pg_statistic.h"
34 #include "catalog/pg_type.h"
35 #include "commands/vacuum.h"
36 #include "fmgr.h"
37 #include "parser/parse_oper.h"
38 #include "storage/bufmgr.h"
39 #include "storage/bufpage.h"
40 #include "storage/shmem.h"
41 #include "storage/smgr.h"
42 #include "storage/lmgr.h"
43 #include "utils/builtins.h"
44 #include "utils/inval.h"
45 #include "utils/mcxt.h"
46 #include "utils/portal.h"
47 #include "utils/syscache.h"
48
49 #ifndef HAVE_GETRUSAGE
50 #include <rusagestub.h>
51 #else
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #endif
55
56  /* #include <port-protos.h> *//* Why? */
57
58 extern int      BlowawayRelationBuffers(Relation rdesc, BlockNumber block);
59
60 bool            VacuumRunning = false;
61
62 static Portal vc_portal;
63
64 static int      MESSAGE_LEVEL;          /* message level */
65
66 #define swapLong(a,b)   {long tmp; tmp=a; a=b; b=tmp;}
67 #define swapInt(a,b)    {int tmp; tmp=a; a=b; b=tmp;}
68 #define swapDatum(a,b)  {Datum tmp; tmp=a; a=b; b=tmp;}
69 #define VacAttrStatsEqValid(stats) ( stats->f_cmpeq.fn_addr != NULL )
70 #define VacAttrStatsLtGtValid(stats) ( stats->f_cmplt.fn_addr != NULL && \
71                                                                    stats->f_cmpgt.fn_addr != NULL && \
72                                                                    RegProcedureIsValid(stats->outfunc) )
73
74
75 /* non-export function prototypes */
76 static void vc_init(void);
77 static void vc_shutdown(void);
78 static void vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols);
79 static VRelList vc_getrels(NameData *VacRelP);
80 static void vc_vacone(Oid relid, bool analyze, List *va_cols);
81 static void vc_scanheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl);
82 static void vc_rpfheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel);
83 static void vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList vpl);
84 static void vc_vacpage(Page page, VPageDescr vpd);
85 static void vc_vaconeind(VPageList vpl, Relation indrel, int nhtups);
86 static void vc_scanoneind(Relation indrel, int nhtups);
87 static void vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup);
88 static void vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len);
89 static void vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats);
90 static void vc_delhilowstats(Oid relid, int attcnt, int *attnums);
91 static void vc_setpagelock(Relation rel, BlockNumber blkno);
92 static VPageDescr vc_tidreapped(ItemPointer itemptr, VPageList vpl);
93 static void vc_reappage(VPageList vpl, VPageDescr vpc);
94 static void vc_vpinsert(VPageList vpl, VPageDescr vpnew);
95 static void vc_free(VRelList vrl);
96 static void vc_getindices(Oid relid, int *nindices, Relation **Irel);
97 static void vc_clsindices(int nindices, Relation *Irel);
98 static void vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc);
99 static char *vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *));
100 static int      vc_cmp_blk(char *left, char *right);
101 static int      vc_cmp_offno(char *left, char *right);
102 static bool vc_enough_space(VPageDescr vpd, Size len);
103
104 void
105 vacuum(char *vacrel, bool verbose, bool analyze, List *va_spec)
106 {
107         char       *pname;
108         MemoryContext old;
109         PortalVariableMemory pmem;
110         NameData        VacRel;
111         List       *le;
112         List       *va_cols = NIL;
113
114         /*
115          * Create a portal for safe memory across transctions.  We need to
116          * palloc the name space for it because our hash function expects the
117          * name to be on a longword boundary.  CreatePortal copies the name to
118          * safe storage for us.
119          */
120         pname = (char *) palloc(strlen(VACPNAME) + 1);
121         strcpy(pname, VACPNAME);
122         vc_portal = CreatePortal(pname);
123         pfree(pname);
124
125         if (verbose)
126                 MESSAGE_LEVEL = NOTICE;
127         else
128                 MESSAGE_LEVEL = DEBUG;
129
130         /* vacrel gets de-allocated on transaction commit */
131         if (vacrel)
132                 strcpy(VacRel.data, vacrel);
133
134         pmem = PortalGetVariableMemory(vc_portal);
135         old = MemoryContextSwitchTo((MemoryContext) pmem);
136
137         if (va_spec != NIL && !analyze)
138                 elog(ERROR, "Can't vacuum columns, only tables.  You can 'vacuum analyze' columns.");
139
140         foreach(le, va_spec)
141         {
142                 char       *col = (char *) lfirst(le);
143                 char       *dest;
144
145                 dest = (char *) palloc(strlen(col) + 1);
146                 strcpy(dest, col);
147                 va_cols = lappend(va_cols, dest);
148         }
149         MemoryContextSwitchTo(old);
150
151         /* initialize vacuum cleaner */
152         vc_init();
153
154         /* vacuum the database */
155         if (vacrel)
156                 vc_vacuum(&VacRel, analyze, va_cols);
157         else
158                 vc_vacuum(NULL, analyze, NIL);
159
160         PortalDestroy(&vc_portal);
161
162         /* clean up */
163         vc_shutdown();
164 }
165
166 /*
167  *      vc_init(), vc_shutdown() -- start up and shut down the vacuum cleaner.
168  *
169  *              We run exactly one vacuum cleaner at a time.  We use the file system
170  *              to guarantee an exclusive lock on vacuuming, since a single vacuum
171  *              cleaner instantiation crosses transaction boundaries, and we'd lose
172  *              postgres-style locks at the end of every transaction.
173  *
174  *              The strangeness with committing and starting transactions in the
175  *              init and shutdown routines is due to the fact that the vacuum cleaner
176  *              is invoked via a sql command, and so is already executing inside
177  *              a transaction.  We need to leave ourselves in a predictable state
178  *              on entry and exit to the vacuum cleaner.  We commit the transaction
179  *              started in PostgresMain() inside vc_init(), and start one in
180  *              vc_shutdown() to match the commit waiting for us back in
181  *              PostgresMain().
182  */
183 static void
184 vc_init()
185 {
186         int                     fd;
187
188         if ((fd = open("pg_vlock", O_CREAT | O_EXCL, 0600)) < 0)
189                 elog(ERROR, "can't create lock file -- another vacuum cleaner running?");
190
191         close(fd);
192
193         /*
194          * By here, exclusive open on the lock file succeeded.  If we abort
195          * for any reason during vacuuming, we need to remove the lock file.
196          * This global variable is checked in the transaction manager on xact
197          * abort, and the routine vc_abort() is called if necessary.
198          */
199
200         VacuumRunning = true;
201
202         /* matches the StartTransaction in PostgresMain() */
203         CommitTransactionCommand();
204 }
205
206 static void
207 vc_shutdown()
208 {
209         /* on entry, not in a transaction */
210         if (unlink("pg_vlock") < 0)
211                 elog(ERROR, "vacuum: can't destroy lock file!");
212
213         /* okay, we're done */
214         VacuumRunning = false;
215
216         /* matches the CommitTransaction in PostgresMain() */
217         StartTransactionCommand();
218
219 }
220
221 void
222 vc_abort()
223 {
224         /* on abort, remove the vacuum cleaner lock file */
225         unlink("pg_vlock");
226
227         VacuumRunning = false;
228 }
229
230 /*
231  *      vc_vacuum() -- vacuum the database.
232  *
233  *              This routine builds a list of relations to vacuum, and then calls
234  *              code that vacuums them one at a time.  We are careful to vacuum each
235  *              relation in a separate transaction in order to avoid holding too many
236  *              locks at one time.
237  */
238 static void
239 vc_vacuum(NameData *VacRelP, bool analyze, List *va_cols)
240 {
241         VRelList        vrl,
242                                 cur;
243
244         /* get list of relations */
245         vrl = vc_getrels(VacRelP);
246
247         if (analyze && VacRelP == NULL && vrl != NULL)
248                 vc_delhilowstats(InvalidOid, 0, NULL);
249
250         /* vacuum each heap relation */
251         for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
252                 vc_vacone(cur->vrl_relid, analyze, va_cols);
253
254         vc_free(vrl);
255 }
256
257 static VRelList
258 vc_getrels(NameData *VacRelP)
259 {
260         Relation        pgclass;
261         TupleDesc       pgcdesc;
262         HeapScanDesc pgcscan;
263         HeapTuple       pgctup;
264         Buffer          buf;
265         PortalVariableMemory portalmem;
266         MemoryContext old;
267         VRelList        vrl,
268                                 cur;
269         Datum           d;
270         char       *rname;
271         char            rkind;
272         bool            n;
273         ScanKeyData pgckey;
274         bool            found = false;
275
276         StartTransactionCommand();
277
278         if (VacRelP->data)
279         {
280                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relname,
281                                                            F_NAMEEQ,
282                                                            PointerGetDatum(VacRelP->data));
283         }
284         else
285         {
286                 ScanKeyEntryInitialize(&pgckey, 0x0, Anum_pg_class_relkind,
287                                                   F_CHAREQ, CharGetDatum('r'));
288         }
289
290         portalmem = PortalGetVariableMemory(vc_portal);
291         vrl = cur = (VRelList) NULL;
292
293         pgclass = heap_openr(RelationRelationName);
294         pgcdesc = RelationGetTupleDescriptor(pgclass);
295
296         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
297
298         while (HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &buf)))
299         {
300
301                 found = true;
302
303                 d = heap_getattr(pgctup, Anum_pg_class_relname, pgcdesc, &n);
304                 rname = (char *) d;
305
306                 /*
307                  * don't vacuum large objects for now - something breaks when we
308                  * do
309                  */
310                 if ((strlen(rname) >= 5) && rname[0] == 'x' &&
311                         rname[1] == 'i' && rname[2] == 'n' &&
312                         (rname[3] == 'v' || rname[3] == 'x') &&
313                         rname[4] >= '0' && rname[4] <= '9')
314                 {
315                         elog(NOTICE, "Rel %s: can't vacuum LargeObjects now",
316                                  rname);
317                         ReleaseBuffer(buf);
318                         continue;
319                 }
320
321                 d = heap_getattr(pgctup, Anum_pg_class_relkind, pgcdesc, &n);
322
323                 rkind = DatumGetChar(d);
324
325                 /* skip system relations */
326                 if (rkind != 'r')
327                 {
328                         ReleaseBuffer(buf);
329                         elog(NOTICE, "Vacuum: can not process index and certain system tables");
330                         continue;
331                 }
332
333                 /* get a relation list entry for this guy */
334                 old = MemoryContextSwitchTo((MemoryContext) portalmem);
335                 if (vrl == (VRelList) NULL)
336                         vrl = cur = (VRelList) palloc(sizeof(VRelListData));
337                 else
338                 {
339                         cur->vrl_next = (VRelList) palloc(sizeof(VRelListData));
340                         cur = cur->vrl_next;
341                 }
342                 MemoryContextSwitchTo(old);
343
344                 cur->vrl_relid = pgctup->t_oid;
345                 cur->vrl_next = (VRelList) NULL;
346
347                 /* wei hates it if you forget to do this */
348                 ReleaseBuffer(buf);
349         }
350         if (found == false)
351                 elog(NOTICE, "Vacuum: table not found");
352
353
354         heap_endscan(pgcscan);
355         heap_close(pgclass);
356
357         CommitTransactionCommand();
358
359         return (vrl);
360 }
361
362 /*
363  *      vc_vacone() -- vacuum one heap relation
364  *
365  *              This routine vacuums a single heap, cleans out its indices, and
366  *              updates its statistics npages and ntups statistics.
367  *
368  *              Doing one heap at a time incurs extra overhead, since we need to
369  *              check that the heap exists again just before we vacuum it.      The
370  *              reason that we do this is so that vacuuming can be spread across
371  *              many small transactions.  Otherwise, two-phase locking would require
372  *              us to lock the entire database during one pass of the vacuum cleaner.
373  */
374 static void
375 vc_vacone(Oid relid, bool analyze, List *va_cols)
376 {
377         Relation        pgclass;
378         TupleDesc       pgcdesc;
379         HeapTuple       pgctup,
380                                 pgttup;
381         Buffer          pgcbuf;
382         HeapScanDesc pgcscan;
383         Relation        onerel;
384         ScanKeyData pgckey;
385         VPageListData Vvpl;                     /* List of pages to vacuum and/or clean
386                                                                  * indices */
387         VPageListData Fvpl;                     /* List of pages with space enough for
388                                                                  * re-using */
389         VPageDescr *vpp;
390         Relation   *Irel;
391         int32           nindices,
392                                 i;
393         VRelStats  *vacrelstats;
394
395         StartTransactionCommand();
396
397         ScanKeyEntryInitialize(&pgckey, 0x0, ObjectIdAttributeNumber,
398                                                    F_OIDEQ,
399                                                    ObjectIdGetDatum(relid));
400
401         pgclass = heap_openr(RelationRelationName);
402         pgcdesc = RelationGetTupleDescriptor(pgclass);
403         pgcscan = heap_beginscan(pgclass, false, false, 1, &pgckey);
404
405         /*
406          * Race condition -- if the pg_class tuple has gone away since the
407          * last time we saw it, we don't need to vacuum it.
408          */
409
410         if (!HeapTupleIsValid(pgctup = heap_getnext(pgcscan, 0, &pgcbuf)))
411         {
412                 heap_endscan(pgcscan);
413                 heap_close(pgclass);
414                 CommitTransactionCommand();
415                 return;
416         }
417
418         /* now open the class and vacuum it */
419         onerel = heap_open(relid);
420
421         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
422         vacrelstats->relid = relid;
423         vacrelstats->npages = vacrelstats->ntups = 0;
424         vacrelstats->hasindex = false;
425         if (analyze && !IsSystemRelationName((RelationGetRelationName(onerel))->data))
426         {
427                 int                     attr_cnt,
428                                    *attnums = NULL;
429                 AttributeTupleForm *attr;
430
431                 attr_cnt = onerel->rd_att->natts;
432                 attr = onerel->rd_att->attrs;
433
434                 if (va_cols != NIL)
435                 {
436                         int                     tcnt = 0;
437                         List       *le;
438
439                         if (length(va_cols) > attr_cnt)
440                                 elog(ERROR, "vacuum: too many attributes specified for relation %s",
441                                          (RelationGetRelationName(onerel))->data);
442                         attnums = (int *) palloc(attr_cnt * sizeof(int));
443                         foreach(le, va_cols)
444                         {
445                                 char       *col = (char *) lfirst(le);
446
447                                 for (i = 0; i < attr_cnt; i++)
448                                 {
449                                         if (namestrcmp(&(attr[i]->attname), col) == 0)
450                                                 break;
451                                 }
452                                 if (i < attr_cnt)               /* found */
453                                         attnums[tcnt++] = i;
454                                 else
455                                 {
456                                         elog(ERROR, "vacuum: there is no attribute %s in %s",
457                                                  col, (RelationGetRelationName(onerel))->data);
458                                 }
459                         }
460                         attr_cnt = tcnt;
461                 }
462
463                 vacrelstats->vacattrstats =
464                         (VacAttrStats *) palloc(attr_cnt * sizeof(VacAttrStats));
465
466                 for (i = 0; i < attr_cnt; i++)
467                 {
468                         Operator        func_operator;
469                         OperatorTupleForm pgopform;
470                         VacAttrStats *stats;
471
472                         stats = &vacrelstats->vacattrstats[i];
473                         stats->attr = palloc(ATTRIBUTE_TUPLE_SIZE);
474                         memmove(stats->attr, attr[((attnums) ? attnums[i] : i)], ATTRIBUTE_TUPLE_SIZE);
475                         stats->best = stats->guess1 = stats->guess2 = 0;
476                         stats->max = stats->min = 0;
477                         stats->best_len = stats->guess1_len = stats->guess2_len = 0;
478                         stats->max_len = stats->min_len = 0;
479                         stats->initialized = false;
480                         stats->best_cnt = stats->guess1_cnt = stats->guess1_hits = stats->guess2_hits = 0;
481                         stats->max_cnt = stats->min_cnt = stats->null_cnt = stats->nonnull_cnt = 0;
482
483                         func_operator = oper("=", stats->attr->atttypid, stats->attr->atttypid, true);
484                         if (func_operator != NULL)
485                         {
486                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
487                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpeq));
488                         }
489                         else
490                                 stats->f_cmpeq.fn_addr = NULL;
491
492                         func_operator = oper("<", stats->attr->atttypid, stats->attr->atttypid, true);
493                         if (func_operator != NULL)
494                         {
495                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
496                                 fmgr_info(pgopform->oprcode, &(stats->f_cmplt));
497                         }
498                         else
499                                 stats->f_cmplt.fn_addr = NULL;
500
501                         func_operator = oper(">", stats->attr->atttypid, stats->attr->atttypid, true);
502                         if (func_operator != NULL)
503                         {
504                                 pgopform = (OperatorTupleForm) GETSTRUCT(func_operator);
505                                 fmgr_info(pgopform->oprcode, &(stats->f_cmpgt));
506                         }
507                         else
508                                 stats->f_cmpgt.fn_addr = NULL;
509
510                         pgttup = SearchSysCacheTuple(TYPOID,
511                                                                  ObjectIdGetDatum(stats->attr->atttypid),
512                                                                                  0, 0, 0);
513                         if (HeapTupleIsValid(pgttup))
514                                 stats->outfunc = ((TypeTupleForm) GETSTRUCT(pgttup))->typoutput;
515                         else
516                                 stats->outfunc = InvalidOid;
517                 }
518                 vacrelstats->va_natts = attr_cnt;
519                 vc_delhilowstats(relid, ((attnums) ? attr_cnt : 0), attnums);
520                 if (attnums)
521                         pfree(attnums);
522         }
523         else
524         {
525                 vacrelstats->va_natts = 0;
526                 vacrelstats->vacattrstats = (VacAttrStats *) NULL;
527         }
528
529         /* we require the relation to be locked until the indices are cleaned */
530         RelationSetLockForWrite(onerel);
531
532         /* scan it */
533         Vvpl.vpl_npages = Fvpl.vpl_npages = 0;
534         vc_scanheap(vacrelstats, onerel, &Vvpl, &Fvpl);
535
536         /* Now open indices */
537         Irel = (Relation *) NULL;
538         vc_getindices(vacrelstats->relid, &nindices, &Irel);
539
540         if (nindices > 0)
541                 vacrelstats->hasindex = true;
542         else
543                 vacrelstats->hasindex = false;
544
545         /* Clean/scan index relation(s) */
546         if (Irel != (Relation *) NULL)
547         {
548                 if (Vvpl.vpl_npages > 0)
549                 {
550                         for (i = 0; i < nindices; i++)
551                                 vc_vaconeind(&Vvpl, Irel[i], vacrelstats->ntups);
552                 }
553                 else
554 /* just scan indices to update statistic */
555                 {
556                         for (i = 0; i < nindices; i++)
557                                 vc_scanoneind(Irel[i], vacrelstats->ntups);
558                 }
559         }
560
561         if (Fvpl.vpl_npages > 0)        /* Try to shrink heap */
562                 vc_rpfheap(vacrelstats, onerel, &Vvpl, &Fvpl, nindices, Irel);
563         else
564         {
565                 if (Irel != (Relation *) NULL)
566                         vc_clsindices(nindices, Irel);
567                 if (Vvpl.vpl_npages > 0)/* Clean pages from Vvpl list */
568                         vc_vacheap(vacrelstats, onerel, &Vvpl);
569         }
570
571         /* ok - free Vvpl list of reapped pages */
572         if (Vvpl.vpl_npages > 0)
573         {
574                 vpp = Vvpl.vpl_pgdesc;
575                 for (i = 0; i < Vvpl.vpl_npages; i++, vpp++)
576                         pfree(*vpp);
577                 pfree(Vvpl.vpl_pgdesc);
578                 if (Fvpl.vpl_npages > 0)
579                         pfree(Fvpl.vpl_pgdesc);
580         }
581
582         /* all done with this class */
583         heap_close(onerel);
584         heap_endscan(pgcscan);
585         heap_close(pgclass);
586
587         /* update statistics in pg_class */
588         vc_updstats(vacrelstats->relid, vacrelstats->npages, vacrelstats->ntups,
589                                 vacrelstats->hasindex, vacrelstats);
590
591         /* next command frees attribute stats */
592
593         CommitTransactionCommand();
594 }
595
596 /*
597  *      vc_scanheap() -- scan an open heap relation
598  *
599  *              This routine sets commit times, constructs Vvpl list of
600  *              empty/uninitialized pages and pages with dead tuples and
601  *              ~LP_USED line pointers, constructs Fvpl list of pages
602  *              appropriate for purposes of shrinking and maintains statistics
603  *              on the number of live tuples in a heap.
604  */
605 static void
606 vc_scanheap(VRelStats *vacrelstats, Relation onerel,
607                         VPageList Vvpl, VPageList Fvpl)
608 {
609         int                     nblocks,
610                                 blkno;
611         ItemId          itemid;
612         ItemPointer itemptr;
613         HeapTuple       htup;
614         Buffer          buf;
615         Page            page,
616                                 tempPage = NULL;
617         OffsetNumber offnum,
618                                 maxoff;
619         bool            pgchanged,
620                                 tupgone,
621                                 dobufrel,
622                                 notup;
623         char       *relname;
624         VPageDescr      vpc,
625                                 vp;
626         uint32          nvac,
627                                 ntups,
628                                 nunused,
629                                 ncrash,
630                                 nempg,
631                                 nnepg,
632                                 nchpg,
633                                 nemend;
634         Size            frsize,
635                                 frsusf;
636         Size            min_tlen = MAXTUPLEN;
637         Size            max_tlen = 0;
638         int32           i /* , attr_cnt */ ;
639         struct rusage ru0,
640                                 ru1;
641         bool            do_shrinking = true;
642
643         getrusage(RUSAGE_SELF, &ru0);
644
645         nvac = ntups = nunused = ncrash = nempg = nnepg = nchpg = nemend = 0;
646         frsize = frsusf = 0;
647
648         relname = (RelationGetRelationName(onerel))->data;
649
650         nblocks = RelationGetNumberOfBlocks(onerel);
651
652         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
653         vpc->vpd_nusd = 0;
654
655         for (blkno = 0; blkno < nblocks; blkno++)
656         {
657                 buf = ReadBuffer(onerel, blkno);
658                 page = BufferGetPage(buf);
659                 vpc->vpd_blkno = blkno;
660                 vpc->vpd_noff = 0;
661
662                 if (PageIsNew(page))
663                 {
664                         elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
665                                  relname, blkno);
666                         PageInit(page, BufferGetPageSize(buf), 0);
667                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
668                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
669                         nnepg++;
670                         nemend++;
671                         vc_reappage(Vvpl, vpc);
672                         WriteBuffer(buf);
673                         continue;
674                 }
675
676                 if (PageIsEmpty(page))
677                 {
678                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
679                         frsize += (vpc->vpd_free - sizeof(ItemIdData));
680                         nempg++;
681                         nemend++;
682                         vc_reappage(Vvpl, vpc);
683                         ReleaseBuffer(buf);
684                         continue;
685                 }
686
687                 pgchanged = false;
688                 notup = true;
689                 maxoff = PageGetMaxOffsetNumber(page);
690                 for (offnum = FirstOffsetNumber;
691                          offnum <= maxoff;
692                          offnum = OffsetNumberNext(offnum))
693                 {
694                         itemid = PageGetItemId(page, offnum);
695
696                         /*
697                          * Collect un-used items too - it's possible to have indices
698                          * pointing here after crash.
699                          */
700                         if (!ItemIdIsUsed(itemid))
701                         {
702                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
703                                 nunused++;
704                                 continue;
705                         }
706
707                         htup = (HeapTuple) PageGetItem(page, itemid);
708                         tupgone = false;
709
710                         if (!(htup->t_infomask & HEAP_XMIN_COMMITTED))
711                         {
712                                 if (htup->t_infomask & HEAP_XMIN_INVALID)
713                                         tupgone = true;
714                                 else
715                                 {
716                                         if (TransactionIdDidAbort(htup->t_xmin))
717                                                 tupgone = true;
718                                         else if (TransactionIdDidCommit(htup->t_xmin))
719                                         {
720                                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
721                                                 pgchanged = true;
722                                         }
723                                         else if (!TransactionIdIsInProgress(htup->t_xmin))
724                                         {
725
726                                                 /*
727                                                  * Not Aborted, Not Committed, Not in Progress -
728                                                  * so it's from crashed process. - vadim 11/26/96
729                                                  */
730                                                 ncrash++;
731                                                 tupgone = true;
732                                         }
733                                         else
734                                         {
735                                                 elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
736                                                          relname, blkno, offnum, htup->t_xmin);
737                                                 do_shrinking = false;
738                                         }
739                                 }
740                         }
741
742                         /*
743                          * here we are concerned about tuples with xmin committed and
744                          * xmax unknown or committed
745                          */
746                         if (htup->t_infomask & HEAP_XMIN_COMMITTED &&
747                                 !(htup->t_infomask & HEAP_XMAX_INVALID))
748                         {
749                                 if (htup->t_infomask & HEAP_XMAX_COMMITTED)
750                                         tupgone = true;
751                                 else if (TransactionIdDidAbort(htup->t_xmax))
752                                 {
753                                         htup->t_infomask |= HEAP_XMAX_INVALID;
754                                         pgchanged = true;
755                                 }
756                                 else if (TransactionIdDidCommit(htup->t_xmax))
757                                         tupgone = true;
758                                 else if (!TransactionIdIsInProgress(htup->t_xmax))
759                                 {
760
761                                         /*
762                                          * Not Aborted, Not Committed, Not in Progress - so it
763                                          * from crashed process. - vadim 06/02/97
764                                          */
765                                         htup->t_infomask |= HEAP_XMAX_INVALID;;
766                                         pgchanged = true;
767                                 }
768                                 else
769                                 {
770                                         elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
771                                                  relname, blkno, offnum, htup->t_xmax);
772                                         do_shrinking = false;
773                                 }
774                         }
775
776                         /*
777                          * It's possibly! But from where it comes ? And should we fix
778                          * it ?  - vadim 11/28/96
779                          */
780                         itemptr = &(htup->t_ctid);
781                         if (!ItemPointerIsValid(itemptr) ||
782                                 BlockIdGetBlockNumber(&(itemptr->ip_blkid)) != blkno)
783                         {
784                                 elog(NOTICE, "Rel %s: TID %u/%u: TID IN TUPLEHEADER %u/%u IS NOT THE SAME. TUPGONE %d.",
785                                          relname, blkno, offnum,
786                                          BlockIdGetBlockNumber(&(itemptr->ip_blkid)),
787                                          itemptr->ip_posid, tupgone);
788                         }
789
790                         /*
791                          * Other checks...
792                          */
793                         if (htup->t_len != itemid->lp_len)
794                         {
795                                 elog(NOTICE, "Rel %s: TID %u/%u: TUPLE_LEN IN PAGEHEADER %u IS NOT THE SAME AS IN TUPLEHEADER %u. TUPGONE %d.",
796                                          relname, blkno, offnum,
797                                          itemid->lp_len, htup->t_len, tupgone);
798                         }
799                         if (!OidIsValid(htup->t_oid))
800                         {
801                                 elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
802                                          relname, blkno, offnum, tupgone);
803                         }
804
805                         if (tupgone)
806                         {
807                                 ItemId          lpp;
808
809                                 if (tempPage == (Page) NULL)
810                                 {
811                                         Size            pageSize;
812
813                                         pageSize = PageGetPageSize(page);
814                                         tempPage = (Page) palloc(pageSize);
815                                         memmove(tempPage, page, pageSize);
816                                 }
817
818                                 lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
819
820                                 /* mark it unused */
821                                 lpp->lp_flags &= ~LP_USED;
822
823                                 vpc->vpd_voff[vpc->vpd_noff++] = offnum;
824                                 nvac++;
825
826                         }
827                         else
828                         {
829                                 ntups++;
830                                 notup = false;
831                                 if (htup->t_len < min_tlen)
832                                         min_tlen = htup->t_len;
833                                 if (htup->t_len > max_tlen)
834                                         max_tlen = htup->t_len;
835                                 vc_attrstats(onerel, vacrelstats, htup);
836                         }
837                 }
838
839                 if (pgchanged)
840                 {
841                         WriteBuffer(buf);
842                         dobufrel = false;
843                         nchpg++;
844                 }
845                 else
846                         dobufrel = true;
847                 if (tempPage != (Page) NULL)
848                 {                                               /* Some tuples are gone */
849                         PageRepairFragmentation(tempPage);
850                         vpc->vpd_free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
851                         frsize += vpc->vpd_free;
852                         vc_reappage(Vvpl, vpc);
853                         pfree(tempPage);
854                         tempPage = (Page) NULL;
855                 }
856                 else if (vpc->vpd_noff > 0)
857                 {                                               /* there are only ~LP_USED line pointers */
858                         vpc->vpd_free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
859                         frsize += vpc->vpd_free;
860                         vc_reappage(Vvpl, vpc);
861                 }
862                 if (dobufrel)
863                         ReleaseBuffer(buf);
864                 if (notup)
865                         nemend++;
866                 else
867                         nemend = 0;
868         }
869
870         pfree(vpc);
871
872         /* save stats in the rel list for use later */
873         vacrelstats->ntups = ntups;
874         vacrelstats->npages = nblocks;
875 /*        vacrelstats->natts = attr_cnt;*/
876         if (ntups == 0)
877                 min_tlen = max_tlen = 0;
878         vacrelstats->min_tlen = min_tlen;
879         vacrelstats->max_tlen = max_tlen;
880
881         Vvpl->vpl_nemend = nemend;
882         Fvpl->vpl_nemend = nemend;
883
884         /*
885          * Try to make Fvpl keeping in mind that we can't use free space of
886          * "empty" end-pages and last page if it reapped.
887          */
888         if (do_shrinking && Vvpl->vpl_npages - nemend > 0)
889         {
890                 int                     nusf;           /* blocks usefull for re-using */
891
892                 nusf = Vvpl->vpl_npages - nemend;
893                 if ((Vvpl->vpl_pgdesc[nusf - 1])->vpd_blkno == nblocks - nemend - 1)
894                         nusf--;
895
896                 for (i = 0; i < nusf; i++)
897                 {
898                         vp = Vvpl->vpl_pgdesc[i];
899                         if (vc_enough_space(vp, min_tlen))
900                         {
901                                 vc_vpinsert(Fvpl, vp);
902                                 frsusf += vp->vpd_free;
903                         }
904                 }
905         }
906
907         getrusage(RUSAGE_SELF, &ru1);
908
909         elog(MESSAGE_LEVEL, "Rel %s: Pages %u: Changed %u, Reapped %u, Empty %u, New %u; \
910 Tup %u: Vac %u, Crash %u, UnUsed %u, MinLen %u, MaxLen %u; Re-using: Free/Avail. Space %u/%u; EndEmpty/Avail. Pages %u/%u. Elapsed %u/%u sec.",
911                  relname,
912                  nblocks, nchpg, Vvpl->vpl_npages, nempg, nnepg,
913                  ntups, nvac, ncrash, nunused, min_tlen, max_tlen,
914                  frsize, frsusf, nemend, Fvpl->vpl_npages,
915                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
916                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
917
918 }       /* vc_scanheap */
919
920
921 /*
922  *      vc_rpfheap() -- try to repaire relation' fragmentation
923  *
924  *              This routine marks dead tuples as unused and tries re-use dead space
925  *              by moving tuples (and inserting indices if needed). It constructs
926  *              Nvpl list of free-ed pages (moved tuples) and clean indices
927  *              for them after committing (in hack-manner - without losing locks
928  *              and freeing memory!) current transaction. It truncates relation
929  *              if some end-blocks are gone away.
930  */
931 static void
932 vc_rpfheap(VRelStats *vacrelstats, Relation onerel,
933                    VPageList Vvpl, VPageList Fvpl, int nindices, Relation *Irel)
934 {
935         TransactionId myXID;
936         CommandId       myCID;
937         Buffer          buf,
938                                 ToBuf;
939         int                     nblocks,
940                                 blkno;
941         Page            page,
942                                 ToPage = NULL;
943         OffsetNumber offnum = 0,
944                                 maxoff = 0,
945                                 newoff,
946                                 moff;
947         ItemId          itemid,
948                                 newitemid;
949         HeapTuple       htup,
950                                 newtup;
951         TupleDesc       tupdesc = NULL;
952         Datum      *idatum = NULL;
953         char       *inulls = NULL;
954         InsertIndexResult iresult;
955         VPageListData Nvpl;
956         VPageDescr      ToVpd = NULL,
957                                 Fvplast,
958                                 Vvplast,
959                                 vpc,
960                            *vpp;
961         int                     ToVpI = 0;
962         IndDesc    *Idesc,
963                            *idcur;
964         int                     Fblklast,
965                                 Vblklast,
966                                 i;
967         Size            tlen;
968         int                     nmoved,
969                                 Fnpages,
970                                 Vnpages;
971         int                     nchkmvd,
972                                 ntups;
973         bool            isempty,
974                                 dowrite;
975         struct rusage ru0,
976                                 ru1;
977
978         getrusage(RUSAGE_SELF, &ru0);
979
980         myXID = GetCurrentTransactionId();
981         myCID = GetCurrentCommandId();
982
983         if (Irel != (Relation *) NULL)          /* preparation for index' inserts */
984         {
985                 vc_mkindesc(onerel, nindices, Irel, &Idesc);
986                 tupdesc = RelationGetTupleDescriptor(onerel);
987                 idatum = (Datum *) palloc(INDEX_MAX_KEYS * sizeof(*idatum));
988                 inulls = (char *) palloc(INDEX_MAX_KEYS * sizeof(*inulls));
989         }
990
991         Nvpl.vpl_npages = 0;
992         Fnpages = Fvpl->vpl_npages;
993         Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
994         Fblklast = Fvplast->vpd_blkno;
995         Assert(Vvpl->vpl_npages > Vvpl->vpl_nemend);
996         Vnpages = Vvpl->vpl_npages - Vvpl->vpl_nemend;
997         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
998         Vblklast = Vvplast->vpd_blkno;
999         Assert(Vblklast >= Fblklast);
1000         ToBuf = InvalidBuffer;
1001         nmoved = 0;
1002
1003         vpc = (VPageDescr) palloc(sizeof(VPageDescrData) + MaxOffsetNumber * sizeof(OffsetNumber));
1004         vpc->vpd_nusd = vpc->vpd_noff = 0;
1005
1006         nblocks = vacrelstats->npages;
1007         for (blkno = nblocks - Vvpl->vpl_nemend - 1;; blkno--)
1008         {
1009                 /* if it's reapped page and it was used by me - quit */
1010                 if (blkno == Fblklast && Fvplast->vpd_nusd > 0)
1011                         break;
1012
1013                 buf = ReadBuffer(onerel, blkno);
1014                 page = BufferGetPage(buf);
1015
1016                 vpc->vpd_noff = 0;
1017
1018                 isempty = PageIsEmpty(page);
1019
1020                 dowrite = false;
1021                 if (blkno == Vblklast)  /* it's reapped page */
1022                 {
1023                         if (Vvplast->vpd_noff > 0)      /* there are dead tuples */
1024                         {                                       /* on this page - clean */
1025                                 Assert(!isempty);
1026                                 vc_vacpage(page, Vvplast);
1027                                 dowrite = true;
1028                         }
1029                         else
1030                                 Assert(isempty);
1031                         --Vnpages;
1032                         Assert(Vnpages > 0);
1033                         /* get prev reapped page from Vvpl */
1034                         Vvplast = Vvpl->vpl_pgdesc[Vnpages - 1];
1035                         Vblklast = Vvplast->vpd_blkno;
1036                         if (blkno == Fblklast)          /* this page in Fvpl too */
1037                         {
1038                                 --Fnpages;
1039                                 Assert(Fnpages > 0);
1040                                 Assert(Fvplast->vpd_nusd == 0);
1041                                 /* get prev reapped page from Fvpl */
1042                                 Fvplast = Fvpl->vpl_pgdesc[Fnpages - 1];
1043                                 Fblklast = Fvplast->vpd_blkno;
1044                         }
1045                         Assert(Fblklast <= Vblklast);
1046                         if (isempty)
1047                         {
1048                                 ReleaseBuffer(buf);
1049                                 continue;
1050                         }
1051                 }
1052                 else
1053                         Assert(!isempty);
1054
1055                 vpc->vpd_blkno = blkno;
1056                 maxoff = PageGetMaxOffsetNumber(page);
1057                 for (offnum = FirstOffsetNumber;
1058                          offnum <= maxoff;
1059                          offnum = OffsetNumberNext(offnum))
1060                 {
1061                         itemid = PageGetItemId(page, offnum);
1062
1063                         if (!ItemIdIsUsed(itemid))
1064                                 continue;
1065
1066                         htup = (HeapTuple) PageGetItem(page, itemid);
1067                         tlen = htup->t_len;
1068
1069                         /* try to find new page for this tuple */
1070                         if (ToBuf == InvalidBuffer ||
1071                                 !vc_enough_space(ToVpd, tlen))
1072                         {
1073                                 if (ToBuf != InvalidBuffer)
1074                                 {
1075                                         WriteBuffer(ToBuf);
1076                                         ToBuf = InvalidBuffer;
1077
1078                                         /*
1079                                          * If no one tuple can't be added to this page -
1080                                          * remove page from Fvpl. - vadim 11/27/96
1081                                          *
1082                                          * But we can't remove last page - this is our
1083                                          * "show-stopper" !!!   - vadim 02/25/98
1084                                          */
1085                                         if (ToVpd != Fvplast &&
1086                                                 !vc_enough_space(ToVpd, vacrelstats->min_tlen))
1087                                         {
1088                                                 Assert(Fnpages > ToVpI + 1);
1089                                                 memmove(Fvpl->vpl_pgdesc + ToVpI,
1090                                                                 Fvpl->vpl_pgdesc + ToVpI + 1,
1091                                                    sizeof(VPageDescr *) * (Fnpages - ToVpI - 1));
1092                                                 Fnpages--;
1093                                                 Assert(Fvplast == Fvpl->vpl_pgdesc[Fnpages - 1]);
1094                                         }
1095                                 }
1096                                 for (i = 0; i < Fnpages; i++)
1097                                 {
1098                                         if (vc_enough_space(Fvpl->vpl_pgdesc[i], tlen))
1099                                                 break;
1100                                 }
1101                                 if (i == Fnpages)
1102                                         break;          /* can't move item anywhere */
1103                                 ToVpI = i;
1104                                 ToVpd = Fvpl->vpl_pgdesc[ToVpI];
1105                                 ToBuf = ReadBuffer(onerel, ToVpd->vpd_blkno);
1106                                 ToPage = BufferGetPage(ToBuf);
1107                                 /* if this page was not used before - clean it */
1108                                 if (!PageIsEmpty(ToPage) && ToVpd->vpd_nusd == 0)
1109                                         vc_vacpage(ToPage, ToVpd);
1110                         }
1111
1112                         /* copy tuple */
1113                         newtup = (HeapTuple) palloc(tlen);
1114                         memmove((char *) newtup, (char *) htup, tlen);
1115
1116                         /* store transaction information */
1117                         TransactionIdStore(myXID, &(newtup->t_xmin));
1118                         newtup->t_cmin = myCID;
1119                         StoreInvalidTransactionId(&(newtup->t_xmax));
1120                         /* set xmin to unknown and xmax to invalid */
1121                         newtup->t_infomask &= ~(HEAP_XACT_MASK);
1122                         newtup->t_infomask |= HEAP_XMAX_INVALID;
1123
1124                         /* add tuple to the page */
1125                         newoff = PageAddItem(ToPage, (Item) newtup, tlen,
1126                                                                  InvalidOffsetNumber, LP_USED);
1127                         if (newoff == InvalidOffsetNumber)
1128                         {
1129                                 elog(ERROR, "\
1130 failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
1131                                          tlen, ToVpd->vpd_blkno, ToVpd->vpd_free,
1132                                          ToVpd->vpd_nusd, ToVpd->vpd_noff);
1133                         }
1134                         newitemid = PageGetItemId(ToPage, newoff);
1135                         pfree(newtup);
1136                         newtup = (HeapTuple) PageGetItem(ToPage, newitemid);
1137                         ItemPointerSet(&(newtup->t_ctid), ToVpd->vpd_blkno, newoff);
1138
1139                         /* now logically delete end-tuple */
1140                         TransactionIdStore(myXID, &(htup->t_xmax));
1141                         htup->t_cmax = myCID;
1142                         /* set xmax to unknown */
1143                         htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED);
1144
1145                         ToVpd->vpd_nusd++;
1146                         nmoved++;
1147                         ToVpd->vpd_free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
1148                         vpc->vpd_voff[vpc->vpd_noff++] = offnum;
1149
1150                         /* insert index' tuples if needed */
1151                         if (Irel != (Relation *) NULL)
1152                         {
1153                                 for (i = 0, idcur = Idesc; i < nindices; i++, idcur++)
1154                                 {
1155                                         FormIndexDatum(
1156                                                                    idcur->natts,
1157                                                            (AttrNumber *) &(idcur->tform->indkey[0]),
1158                                                                    newtup,
1159                                                                    tupdesc,
1160                                                                    InvalidBuffer,
1161                                                                    idatum,
1162                                                                    inulls,
1163                                                                    idcur->finfoP);
1164                                         iresult = index_insert(
1165                                                                                    Irel[i],
1166                                                                                    idatum,
1167                                                                                    inulls,
1168                                                                                    &(newtup->t_ctid),
1169                                                                                    onerel);
1170                                         if (iresult)
1171                                                 pfree(iresult);
1172                                 }
1173                         }
1174
1175                 }                                               /* walk along page */
1176
1177                 if (vpc->vpd_noff > 0)  /* some tuples were moved */
1178                 {
1179                         vc_reappage(&Nvpl, vpc);
1180                         WriteBuffer(buf);
1181                 }
1182                 else if (dowrite)
1183                         WriteBuffer(buf);
1184                 else
1185                         ReleaseBuffer(buf);
1186
1187                 if (offnum <= maxoff)
1188                         break;                          /* some item(s) left */
1189
1190         }                                                       /* walk along relation */
1191
1192         blkno++;                                        /* new number of blocks */
1193
1194         if (ToBuf != InvalidBuffer)
1195         {
1196                 Assert(nmoved > 0);
1197                 WriteBuffer(ToBuf);
1198         }
1199
1200         if (nmoved > 0)
1201         {
1202
1203                 /*
1204                  * We have to commit our tuple' movings before we'll truncate
1205                  * relation, but we shouldn't lose our locks. And so - quick hack:
1206                  * flush buffers and record status of current transaction as
1207                  * committed, and continue. - vadim 11/13/96
1208                  */
1209                 FlushBufferPool(!TransactionFlushEnabled());
1210                 TransactionIdCommit(myXID);
1211                 FlushBufferPool(!TransactionFlushEnabled());
1212         }
1213
1214         /*
1215          * Clean uncleaned reapped pages from Vvpl list and set xmin committed
1216          * for inserted tuples
1217          */
1218         nchkmvd = 0;
1219         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < Vnpages; i++, vpp++)
1220         {
1221                 Assert((*vpp)->vpd_blkno < blkno);
1222                 buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1223                 page = BufferGetPage(buf);
1224                 if ((*vpp)->vpd_nusd == 0)              /* this page was not used */
1225                 {
1226
1227                         /*
1228                          * noff == 0 in empty pages only - such pages should be
1229                          * re-used
1230                          */
1231                         Assert((*vpp)->vpd_noff > 0);
1232                         vc_vacpage(page, *vpp);
1233                 }
1234                 else
1235 /* this page was used */
1236                 {
1237                         ntups = 0;
1238                         moff = PageGetMaxOffsetNumber(page);
1239                         for (newoff = FirstOffsetNumber;
1240                                  newoff <= moff;
1241                                  newoff = OffsetNumberNext(newoff))
1242                         {
1243                                 itemid = PageGetItemId(page, newoff);
1244                                 if (!ItemIdIsUsed(itemid))
1245                                         continue;
1246                                 htup = (HeapTuple) PageGetItem(page, itemid);
1247                                 if (TransactionIdEquals((TransactionId) htup->t_xmin, myXID))
1248                                 {
1249                                         htup->t_infomask |= HEAP_XMIN_COMMITTED;
1250                                         ntups++;
1251                                 }
1252                         }
1253                         Assert((*vpp)->vpd_nusd == ntups);
1254                         nchkmvd += ntups;
1255                 }
1256                 WriteBuffer(buf);
1257         }
1258         Assert(nmoved == nchkmvd);
1259
1260         getrusage(RUSAGE_SELF, &ru1);
1261
1262         elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. \
1263 Elapsed %u/%u sec.",
1264                  (RelationGetRelationName(onerel))->data,
1265                  nblocks, blkno, nmoved,
1266                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1267                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1268
1269         if (Nvpl.vpl_npages > 0)
1270         {
1271                 /* vacuum indices again if needed */
1272                 if (Irel != (Relation *) NULL)
1273                 {
1274                         VPageDescr *vpleft,
1275                                            *vpright,
1276                                                 vpsave;
1277
1278                         /* re-sort Nvpl.vpl_pgdesc */
1279                         for (vpleft = Nvpl.vpl_pgdesc,
1280                                  vpright = Nvpl.vpl_pgdesc + Nvpl.vpl_npages - 1;
1281                                  vpleft < vpright; vpleft++, vpright--)
1282                         {
1283                                 vpsave = *vpleft;
1284                                 *vpleft = *vpright;
1285                                 *vpright = vpsave;
1286                         }
1287                         for (i = 0; i < nindices; i++)
1288                                 vc_vaconeind(&Nvpl, Irel[i], vacrelstats->ntups);
1289                 }
1290
1291                 /*
1292                  * clean moved tuples from last page in Nvpl list if some tuples
1293                  * left there
1294                  */
1295                 if (vpc->vpd_noff > 0 && offnum <= maxoff)
1296                 {
1297                         Assert(vpc->vpd_blkno == blkno - 1);
1298                         buf = ReadBuffer(onerel, vpc->vpd_blkno);
1299                         page = BufferGetPage(buf);
1300                         ntups = 0;
1301                         maxoff = offnum;
1302                         for (offnum = FirstOffsetNumber;
1303                                  offnum < maxoff;
1304                                  offnum = OffsetNumberNext(offnum))
1305                         {
1306                                 itemid = PageGetItemId(page, offnum);
1307                                 if (!ItemIdIsUsed(itemid))
1308                                         continue;
1309                                 htup = (HeapTuple) PageGetItem(page, itemid);
1310                                 Assert(TransactionIdEquals((TransactionId) htup->t_xmax, myXID));
1311                                 itemid->lp_flags &= ~LP_USED;
1312                                 ntups++;
1313                         }
1314                         Assert(vpc->vpd_noff == ntups);
1315                         PageRepairFragmentation(page);
1316                         WriteBuffer(buf);
1317                 }
1318
1319                 /* now - free new list of reapped pages */
1320                 vpp = Nvpl.vpl_pgdesc;
1321                 for (i = 0; i < Nvpl.vpl_npages; i++, vpp++)
1322                         pfree(*vpp);
1323                 pfree(Nvpl.vpl_pgdesc);
1324         }
1325
1326         /* truncate relation */
1327         if (blkno < nblocks)
1328         {
1329                 i = BlowawayRelationBuffers(onerel, blkno);
1330                 if (i < 0)
1331                         elog(FATAL, "VACUUM (vc_rpfheap): BlowawayRelationBuffers returned %d", i);
1332                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
1333                 Assert(blkno >= 0);
1334                 vacrelstats->npages = blkno;    /* set new number of blocks */
1335         }
1336
1337         if (Irel != (Relation *) NULL)          /* pfree index' allocations */
1338         {
1339                 pfree(Idesc);
1340                 pfree(idatum);
1341                 pfree(inulls);
1342                 vc_clsindices(nindices, Irel);
1343         }
1344
1345         pfree(vpc);
1346
1347 }       /* vc_rpfheap */
1348
1349 /*
1350  *      vc_vacheap() -- free dead tuples
1351  *
1352  *              This routine marks dead tuples as unused and truncates relation
1353  *              if there are "empty" end-blocks.
1354  */
1355 static void
1356 vc_vacheap(VRelStats *vacrelstats, Relation onerel, VPageList Vvpl)
1357 {
1358         Buffer          buf;
1359         Page            page;
1360         VPageDescr *vpp;
1361         int                     nblocks;
1362         int                     i;
1363
1364         nblocks = Vvpl->vpl_npages;
1365         nblocks -= Vvpl->vpl_nemend;/* nothing to do with them */
1366
1367         for (i = 0, vpp = Vvpl->vpl_pgdesc; i < nblocks; i++, vpp++)
1368         {
1369                 if ((*vpp)->vpd_noff > 0)
1370                 {
1371                         buf = ReadBuffer(onerel, (*vpp)->vpd_blkno);
1372                         page = BufferGetPage(buf);
1373                         vc_vacpage(page, *vpp);
1374                         WriteBuffer(buf);
1375                 }
1376         }
1377
1378         /* truncate relation if there are some empty end-pages */
1379         if (Vvpl->vpl_nemend > 0)
1380         {
1381                 Assert(vacrelstats->npages >= Vvpl->vpl_nemend);
1382                 nblocks = vacrelstats->npages - Vvpl->vpl_nemend;
1383                 elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
1384                          (RelationGetRelationName(onerel))->data,
1385                          vacrelstats->npages, nblocks);
1386
1387                 /*
1388                  * we have to flush "empty" end-pages (if changed, but who knows
1389                  * it) before truncation
1390                  */
1391                 FlushBufferPool(!TransactionFlushEnabled());
1392
1393                 i = BlowawayRelationBuffers(onerel, nblocks);
1394                 if (i < 0)
1395                         elog(FATAL, "VACUUM (vc_vacheap): BlowawayRelationBuffers returned %d", i);
1396
1397                 nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
1398                 Assert(nblocks >= 0);
1399                 vacrelstats->npages = nblocks;  /* set new number of blocks */
1400         }
1401
1402 }       /* vc_vacheap */
1403
1404 /*
1405  *      vc_vacpage() -- free dead tuples on a page
1406  *                                       and repaire its fragmentation.
1407  */
1408 static void
1409 vc_vacpage(Page page, VPageDescr vpd)
1410 {
1411         ItemId          itemid;
1412         int                     i;
1413
1414         Assert(vpd->vpd_nusd == 0);
1415         for (i = 0; i < vpd->vpd_noff; i++)
1416         {
1417                 itemid = &(((PageHeader) page)->pd_linp[vpd->vpd_voff[i] - 1]);
1418                 itemid->lp_flags &= ~LP_USED;
1419         }
1420         PageRepairFragmentation(page);
1421
1422 }       /* vc_vacpage */
1423
1424 /*
1425  *      _vc_scanoneind() -- scan one index relation to update statistic.
1426  *
1427  */
1428 static void
1429 vc_scanoneind(Relation indrel, int nhtups)
1430 {
1431         RetrieveIndexResult res;
1432         IndexScanDesc iscan;
1433         int                     nitups;
1434         int                     nipages;
1435         struct rusage ru0,
1436                                 ru1;
1437
1438         getrusage(RUSAGE_SELF, &ru0);
1439
1440         /* walk through the entire index */
1441         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1442         nitups = 0;
1443
1444         while ((res = index_getnext(iscan, ForwardScanDirection))
1445                    != (RetrieveIndexResult) NULL)
1446         {
1447                 nitups++;
1448                 pfree(res);
1449         }
1450
1451         index_endscan(iscan);
1452
1453         /* now update statistics in pg_class */
1454         nipages = RelationGetNumberOfBlocks(indrel);
1455         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1456
1457         getrusage(RUSAGE_SELF, &ru1);
1458
1459         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u. Elapsed %u/%u sec.",
1460                  indrel->rd_rel->relname.data, nipages, nitups,
1461                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1462                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1463
1464         if (nitups != nhtups)
1465                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1466                          indrel->rd_rel->relname.data, nitups, nhtups);
1467
1468 }       /* vc_scanoneind */
1469
1470 /*
1471  *      vc_vaconeind() -- vacuum one index relation.
1472  *
1473  *              Vpl is the VPageList of the heap we're currently vacuuming.
1474  *              It's locked. Indrel is an index relation on the vacuumed heap.
1475  *              We don't set locks on the index relation here, since the indexed
1476  *              access methods support locking at different granularities.
1477  *              We let them handle it.
1478  *
1479  *              Finally, we arrange to update the index relation's statistics in
1480  *              pg_class.
1481  */
1482 static void
1483 vc_vaconeind(VPageList vpl, Relation indrel, int nhtups)
1484 {
1485         RetrieveIndexResult res;
1486         IndexScanDesc iscan;
1487         ItemPointer heapptr;
1488         int                     nvac;
1489         int                     nitups;
1490         int                     nipages;
1491         VPageDescr      vp;
1492         struct rusage ru0,
1493                                 ru1;
1494
1495         getrusage(RUSAGE_SELF, &ru0);
1496
1497         /* walk through the entire index */
1498         iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
1499         nvac = 0;
1500         nitups = 0;
1501
1502         while ((res = index_getnext(iscan, ForwardScanDirection))
1503                    != (RetrieveIndexResult) NULL)
1504         {
1505                 heapptr = &res->heap_iptr;
1506
1507                 if ((vp = vc_tidreapped(heapptr, vpl)) != (VPageDescr) NULL)
1508                 {
1509 #if 0
1510                         elog(DEBUG, "<%x,%x> -> <%x,%x>",
1511                                  ItemPointerGetBlockNumber(&(res->index_iptr)),
1512                                  ItemPointerGetOffsetNumber(&(res->index_iptr)),
1513                                  ItemPointerGetBlockNumber(&(res->heap_iptr)),
1514                                  ItemPointerGetOffsetNumber(&(res->heap_iptr)));
1515 #endif
1516                         if (vp->vpd_noff == 0)
1517                         {                                       /* this is EmptyPage !!! */
1518                                 elog(NOTICE, "Ind %s: pointer to EmptyPage (blk %u off %u) - fixing",
1519                                          indrel->rd_rel->relname.data,
1520                                          vp->vpd_blkno, ItemPointerGetOffsetNumber(heapptr));
1521                         }
1522                         ++nvac;
1523                         index_delete(indrel, &res->index_iptr);
1524                 }
1525                 else
1526                         nitups++;
1527
1528                 /* be tidy */
1529                 pfree(res);
1530         }
1531
1532         index_endscan(iscan);
1533
1534         /* now update statistics in pg_class */
1535         nipages = RelationGetNumberOfBlocks(indrel);
1536         vc_updstats(indrel->rd_id, nipages, nitups, false, NULL);
1537
1538         getrusage(RUSAGE_SELF, &ru1);
1539
1540         elog(MESSAGE_LEVEL, "Ind %s: Pages %u; Tuples %u: Deleted %u. Elapsed %u/%u sec.",
1541                  indrel->rd_rel->relname.data, nipages, nitups, nvac,
1542                  ru1.ru_stime.tv_sec - ru0.ru_stime.tv_sec,
1543                  ru1.ru_utime.tv_sec - ru0.ru_utime.tv_sec);
1544
1545         if (nitups != nhtups)
1546                 elog(NOTICE, "Ind %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u)",
1547                          indrel->rd_rel->relname.data, nitups, nhtups);
1548
1549 }       /* vc_vaconeind */
1550
1551 /*
1552  *      vc_tidreapped() -- is a particular tid reapped?
1553  *
1554  *              vpl->VPageDescr_array is sorted in right order.
1555  */
1556 static VPageDescr
1557 vc_tidreapped(ItemPointer itemptr, VPageList vpl)
1558 {
1559         OffsetNumber ioffno;
1560         OffsetNumber *voff;
1561         VPageDescr      vp,
1562                            *vpp;
1563         VPageDescrData vpd;
1564
1565         vpd.vpd_blkno = ItemPointerGetBlockNumber(itemptr);
1566         ioffno = ItemPointerGetOffsetNumber(itemptr);
1567
1568         vp = &vpd;
1569         vpp = (VPageDescr *) vc_find_eq((char *) (vpl->vpl_pgdesc),
1570                                            vpl->vpl_npages, sizeof(VPageDescr), (char *) &vp,
1571                                                                         vc_cmp_blk);
1572
1573         if (vpp == (VPageDescr *) NULL)
1574                 return ((VPageDescr) NULL);
1575         vp = *vpp;
1576
1577         /* ok - we are on true page */
1578
1579         if (vp->vpd_noff == 0)
1580         {                                                       /* this is EmptyPage !!! */
1581                 return (vp);
1582         }
1583
1584         voff = (OffsetNumber *) vc_find_eq((char *) (vp->vpd_voff),
1585                                         vp->vpd_noff, sizeof(OffsetNumber), (char *) &ioffno,
1586                                                                            vc_cmp_offno);
1587
1588         if (voff == (OffsetNumber *) NULL)
1589                 return ((VPageDescr) NULL);
1590
1591         return (vp);
1592
1593 }       /* vc_tidreapped */
1594
1595 /*
1596  *      vc_attrstats() -- compute column statistics used by the optimzer
1597  *
1598  *      We compute the column min, max, null and non-null counts.
1599  *      Plus we attempt to find the count of the value that occurs most
1600  *      frequently in each column
1601  *      These figures are used to compute the selectivity of the column
1602  *
1603  *      We use a three-bucked cache to get the most frequent item
1604  *      The 'guess' buckets count hits.  A cache miss causes guess1
1605  *      to get the most hit 'guess' item in the most recent cycle, and
1606  *      the new item goes into guess2.  Whenever the total count of hits
1607  *      of a 'guess' entry is larger than 'best', 'guess' becomes 'best'.
1608  *
1609  *      This method works perfectly for columns with unique values, and columns
1610  *      with only two unique values, plus nulls.
1611  *
1612  *      It becomes less perfect as the number of unique values increases and
1613  *      their distribution in the table becomes more random.
1614  *
1615  */
1616 static void
1617 vc_attrstats(Relation onerel, VRelStats *vacrelstats, HeapTuple htup)
1618 {
1619         int                     i,
1620                                 attr_cnt = vacrelstats->va_natts;
1621         VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1622         TupleDesc       tupDesc = onerel->rd_att;
1623         Datum           value;
1624         bool            isnull;
1625
1626         for (i = 0; i < attr_cnt; i++)
1627         {
1628                 VacAttrStats *stats = &vacattrstats[i];
1629                 bool            value_hit = true;
1630
1631                 value = heap_getattr(htup,
1632                                                          stats->attr->attnum, tupDesc, &isnull);
1633
1634                 if (!VacAttrStatsEqValid(stats))
1635                         continue;
1636
1637                 if (isnull)
1638                         stats->null_cnt++;
1639                 else
1640                 {
1641                         stats->nonnull_cnt++;
1642                         if (stats->initialized == false)
1643                         {
1644                                 vc_bucketcpy(stats->attr, value, &stats->best, &stats->best_len);
1645                                 /* best_cnt gets incremented later */
1646                                 vc_bucketcpy(stats->attr, value, &stats->guess1, &stats->guess1_len);
1647                                 stats->guess1_cnt = stats->guess1_hits = 1;
1648                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1649                                 stats->guess2_hits = 1;
1650                                 if (VacAttrStatsLtGtValid(stats))
1651                                 {
1652                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1653                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1654                                 }
1655                                 stats->initialized = true;
1656                         }
1657                         if (VacAttrStatsLtGtValid(stats))
1658                         {
1659                                 if ((*fmgr_faddr(&stats->f_cmplt)) (value, stats->min))
1660                                 {
1661                                         vc_bucketcpy(stats->attr, value, &stats->min, &stats->min_len);
1662                                         stats->min_cnt = 0;
1663                                 }
1664                                 if ((*fmgr_faddr(&stats->f_cmpgt)) (value, stats->max))
1665                                 {
1666                                         vc_bucketcpy(stats->attr, value, &stats->max, &stats->max_len);
1667                                         stats->max_cnt = 0;
1668                                 }
1669                                 if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->min))
1670                                         stats->min_cnt++;
1671                                 else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->max))
1672                                         stats->max_cnt++;
1673                         }
1674                         if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->best))
1675                                 stats->best_cnt++;
1676                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess1))
1677                         {
1678                                 stats->guess1_cnt++;
1679                                 stats->guess1_hits++;
1680                         }
1681                         else if ((*fmgr_faddr(&stats->f_cmpeq)) (value, stats->guess2))
1682                                 stats->guess2_hits++;
1683                         else
1684                                 value_hit = false;
1685
1686                         if (stats->guess2_hits > stats->guess1_hits)
1687                         {
1688                                 swapDatum(stats->guess1, stats->guess2);
1689                                 swapInt(stats->guess1_len, stats->guess2_len);
1690                                 stats->guess1_cnt = stats->guess2_hits;
1691                                 swapLong(stats->guess1_hits, stats->guess2_hits);
1692                         }
1693                         if (stats->guess1_cnt > stats->best_cnt)
1694                         {
1695                                 swapDatum(stats->best, stats->guess1);
1696                                 swapInt(stats->best_len, stats->guess1_len);
1697                                 swapLong(stats->best_cnt, stats->guess1_cnt);
1698                                 stats->guess1_hits = 1;
1699                                 stats->guess2_hits = 1;
1700                         }
1701                         if (!value_hit)
1702                         {
1703                                 vc_bucketcpy(stats->attr, value, &stats->guess2, &stats->guess2_len);
1704                                 stats->guess1_hits = 1;
1705                                 stats->guess2_hits = 1;
1706                         }
1707                 }
1708         }
1709         return;
1710 }
1711
1712 /*
1713  *      vc_bucketcpy() -- update pg_class statistics for one relation
1714  *
1715  */
1716 static void
1717 vc_bucketcpy(AttributeTupleForm attr, Datum value, Datum *bucket, int16 *bucket_len)
1718 {
1719         if (attr->attbyval && attr->attlen != -1)
1720                 *bucket = value;
1721         else
1722         {
1723                 int                     len = (attr->attlen != -1 ? attr->attlen : VARSIZE(value));
1724
1725                 if (len > *bucket_len)
1726                 {
1727                         if (*bucket_len != 0)
1728                                 pfree(DatumGetPointer(*bucket));
1729                         *bucket = PointerGetDatum(palloc(len));
1730                         *bucket_len = len;
1731                 }
1732                 memmove(DatumGetPointer(*bucket), DatumGetPointer(value), len);
1733         }
1734 }
1735
1736 /*
1737  *      vc_updstats() -- update pg_class statistics for one relation
1738  *
1739  *              This routine works for both index and heap relation entries in
1740  *              pg_class.  We violate no-overwrite semantics here by storing new
1741  *              values for ntups, npages, and hasindex directly in the pg_class
1742  *              tuple that's already on the page.  The reason for this is that if
1743  *              we updated these tuples in the usual way, then every tuple in pg_class
1744  *              would be replaced every day.  This would make planning and executing
1745  *              historical queries very expensive.
1746  */
1747 static void
1748 vc_updstats(Oid relid, int npages, int ntups, bool hasindex, VRelStats *vacrelstats)
1749 {
1750         Relation        rd,
1751                                 ad,
1752                                 sd;
1753         HeapScanDesc rsdesc,
1754                                 asdesc;
1755         TupleDesc       sdesc;
1756         HeapTuple       rtup,
1757                                 atup,
1758                                 stup;
1759         Buffer          rbuf,
1760                                 abuf;
1761         Form_pg_class pgcform;
1762         ScanKeyData rskey,
1763                                 askey;
1764         AttributeTupleForm attp;
1765
1766         /*
1767          * update number of tuples and number of pages in pg_class
1768          */
1769         ScanKeyEntryInitialize(&rskey, 0x0, ObjectIdAttributeNumber,
1770                                                    F_OIDEQ,
1771                                                    ObjectIdGetDatum(relid));
1772
1773         rd = heap_openr(RelationRelationName);
1774         rsdesc = heap_beginscan(rd, false, false, 1, &rskey);
1775
1776         if (!HeapTupleIsValid(rtup = heap_getnext(rsdesc, 0, &rbuf)))
1777                 elog(ERROR, "pg_class entry for relid %d vanished during vacuuming",
1778                          relid);
1779
1780         /* overwrite the existing statistics in the tuple */
1781         vc_setpagelock(rd, BufferGetBlockNumber(rbuf));
1782         pgcform = (Form_pg_class) GETSTRUCT(rtup);
1783         pgcform->reltuples = ntups;
1784         pgcform->relpages = npages;
1785         pgcform->relhasindex = hasindex;
1786
1787         if (vacrelstats != NULL && vacrelstats->va_natts > 0)
1788         {
1789                 VacAttrStats *vacattrstats = vacrelstats->vacattrstats;
1790                 int                     natts = vacrelstats->va_natts;
1791
1792                 ad = heap_openr(AttributeRelationName);
1793                 sd = heap_openr(StatisticRelationName);
1794                 ScanKeyEntryInitialize(&askey, 0, Anum_pg_attribute_attrelid,
1795                                                            F_INT4EQ, relid);
1796
1797                 asdesc = heap_beginscan(ad, false, false, 1, &askey);
1798
1799                 while (HeapTupleIsValid(atup = heap_getnext(asdesc, 0, &abuf)))
1800                 {
1801                         int                     i;
1802                         float32data selratio;           /* average ratio of rows selected
1803                                                                                  * for a random constant */
1804                         VacAttrStats *stats;
1805                         Datum           values[Natts_pg_statistic];
1806                         char            nulls[Natts_pg_statistic];
1807
1808                         attp = (AttributeTupleForm) GETSTRUCT(atup);
1809                         if (attp->attnum <= 0)          /* skip system attributes for now, */
1810                                 /* they are unique anyway */
1811                                 continue;
1812
1813                         for (i = 0; i < natts; i++)
1814                         {
1815                                 if (attp->attnum == vacattrstats[i].attr->attnum)
1816                                         break;
1817                         }
1818                         if (i >= natts)
1819                                 continue;
1820                         stats = &(vacattrstats[i]);
1821
1822                         /* overwrite the existing statistics in the tuple */
1823                         if (VacAttrStatsEqValid(stats))
1824                         {
1825
1826                                 vc_setpagelock(ad, BufferGetBlockNumber(abuf));
1827
1828                                 if (stats->nonnull_cnt + stats->null_cnt == 0 ||
1829                                         (stats->null_cnt <= 1 && stats->best_cnt == 1))
1830                                         selratio = 0;
1831                                 else if (VacAttrStatsLtGtValid(stats) && stats->min_cnt + stats->max_cnt == stats->nonnull_cnt)
1832                                 {
1833                                         double          min_cnt_d = stats->min_cnt,
1834                                                                 max_cnt_d = stats->max_cnt,
1835                                                                 null_cnt_d = stats->null_cnt,
1836                                                                 nonnullcnt_d = stats->nonnull_cnt;              /* prevent overflow */
1837
1838                                         selratio = (min_cnt_d * min_cnt_d + max_cnt_d * max_cnt_d + null_cnt_d * null_cnt_d) /
1839                                                 (nonnullcnt_d + null_cnt_d) / (nonnullcnt_d + null_cnt_d);
1840                                 }
1841                                 else
1842                                 {
1843                                         double          most = (double) (stats->best_cnt > stats->null_cnt ? stats->best_cnt : stats->null_cnt);
1844                                         double          total = ((double) stats->nonnull_cnt) + ((double) stats->null_cnt);
1845
1846                                         /*
1847                                          * we assume count of other values are 20% of best
1848                                          * count in table
1849                                          */
1850                                         selratio = (most * most + 0.20 * most * (total - most)) / total / total;
1851                                 }
1852                                 if (selratio > 1.0)
1853                                         selratio = 1.0;
1854                                 attp->attdisbursion = selratio;
1855                                 WriteNoReleaseBuffer(abuf);
1856
1857                                 /* DO PG_STATISTIC INSERTS */
1858
1859                                 /*
1860                                  * doing system relations, especially pg_statistic is a
1861                                  * problem
1862                                  */
1863                                 if (VacAttrStatsLtGtValid(stats) && stats->initialized  /* &&
1864                                                                                                                                                  * !IsSystemRelationName(
1865                                                                                                                                                  *
1866                                          pgcform->relname.data) */ )
1867                                 {
1868                                         FmgrInfo        out_function;
1869                                         char       *out_string;
1870
1871                                         for (i = 0; i < Natts_pg_statistic; ++i)
1872                                                 nulls[i] = ' ';
1873
1874                                         /* ----------------
1875                                          *      initialize values[]
1876                                          * ----------------
1877                                          */
1878                                         i = 0;
1879                                         values[i++] = (Datum) relid;            /* 1 */
1880                                         values[i++] = (Datum) attp->attnum; /* 2 */
1881                                         values[i++] = (Datum) InvalidOid;       /* 3 */
1882                                         fmgr_info(stats->outfunc, &out_function);
1883                                         out_string = (*fmgr_faddr(&out_function)) (stats->min, stats->attr->atttypid);
1884                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1885                                         pfree(out_string);
1886                                         out_string = (char *) (*fmgr_faddr(&out_function)) (stats->max, stats->attr->atttypid);
1887                                         values[i++] = (Datum) fmgr(F_TEXTIN, out_string);
1888                                         pfree(out_string);
1889
1890                                         sdesc = sd->rd_att;
1891
1892                                         stup = heap_formtuple(sdesc, values, nulls);
1893
1894                                         /* ----------------
1895                                          *      insert the tuple in the relation and get the tuple's oid.
1896                                          * ----------------
1897                                          */
1898                                         heap_insert(sd, stup);
1899                                         pfree(DatumGetPointer(values[3]));
1900                                         pfree(DatumGetPointer(values[4]));
1901                                         pfree(stup);
1902                                 }
1903                         }
1904                 }
1905                 heap_endscan(asdesc);
1906                 heap_close(ad);
1907                 heap_close(sd);
1908         }
1909
1910         /* XXX -- after write, should invalidate relcache in other backends */
1911         WriteNoReleaseBuffer(rbuf); /* heap_endscan release scan' buffers ? */
1912
1913         /*
1914          * invalidating system relations confuses the function cache of
1915          * pg_operator and pg_opclass
1916          */
1917         if (!IsSystemRelationName(pgcform->relname.data))
1918                 RelationInvalidateHeapTuple(rd, rtup);
1919
1920         /* that's all, folks */
1921         heap_endscan(rsdesc);
1922         heap_close(rd);
1923 }
1924
1925 /*
1926  *      vc_delhilowstats() -- delete pg_statistics rows
1927  *
1928  */
1929 static void
1930 vc_delhilowstats(Oid relid, int attcnt, int *attnums)
1931 {
1932         Relation        pgstatistic;
1933         HeapScanDesc pgsscan;
1934         HeapTuple       pgstup;
1935         ScanKeyData pgskey;
1936
1937         pgstatistic = heap_openr(StatisticRelationName);
1938
1939         if (relid != InvalidOid)
1940         {
1941                 ScanKeyEntryInitialize(&pgskey, 0x0, Anum_pg_statistic_starelid,
1942                                                            F_OIDEQ,
1943                                                            ObjectIdGetDatum(relid));
1944                 pgsscan = heap_beginscan(pgstatistic, false, false, 1, &pgskey);
1945         }
1946         else
1947                 pgsscan = heap_beginscan(pgstatistic, false, false, 0, NULL);
1948
1949         while (HeapTupleIsValid(pgstup = heap_getnext(pgsscan, 0, NULL)))
1950         {
1951                 if (attcnt > 0)
1952                 {
1953                         Form_pg_statistic pgs = (Form_pg_statistic) GETSTRUCT(pgstup);
1954                         int                     i;
1955
1956                         for (i = 0; i < attcnt; i++)
1957                         {
1958                                 if (pgs->staattnum == attnums[i] + 1)
1959                                         break;
1960                         }
1961                         if (i >= attcnt)
1962                                 continue;               /* don't delete it */
1963                 }
1964                 heap_delete(pgstatistic, &pgstup->t_ctid);
1965         }
1966
1967         heap_endscan(pgsscan);
1968         heap_close(pgstatistic);
1969 }
1970
1971 static void
1972 vc_setpagelock(Relation rel, BlockNumber blkno)
1973 {
1974         ItemPointerData itm;
1975
1976         ItemPointerSet(&itm, blkno, 1);
1977
1978         RelationSetLockForWritePage(rel, &itm);
1979 }
1980
1981 /*
1982  *      vc_reappage() -- save a page on the array of reapped pages.
1983  *
1984  *              As a side effect of the way that the vacuuming loop for a given
1985  *              relation works, higher pages come after lower pages in the array
1986  *              (and highest tid on a page is last).
1987  */
1988 static void
1989 vc_reappage(VPageList vpl, VPageDescr vpc)
1990 {
1991         VPageDescr      newvpd;
1992
1993         /* allocate a VPageDescrData entry */
1994         newvpd = (VPageDescr) palloc(sizeof(VPageDescrData) + vpc->vpd_noff * sizeof(OffsetNumber));
1995
1996         /* fill it in */
1997         if (vpc->vpd_noff > 0)
1998                 memmove(newvpd->vpd_voff, vpc->vpd_voff, vpc->vpd_noff * sizeof(OffsetNumber));
1999         newvpd->vpd_blkno = vpc->vpd_blkno;
2000         newvpd->vpd_free = vpc->vpd_free;
2001         newvpd->vpd_nusd = vpc->vpd_nusd;
2002         newvpd->vpd_noff = vpc->vpd_noff;
2003
2004         /* insert this page into vpl list */
2005         vc_vpinsert(vpl, newvpd);
2006
2007 }       /* vc_reappage */
2008
2009 static void
2010 vc_vpinsert(VPageList vpl, VPageDescr vpnew)
2011 {
2012
2013         /* allocate a VPageDescr entry if needed */
2014         if (vpl->vpl_npages == 0)
2015                 vpl->vpl_pgdesc = (VPageDescr *) palloc(100 * sizeof(VPageDescr));
2016         else if (vpl->vpl_npages % 100 == 0)
2017                 vpl->vpl_pgdesc = (VPageDescr *) repalloc(vpl->vpl_pgdesc, (vpl->vpl_npages + 100) * sizeof(VPageDescr));
2018         vpl->vpl_pgdesc[vpl->vpl_npages] = vpnew;
2019         (vpl->vpl_npages)++;
2020
2021 }
2022
2023 static void
2024 vc_free(VRelList vrl)
2025 {
2026         VRelList        p_vrl;
2027         MemoryContext old;
2028         PortalVariableMemory pmem;
2029
2030         pmem = PortalGetVariableMemory(vc_portal);
2031         old = MemoryContextSwitchTo((MemoryContext) pmem);
2032
2033         while (vrl != (VRelList) NULL)
2034         {
2035
2036                 /* free rel list entry */
2037                 p_vrl = vrl;
2038                 vrl = vrl->vrl_next;
2039                 pfree(p_vrl);
2040         }
2041
2042         MemoryContextSwitchTo(old);
2043 }
2044
2045 static char *
2046 vc_find_eq(char *bot, int nelem, int size, char *elm, int (*compar) (char *, char *))
2047 {
2048         int                     res;
2049         int                     last = nelem - 1;
2050         int                     celm = nelem / 2;
2051         bool            last_move,
2052                                 first_move;
2053
2054         last_move = first_move = true;
2055         for (;;)
2056         {
2057                 if (first_move == true)
2058                 {
2059                         res = compar(bot, elm);
2060                         if (res > 0)
2061                                 return (NULL);
2062                         if (res == 0)
2063                                 return (bot);
2064                         first_move = false;
2065                 }
2066                 if (last_move == true)
2067                 {
2068                         res = compar(elm, bot + last * size);
2069                         if (res > 0)
2070                                 return (NULL);
2071                         if (res == 0)
2072                                 return (bot + last * size);
2073                         last_move = false;
2074                 }
2075                 res = compar(elm, bot + celm * size);
2076                 if (res == 0)
2077                         return (bot + celm * size);
2078                 if (res < 0)
2079                 {
2080                         if (celm == 0)
2081                                 return (NULL);
2082                         last = celm - 1;
2083                         celm = celm / 2;
2084                         last_move = true;
2085                         continue;
2086                 }
2087
2088                 if (celm == last)
2089                         return (NULL);
2090
2091                 last = last - celm - 1;
2092                 bot = bot + (celm + 1) * size;
2093                 celm = (last + 1) / 2;
2094                 first_move = true;
2095         }
2096
2097 }       /* vc_find_eq */
2098
2099 static int
2100 vc_cmp_blk(char *left, char *right)
2101 {
2102         BlockNumber lblk,
2103                                 rblk;
2104
2105         lblk = (*((VPageDescr *) left))->vpd_blkno;
2106         rblk = (*((VPageDescr *) right))->vpd_blkno;
2107
2108         if (lblk < rblk)
2109                 return (-1);
2110         if (lblk == rblk)
2111                 return (0);
2112         return (1);
2113
2114 }       /* vc_cmp_blk */
2115
2116 static int
2117 vc_cmp_offno(char *left, char *right)
2118 {
2119
2120         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2121                 return (-1);
2122         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2123                 return (0);
2124         return (1);
2125
2126 }       /* vc_cmp_offno */
2127
2128
2129 static void
2130 vc_getindices(Oid relid, int *nindices, Relation **Irel)
2131 {
2132         Relation        pgindex;
2133         Relation        irel;
2134         TupleDesc       pgidesc;
2135         HeapTuple       pgitup;
2136         HeapScanDesc pgiscan;
2137         Datum           d;
2138         int                     i,
2139                                 k;
2140         bool            n;
2141         ScanKeyData pgikey;
2142         Oid                *ioid;
2143
2144         *nindices = i = 0;
2145
2146         ioid = (Oid *) palloc(10 * sizeof(Oid));
2147
2148         /* prepare a heap scan on the pg_index relation */
2149         pgindex = heap_openr(IndexRelationName);
2150         pgidesc = RelationGetTupleDescriptor(pgindex);
2151
2152         ScanKeyEntryInitialize(&pgikey, 0x0, Anum_pg_index_indrelid,
2153                                                    F_OIDEQ,
2154                                                    ObjectIdGetDatum(relid));
2155
2156         pgiscan = heap_beginscan(pgindex, false, false, 1, &pgikey);
2157
2158         while (HeapTupleIsValid(pgitup = heap_getnext(pgiscan, 0, NULL)))
2159         {
2160                 d = heap_getattr(pgitup, Anum_pg_index_indexrelid,
2161                                                  pgidesc, &n);
2162                 i++;
2163                 if (i % 10 == 0)
2164                         ioid = (Oid *) repalloc(ioid, (i + 10) * sizeof(Oid));
2165                 ioid[i - 1] = DatumGetObjectId(d);
2166         }
2167
2168         heap_endscan(pgiscan);
2169         heap_close(pgindex);
2170
2171         if (i == 0)
2172         {                                                       /* No one index found */
2173                 pfree(ioid);
2174                 return;
2175         }
2176
2177         if (Irel != (Relation **) NULL)
2178                 *Irel = (Relation *) palloc(i * sizeof(Relation));
2179
2180         for (k = 0; i > 0;)
2181         {
2182                 irel = index_open(ioid[--i]);
2183                 if (irel != (Relation) NULL)
2184                 {
2185                         if (Irel != (Relation **) NULL)
2186                                 (*Irel)[k] = irel;
2187                         else
2188                                 index_close(irel);
2189                         k++;
2190                 }
2191                 else
2192                         elog(NOTICE, "CAN't OPEN INDEX %u - SKIP IT", ioid[i]);
2193         }
2194         *nindices = k;
2195         pfree(ioid);
2196
2197         if (Irel != (Relation **) NULL && *nindices == 0)
2198         {
2199                 pfree(*Irel);
2200                 *Irel = (Relation *) NULL;
2201         }
2202
2203 }       /* vc_getindices */
2204
2205
2206 static void
2207 vc_clsindices(int nindices, Relation *Irel)
2208 {
2209
2210         if (Irel == (Relation *) NULL)
2211                 return;
2212
2213         while (nindices--)
2214                 index_close(Irel[nindices]);
2215         pfree(Irel);
2216
2217 }       /* vc_clsindices */
2218
2219
2220 static void
2221 vc_mkindesc(Relation onerel, int nindices, Relation *Irel, IndDesc **Idesc)
2222 {
2223         IndDesc    *idcur;
2224         HeapTuple       pgIndexTup;
2225         AttrNumber *attnumP;
2226         int                     natts;
2227         int                     i;
2228
2229         *Idesc = (IndDesc *) palloc(nindices * sizeof(IndDesc));
2230
2231         for (i = 0, idcur = *Idesc; i < nindices; i++, idcur++)
2232         {
2233                 pgIndexTup =
2234                         SearchSysCacheTuple(INDEXRELID,
2235                                                                 ObjectIdGetDatum(Irel[i]->rd_id),
2236                                                                 0, 0, 0);
2237                 Assert(pgIndexTup);
2238                 idcur->tform = (IndexTupleForm) GETSTRUCT(pgIndexTup);
2239                 for (attnumP = &(idcur->tform->indkey[0]), natts = 0;
2240                          *attnumP != InvalidAttrNumber && natts != INDEX_MAX_KEYS;
2241                          attnumP++, natts++);
2242                 if (idcur->tform->indproc != InvalidOid)
2243                 {
2244                         idcur->finfoP = &(idcur->finfo);
2245                         FIgetnArgs(idcur->finfoP) = natts;
2246                         natts = 1;
2247                         FIgetProcOid(idcur->finfoP) = idcur->tform->indproc;
2248                         *(FIgetname(idcur->finfoP)) = '\0';
2249                 }
2250                 else
2251                         idcur->finfoP = (FuncIndexInfo *) NULL;
2252
2253                 idcur->natts = natts;
2254         }
2255
2256 }       /* vc_mkindesc */
2257
2258
2259 static bool
2260 vc_enough_space(VPageDescr vpd, Size len)
2261 {
2262
2263         len = DOUBLEALIGN(len);
2264
2265         if (len > vpd->vpd_free)
2266                 return (false);
2267
2268         if (vpd->vpd_nusd < vpd->vpd_noff)      /* there are free itemid(s) */
2269                 return (true);                  /* and len <= free_space */
2270
2271         /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
2272         if (len <= vpd->vpd_free - sizeof(ItemIdData))
2273                 return (true);
2274
2275         return (false);
2276
2277 }       /* vc_enough_space */