]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Restructure local-buffer handling per recent pghackers discussion.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.233 2002/08/06 02:36:34 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108 static TransactionId initialOldestXmin;
109 static TransactionId initialFreezeLimit;
110
111
112 /* non-export function prototypes */
113 static List *getrels(const RangeVar *vacrel, const char *stmttype);
114 static void vac_update_dbstats(Oid dbid,
115                                    TransactionId vacuumXID,
116                                    TransactionId frozenXID);
117 static void vac_truncate_clog(TransactionId vacuumXID,
118                                   TransactionId frozenXID);
119 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
120 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
121 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
122                   VacPageList vacuum_pages, VacPageList fraged_pages);
123 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacuum_pages, VacPageList fraged_pages,
125                         int nindexes, Relation *Irel);
126 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
127                         VacPageList vacpagelist);
128 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
129 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
130                          double num_tuples, int keep_tuples);
131 static void scan_index(Relation indrel, double num_tuples);
132 static bool tid_reaped(ItemPointer itemptr, void *state);
133 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
134 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
135                            BlockNumber rel_pages);
136 static VacPage copy_vac_page(VacPage vacpage);
137 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
138 static void *vac_bsearch(const void *key, const void *base,
139                         size_t nelem, size_t size,
140                         int (*compar) (const void *, const void *));
141 static int      vac_cmp_blk(const void *left, const void *right);
142 static int      vac_cmp_offno(const void *left, const void *right);
143 static int      vac_cmp_vtlinks(const void *left, const void *right);
144 static bool enough_space(VacPage vacpage, Size len);
145
146
147 /****************************************************************************
148  *                                                                                                                                                      *
149  *                      Code common to all flavors of VACUUM and ANALYZE                                *
150  *                                                                                                                                                      *
151  ****************************************************************************
152  */
153
154
155 /*
156  * Primary entry point for VACUUM and ANALYZE commands.
157  */
158 void
159 vacuum(VacuumStmt *vacstmt)
160 {
161         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
162         MemoryContext anl_context = NULL;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG1;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum && IsTransactionBlock())
181                 elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
182
183         /* Running VACUUM from a function would free the function context */
184         if (vacstmt->vacuum && !MemoryContextContains(QueryContext, vacstmt))
185                 elog(ERROR, "%s cannot be executed from a function", stmttype);
186
187         /*
188          * Send info about dead objects to the statistics collector
189          */
190         if (vacstmt->vacuum)
191                 pgstat_vacuum_tabstat();
192
193         /*
194          * Create special memory context for cross-transaction storage.
195          *
196          * Since it is a child of QueryContext, it will go away eventually even
197          * if we suffer an error; there's no need for special abort cleanup
198          * logic.
199          */
200         vac_context = AllocSetContextCreate(QueryContext,
201                                                                                 "Vacuum",
202                                                                                 ALLOCSET_DEFAULT_MINSIZE,
203                                                                                 ALLOCSET_DEFAULT_INITSIZE,
204                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
205
206         /*
207          * If we are running only ANALYZE, we don't need per-table transactions,
208          * but we still need a memory context with table lifetime.
209          */
210         if (vacstmt->analyze && !vacstmt->vacuum)
211                 anl_context = AllocSetContextCreate(QueryContext,
212                                                                                         "Analyze",
213                                                                                         ALLOCSET_DEFAULT_MINSIZE,
214                                                                                         ALLOCSET_DEFAULT_INITSIZE,
215                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
216
217         /* Build list of relations to process (note this lives in vac_context) */
218         vrl = getrels(vacstmt->relation, stmttype);
219
220         /*
221          * Formerly, there was code here to prevent more than one VACUUM from
222          * executing concurrently in the same database.  However, there's no
223          * good reason to prevent that, and manually removing lockfiles after
224          * a vacuum crash was a pain for dbadmins.  So, forget about lockfiles,
225          * and just rely on the locks we grab on each target table
226          * to ensure that there aren't two VACUUMs running on the same table
227          * at the same time.
228          */
229
230         /*
231          * The strangeness with committing and starting transactions here is due
232          * to wanting to run each table's VACUUM as a separate transaction, so
233          * that we don't hold locks unnecessarily long.  Also, if we are doing
234          * VACUUM ANALYZE, the ANALYZE part runs as a separate transaction from
235          * the VACUUM to further reduce locking.
236          *
237          * vacuum_rel expects to be entered with no transaction active; it will
238          * start and commit its own transaction.  But we are called by an SQL
239          * command, and so we are executing inside a transaction already.  We
240          * commit the transaction started in PostgresMain() here, and start
241          * another one before exiting to match the commit waiting for us back in
242          * PostgresMain().
243          *
244          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
245          * okay to run the whole thing in the outer transaction, and so we skip
246          * transaction start/stop operations.
247          */
248         if (vacstmt->vacuum)
249         {
250                 if (vacstmt->relation == NULL)
251                 {
252                         /*
253                          * It's a database-wide VACUUM.
254                          *
255                          * Compute the initially applicable OldestXmin and FreezeLimit
256                          * XIDs, so that we can record these values at the end of the
257                          * VACUUM. Note that individual tables may well be processed with
258                          * newer values, but we can guarantee that no (non-shared)
259                          * relations are processed with older ones.
260                          *
261                          * It is okay to record non-shared values in pg_database, even though
262                          * we may vacuum shared relations with older cutoffs, because only
263                          * the minimum of the values present in pg_database matters.  We
264                          * can be sure that shared relations have at some time been
265                          * vacuumed with cutoffs no worse than the global minimum; for, if
266                          * there is a backend in some other DB with xmin = OLDXMIN that's
267                          * determining the cutoff with which we vacuum shared relations,
268                          * it is not possible for that database to have a cutoff newer
269                          * than OLDXMIN recorded in pg_database.
270                          */
271                         vacuum_set_xid_limits(vacstmt, false,
272                                                                   &initialOldestXmin, &initialFreezeLimit);
273                 }
274
275                 /* matches the StartTransaction in PostgresMain() */
276                 CommitTransactionCommand();
277         }
278
279         /*
280          * Loop to process each selected relation.
281          */
282         foreach(cur, vrl)
283         {
284                 Oid             relid = (Oid) lfirsti(cur);
285
286                 if (vacstmt->vacuum)
287                         vacuum_rel(relid, vacstmt, RELKIND_RELATION);
288                 if (vacstmt->analyze)
289                 {
290                         MemoryContext old_context = NULL;
291
292                         /*
293                          * If we vacuumed, use new transaction for analyze.  Otherwise,
294                          * we can use the outer transaction, but we still need to call
295                          * analyze_rel in a memory context that will be cleaned up on
296                          * return (else we leak memory while processing multiple tables).
297                          */
298                         if (vacstmt->vacuum)
299                                 StartTransactionCommand();
300                         else
301                                 old_context = MemoryContextSwitchTo(anl_context);
302
303                         analyze_rel(relid, vacstmt);
304
305                         if (vacstmt->vacuum)
306                                 CommitTransactionCommand();
307                         else
308                         {
309                                 MemoryContextSwitchTo(old_context);
310                                 MemoryContextResetAndDeleteChildren(anl_context);
311                         }
312                 }
313         }
314
315         /*
316          * Finish up processing.
317          */
318         if (vacstmt->vacuum)
319         {
320                 /* here, we are not in a transaction */
321
322                 /* matches the CommitTransaction in PostgresMain() */
323                 StartTransactionCommand();
324
325                 /*
326                  * If we did a database-wide VACUUM, update the database's pg_database
327                  * row with info about the transaction IDs used, and try to truncate
328                  * pg_clog.
329                  */
330                 if (vacstmt->relation == NULL)
331                 {
332                         vac_update_dbstats(MyDatabaseId,
333                                                            initialOldestXmin, initialFreezeLimit);
334                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
335                 }
336         }
337
338         /*
339          * Clean up working storage --- note we must do this after
340          * StartTransactionCommand, else we might be trying to delete the
341          * active context!
342          */
343         MemoryContextDelete(vac_context);
344         vac_context = NULL;
345
346         if (anl_context)
347                 MemoryContextDelete(anl_context);
348 }
349
350 /*
351  * Build a list of Oids for each relation to be processed
352  *
353  * The list is built in vac_context so that it will survive across our
354  * per-relation transactions.
355  */
356 static List *
357 getrels(const RangeVar *vacrel, const char *stmttype)
358 {
359         List       *vrl = NIL;
360         MemoryContext oldcontext;
361
362         if (vacrel)
363         {
364                 /* Process specific relation */
365                 Oid             relid;
366
367                 relid = RangeVarGetRelid(vacrel, false);
368
369                 /* Make a relation list entry for this guy */
370                 oldcontext = MemoryContextSwitchTo(vac_context);
371                 vrl = lappendi(vrl, relid);
372                 MemoryContextSwitchTo(oldcontext);
373         }
374         else
375         {
376                 /* Process all plain relations listed in pg_class */
377                 Relation        pgclass;
378                 HeapScanDesc scan;
379                 HeapTuple       tuple;
380                 ScanKeyData key;
381
382                 ScanKeyEntryInitialize(&key, 0x0,
383                                                            Anum_pg_class_relkind,
384                                                            F_CHAREQ,
385                                                            CharGetDatum(RELKIND_RELATION));
386
387                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
388
389                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
390
391                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
392                 {
393                         /* Make a relation list entry for this guy */
394                         oldcontext = MemoryContextSwitchTo(vac_context);
395                         AssertTupleDescHasOid(pgclass->rd_att);
396                         vrl = lappendi(vrl, HeapTupleGetOid(tuple));
397                         MemoryContextSwitchTo(oldcontext);
398                 }
399
400                 heap_endscan(scan);
401                 heap_close(pgclass, AccessShareLock);
402         }
403
404         return vrl;
405 }
406
407 /*
408  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
409  */
410 void
411 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
412                                           TransactionId *oldestXmin,
413                                           TransactionId *freezeLimit)
414 {
415         TransactionId limit;
416
417         *oldestXmin = GetOldestXmin(sharedRel);
418
419         Assert(TransactionIdIsNormal(*oldestXmin));
420
421         if (vacstmt->freeze)
422         {
423                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
424                 limit = *oldestXmin;
425         }
426         else
427         {
428                 /*
429                  * Normal case: freeze cutoff is well in the past, to wit, about
430                  * halfway to the wrap horizon
431                  */
432                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
433         }
434
435         /*
436          * Be careful not to generate a "permanent" XID
437          */
438         if (!TransactionIdIsNormal(limit))
439                 limit = FirstNormalTransactionId;
440
441         /*
442          * Ensure sane relationship of limits
443          */
444         if (TransactionIdFollows(limit, *oldestXmin))
445         {
446                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
447                 limit = *oldestXmin;
448         }
449
450         *freezeLimit = limit;
451 }
452
453
454 /*
455  *      vac_update_relstats() -- update statistics for one relation
456  *
457  *              Update the whole-relation statistics that are kept in its pg_class
458  *              row.  There are additional stats that will be updated if we are
459  *              doing ANALYZE, but we always update these stats.  This routine works
460  *              for both index and heap relation entries in pg_class.
461  *
462  *              We violate no-overwrite semantics here by storing new values for the
463  *              statistics columns directly into the pg_class tuple that's already on
464  *              the page.  The reason for this is that if we updated these tuples in
465  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
466  *              by the time we got done with a vacuum cycle, most of the tuples in
467  *              pg_class would've been obsoleted.  Of course, this only works for
468  *              fixed-size never-null columns, but these are.
469  *
470  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
471  *              ANALYZE.
472  */
473 void
474 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
475                                         bool hasindex)
476 {
477         Relation        rd;
478         HeapTupleData rtup;
479         HeapTuple       ctup;
480         Form_pg_class pgcform;
481         Buffer          buffer;
482
483         /*
484          * update number of tuples and number of pages in pg_class
485          */
486         rd = heap_openr(RelationRelationName, RowExclusiveLock);
487
488         ctup = SearchSysCache(RELOID,
489                                                   ObjectIdGetDatum(relid),
490                                                   0, 0, 0);
491         if (!HeapTupleIsValid(ctup))
492                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
493                          relid);
494
495         /* get the buffer cache tuple */
496         rtup.t_self = ctup->t_self;
497         ReleaseSysCache(ctup);
498         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
499                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
500                          relid);
501
502         /* overwrite the existing statistics in the tuple */
503         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
504         pgcform->relpages = (int32) num_pages;
505         pgcform->reltuples = num_tuples;
506         pgcform->relhasindex = hasindex;
507
508         /*
509          * If we have discovered that there are no indexes, then there's no
510          * primary key either.  This could be done more thoroughly...
511          */
512         if (!hasindex)
513                 pgcform->relhaspkey = false;
514
515         /*
516          * Invalidate the tuple in the catcaches; this also arranges to flush
517          * the relation's relcache entry.  (If we fail to commit for some reason,
518          * no flush will occur, but no great harm is done since there are no
519          * noncritical state updates here.)
520          */
521         CacheInvalidateHeapTuple(rd, &rtup);
522
523         /* Write the buffer */
524         WriteBuffer(buffer);
525
526         heap_close(rd, RowExclusiveLock);
527 }
528
529
530 /*
531  *      vac_update_dbstats() -- update statistics for one database
532  *
533  *              Update the whole-database statistics that are kept in its pg_database
534  *              row.
535  *
536  *              We violate no-overwrite semantics here by storing new values for the
537  *              statistics columns directly into the tuple that's already on the page.
538  *              As with vac_update_relstats, this avoids leaving dead tuples behind
539  *              after a VACUUM; which is good since GetRawDatabaseInfo
540  *              can get confused by finding dead tuples in pg_database.
541  *
542  *              This routine is shared by full and lazy VACUUM.  Note that it is only
543  *              applied after a database-wide VACUUM operation.
544  */
545 static void
546 vac_update_dbstats(Oid dbid,
547                                    TransactionId vacuumXID,
548                                    TransactionId frozenXID)
549 {
550         Relation        relation;
551         ScanKeyData entry[1];
552         HeapScanDesc scan;
553         HeapTuple       tuple;
554         Form_pg_database dbform;
555
556         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
557
558         /* Must use a heap scan, since there's no syscache for pg_database */
559         ScanKeyEntryInitialize(&entry[0], 0x0,
560                                                    ObjectIdAttributeNumber, F_OIDEQ,
561                                                    ObjectIdGetDatum(dbid));
562
563         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
564
565         tuple = heap_getnext(scan, ForwardScanDirection);
566
567         if (!HeapTupleIsValid(tuple))
568                 elog(ERROR, "database %u does not exist", dbid);
569
570         dbform = (Form_pg_database) GETSTRUCT(tuple);
571
572         /* overwrite the existing statistics in the tuple */
573         dbform->datvacuumxid = vacuumXID;
574         dbform->datfrozenxid = frozenXID;
575
576         /* invalidate the tuple in the cache and write the buffer */
577         CacheInvalidateHeapTuple(relation, tuple);
578         WriteNoReleaseBuffer(scan->rs_cbuf);
579
580         heap_endscan(scan);
581
582         heap_close(relation, RowExclusiveLock);
583 }
584
585
586 /*
587  *      vac_truncate_clog() -- attempt to truncate the commit log
588  *
589  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
590  *              and use it to truncate the transaction commit log (pg_clog).
591  *              Also generate a warning if the system-wide oldest datfrozenxid
592  *              seems to be in danger of wrapping around.
593  *
594  *              The passed XIDs are simply the ones I just wrote into my pg_database
595  *              entry.  They're used to initialize the "min" calculations.
596  *
597  *              This routine is shared by full and lazy VACUUM.  Note that it is only
598  *              applied after a database-wide VACUUM operation.
599  */
600 static void
601 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
602 {
603         TransactionId myXID;
604         Relation        relation;
605         HeapScanDesc scan;
606         HeapTuple       tuple;
607         int32           age;
608         bool            vacuumAlreadyWrapped = false;
609         bool            frozenAlreadyWrapped = false;
610
611         myXID = GetCurrentTransactionId();
612
613         relation = heap_openr(DatabaseRelationName, AccessShareLock);
614
615         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
616
617         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
618         {
619                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
620
621                 /* Ignore non-connectable databases (eg, template0) */
622                 /* It's assumed that these have been frozen correctly */
623                 if (!dbform->datallowconn)
624                         continue;
625
626                 if (TransactionIdIsNormal(dbform->datvacuumxid))
627                 {
628                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
629                                 vacuumAlreadyWrapped = true;
630                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
631                                 vacuumXID = dbform->datvacuumxid;
632                 }
633                 if (TransactionIdIsNormal(dbform->datfrozenxid))
634                 {
635                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
636                                 frozenAlreadyWrapped = true;
637                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
638                                 frozenXID = dbform->datfrozenxid;
639                 }
640         }
641
642         heap_endscan(scan);
643
644         heap_close(relation, AccessShareLock);
645
646         /*
647          * Do not truncate CLOG if we seem to have suffered wraparound already;
648          * the computed minimum XID might be bogus.
649          */
650         if (vacuumAlreadyWrapped)
651         {
652                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
653                          "\n\tYou may have already suffered transaction-wraparound data loss.");
654                 return;
655         }
656
657         /* Truncate CLOG to the oldest vacuumxid */
658         TruncateCLOG(vacuumXID);
659
660         /* Give warning about impending wraparound problems */
661         if (frozenAlreadyWrapped)
662         {
663                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
664                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
665         }
666         else
667         {
668                 age = (int32) (myXID - frozenXID);
669                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
670                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
671                                  "\n\tBetter vacuum them within %d transactions,"
672                                  "\n\tor you may have a wraparound failure.",
673                                  age, (int32) (MaxTransactionId >> 1) - age);
674         }
675 }
676
677
678 /****************************************************************************
679  *                                                                                                                                                      *
680  *                      Code common to both flavors of VACUUM                                                   *
681  *                                                                                                                                                      *
682  ****************************************************************************
683  */
684
685
686 /*
687  *      vacuum_rel() -- vacuum one heap relation
688  *
689  *              Doing one heap at a time incurs extra overhead, since we need to
690  *              check that the heap exists again just before we vacuum it.      The
691  *              reason that we do this is so that vacuuming can be spread across
692  *              many small transactions.  Otherwise, two-phase locking would require
693  *              us to lock the entire database during one pass of the vacuum cleaner.
694  *
695  *              At entry and exit, we are not inside a transaction.
696  */
697 static void
698 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
699 {
700         LOCKMODE        lmode;
701         Relation        onerel;
702         LockRelId       onerelid;
703         Oid                     toast_relid;
704
705         /* Begin a transaction for vacuuming this relation */
706         StartTransactionCommand();
707
708         /*
709          * Check for user-requested abort.      Note we want this to be inside a
710          * transaction, so xact.c doesn't issue useless WARNING.
711          */
712         CHECK_FOR_INTERRUPTS();
713
714         /*
715          * Race condition -- if the pg_class tuple has gone away since the
716          * last time we saw it, we don't need to vacuum it.
717          */
718         if (!SearchSysCacheExists(RELOID,
719                                                           ObjectIdGetDatum(relid),
720                                                           0, 0, 0))
721         {
722                 CommitTransactionCommand();
723                 return;
724         }
725
726         /*
727          * Determine the type of lock we want --- hard exclusive lock for a
728          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
729          * vacuum.      Either way, we can be sure that no other backend is
730          * vacuuming the same table.
731          */
732         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
733
734         /*
735          * Open the class, get an appropriate lock on it, and check
736          * permissions.
737          *
738          * We allow the user to vacuum a table if he is superuser, the table
739          * owner, or the database owner (but in the latter case, only if it's
740          * not a shared relation).      pg_class_ownercheck includes the superuser case.
741          *
742          * Note we choose to treat permissions failure as a WARNING and keep
743          * trying to vacuum the rest of the DB --- is this appropriate?
744          */
745         onerel = relation_open(relid, lmode);
746
747         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
748                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
749         {
750                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
751                          RelationGetRelationName(onerel));
752                 relation_close(onerel, lmode);
753                 CommitTransactionCommand();
754                 return;
755         }
756
757         /*
758          * Check that it's a plain table; we used to do this in getrels() but
759          * seems safer to check after we've locked the relation.
760          */
761         if (onerel->rd_rel->relkind != expected_relkind)
762         {
763                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
764                          RelationGetRelationName(onerel));
765                 relation_close(onerel, lmode);
766                 CommitTransactionCommand();
767                 return;
768         }
769
770         /*
771          * Get a session-level lock too. This will protect our access to the
772          * relation across multiple transactions, so that we can vacuum the
773          * relation's TOAST table (if any) secure in the knowledge that no one
774          * is deleting the parent relation.
775          *
776          * NOTE: this cannot block, even if someone else is waiting for access,
777          * because the lock manager knows that both lock requests are from the
778          * same process.
779          */
780         onerelid = onerel->rd_lockInfo.lockRelId;
781         LockRelationForSession(&onerelid, lmode);
782
783         /*
784          * Remember the relation's TOAST relation for later
785          */
786         toast_relid = onerel->rd_rel->reltoastrelid;
787
788         /*
789          * Do the actual work --- either FULL or "lazy" vacuum
790          */
791         if (vacstmt->full)
792                 full_vacuum_rel(onerel, vacstmt);
793         else
794                 lazy_vacuum_rel(onerel, vacstmt);
795
796         /* all done with this class, but hold lock until commit */
797         relation_close(onerel, NoLock);
798
799         /*
800          * Complete the transaction and free all temporary memory used.
801          */
802         CommitTransactionCommand();
803
804         /*
805          * If the relation has a secondary toast rel, vacuum that too while we
806          * still hold the session lock on the master table.  Note however that
807          * "analyze" will not get done on the toast table.      This is good,
808          * because the toaster always uses hardcoded index access and
809          * statistics are totally unimportant for toast relations.
810          */
811         if (toast_relid != InvalidOid)
812                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
813
814         /*
815          * Now release the session-level lock on the master table.
816          */
817         UnlockRelationForSession(&onerelid, lmode);
818 }
819
820
821 /****************************************************************************
822  *                                                                                                                                                      *
823  *                      Code for VACUUM FULL (only)                                                                             *
824  *                                                                                                                                                      *
825  ****************************************************************************
826  */
827
828
829 /*
830  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
831  *
832  *              This routine vacuums a single heap, cleans out its indexes, and
833  *              updates its num_pages and num_tuples statistics.
834  *
835  *              At entry, we have already established a transaction and opened
836  *              and locked the relation.
837  */
838 static void
839 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
840 {
841         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
842                                                                                  * clean indexes */
843         VacPageListData fraged_pages;           /* List of pages with space enough
844                                                                                  * for re-using */
845         Relation   *Irel;
846         int                     nindexes,
847                                 i;
848         VRelStats  *vacrelstats;
849         bool            reindex = false;
850
851         if (IsIgnoringSystemIndexes() &&
852                 IsSystemRelation(onerel))
853                 reindex = true;
854
855         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
856                                                   &OldestXmin, &FreezeLimit);
857
858         /*
859          * Set up statistics-gathering machinery.
860          */
861         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
862         vacrelstats->rel_pages = 0;
863         vacrelstats->rel_tuples = 0;
864         vacrelstats->hasindex = false;
865
866         /* scan the heap */
867         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
868         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
869
870         /* Now open all indexes of the relation */
871         vac_open_indexes(onerel, &nindexes, &Irel);
872         if (!Irel)
873                 reindex = false;
874         else if (!RelationGetForm(onerel)->relhasindex)
875                 reindex = true;
876         if (nindexes > 0)
877                 vacrelstats->hasindex = true;
878
879 #ifdef NOT_USED
880
881         /*
882          * reindex in VACUUM is dangerous under WAL. ifdef out until it
883          * becomes safe.
884          */
885         if (reindex)
886         {
887                 vac_close_indexes(nindexes, Irel);
888                 Irel = (Relation *) NULL;
889                 activate_indexes_of_a_table(RelationGetRelid(onerel), false);
890         }
891 #endif   /* NOT_USED */
892
893         /* Clean/scan index relation(s) */
894         if (Irel != (Relation *) NULL)
895         {
896                 if (vacuum_pages.num_pages > 0)
897                 {
898                         for (i = 0; i < nindexes; i++)
899                                 vacuum_index(&vacuum_pages, Irel[i],
900                                                          vacrelstats->rel_tuples, 0);
901                 }
902                 else
903                 {
904                         /* just scan indexes to update statistic */
905                         for (i = 0; i < nindexes; i++)
906                                 scan_index(Irel[i], vacrelstats->rel_tuples);
907                 }
908         }
909
910         if (fraged_pages.num_pages > 0)
911         {
912                 /* Try to shrink heap */
913                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
914                                         nindexes, Irel);
915                 vac_close_indexes(nindexes, Irel);
916         }
917         else
918         {
919                 vac_close_indexes(nindexes, Irel);
920                 if (vacuum_pages.num_pages > 0)
921                 {
922                         /* Clean pages from vacuum_pages list */
923                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
924                 }
925                 else
926                 {
927                         /*
928                          * Flush dirty pages out to disk.  We must do this even if we
929                          * didn't do anything else, because we want to ensure that all
930                          * tuples have correct on-row commit status on disk (see
931                          * bufmgr.c's comments for FlushRelationBuffers()).
932                          */
933                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
934                         if (i < 0)
935                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
936                                          i);
937                 }
938         }
939
940 #ifdef NOT_USED
941         if (reindex)
942                 activate_indexes_of_a_table(RelationGetRelid(onerel), true);
943 #endif   /* NOT_USED */
944
945         /* update shared free space map with final free space info */
946         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
947
948         /* update statistics in pg_class */
949         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
950                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
951 }
952
953
954 /*
955  *      scan_heap() -- scan an open heap relation
956  *
957  *              This routine sets commit status bits, constructs vacuum_pages (list
958  *              of pages we need to compact free space on and/or clean indexes of
959  *              deleted tuples), constructs fraged_pages (list of pages with free
960  *              space that tuples could be moved into), and calculates statistics
961  *              on the number of live tuples in the heap.
962  */
963 static void
964 scan_heap(VRelStats *vacrelstats, Relation onerel,
965                   VacPageList vacuum_pages, VacPageList fraged_pages)
966 {
967         BlockNumber nblocks,
968                                 blkno;
969         ItemId          itemid;
970         Buffer          buf;
971         HeapTupleData tuple;
972         OffsetNumber offnum,
973                                 maxoff;
974         bool            pgchanged,
975                                 tupgone,
976                                 notup;
977         char       *relname;
978         VacPage         vacpage,
979                                 vacpagecopy;
980         BlockNumber empty_pages,
981                                 new_pages,
982                                 changed_pages,
983                                 empty_end_pages;
984         double          num_tuples,
985                                 tups_vacuumed,
986                                 nkeep,
987                                 nunused;
988         double          free_size,
989                                 usable_free_size;
990         Size            min_tlen = MaxTupleSize;
991         Size            max_tlen = 0;
992         int                     i;
993         bool            do_shrinking = true;
994         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
995         int                     num_vtlinks = 0;
996         int                     free_vtlinks = 100;
997         VacRUsage       ru0;
998
999         vac_init_rusage(&ru0);
1000
1001         relname = RelationGetRelationName(onerel);
1002         elog(elevel, "--Relation %s.%s--",
1003                  get_namespace_name(RelationGetNamespace(onerel)),
1004                  relname);
1005
1006         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
1007         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1008         free_size = 0;
1009
1010         nblocks = RelationGetNumberOfBlocks(onerel);
1011
1012         /*
1013          * We initially create each VacPage item in a maximal-sized workspace,
1014          * then copy the workspace into a just-large-enough copy.
1015          */
1016         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1017
1018         for (blkno = 0; blkno < nblocks; blkno++)
1019         {
1020                 Page            page,
1021                                         tempPage = NULL;
1022                 bool            do_reap,
1023                                         do_frag;
1024
1025                 CHECK_FOR_INTERRUPTS();
1026
1027                 buf = ReadBuffer(onerel, blkno);
1028                 page = BufferGetPage(buf);
1029
1030                 vacpage->blkno = blkno;
1031                 vacpage->offsets_used = 0;
1032                 vacpage->offsets_free = 0;
1033
1034                 if (PageIsNew(page))
1035                 {
1036                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
1037                                  relname, blkno);
1038                         PageInit(page, BufferGetPageSize(buf), 0);
1039                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1040                         free_size += (vacpage->free - sizeof(ItemIdData));
1041                         new_pages++;
1042                         empty_end_pages++;
1043                         vacpagecopy = copy_vac_page(vacpage);
1044                         vpage_insert(vacuum_pages, vacpagecopy);
1045                         vpage_insert(fraged_pages, vacpagecopy);
1046                         WriteBuffer(buf);
1047                         continue;
1048                 }
1049
1050                 if (PageIsEmpty(page))
1051                 {
1052                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1053                         free_size += (vacpage->free - sizeof(ItemIdData));
1054                         empty_pages++;
1055                         empty_end_pages++;
1056                         vacpagecopy = copy_vac_page(vacpage);
1057                         vpage_insert(vacuum_pages, vacpagecopy);
1058                         vpage_insert(fraged_pages, vacpagecopy);
1059                         ReleaseBuffer(buf);
1060                         continue;
1061                 }
1062
1063                 pgchanged = false;
1064                 notup = true;
1065                 maxoff = PageGetMaxOffsetNumber(page);
1066                 for (offnum = FirstOffsetNumber;
1067                          offnum <= maxoff;
1068                          offnum = OffsetNumberNext(offnum))
1069                 {
1070                         uint16          sv_infomask;
1071
1072                         itemid = PageGetItemId(page, offnum);
1073
1074                         /*
1075                          * Collect un-used items too - it's possible to have indexes
1076                          * pointing here after crash.
1077                          */
1078                         if (!ItemIdIsUsed(itemid))
1079                         {
1080                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1081                                 nunused += 1;
1082                                 continue;
1083                         }
1084
1085                         tuple.t_datamcxt = NULL;
1086                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1087                         tuple.t_len = ItemIdGetLength(itemid);
1088                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1089
1090                         tupgone = false;
1091                         sv_infomask = tuple.t_data->t_infomask;
1092
1093                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1094                         {
1095                                 case HEAPTUPLE_DEAD:
1096                                         tupgone = true;         /* we can delete the tuple */
1097                                         break;
1098                                 case HEAPTUPLE_LIVE:
1099
1100                                         /*
1101                                          * Tuple is good.  Consider whether to replace its
1102                                          * xmin value with FrozenTransactionId.
1103                                          */
1104                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1105                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1106                                                                                           FreezeLimit))
1107                                         {
1108                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1109                                                 /* infomask should be okay already */
1110                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1111                                                 pgchanged = true;
1112                                         }
1113                                         break;
1114                                 case HEAPTUPLE_RECENTLY_DEAD:
1115
1116                                         /*
1117                                          * If tuple is recently deleted then we must not
1118                                          * remove it from relation.
1119                                          */
1120                                         nkeep += 1;
1121
1122                                         /*
1123                                          * If we do shrinking and this tuple is updated one
1124                                          * then remember it to construct updated tuple
1125                                          * dependencies.
1126                                          */
1127                                         if (do_shrinking &&
1128                                                 !(ItemPointerEquals(&(tuple.t_self),
1129                                                                                         &(tuple.t_data->t_ctid))))
1130                                         {
1131                                                 if (free_vtlinks == 0)
1132                                                 {
1133                                                         free_vtlinks = 1000;
1134                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1135                                                                                    (free_vtlinks + num_vtlinks) *
1136                                                                                                  sizeof(VTupleLinkData));
1137                                                 }
1138                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1139                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1140                                                 free_vtlinks--;
1141                                                 num_vtlinks++;
1142                                         }
1143                                         break;
1144                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1145
1146                                         /*
1147                                          * This should not happen, since we hold exclusive
1148                                          * lock on the relation; shouldn't we raise an error?
1149                                          */
1150                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1151                                                  relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data));
1152                                         do_shrinking = false;
1153                                         break;
1154                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1155
1156                                         /*
1157                                          * This should not happen, since we hold exclusive
1158                                          * lock on the relation; shouldn't we raise an error?
1159                                          */
1160                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1161                                                  relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data));
1162                                         do_shrinking = false;
1163                                         break;
1164                                 default:
1165                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1166                                         break;
1167                         }
1168
1169                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1170                         if (sv_infomask != tuple.t_data->t_infomask)
1171                                 pgchanged = true;
1172
1173                         /*
1174                          * Other checks...
1175                          */
1176                         if (onerel->rd_rel->relhasoids &&
1177                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1178                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1179                                          relname, blkno, offnum, (int) tupgone);
1180
1181                         if (tupgone)
1182                         {
1183                                 ItemId          lpp;
1184
1185                                 /*
1186                                  * Here we are building a temporary copy of the page with
1187                                  * dead tuples removed.  Below we will apply
1188                                  * PageRepairFragmentation to the copy, so that we can
1189                                  * determine how much space will be available after
1190                                  * removal of dead tuples.      But note we are NOT changing
1191                                  * the real page yet...
1192                                  */
1193                                 if (tempPage == (Page) NULL)
1194                                 {
1195                                         Size            pageSize;
1196
1197                                         pageSize = PageGetPageSize(page);
1198                                         tempPage = (Page) palloc(pageSize);
1199                                         memcpy(tempPage, page, pageSize);
1200                                 }
1201
1202                                 /* mark it unused on the temp page */
1203                                 lpp = PageGetItemId(tempPage, offnum);
1204                                 lpp->lp_flags &= ~LP_USED;
1205
1206                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1207                                 tups_vacuumed += 1;
1208                         }
1209                         else
1210                         {
1211                                 num_tuples += 1;
1212                                 notup = false;
1213                                 if (tuple.t_len < min_tlen)
1214                                         min_tlen = tuple.t_len;
1215                                 if (tuple.t_len > max_tlen)
1216                                         max_tlen = tuple.t_len;
1217                         }
1218                 }                                               /* scan along page */
1219
1220                 if (tempPage != (Page) NULL)
1221                 {
1222                         /* Some tuples are removable; figure free space after removal */
1223                         PageRepairFragmentation(tempPage, NULL);
1224                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1225                         pfree(tempPage);
1226                         do_reap = true;
1227                 }
1228                 else
1229                 {
1230                         /* Just use current available space */
1231                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1232                         /* Need to reap the page if it has ~LP_USED line pointers */
1233                         do_reap = (vacpage->offsets_free > 0);
1234                 }
1235
1236                 free_size += vacpage->free;
1237
1238                 /*
1239                  * Add the page to fraged_pages if it has a useful amount of free
1240                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1241                  * don't know that accurately near the start of the relation, so
1242                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1243                  */
1244                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1245
1246                 if (do_reap || do_frag)
1247                 {
1248                         vacpagecopy = copy_vac_page(vacpage);
1249                         if (do_reap)
1250                                 vpage_insert(vacuum_pages, vacpagecopy);
1251                         if (do_frag)
1252                                 vpage_insert(fraged_pages, vacpagecopy);
1253                 }
1254
1255                 if (notup)
1256                         empty_end_pages++;
1257                 else
1258                         empty_end_pages = 0;
1259
1260                 if (pgchanged)
1261                 {
1262                         WriteBuffer(buf);
1263                         changed_pages++;
1264                 }
1265                 else
1266                         ReleaseBuffer(buf);
1267         }
1268
1269         pfree(vacpage);
1270
1271         /* save stats in the rel list for use later */
1272         vacrelstats->rel_tuples = num_tuples;
1273         vacrelstats->rel_pages = nblocks;
1274         if (num_tuples == 0)
1275                 min_tlen = max_tlen = 0;
1276         vacrelstats->min_tlen = min_tlen;
1277         vacrelstats->max_tlen = max_tlen;
1278
1279         vacuum_pages->empty_end_pages = empty_end_pages;
1280         fraged_pages->empty_end_pages = empty_end_pages;
1281
1282         /*
1283          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1284          * remove any "empty" end-pages from the list, and compute usable free
1285          * space = free space in remaining pages.
1286          */
1287         if (do_shrinking)
1288         {
1289                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1290                 fraged_pages->num_pages -= empty_end_pages;
1291                 usable_free_size = 0;
1292                 for (i = 0; i < fraged_pages->num_pages; i++)
1293                         usable_free_size += fraged_pages->pagedesc[i]->free;
1294         }
1295         else
1296         {
1297                 fraged_pages->num_pages = 0;
1298                 usable_free_size = 0;
1299         }
1300
1301         if (usable_free_size > 0 && num_vtlinks > 0)
1302         {
1303                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1304                           vac_cmp_vtlinks);
1305                 vacrelstats->vtlinks = vtlinks;
1306                 vacrelstats->num_vtlinks = num_vtlinks;
1307         }
1308         else
1309         {
1310                 vacrelstats->vtlinks = NULL;
1311                 vacrelstats->num_vtlinks = 0;
1312                 pfree(vtlinks);
1313         }
1314
1315         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
1316 Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
1317 Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
1318                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1319                  new_pages, num_tuples, tups_vacuumed,
1320                  nkeep, vacrelstats->num_vtlinks,
1321                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1322                  free_size, usable_free_size,
1323                  empty_end_pages, fraged_pages->num_pages,
1324                  vac_show_rusage(&ru0));
1325
1326 }
1327
1328
1329 /*
1330  *      repair_frag() -- try to repair relation's fragmentation
1331  *
1332  *              This routine marks dead tuples as unused and tries re-use dead space
1333  *              by moving tuples (and inserting indexes if needed). It constructs
1334  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1335  *              for them after committing (in hack-manner - without losing locks
1336  *              and freeing memory!) current transaction. It truncates relation
1337  *              if some end-blocks are gone away.
1338  */
1339 static void
1340 repair_frag(VRelStats *vacrelstats, Relation onerel,
1341                         VacPageList vacuum_pages, VacPageList fraged_pages,
1342                         int nindexes, Relation *Irel)
1343 {
1344         TransactionId myXID;
1345         CommandId       myCID;
1346         Buffer          buf,
1347                                 cur_buffer;
1348         BlockNumber nblocks,
1349                                 blkno;
1350         BlockNumber last_move_dest_block = 0,
1351                                 last_vacuum_block;
1352         Page            page,
1353                                 ToPage = NULL;
1354         OffsetNumber offnum,
1355                                 maxoff,
1356                                 newoff,
1357                                 max_offset;
1358         ItemId          itemid,
1359                                 newitemid;
1360         HeapTupleData tuple,
1361                                 newtup;
1362         TupleDesc       tupdesc;
1363         ResultRelInfo *resultRelInfo;
1364         EState     *estate;
1365         TupleTable      tupleTable;
1366         TupleTableSlot *slot;
1367         VacPageListData Nvacpagelist;
1368         VacPage         cur_page = NULL,
1369                                 last_vacuum_page,
1370                                 vacpage,
1371                            *curpage;
1372         int                     cur_item = 0;
1373         int                     i;
1374         Size            tuple_len;
1375         int                     num_moved,
1376                                 num_fraged_pages,
1377                                 vacuumed_pages;
1378         int                     checked_moved,
1379                                 num_tuples,
1380                                 keep_tuples = 0;
1381         bool            isempty,
1382                                 dowrite,
1383                                 chain_tuple_moved;
1384         VacRUsage       ru0;
1385
1386         vac_init_rusage(&ru0);
1387
1388         myXID = GetCurrentTransactionId();
1389         myCID = GetCurrentCommandId();
1390
1391         tupdesc = RelationGetDescr(onerel);
1392
1393         /*
1394          * We need a ResultRelInfo and an EState so we can use the regular
1395          * executor's index-entry-making machinery.
1396          */
1397         resultRelInfo = makeNode(ResultRelInfo);
1398         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1399         resultRelInfo->ri_RelationDesc = onerel;
1400         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1401
1402         ExecOpenIndices(resultRelInfo);
1403
1404         estate = CreateExecutorState();
1405         estate->es_result_relations = resultRelInfo;
1406         estate->es_num_result_relations = 1;
1407         estate->es_result_relation_info = resultRelInfo;
1408
1409         /* Set up a dummy tuple table too */
1410         tupleTable = ExecCreateTupleTable(1);
1411         slot = ExecAllocTableSlot(tupleTable);
1412         ExecSetSlotDescriptor(slot, tupdesc, false);
1413
1414         Nvacpagelist.num_pages = 0;
1415         num_fraged_pages = fraged_pages->num_pages;
1416         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1417         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1418         if (vacuumed_pages > 0)
1419         {
1420                 /* get last reaped page from vacuum_pages */
1421                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1422                 last_vacuum_block = last_vacuum_page->blkno;
1423         }
1424         else
1425         {
1426                 last_vacuum_page = NULL;
1427                 last_vacuum_block = InvalidBlockNumber;
1428         }
1429         cur_buffer = InvalidBuffer;
1430         num_moved = 0;
1431
1432         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1433         vacpage->offsets_used = vacpage->offsets_free = 0;
1434
1435         /*
1436          * Scan pages backwards from the last nonempty page, trying to move
1437          * tuples down to lower pages.  Quit when we reach a page that we have
1438          * moved any tuples onto, or the first page if we haven't moved
1439          * anything, or when we find a page we cannot completely empty (this
1440          * last condition is handled by "break" statements within the loop).
1441          *
1442          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1443          * in order by blkno.
1444          */
1445         nblocks = vacrelstats->rel_pages;
1446         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1447                  blkno > last_move_dest_block;
1448                  blkno--)
1449         {
1450                 CHECK_FOR_INTERRUPTS();
1451
1452                 /*
1453                  * Forget fraged_pages pages at or after this one; they're no
1454                  * longer useful as move targets, since we only want to move down.
1455                  * Note that since we stop the outer loop at last_move_dest_block,
1456                  * pages removed here cannot have had anything moved onto them
1457                  * already.
1458                  *
1459                  * Also note that we don't change the stored fraged_pages list, only
1460                  * our local variable num_fraged_pages; so the forgotten pages are
1461                  * still available to be loaded into the free space map later.
1462                  */
1463                 while (num_fraged_pages > 0 &&
1464                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1465                 {
1466                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1467                         --num_fraged_pages;
1468                 }
1469
1470                 /*
1471                  * Process this page of relation.
1472                  */
1473                 buf = ReadBuffer(onerel, blkno);
1474                 page = BufferGetPage(buf);
1475
1476                 vacpage->offsets_free = 0;
1477
1478                 isempty = PageIsEmpty(page);
1479
1480                 dowrite = false;
1481
1482                 /* Is the page in the vacuum_pages list? */
1483                 if (blkno == last_vacuum_block)
1484                 {
1485                         if (last_vacuum_page->offsets_free > 0)
1486                         {
1487                                 /* there are dead tuples on this page - clean them */
1488                                 Assert(!isempty);
1489                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1490                                 vacuum_page(onerel, buf, last_vacuum_page);
1491                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1492                                 dowrite = true;
1493                         }
1494                         else
1495                                 Assert(isempty);
1496                         --vacuumed_pages;
1497                         if (vacuumed_pages > 0)
1498                         {
1499                                 /* get prev reaped page from vacuum_pages */
1500                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1501                                 last_vacuum_block = last_vacuum_page->blkno;
1502                         }
1503                         else
1504                         {
1505                                 last_vacuum_page = NULL;
1506                                 last_vacuum_block = InvalidBlockNumber;
1507                         }
1508                         if (isempty)
1509                         {
1510                                 ReleaseBuffer(buf);
1511                                 continue;
1512                         }
1513                 }
1514                 else
1515                         Assert(!isempty);
1516
1517                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1518                                                                                  * off this page, yet */
1519                 vacpage->blkno = blkno;
1520                 maxoff = PageGetMaxOffsetNumber(page);
1521                 for (offnum = FirstOffsetNumber;
1522                          offnum <= maxoff;
1523                          offnum = OffsetNumberNext(offnum))
1524                 {
1525                         itemid = PageGetItemId(page, offnum);
1526
1527                         if (!ItemIdIsUsed(itemid))
1528                                 continue;
1529
1530                         tuple.t_datamcxt = NULL;
1531                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1532                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1533                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1534
1535                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1536                         {
1537                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1538                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1539
1540                                 /*
1541                                  * If this (chain) tuple is moved by me already then I
1542                                  * have to check is it in vacpage or not - i.e. is it
1543                                  * moved while cleaning this page or some previous one.
1544                                  */
1545                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1546                                 {
1547                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1548                                                 elog(ERROR, "Invalid XVAC in tuple header");
1549                                         if (keep_tuples == 0)
1550                                                 continue;
1551                                         if (chain_tuple_moved)          /* some chains was moved
1552                                                                                                  * while */
1553                                         {                       /* cleaning this page */
1554                                                 Assert(vacpage->offsets_free > 0);
1555                                                 for (i = 0; i < vacpage->offsets_free; i++)
1556                                                 {
1557                                                         if (vacpage->offsets[i] == offnum)
1558                                                                 break;
1559                                                 }
1560                                                 if (i >= vacpage->offsets_free) /* not found */
1561                                                 {
1562                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1563                                                         keep_tuples--;
1564                                                 }
1565                                         }
1566                                         else
1567                                         {
1568                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1569                                                 keep_tuples--;
1570                                         }
1571                                         continue;
1572                                 }
1573                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1574                         }
1575
1576                         /*
1577                          * If this tuple is in the chain of tuples created in updates
1578                          * by "recent" transactions then we have to move all chain of
1579                          * tuples to another places.
1580                          */
1581                         if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
1582                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1583                                                 OldestXmin)) ||
1584                                 (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1585                                  !(ItemPointerEquals(&(tuple.t_self),
1586                                                                          &(tuple.t_data->t_ctid)))))
1587                         {
1588                                 Buffer          Cbuf = buf;
1589                                 Page            Cpage;
1590                                 ItemId          Citemid;
1591                                 ItemPointerData Ctid;
1592                                 HeapTupleData tp = tuple;
1593                                 Size            tlen = tuple_len;
1594                                 VTupleMove      vtmove = (VTupleMove)
1595                                 palloc(100 * sizeof(VTupleMoveData));
1596                                 int                     num_vtmove = 0;
1597                                 int                     free_vtmove = 100;
1598                                 VacPage         to_vacpage = NULL;
1599                                 int                     to_item = 0;
1600                                 bool            freeCbuf = false;
1601                                 int                     ti;
1602
1603                                 if (vacrelstats->vtlinks == NULL)
1604                                         elog(ERROR, "No one parent tuple was found");
1605                                 if (cur_buffer != InvalidBuffer)
1606                                 {
1607                                         WriteBuffer(cur_buffer);
1608                                         cur_buffer = InvalidBuffer;
1609                                 }
1610
1611                                 /*
1612                                  * If this tuple is in the begin/middle of the chain then
1613                                  * we have to move to the end of chain.
1614                                  */
1615                                 while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
1616                                            !(ItemPointerEquals(&(tp.t_self),
1617                                                                                    &(tp.t_data->t_ctid))))
1618                                 {
1619                                         Ctid = tp.t_data->t_ctid;
1620                                         if (freeCbuf)
1621                                                 ReleaseBuffer(Cbuf);
1622                                         freeCbuf = true;
1623                                         Cbuf = ReadBuffer(onerel,
1624                                                                           ItemPointerGetBlockNumber(&Ctid));
1625                                         Cpage = BufferGetPage(Cbuf);
1626                                         Citemid = PageGetItemId(Cpage,
1627                                                                           ItemPointerGetOffsetNumber(&Ctid));
1628                                         if (!ItemIdIsUsed(Citemid))
1629                                         {
1630                                                 /*
1631                                                  * This means that in the middle of chain there
1632                                                  * was tuple updated by older (than OldestXmin)
1633                                                  * xaction and this tuple is already deleted by
1634                                                  * me. Actually, upper part of chain should be
1635                                                  * removed and seems that this should be handled
1636                                                  * in scan_heap(), but it's not implemented at the
1637                                                  * moment and so we just stop shrinking here.
1638                                                  */
1639                                                 ReleaseBuffer(Cbuf);
1640                                                 pfree(vtmove);
1641                                                 vtmove = NULL;
1642                                                 elog(WARNING, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1643                                                 break;
1644                                         }
1645                                         tp.t_datamcxt = NULL;
1646                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1647                                         tp.t_self = Ctid;
1648                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1649                                 }
1650                                 if (vtmove == NULL)
1651                                         break;
1652                                 /* first, can chain be moved ? */
1653                                 for (;;)
1654                                 {
1655                                         if (to_vacpage == NULL ||
1656                                                 !enough_space(to_vacpage, tlen))
1657                                         {
1658                                                 for (i = 0; i < num_fraged_pages; i++)
1659                                                 {
1660                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1661                                                                 break;
1662                                                 }
1663
1664                                                 if (i == num_fraged_pages)
1665                                                 {
1666                                                         /* can't move item anywhere */
1667                                                         for (i = 0; i < num_vtmove; i++)
1668                                                         {
1669                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1670                                                                 (vtmove[i].vacpage->offsets_used)--;
1671                                                         }
1672                                                         num_vtmove = 0;
1673                                                         break;
1674                                                 }
1675                                                 to_item = i;
1676                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1677                                         }
1678                                         to_vacpage->free -= MAXALIGN(tlen);
1679                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1680                                                 to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
1681                                         (to_vacpage->offsets_used)++;
1682                                         if (free_vtmove == 0)
1683                                         {
1684                                                 free_vtmove = 1000;
1685                                                 vtmove = (VTupleMove) repalloc(vtmove,
1686                                                                                          (free_vtmove + num_vtmove) *
1687                                                                                                  sizeof(VTupleMoveData));
1688                                         }
1689                                         vtmove[num_vtmove].tid = tp.t_self;
1690                                         vtmove[num_vtmove].vacpage = to_vacpage;
1691                                         if (to_vacpage->offsets_used == 1)
1692                                                 vtmove[num_vtmove].cleanVpd = true;
1693                                         else
1694                                                 vtmove[num_vtmove].cleanVpd = false;
1695                                         free_vtmove--;
1696                                         num_vtmove++;
1697
1698                                         /* All done ? */
1699                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1700                                             TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1701                                                                   OldestXmin))
1702                                                 break;
1703
1704                                         /* Well, try to find tuple with old row version */
1705                                         for (;;)
1706                                         {
1707                                                 Buffer          Pbuf;
1708                                                 Page            Ppage;
1709                                                 ItemId          Pitemid;
1710                                                 HeapTupleData Ptp;
1711                                                 VTupleLinkData vtld,
1712                                                                    *vtlp;
1713
1714                                                 vtld.new_tid = tp.t_self;
1715                                                 vtlp = (VTupleLink)
1716                                                         vac_bsearch((void *) &vtld,
1717                                                                                 (void *) (vacrelstats->vtlinks),
1718                                                                                 vacrelstats->num_vtlinks,
1719                                                                                 sizeof(VTupleLinkData),
1720                                                                                 vac_cmp_vtlinks);
1721                                                 if (vtlp == NULL)
1722                                                         elog(ERROR, "Parent tuple was not found");
1723                                                 tp.t_self = vtlp->this_tid;
1724                                                 Pbuf = ReadBuffer(onerel,
1725                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1726                                                 Ppage = BufferGetPage(Pbuf);
1727                                                 Pitemid = PageGetItemId(Ppage,
1728                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1729                                                 if (!ItemIdIsUsed(Pitemid))
1730                                                         elog(ERROR, "Parent itemid marked as unused");
1731                                                 Ptp.t_datamcxt = NULL;
1732                                                 Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1733                                                 Assert(ItemPointerEquals(&(vtld.new_tid),
1734                                                                                                  &(Ptp.t_data->t_ctid)));
1735
1736                                                 /*
1737                                                  * Read above about cases when
1738                                                  * !ItemIdIsUsed(Citemid) (child item is
1739                                                  * removed)... Due to the fact that at the moment
1740                                                  * we don't remove unuseful part of update-chain,
1741                                                  * it's possible to get too old parent row here.
1742                                                  * Like as in the case which caused this problem,
1743                                                  * we stop shrinking here. I could try to find
1744                                                  * real parent row but want not to do it because
1745                                                  * of real solution will be implemented anyway,
1746                                                  * latter, and we are too close to 6.5 release. -
1747                                                  * vadim 06/11/99
1748                                                  */
1749                                                 if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1750                                                                                                   HeapTupleHeaderGetXmin(tp.t_data))))
1751                                                 {
1752                                                         if (freeCbuf)
1753                                                                 ReleaseBuffer(Cbuf);
1754                                                         freeCbuf = false;
1755                                                         ReleaseBuffer(Pbuf);
1756                                                         for (i = 0; i < num_vtmove; i++)
1757                                                         {
1758                                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1759                                                                 (vtmove[i].vacpage->offsets_used)--;
1760                                                         }
1761                                                         num_vtmove = 0;
1762                                                         elog(WARNING, "Too old parent tuple found - can't continue repair_frag");
1763                                                         break;
1764                                                 }
1765 #ifdef NOT_USED                                 /* I'm not sure that this will wotk
1766                                                                  * properly... */
1767
1768                                                 /*
1769                                                  * If this tuple is updated version of row and it
1770                                                  * was created by the same transaction then no one
1771                                                  * is interested in this tuple - mark it as
1772                                                  * removed.
1773                                                  */
1774                                                 if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
1775                                                         TransactionIdEquals(HeapTupleHeaderGetXmin(Ptp.t_data),
1776                                                                                                 HeapTupleHeaderGetXmax(Ptp.t_data)))
1777                                                 {
1778                                                         Ptp.t_data->t_infomask &=
1779                                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1780                                                         Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
1781                                                         HeapTupleHeaderSetXvac(Ptp.t_data, myXID);
1782                                                         WriteBuffer(Pbuf);
1783                                                         continue;
1784                                                 }
1785 #endif
1786                                                 tp.t_datamcxt = Ptp.t_datamcxt;
1787                                                 tp.t_data = Ptp.t_data;
1788                                                 tlen = tp.t_len = ItemIdGetLength(Pitemid);
1789                                                 if (freeCbuf)
1790                                                         ReleaseBuffer(Cbuf);
1791                                                 Cbuf = Pbuf;
1792                                                 freeCbuf = true;
1793                                                 break;
1794                                         }
1795                                         if (num_vtmove == 0)
1796                                                 break;
1797                                 }
1798                                 if (freeCbuf)
1799                                         ReleaseBuffer(Cbuf);
1800                                 if (num_vtmove == 0)    /* chain can't be moved */
1801                                 {
1802                                         pfree(vtmove);
1803                                         break;
1804                                 }
1805                                 ItemPointerSetInvalid(&Ctid);
1806                                 for (ti = 0; ti < num_vtmove; ti++)
1807                                 {
1808                                         VacPage         destvacpage = vtmove[ti].vacpage;
1809
1810                                         /* Get page to move from */
1811                                         tuple.t_self = vtmove[ti].tid;
1812                                         Cbuf = ReadBuffer(onerel,
1813                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1814
1815                                         /* Get page to move to */
1816                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1817
1818                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1819                                         if (cur_buffer != Cbuf)
1820                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1821
1822                                         ToPage = BufferGetPage(cur_buffer);
1823                                         Cpage = BufferGetPage(Cbuf);
1824
1825                                         Citemid = PageGetItemId(Cpage,
1826                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1827                                         tuple.t_datamcxt = NULL;
1828                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1829                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1830
1831                                         /*
1832                                          * make a copy of the source tuple, and then mark the
1833                                          * source tuple MOVED_OFF.
1834                                          */
1835                                         heap_copytuple_with_tuple(&tuple, &newtup);
1836
1837                                         /*
1838                                          * register invalidation of source tuple in catcaches.
1839                                          */
1840                                         CacheInvalidateHeapTuple(onerel, &tuple);
1841
1842                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1843                                         START_CRIT_SECTION();
1844
1845                                         tuple.t_data->t_infomask &=
1846                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
1847                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1848                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1849
1850                                         /*
1851                                          * If this page was not used before - clean it.
1852                                          *
1853                                          * NOTE: a nasty bug used to lurk here.  It is possible
1854                                          * for the source and destination pages to be the same
1855                                          * (since this tuple-chain member can be on a page
1856                                          * lower than the one we're currently processing in
1857                                          * the outer loop).  If that's true, then after
1858                                          * vacuum_page() the source tuple will have been
1859                                          * moved, and tuple.t_data will be pointing at
1860                                          * garbage.  Therefore we must do everything that uses
1861                                          * tuple.t_data BEFORE this step!!
1862                                          *
1863                                          * This path is different from the other callers of
1864                                          * vacuum_page, because we have already incremented
1865                                          * the vacpage's offsets_used field to account for the
1866                                          * tuple(s) we expect to move onto the page. Therefore
1867                                          * vacuum_page's check for offsets_used == 0 is wrong.
1868                                          * But since that's a good debugging check for all
1869                                          * other callers, we work around it here rather than
1870                                          * remove it.
1871                                          */
1872                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1873                                         {
1874                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1875
1876                                                 destvacpage->offsets_used = 0;
1877                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1878                                                 destvacpage->offsets_used = sv_offsets_used;
1879                                         }
1880
1881                                         /*
1882                                          * Update the state of the copied tuple, and store it
1883                                          * on the destination page.
1884                                          */
1885                                         newtup.t_data->t_infomask &=
1886                                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
1887                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1888                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1889                                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
1890                                                                                  InvalidOffsetNumber, LP_USED);
1891                                         if (newoff == InvalidOffsetNumber)
1892                                         {
1893                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1894                                                   (unsigned long) tuple_len, destvacpage->blkno);
1895                                         }
1896                                         newitemid = PageGetItemId(ToPage, newoff);
1897                                         pfree(newtup.t_data);
1898                                         newtup.t_datamcxt = NULL;
1899                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1900                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1901
1902                                         /* XLOG stuff */
1903                                         if (!onerel->rd_istemp)
1904                                         {
1905                                                 XLogRecPtr      recptr =
1906                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1907                                                                           cur_buffer, &newtup);
1908
1909                                                 if (Cbuf != cur_buffer)
1910                                                 {
1911                                                         PageSetLSN(Cpage, recptr);
1912                                                         PageSetSUI(Cpage, ThisStartUpID);
1913                                                 }
1914                                                 PageSetLSN(ToPage, recptr);
1915                                                 PageSetSUI(ToPage, ThisStartUpID);
1916                                         }
1917                                         else
1918                                         {
1919                                                 /* No XLOG record, but still need to flag that XID exists on disk */
1920                                                 MyXactMadeTempRelUpdate = true;
1921                                         }
1922
1923                                         END_CRIT_SECTION();
1924
1925                                         if (destvacpage->blkno > last_move_dest_block)
1926                                                 last_move_dest_block = destvacpage->blkno;
1927
1928                                         /*
1929                                          * Set new tuple's t_ctid pointing to itself for last
1930                                          * tuple in chain, and to next tuple in chain
1931                                          * otherwise.
1932                                          */
1933                                         if (!ItemPointerIsValid(&Ctid))
1934                                                 newtup.t_data->t_ctid = newtup.t_self;
1935                                         else
1936                                                 newtup.t_data->t_ctid = Ctid;
1937                                         Ctid = newtup.t_self;
1938
1939                                         num_moved++;
1940
1941                                         /*
1942                                          * Remember that we moved tuple from the current page
1943                                          * (corresponding index tuple will be cleaned).
1944                                          */
1945                                         if (Cbuf == buf)
1946                                                 vacpage->offsets[vacpage->offsets_free++] =
1947                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
1948                                         else
1949                                                 keep_tuples++;
1950
1951                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
1952                                         if (cur_buffer != Cbuf)
1953                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
1954
1955                                         /* Create index entries for the moved tuple */
1956                                         if (resultRelInfo->ri_NumIndices > 0)
1957                                         {
1958                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
1959                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
1960                                                                                           estate, true);
1961                                         }
1962
1963                                         WriteBuffer(cur_buffer);
1964                                         WriteBuffer(Cbuf);
1965                                 }
1966                                 cur_buffer = InvalidBuffer;
1967                                 pfree(vtmove);
1968                                 chain_tuple_moved = true;
1969                                 continue;
1970                         }
1971
1972                         /* try to find new page for this tuple */
1973                         if (cur_buffer == InvalidBuffer ||
1974                                 !enough_space(cur_page, tuple_len))
1975                         {
1976                                 if (cur_buffer != InvalidBuffer)
1977                                 {
1978                                         WriteBuffer(cur_buffer);
1979                                         cur_buffer = InvalidBuffer;
1980                                 }
1981                                 for (i = 0; i < num_fraged_pages; i++)
1982                                 {
1983                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
1984                                                 break;
1985                                 }
1986                                 if (i == num_fraged_pages)
1987                                         break;          /* can't move item anywhere */
1988                                 cur_item = i;
1989                                 cur_page = fraged_pages->pagedesc[cur_item];
1990                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
1991                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1992                                 ToPage = BufferGetPage(cur_buffer);
1993                                 /* if this page was not used before - clean it */
1994                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
1995                                         vacuum_page(onerel, cur_buffer, cur_page);
1996                         }
1997                         else
1998                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1999
2000                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2001
2002                         /* copy tuple */
2003                         heap_copytuple_with_tuple(&tuple, &newtup);
2004
2005                         /*
2006                          * register invalidation of source tuple in catcaches.
2007                          *
2008                          * (Note: we do not need to register the copied tuple,
2009                          * because we are not changing the tuple contents and
2010                          * so there cannot be any need to flush negative
2011                          * catcache entries.)
2012                          */
2013                         CacheInvalidateHeapTuple(onerel, &tuple);
2014
2015                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
2016                         START_CRIT_SECTION();
2017
2018                         /*
2019                          * Mark new tuple as moved_in by vacuum and store vacuum XID
2020                          * in t_cid !!!
2021                          */
2022                         newtup.t_data->t_infomask &=
2023                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
2024                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2025                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2026
2027                         /* add tuple to the page */
2028                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2029                                                                  InvalidOffsetNumber, LP_USED);
2030                         if (newoff == InvalidOffsetNumber)
2031                         {
2032                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2033                                          (unsigned long) tuple_len,
2034                                          cur_page->blkno, (unsigned long) cur_page->free,
2035                                          cur_page->offsets_used, cur_page->offsets_free);
2036                         }
2037                         newitemid = PageGetItemId(ToPage, newoff);
2038                         pfree(newtup.t_data);
2039                         newtup.t_datamcxt = NULL;
2040                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2041                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2042                         newtup.t_self = newtup.t_data->t_ctid;
2043
2044                         /*
2045                          * Mark old tuple as moved_off by vacuum and store vacuum XID
2046                          * in t_cid !!!
2047                          */
2048                         tuple.t_data->t_infomask &=
2049                                 ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
2050                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2051                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2052
2053                         /* XLOG stuff */
2054                         if (!onerel->rd_istemp)
2055                         {
2056                                 XLogRecPtr      recptr =
2057                                 log_heap_move(onerel, buf, tuple.t_self,
2058                                                           cur_buffer, &newtup);
2059
2060                                 PageSetLSN(page, recptr);
2061                                 PageSetSUI(page, ThisStartUpID);
2062                                 PageSetLSN(ToPage, recptr);
2063                                 PageSetSUI(ToPage, ThisStartUpID);
2064                         }
2065                         else
2066                         {
2067                                 /* No XLOG record, but still need to flag that XID exists on disk */
2068                                 MyXactMadeTempRelUpdate = true;
2069                         }
2070
2071                         END_CRIT_SECTION();
2072
2073                         cur_page->offsets_used++;
2074                         num_moved++;
2075                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2076                         if (cur_page->blkno > last_move_dest_block)
2077                                 last_move_dest_block = cur_page->blkno;
2078
2079                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2080
2081                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2082                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2083
2084                         /* insert index' tuples if needed */
2085                         if (resultRelInfo->ri_NumIndices > 0)
2086                         {
2087                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2088                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2089                         }
2090                 }                                               /* walk along page */
2091
2092                 if (offnum < maxoff && keep_tuples > 0)
2093                 {
2094                         OffsetNumber off;
2095
2096                         for (off = OffsetNumberNext(offnum);
2097                                  off <= maxoff;
2098                                  off = OffsetNumberNext(off))
2099                         {
2100                                 itemid = PageGetItemId(page, off);
2101                                 if (!ItemIdIsUsed(itemid))
2102                                         continue;
2103                                 tuple.t_datamcxt = NULL;
2104                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2105                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2106                                         continue;
2107                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2108                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2109                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2110                                 {
2111                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2112                                                 elog(ERROR, "Invalid XVAC in tuple header (4)");
2113                                         /* some chains was moved while */
2114                                         if (chain_tuple_moved)
2115                                         {                       /* cleaning this page */
2116                                                 Assert(vacpage->offsets_free > 0);
2117                                                 for (i = 0; i < vacpage->offsets_free; i++)
2118                                                 {
2119                                                         if (vacpage->offsets[i] == off)
2120                                                                 break;
2121                                                 }
2122                                                 if (i >= vacpage->offsets_free) /* not found */
2123                                                 {
2124                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2125                                                         Assert(keep_tuples > 0);
2126                                                         keep_tuples--;
2127                                                 }
2128                                         }
2129                                         else
2130                                         {
2131                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2132                                                 Assert(keep_tuples > 0);
2133                                                 keep_tuples--;
2134                                         }
2135                                 }
2136                                 else
2137                                         elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2138                         }
2139                 }
2140
2141                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2142                 {
2143                         if (chain_tuple_moved)          /* else - they are ordered */
2144                         {
2145                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2146                                           sizeof(OffsetNumber), vac_cmp_offno);
2147                         }
2148                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2149                         WriteBuffer(buf);
2150                 }
2151                 else if (dowrite)
2152                         WriteBuffer(buf);
2153                 else
2154                         ReleaseBuffer(buf);
2155
2156                 if (offnum <= maxoff)
2157                         break;                          /* some item(s) left */
2158
2159         }                                                       /* walk along relation */
2160
2161         blkno++;                                        /* new number of blocks */
2162
2163         if (cur_buffer != InvalidBuffer)
2164         {
2165                 Assert(num_moved > 0);
2166                 WriteBuffer(cur_buffer);
2167         }
2168
2169         if (num_moved > 0)
2170         {
2171                 /*
2172                  * We have to commit our tuple movings before we truncate the
2173                  * relation.  Ideally we should do Commit/StartTransactionCommand
2174                  * here, relying on the session-level table lock to protect our
2175                  * exclusive access to the relation.  However, that would require
2176                  * a lot of extra code to close and re-open the relation, indexes,
2177                  * etc.  For now, a quick hack: record status of current
2178                  * transaction as committed, and continue.
2179                  */
2180                 RecordTransactionCommit();
2181         }
2182
2183         /*
2184          * We are not going to move any more tuples across pages, but we still
2185          * need to apply vacuum_page to compact free space in the remaining
2186          * pages in vacuum_pages list.  Note that some of these pages may also
2187          * be in the fraged_pages list, and may have had tuples moved onto
2188          * them; if so, we already did vacuum_page and needn't do it again.
2189          */
2190         for (i = 0, curpage = vacuum_pages->pagedesc;
2191                  i < vacuumed_pages;
2192                  i++, curpage++)
2193         {
2194                 CHECK_FOR_INTERRUPTS();
2195                 Assert((*curpage)->blkno < blkno);
2196                 if ((*curpage)->offsets_used == 0)
2197                 {
2198                         /* this page was not used as a move target, so must clean it */
2199                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2200                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2201                         page = BufferGetPage(buf);
2202                         if (!PageIsEmpty(page))
2203                                 vacuum_page(onerel, buf, *curpage);
2204                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2205                         WriteBuffer(buf);
2206                 }
2207         }
2208
2209         /*
2210          * Now scan all the pages that we moved tuples onto and update tuple
2211          * status bits.  This is not really necessary, but will save time for
2212          * future transactions examining these tuples.
2213          *
2214          * XXX WARNING that this code fails to clear HEAP_MOVED_OFF tuples from
2215          * pages that were move source pages but not move dest pages.  One
2216          * also wonders whether it wouldn't be better to skip this step and
2217          * let the tuple status updates happen someplace that's not holding an
2218          * exclusive lock on the relation.
2219          */
2220         checked_moved = 0;
2221         for (i = 0, curpage = fraged_pages->pagedesc;
2222                  i < num_fraged_pages;
2223                  i++, curpage++)
2224         {
2225                 CHECK_FOR_INTERRUPTS();
2226                 Assert((*curpage)->blkno < blkno);
2227                 if ((*curpage)->blkno > last_move_dest_block)
2228                         break;                          /* no need to scan any further */
2229                 if ((*curpage)->offsets_used == 0)
2230                         continue;                       /* this page was never used as a move dest */
2231                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2232                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2233                 page = BufferGetPage(buf);
2234                 num_tuples = 0;
2235                 max_offset = PageGetMaxOffsetNumber(page);
2236                 for (newoff = FirstOffsetNumber;
2237                          newoff <= max_offset;
2238                          newoff = OffsetNumberNext(newoff))
2239                 {
2240                         itemid = PageGetItemId(page, newoff);
2241                         if (!ItemIdIsUsed(itemid))
2242                                 continue;
2243                         tuple.t_datamcxt = NULL;
2244                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2245                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2246                         {
2247                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2248                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2249                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2250                                         elog(ERROR, "Invalid XVAC in tuple header (2)");
2251                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2252                                 {
2253                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2254                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2255                                         num_tuples++;
2256                                 }
2257                                 else
2258                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2259                         }
2260                 }
2261                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2262                 WriteBuffer(buf);
2263                 Assert((*curpage)->offsets_used == num_tuples);
2264                 checked_moved += num_tuples;
2265         }
2266         Assert(num_moved == checked_moved);
2267
2268         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2269                  RelationGetRelationName(onerel),
2270                  nblocks, blkno, num_moved,
2271                  vac_show_rusage(&ru0));
2272
2273         /*
2274          * Reflect the motion of system tuples to catalog cache here.
2275          */
2276         CommandCounterIncrement();
2277
2278         if (Nvacpagelist.num_pages > 0)
2279         {
2280                 /* vacuum indexes again if needed */
2281                 if (Irel != (Relation *) NULL)
2282                 {
2283                         VacPage    *vpleft,
2284                                            *vpright,
2285                                                 vpsave;
2286
2287                         /* re-sort Nvacpagelist.pagedesc */
2288                         for (vpleft = Nvacpagelist.pagedesc,
2289                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2290                                  vpleft < vpright; vpleft++, vpright--)
2291                         {
2292                                 vpsave = *vpleft;
2293                                 *vpleft = *vpright;
2294                                 *vpright = vpsave;
2295                         }
2296                         Assert(keep_tuples >= 0);
2297                         for (i = 0; i < nindexes; i++)
2298                                 vacuum_index(&Nvacpagelist, Irel[i],
2299                                                          vacrelstats->rel_tuples, keep_tuples);
2300                 }
2301
2302                 /* clean moved tuples from last page in Nvacpagelist list */
2303                 if (vacpage->blkno == (blkno - 1) &&
2304                         vacpage->offsets_free > 0)
2305                 {
2306                         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2307                         OffsetNumber *unused = unbuf;
2308                         int                     uncnt;
2309
2310                         buf = ReadBuffer(onerel, vacpage->blkno);
2311                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2312                         page = BufferGetPage(buf);
2313                         num_tuples = 0;
2314                         maxoff = PageGetMaxOffsetNumber(page);
2315                         for (offnum = FirstOffsetNumber;
2316                                  offnum <= maxoff;
2317                                  offnum = OffsetNumberNext(offnum))
2318                         {
2319                                 itemid = PageGetItemId(page, offnum);
2320                                 if (!ItemIdIsUsed(itemid))
2321                                         continue;
2322                                 tuple.t_datamcxt = NULL;
2323                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2324
2325                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2326                                 {
2327                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2328                                         {
2329                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2330                                                         elog(ERROR, "Invalid XVAC in tuple header (3)");
2331                                                 itemid->lp_flags &= ~LP_USED;
2332                                                 num_tuples++;
2333                                         }
2334                                         else
2335                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (3)");
2336                                 }
2337
2338                         }
2339                         Assert(vacpage->offsets_free == num_tuples);
2340
2341                         START_CRIT_SECTION();
2342
2343                         uncnt = PageRepairFragmentation(page, unused);
2344
2345                         /* XLOG stuff */
2346                         if (!onerel->rd_istemp)
2347                         {
2348                                 XLogRecPtr      recptr;
2349
2350                                 recptr = log_heap_clean(onerel, buf, (char *) unused,
2351                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2352                                 PageSetLSN(page, recptr);
2353                                 PageSetSUI(page, ThisStartUpID);
2354                         }
2355                         else
2356                         {
2357                                 /* No XLOG record, but still need to flag that XID exists on disk */
2358                                 MyXactMadeTempRelUpdate = true;
2359                         }
2360
2361                         END_CRIT_SECTION();
2362
2363                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2364                         WriteBuffer(buf);
2365                 }
2366
2367                 /* now - free new list of reaped pages */
2368                 curpage = Nvacpagelist.pagedesc;
2369                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2370                         pfree(*curpage);
2371                 pfree(Nvacpagelist.pagedesc);
2372         }
2373
2374         /*
2375          * Flush dirty pages out to disk.  We do this unconditionally, even if
2376          * we don't need to truncate, because we want to ensure that all
2377          * tuples have correct on-row commit status on disk (see bufmgr.c's
2378          * comments for FlushRelationBuffers()).
2379          */
2380         i = FlushRelationBuffers(onerel, blkno);
2381         if (i < 0)
2382                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2383                          i);
2384
2385         /* truncate relation, if needed */
2386         if (blkno < nblocks)
2387         {
2388                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2389                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2390                 onerel->rd_targblock = InvalidBlockNumber;
2391                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2392         }
2393
2394         /* clean up */
2395         pfree(vacpage);
2396         if (vacrelstats->vtlinks != NULL)
2397                 pfree(vacrelstats->vtlinks);
2398
2399         ExecDropTupleTable(tupleTable, true);
2400
2401         ExecCloseIndices(resultRelInfo);
2402 }
2403
2404 /*
2405  *      vacuum_heap() -- free dead tuples
2406  *
2407  *              This routine marks dead tuples as unused and truncates relation
2408  *              if there are "empty" end-blocks.
2409  */
2410 static void
2411 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2412 {
2413         Buffer          buf;
2414         VacPage    *vacpage;
2415         BlockNumber relblocks;
2416         int                     nblocks;
2417         int                     i;
2418
2419         nblocks = vacuum_pages->num_pages;
2420         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2421
2422         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2423         {
2424                 CHECK_FOR_INTERRUPTS();
2425                 if ((*vacpage)->offsets_free > 0)
2426                 {
2427                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2428                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2429                         vacuum_page(onerel, buf, *vacpage);
2430                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2431                         WriteBuffer(buf);
2432                 }
2433         }
2434
2435         /*
2436          * Flush dirty pages out to disk.  We do this unconditionally, even if
2437          * we don't need to truncate, because we want to ensure that all
2438          * tuples have correct on-row commit status on disk (see bufmgr.c's
2439          * comments for FlushRelationBuffers()).
2440          */
2441         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2442         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2443
2444         i = FlushRelationBuffers(onerel, relblocks);
2445         if (i < 0)
2446                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2447                          i);
2448
2449         /* truncate relation if there are some empty end-pages */
2450         if (vacuum_pages->empty_end_pages > 0)
2451         {
2452                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2453                          RelationGetRelationName(onerel),
2454                          vacrelstats->rel_pages, relblocks);
2455                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2456                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2457                 onerel->rd_targblock = InvalidBlockNumber;
2458                 vacrelstats->rel_pages = relblocks;             /* set new number of
2459                                                                                                  * blocks */
2460         }
2461 }
2462
2463 /*
2464  *      vacuum_page() -- free dead tuples on a page
2465  *                                       and repair its fragmentation.
2466  */
2467 static void
2468 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2469 {
2470         OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
2471         OffsetNumber *unused = unbuf;
2472         int                     uncnt;
2473         Page            page = BufferGetPage(buffer);
2474         ItemId          itemid;
2475         int                     i;
2476
2477         /* There shouldn't be any tuples moved onto the page yet! */
2478         Assert(vacpage->offsets_used == 0);
2479
2480         START_CRIT_SECTION();
2481
2482         for (i = 0; i < vacpage->offsets_free; i++)
2483         {
2484                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2485                 itemid->lp_flags &= ~LP_USED;
2486         }
2487
2488         uncnt = PageRepairFragmentation(page, unused);
2489
2490         /* XLOG stuff */
2491         if (!onerel->rd_istemp)
2492         {
2493                 XLogRecPtr      recptr;
2494
2495                 recptr = log_heap_clean(onerel, buffer, (char *) unused,
2496                                                   (char *) (&(unused[uncnt])) - (char *) unused);
2497                 PageSetLSN(page, recptr);
2498                 PageSetSUI(page, ThisStartUpID);
2499         }
2500         else
2501         {
2502                 /* No XLOG record, but still need to flag that XID exists on disk */
2503                 MyXactMadeTempRelUpdate = true;
2504         }
2505
2506         END_CRIT_SECTION();
2507 }
2508
2509 /*
2510  *      scan_index() -- scan one index relation to update statistic.
2511  *
2512  * We use this when we have no deletions to do.
2513  */
2514 static void
2515 scan_index(Relation indrel, double num_tuples)
2516 {
2517         IndexBulkDeleteResult *stats;
2518         VacRUsage       ru0;
2519
2520         vac_init_rusage(&ru0);
2521
2522         /*
2523          * Even though we're not planning to delete anything, use the
2524          * ambulkdelete call, so that the scan happens within the index AM for
2525          * more speed.
2526          */
2527         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2528
2529         if (!stats)
2530                 return;
2531
2532         /* now update statistics in pg_class */
2533         vac_update_relstats(RelationGetRelid(indrel),
2534                                                 stats->num_pages, stats->num_index_tuples,
2535                                                 false);
2536
2537         elog(elevel, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
2538                  RelationGetRelationName(indrel),
2539                  stats->num_pages, stats->num_index_tuples,
2540                  vac_show_rusage(&ru0));
2541
2542         /*
2543          * Check for tuple count mismatch.      If the index is partial, then it's
2544          * OK for it to have fewer tuples than the heap; else we got trouble.
2545          */
2546         if (stats->num_index_tuples != num_tuples)
2547         {
2548                 if (stats->num_index_tuples > num_tuples ||
2549                         !vac_is_partial_index(indrel))
2550                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2551 \n\tRecreate the index.",
2552                                  RelationGetRelationName(indrel),
2553                                  stats->num_index_tuples, num_tuples);
2554         }
2555
2556         pfree(stats);
2557 }
2558
2559 /*
2560  *      vacuum_index() -- vacuum one index relation.
2561  *
2562  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2563  *              It's locked. Indrel is an index relation on the vacuumed heap.
2564  *
2565  *              We don't bother to set locks on the index relation here, since
2566  *              the parent table is exclusive-locked already.
2567  *
2568  *              Finally, we arrange to update the index relation's statistics in
2569  *              pg_class.
2570  */
2571 static void
2572 vacuum_index(VacPageList vacpagelist, Relation indrel,
2573                          double num_tuples, int keep_tuples)
2574 {
2575         IndexBulkDeleteResult *stats;
2576         VacRUsage       ru0;
2577
2578         vac_init_rusage(&ru0);
2579
2580         /* Do bulk deletion */
2581         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2582
2583         if (!stats)
2584                 return;
2585
2586         /* now update statistics in pg_class */
2587         vac_update_relstats(RelationGetRelid(indrel),
2588                                                 stats->num_pages, stats->num_index_tuples,
2589                                                 false);
2590
2591         elog(elevel, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
2592                  RelationGetRelationName(indrel), stats->num_pages,
2593                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2594                  vac_show_rusage(&ru0));
2595
2596         /*
2597          * Check for tuple count mismatch.      If the index is partial, then it's
2598          * OK for it to have fewer tuples than the heap; else we got trouble.
2599          */
2600         if (stats->num_index_tuples != num_tuples + keep_tuples)
2601         {
2602                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2603                         !vac_is_partial_index(indrel))
2604                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
2605 \n\tRecreate the index.",
2606                                  RelationGetRelationName(indrel),
2607                                  stats->num_index_tuples, num_tuples);
2608         }
2609
2610         pfree(stats);
2611 }
2612
2613 /*
2614  *      tid_reaped() -- is a particular tid reaped?
2615  *
2616  *              This has the right signature to be an IndexBulkDeleteCallback.
2617  *
2618  *              vacpagelist->VacPage_array is sorted in right order.
2619  */
2620 static bool
2621 tid_reaped(ItemPointer itemptr, void *state)
2622 {
2623         VacPageList vacpagelist = (VacPageList) state;
2624         OffsetNumber ioffno;
2625         OffsetNumber *voff;
2626         VacPage         vp,
2627                            *vpp;
2628         VacPageData vacpage;
2629
2630         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2631         ioffno = ItemPointerGetOffsetNumber(itemptr);
2632
2633         vp = &vacpage;
2634         vpp = (VacPage *) vac_bsearch((void *) &vp,
2635                                                                   (void *) (vacpagelist->pagedesc),
2636                                                                   vacpagelist->num_pages,
2637                                                                   sizeof(VacPage),
2638                                                                   vac_cmp_blk);
2639
2640         if (vpp == NULL)
2641                 return false;
2642
2643         /* ok - we are on a partially or fully reaped page */
2644         vp = *vpp;
2645
2646         if (vp->offsets_free == 0)
2647         {
2648                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2649                 return true;
2650         }
2651
2652         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2653                                                                                 (void *) (vp->offsets),
2654                                                                                 vp->offsets_free,
2655                                                                                 sizeof(OffsetNumber),
2656                                                                                 vac_cmp_offno);
2657
2658         if (voff == NULL)
2659                 return false;
2660
2661         /* tid is reaped */
2662         return true;
2663 }
2664
2665 /*
2666  * Dummy version for scan_index.
2667  */
2668 static bool
2669 dummy_tid_reaped(ItemPointer itemptr, void *state)
2670 {
2671         return false;
2672 }
2673
2674 /*
2675  * Update the shared Free Space Map with the info we now have about
2676  * free space in the relation, discarding any old info the map may have.
2677  */
2678 static void
2679 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2680                            BlockNumber rel_pages)
2681 {
2682         int                     nPages = fraged_pages->num_pages;
2683         int                     i;
2684         BlockNumber *pages;
2685         Size       *spaceAvail;
2686
2687         /* +1 to avoid palloc(0) */
2688         pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
2689         spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
2690
2691         for (i = 0; i < nPages; i++)
2692         {
2693                 pages[i] = fraged_pages->pagedesc[i]->blkno;
2694                 spaceAvail[i] = fraged_pages->pagedesc[i]->free;
2695
2696                 /*
2697                  * fraged_pages may contain entries for pages that we later
2698                  * decided to truncate from the relation; don't enter them into
2699                  * the map!
2700                  */
2701                 if (pages[i] >= rel_pages)
2702                 {
2703                         nPages = i;
2704                         break;
2705                 }
2706         }
2707
2708         MultiRecordFreeSpace(&onerel->rd_node,
2709                                                  0, MaxBlockNumber,
2710                                                  nPages, pages, spaceAvail);
2711         pfree(pages);
2712         pfree(spaceAvail);
2713 }
2714
2715 /* Copy a VacPage structure */
2716 static VacPage
2717 copy_vac_page(VacPage vacpage)
2718 {
2719         VacPage         newvacpage;
2720
2721         /* allocate a VacPageData entry */
2722         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2723                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2724
2725         /* fill it in */
2726         if (vacpage->offsets_free > 0)
2727                 memcpy(newvacpage->offsets, vacpage->offsets,
2728                            vacpage->offsets_free * sizeof(OffsetNumber));
2729         newvacpage->blkno = vacpage->blkno;
2730         newvacpage->free = vacpage->free;
2731         newvacpage->offsets_used = vacpage->offsets_used;
2732         newvacpage->offsets_free = vacpage->offsets_free;
2733
2734         return newvacpage;
2735 }
2736
2737 /*
2738  * Add a VacPage pointer to a VacPageList.
2739  *
2740  *              As a side effect of the way that scan_heap works,
2741  *              higher pages come after lower pages in the array
2742  *              (and highest tid on a page is last).
2743  */
2744 static void
2745 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2746 {
2747 #define PG_NPAGEDESC 1024
2748
2749         /* allocate a VacPage entry if needed */
2750         if (vacpagelist->num_pages == 0)
2751         {
2752                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2753                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2754         }
2755         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2756         {
2757                 vacpagelist->num_allocated_pages *= 2;
2758                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2759         }
2760         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2761         (vacpagelist->num_pages)++;
2762 }
2763
2764 /*
2765  * vac_bsearch: just like standard C library routine bsearch(),
2766  * except that we first test to see whether the target key is outside
2767  * the range of the table entries.      This case is handled relatively slowly
2768  * by the normal binary search algorithm (ie, no faster than any other key)
2769  * but it occurs often enough in VACUUM to be worth optimizing.
2770  */
2771 static void *
2772 vac_bsearch(const void *key, const void *base,
2773                         size_t nelem, size_t size,
2774                         int (*compar) (const void *, const void *))
2775 {
2776         int                     res;
2777         const void *last;
2778
2779         if (nelem == 0)
2780                 return NULL;
2781         res = compar(key, base);
2782         if (res < 0)
2783                 return NULL;
2784         if (res == 0)
2785                 return (void *) base;
2786         if (nelem > 1)
2787         {
2788                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2789                 res = compar(key, last);
2790                 if (res > 0)
2791                         return NULL;
2792                 if (res == 0)
2793                         return (void *) last;
2794         }
2795         if (nelem <= 2)
2796                 return NULL;                    /* already checked 'em all */
2797         return bsearch(key, base, nelem, size, compar);
2798 }
2799
2800 /*
2801  * Comparator routines for use with qsort() and bsearch().
2802  */
2803 static int
2804 vac_cmp_blk(const void *left, const void *right)
2805 {
2806         BlockNumber lblk,
2807                                 rblk;
2808
2809         lblk = (*((VacPage *) left))->blkno;
2810         rblk = (*((VacPage *) right))->blkno;
2811
2812         if (lblk < rblk)
2813                 return -1;
2814         if (lblk == rblk)
2815                 return 0;
2816         return 1;
2817 }
2818
2819 static int
2820 vac_cmp_offno(const void *left, const void *right)
2821 {
2822         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2823                 return -1;
2824         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2825                 return 0;
2826         return 1;
2827 }
2828
2829 static int
2830 vac_cmp_vtlinks(const void *left, const void *right)
2831 {
2832         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2833                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2834                 return -1;
2835         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2836                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2837                 return 1;
2838         /* bi_hi-es are equal */
2839         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2840                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2841                 return -1;
2842         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2843                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2844                 return 1;
2845         /* bi_lo-es are equal */
2846         if (((VTupleLink) left)->new_tid.ip_posid <
2847                 ((VTupleLink) right)->new_tid.ip_posid)
2848                 return -1;
2849         if (((VTupleLink) left)->new_tid.ip_posid >
2850                 ((VTupleLink) right)->new_tid.ip_posid)
2851                 return 1;
2852         return 0;
2853 }
2854
2855
2856 void
2857 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2858 {
2859         List       *indexoidlist,
2860                            *indexoidscan;
2861         int                     i;
2862
2863         indexoidlist = RelationGetIndexList(relation);
2864
2865         *nindexes = length(indexoidlist);
2866
2867         if (*nindexes > 0)
2868                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2869         else
2870                 *Irel = NULL;
2871
2872         i = 0;
2873         foreach(indexoidscan, indexoidlist)
2874         {
2875                 Oid                     indexoid = lfirsti(indexoidscan);
2876
2877                 (*Irel)[i] = index_open(indexoid);
2878                 i++;
2879         }
2880
2881         freeList(indexoidlist);
2882 }
2883
2884
2885 void
2886 vac_close_indexes(int nindexes, Relation *Irel)
2887 {
2888         if (Irel == (Relation *) NULL)
2889                 return;
2890
2891         while (nindexes--)
2892                 index_close(Irel[nindexes]);
2893         pfree(Irel);
2894 }
2895
2896
2897 /*
2898  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2899  */
2900 bool
2901 vac_is_partial_index(Relation indrel)
2902 {
2903         /*
2904          * If the index's AM doesn't support nulls, it's partial for our
2905          * purposes
2906          */
2907         if (!indrel->rd_am->amindexnulls)
2908                 return true;
2909
2910         /* Otherwise, look to see if there's a partial-index predicate */
2911         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
2912 }
2913
2914
2915 static bool
2916 enough_space(VacPage vacpage, Size len)
2917 {
2918         len = MAXALIGN(len);
2919
2920         if (len > vacpage->free)
2921                 return false;
2922
2923         /* if there are free itemid(s) and len <= free_space... */
2924         if (vacpage->offsets_used < vacpage->offsets_free)
2925                 return true;
2926
2927         /* noff_used >= noff_free and so we'll have to allocate new itemid */
2928         if (len + sizeof(ItemIdData) <= vacpage->free)
2929                 return true;
2930
2931         return false;
2932 }
2933
2934
2935 /*
2936  * Initialize usage snapshot.
2937  */
2938 void
2939 vac_init_rusage(VacRUsage *ru0)
2940 {
2941         struct timezone tz;
2942
2943         getrusage(RUSAGE_SELF, &ru0->ru);
2944         gettimeofday(&ru0->tv, &tz);
2945 }
2946
2947 /*
2948  * Compute elapsed time since ru0 usage snapshot, and format into
2949  * a displayable string.  Result is in a static string, which is
2950  * tacky, but no one ever claimed that the Postgres backend is
2951  * threadable...
2952  */
2953 const char *
2954 vac_show_rusage(VacRUsage *ru0)
2955 {
2956         static char result[100];
2957         VacRUsage       ru1;
2958
2959         vac_init_rusage(&ru1);
2960
2961         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
2962         {
2963                 ru1.tv.tv_sec--;
2964                 ru1.tv.tv_usec += 1000000;
2965         }
2966         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
2967         {
2968                 ru1.ru.ru_stime.tv_sec--;
2969                 ru1.ru.ru_stime.tv_usec += 1000000;
2970         }
2971         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
2972         {
2973                 ru1.ru.ru_utime.tv_sec--;
2974                 ru1.ru.ru_utime.tv_usec += 1000000;
2975         }
2976
2977         snprintf(result, sizeof(result),
2978                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
2979                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
2980           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
2981                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
2982           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
2983                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
2984                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
2985
2986         return result;
2987 }