]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Add FILLFACTOR to CREATE INDEX.
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.331 2006/07/02 02:23:19 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/subtrans.h"
29 #include "access/xlog.h"
30 #include "catalog/catalog.h"
31 #include "catalog/indexing.h"
32 #include "catalog/namespace.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_index.h"
35 #include "commands/dbcommands.h"
36 #include "commands/vacuum.h"
37 #include "executor/executor.h"
38 #include "miscadmin.h"
39 #include "postmaster/autovacuum.h"
40 #include "storage/freespace.h"
41 #include "storage/procarray.h"
42 #include "storage/smgr.h"
43 #include "tcop/pquery.h"
44 #include "utils/acl.h"
45 #include "utils/builtins.h"
46 #include "utils/flatfiles.h"
47 #include "utils/fmgroids.h"
48 #include "utils/inval.h"
49 #include "utils/lsyscache.h"
50 #include "utils/memutils.h"
51 #include "utils/pg_rusage.h"
52 #include "utils/relcache.h"
53 #include "utils/syscache.h"
54 #include "pgstat.h"
55
56
57 /*
58  * VacPage structures keep track of each page on which we find useful
59  * amounts of free space.
60  */
61 typedef struct VacPageData
62 {
63         BlockNumber blkno;                      /* BlockNumber of this Page */
64         Size            free;                   /* FreeSpace on this Page */
65         uint16          offsets_used;   /* Number of OffNums used by vacuum */
66         uint16          offsets_free;   /* Number of OffNums free or to be free */
67         OffsetNumber offsets[1];        /* Array of free OffNums */
68 } VacPageData;
69
70 typedef VacPageData *VacPage;
71
72 typedef struct VacPageListData
73 {
74         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
75         int                     num_pages;              /* Number of pages in pagedesc */
76         int                     num_allocated_pages;    /* Number of allocated pages in
77                                                                                  * pagedesc */
78         VacPage    *pagedesc;           /* Descriptions of pages */
79 } VacPageListData;
80
81 typedef VacPageListData *VacPageList;
82
83 /*
84  * The "vtlinks" array keeps information about each recently-updated tuple
85  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
86  * We store the tuple's own TID as well as its t_ctid (its link to the next
87  * newer tuple version).  Searching in this array allows us to follow update
88  * chains backwards from newer to older tuples.  When we move a member of an
89  * update chain, we must move *all* the live members of the chain, so that we
90  * can maintain their t_ctid link relationships (we must not just overwrite
91  * t_ctid in an existing tuple).
92  *
93  * Note: because t_ctid links can be stale (this would only occur if a prior
94  * VACUUM crashed partway through), it is possible that new_tid points to an
95  * empty slot or unrelated tuple.  We have to check the linkage as we follow
96  * it, just as is done in EvalPlanQual.
97  */
98 typedef struct VTupleLinkData
99 {
100         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
101         ItemPointerData this_tid;       /* t_self of the tuple */
102 } VTupleLinkData;
103
104 typedef VTupleLinkData *VTupleLink;
105
106 /*
107  * We use an array of VTupleMoveData to plan a chain tuple move fully
108  * before we do it.
109  */
110 typedef struct VTupleMoveData
111 {
112         ItemPointerData tid;            /* tuple ID */
113         VacPage         vacpage;                /* where to move it to */
114         bool            cleanVpd;               /* clean vacpage before using? */
115 } VTupleMoveData;
116
117 typedef VTupleMoveData *VTupleMove;
118
119 /*
120  * VRelStats contains the data acquired by scan_heap for use later
121  */
122 typedef struct VRelStats
123 {
124         /* miscellaneous statistics */
125         BlockNumber rel_pages;
126         double          rel_tuples;
127         Size            min_tlen;
128         Size            max_tlen;
129         bool            hasindex;
130         /* vtlinks array for tuple chain following - sorted by new_tid */
131         int                     num_vtlinks;
132         VTupleLink      vtlinks;
133 } VRelStats;
134
135 /*----------------------------------------------------------------------
136  * ExecContext:
137  *
138  * As these variables always appear together, we put them into one struct
139  * and pull initialization and cleanup into separate routines.
140  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
141  * accurately:  It is *used* only in move_xxx_tuple(), but because this
142  * routine is called many times, we initialize the struct just once in
143  * repair_frag() and pass it on to move_xxx_tuple().
144  */
145 typedef struct ExecContextData
146 {
147         ResultRelInfo *resultRelInfo;
148         EState     *estate;
149         TupleTableSlot *slot;
150 } ExecContextData;
151
152 typedef ExecContextData *ExecContext;
153
154 static void
155 ExecContext_Init(ExecContext ec, Relation rel)
156 {
157         TupleDesc       tupdesc = RelationGetDescr(rel);
158
159         /*
160          * We need a ResultRelInfo and an EState so we can use the regular
161          * executor's index-entry-making machinery.
162          */
163         ec->estate = CreateExecutorState();
164
165         ec->resultRelInfo = makeNode(ResultRelInfo);
166         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
167         ec->resultRelInfo->ri_RelationDesc = rel;
168         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
169
170         ExecOpenIndices(ec->resultRelInfo);
171
172         ec->estate->es_result_relations = ec->resultRelInfo;
173         ec->estate->es_num_result_relations = 1;
174         ec->estate->es_result_relation_info = ec->resultRelInfo;
175
176         /* Set up a tuple slot too */
177         ec->slot = MakeSingleTupleTableSlot(tupdesc);
178 }
179
180 static void
181 ExecContext_Finish(ExecContext ec)
182 {
183         ExecDropSingleTupleTableSlot(ec->slot);
184         ExecCloseIndices(ec->resultRelInfo);
185         FreeExecutorState(ec->estate);
186 }
187
188 /*
189  * End of ExecContext Implementation
190  *----------------------------------------------------------------------
191  */
192
193 static MemoryContext vac_context = NULL;
194
195 static int      elevel = -1;
196
197 static TransactionId OldestXmin;
198 static TransactionId FreezeLimit;
199
200
201 /* non-export function prototypes */
202 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
203                          const char *stmttype);
204 static void vac_update_dbstats(Oid dbid,
205                                    TransactionId vacuumXID,
206                                    TransactionId frozenXID);
207 static void vac_truncate_clog(TransactionId vacuumXID,
208                                   TransactionId frozenXID);
209 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
210 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
211 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
212                   VacPageList vacuum_pages, VacPageList fraged_pages);
213 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
214                         VacPageList vacuum_pages, VacPageList fraged_pages,
215                         int nindexes, Relation *Irel);
216 static void move_chain_tuple(Relation rel,
217                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
218                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
219                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
220 static void move_plain_tuple(Relation rel,
221                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
222                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
223                                  ExecContext ec);
224 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
225                                  int num_fraged_pages, BlockNumber last_move_dest_block,
226                                  int num_moved);
227 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
228                         VacPageList vacpagelist);
229 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
230 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
231                          double num_tuples, int keep_tuples);
232 static void scan_index(Relation indrel, double num_tuples);
233 static bool tid_reaped(ItemPointer itemptr, void *state);
234 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
235                            BlockNumber rel_pages);
236 static VacPage copy_vac_page(VacPage vacpage);
237 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
238 static void *vac_bsearch(const void *key, const void *base,
239                         size_t nelem, size_t size,
240                         int (*compar) (const void *, const void *));
241 static int      vac_cmp_blk(const void *left, const void *right);
242 static int      vac_cmp_offno(const void *left, const void *right);
243 static int      vac_cmp_vtlinks(const void *left, const void *right);
244 static bool enough_space(VacPage vacpage, Size len);
245 static Size     PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
246
247
248 /****************************************************************************
249  *                                                                                                                                                      *
250  *                      Code common to all flavors of VACUUM and ANALYZE                                *
251  *                                                                                                                                                      *
252  ****************************************************************************
253  */
254
255
256 /*
257  * Primary entry point for VACUUM and ANALYZE commands.
258  *
259  * relids is normally NIL; if it is not, then it provides the list of
260  * relation OIDs to be processed, and vacstmt->relation is ignored.
261  * (The non-NIL case is currently only used by autovacuum.)
262  *
263  * It is the caller's responsibility that both vacstmt and relids
264  * (if given) be allocated in a memory context that won't disappear
265  * at transaction commit.  In fact this context must be QueryContext
266  * to avoid complaints from PreventTransactionChain.
267  */
268 void
269 vacuum(VacuumStmt *vacstmt, List *relids)
270 {
271         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
272         TransactionId initialOldestXmin = InvalidTransactionId;
273         TransactionId initialFreezeLimit = InvalidTransactionId;
274         volatile MemoryContext anl_context = NULL;
275         volatile bool all_rels,
276                                 in_outer_xact,
277                                 use_own_xacts;
278         List       *relations;
279
280         if (vacstmt->verbose)
281                 elevel = INFO;
282         else
283                 elevel = DEBUG2;
284
285         /*
286          * We cannot run VACUUM inside a user transaction block; if we were inside
287          * a transaction, then our commit- and start-transaction-command calls
288          * would not have the intended effect! Furthermore, the forced commit that
289          * occurs before truncating the relation's file would have the effect of
290          * committing the rest of the user's transaction too, which would
291          * certainly not be the desired behavior.  (This only applies to VACUUM
292          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
293          * block, but we choose to disallow that case because we'd rather commit
294          * as soon as possible after finishing the vacuum.      This is mainly so that
295          * we can let go the AccessExclusiveLock that we may be holding.)
296          *
297          * ANALYZE (without VACUUM) can run either way.
298          */
299         if (vacstmt->vacuum)
300         {
301                 PreventTransactionChain((void *) vacstmt, stmttype);
302                 in_outer_xact = false;
303         }
304         else
305                 in_outer_xact = IsInTransactionChain((void *) vacstmt);
306
307         /*
308          * Disallow the combination VACUUM FULL FREEZE; although it would mostly
309          * work, VACUUM FULL's ability to move tuples around means that it is
310          * injecting its own XID into tuple visibility checks.  We'd have to
311          * guarantee that every moved tuple is properly marked XMIN_COMMITTED or
312          * XMIN_INVALID before the end of the operation.  There are corner cases
313          * where this does not happen, and getting rid of them all seems hard (not
314          * to mention fragile to maintain).  On the whole it's not worth it
315          * compared to telling people to use two operations.  See pgsql-hackers
316          * discussion of 27-Nov-2004, and comments below for update_hint_bits().
317          *
318          * Note: this is enforced here, and not in the grammar, since (a) we can
319          * give a better error message, and (b) we might want to allow it again
320          * someday.
321          */
322         if (vacstmt->vacuum && vacstmt->full && vacstmt->freeze)
323                 ereport(ERROR,
324                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
325                                  errmsg("VACUUM FULL FREEZE is not supported"),
326                                  errhint("Use VACUUM FULL, then VACUUM FREEZE.")));
327
328         /*
329          * Send info about dead objects to the statistics collector, unless
330          * we are in autovacuum --- autovacuum.c does this for itself.
331          */
332         if (vacstmt->vacuum && !IsAutoVacuumProcess())
333                 pgstat_vacuum_tabstat();
334
335         /*
336          * Create special memory context for cross-transaction storage.
337          *
338          * Since it is a child of PortalContext, it will go away eventually even
339          * if we suffer an error; there's no need for special abort cleanup logic.
340          */
341         vac_context = AllocSetContextCreate(PortalContext,
342                                                                                 "Vacuum",
343                                                                                 ALLOCSET_DEFAULT_MINSIZE,
344                                                                                 ALLOCSET_DEFAULT_INITSIZE,
345                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
346
347         /* Remember whether we are processing everything in the DB */
348         all_rels = (relids == NIL && vacstmt->relation == NULL);
349
350         /*
351          * Build list of relations to process, unless caller gave us one. (If we
352          * build one, we put it in vac_context for safekeeping.)
353          */
354         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
355
356         if (vacstmt->vacuum && all_rels)
357         {
358                 /*
359                  * It's a database-wide VACUUM.
360                  *
361                  * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
362                  * so that we can record these values at the end of the VACUUM. Note
363                  * that individual tables may well be processed with newer values, but
364                  * we can guarantee that no (non-shared) relations are processed with
365                  * older ones.
366                  *
367                  * It is okay to record non-shared values in pg_database, even though
368                  * we may vacuum shared relations with older cutoffs, because only the
369                  * minimum of the values present in pg_database matters.  We can be
370                  * sure that shared relations have at some time been vacuumed with
371                  * cutoffs no worse than the global minimum; for, if there is a
372                  * backend in some other DB with xmin = OLDXMIN that's determining the
373                  * cutoff with which we vacuum shared relations, it is not possible
374                  * for that database to have a cutoff newer than OLDXMIN recorded in
375                  * pg_database.
376                  */
377                 vacuum_set_xid_limits(vacstmt, false,
378                                                           &initialOldestXmin,
379                                                           &initialFreezeLimit);
380         }
381
382         /*
383          * Decide whether we need to start/commit our own transactions.
384          *
385          * For VACUUM (with or without ANALYZE): always do so, so that we can
386          * release locks as soon as possible.  (We could possibly use the outer
387          * transaction for a one-table VACUUM, but handling TOAST tables would be
388          * problematic.)
389          *
390          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
391          * start/commit our own transactions.  Also, there's no need to do so if
392          * only processing one relation.  For multiple relations when not within a
393          * transaction block, use own transactions so we can release locks sooner.
394          */
395         if (vacstmt->vacuum)
396                 use_own_xacts = true;
397         else
398         {
399                 Assert(vacstmt->analyze);
400                 if (in_outer_xact)
401                         use_own_xacts = false;
402                 else if (list_length(relations) > 1)
403                         use_own_xacts = true;
404                 else
405                         use_own_xacts = false;
406         }
407
408         /*
409          * If we are running ANALYZE without per-table transactions, we'll need a
410          * memory context with table lifetime.
411          */
412         if (!use_own_xacts)
413                 anl_context = AllocSetContextCreate(PortalContext,
414                                                                                         "Analyze",
415                                                                                         ALLOCSET_DEFAULT_MINSIZE,
416                                                                                         ALLOCSET_DEFAULT_INITSIZE,
417                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
418
419         /*
420          * vacuum_rel expects to be entered with no transaction active; it will
421          * start and commit its own transaction.  But we are called by an SQL
422          * command, and so we are executing inside a transaction already. We
423          * commit the transaction started in PostgresMain() here, and start
424          * another one before exiting to match the commit waiting for us back in
425          * PostgresMain().
426          */
427         if (use_own_xacts)
428         {
429                 /* matches the StartTransaction in PostgresMain() */
430                 CommitTransactionCommand();
431         }
432
433         /* Turn vacuum cost accounting on or off */
434         PG_TRY();
435         {
436                 ListCell   *cur;
437
438                 VacuumCostActive = (VacuumCostDelay > 0);
439                 VacuumCostBalance = 0;
440
441                 /*
442                  * Loop to process each selected relation.
443                  */
444                 foreach(cur, relations)
445                 {
446                         Oid                     relid = lfirst_oid(cur);
447
448                         if (vacstmt->vacuum)
449                         {
450                                 if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
451                                         all_rels = false;       /* forget about updating dbstats */
452                         }
453                         if (vacstmt->analyze)
454                         {
455                                 MemoryContext old_context = NULL;
456
457                                 /*
458                                  * If using separate xacts, start one for analyze. Otherwise,
459                                  * we can use the outer transaction, but we still need to call
460                                  * analyze_rel in a memory context that will be cleaned up on
461                                  * return (else we leak memory while processing multiple
462                                  * tables).
463                                  */
464                                 if (use_own_xacts)
465                                 {
466                                         StartTransactionCommand();
467                                         /* functions in indexes may want a snapshot set */
468                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
469                                 }
470                                 else
471                                         old_context = MemoryContextSwitchTo(anl_context);
472
473                                 /*
474                                  * Tell the buffer replacement strategy that vacuum is causing
475                                  * the IO
476                                  */
477                                 StrategyHintVacuum(true);
478
479                                 analyze_rel(relid, vacstmt);
480
481                                 StrategyHintVacuum(false);
482
483                                 if (use_own_xacts)
484                                         CommitTransactionCommand();
485                                 else
486                                 {
487                                         MemoryContextSwitchTo(old_context);
488                                         MemoryContextResetAndDeleteChildren(anl_context);
489                                 }
490                         }
491                 }
492         }
493         PG_CATCH();
494         {
495                 /* Make sure cost accounting is turned off after error */
496                 VacuumCostActive = false;
497                 PG_RE_THROW();
498         }
499         PG_END_TRY();
500
501         /* Turn off vacuum cost accounting */
502         VacuumCostActive = false;
503
504         /*
505          * Finish up processing.
506          */
507         if (use_own_xacts)
508         {
509                 /* here, we are not in a transaction */
510
511                 /*
512                  * This matches the CommitTransaction waiting for us in
513                  * PostgresMain().
514                  */
515                 StartTransactionCommand();
516                 /*
517                  * Re-establish the transaction snapshot.  This is wasted effort
518                  * when we are called as a normal utility command, because the
519                  * new transaction will be dropped immediately by PostgresMain();
520                  * but it's necessary if we are called from autovacuum because
521                  * autovacuum might continue on to do an ANALYZE-only call.
522                  */
523                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
524         }
525
526         if (vacstmt->vacuum)
527         {
528                 /*
529                  * If it was a database-wide VACUUM, print FSM usage statistics (we
530                  * don't make you be superuser to see these).
531                  */
532                 if (all_rels)
533                         PrintFreeSpaceMapStatistics(elevel);
534
535                 /*
536                  * If we completed a database-wide VACUUM without skipping any
537                  * relations, update the database's pg_database row with info about
538                  * the transaction IDs used, and try to truncate pg_clog.
539                  */
540                 if (all_rels)
541                 {
542                         vac_update_dbstats(MyDatabaseId,
543                                                            initialOldestXmin, initialFreezeLimit);
544                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
545                 }
546         }
547
548         /*
549          * Clean up working storage --- note we must do this after
550          * StartTransactionCommand, else we might be trying to delete the active
551          * context!
552          */
553         MemoryContextDelete(vac_context);
554         vac_context = NULL;
555
556         if (anl_context)
557                 MemoryContextDelete(anl_context);
558 }
559
560 /*
561  * Build a list of Oids for each relation to be processed
562  *
563  * The list is built in vac_context so that it will survive across our
564  * per-relation transactions.
565  */
566 static List *
567 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
568 {
569         List       *oid_list = NIL;
570         MemoryContext oldcontext;
571
572         /* List supplied by VACUUM's caller? */
573         if (relids)
574                 return relids;
575
576         if (vacrel)
577         {
578                 /* Process a specific relation */
579                 Oid                     relid;
580
581                 relid = RangeVarGetRelid(vacrel, false);
582
583                 /* Make a relation list entry for this guy */
584                 oldcontext = MemoryContextSwitchTo(vac_context);
585                 oid_list = lappend_oid(oid_list, relid);
586                 MemoryContextSwitchTo(oldcontext);
587         }
588         else
589         {
590                 /* Process all plain relations listed in pg_class */
591                 Relation        pgclass;
592                 HeapScanDesc scan;
593                 HeapTuple       tuple;
594                 ScanKeyData key;
595
596                 ScanKeyInit(&key,
597                                         Anum_pg_class_relkind,
598                                         BTEqualStrategyNumber, F_CHAREQ,
599                                         CharGetDatum(RELKIND_RELATION));
600
601                 pgclass = heap_open(RelationRelationId, AccessShareLock);
602
603                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
604
605                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
606                 {
607                         /* Make a relation list entry for this guy */
608                         oldcontext = MemoryContextSwitchTo(vac_context);
609                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
610                         MemoryContextSwitchTo(oldcontext);
611                 }
612
613                 heap_endscan(scan);
614                 heap_close(pgclass, AccessShareLock);
615         }
616
617         return oid_list;
618 }
619
620 /*
621  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
622  */
623 void
624 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
625                                           TransactionId *oldestXmin,
626                                           TransactionId *freezeLimit)
627 {
628         TransactionId limit;
629
630         *oldestXmin = GetOldestXmin(sharedRel);
631
632         Assert(TransactionIdIsNormal(*oldestXmin));
633
634         if (vacstmt->freeze)
635         {
636                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
637                 limit = *oldestXmin;
638         }
639         else
640         {
641                 /*
642                  * Normal case: freeze cutoff is well in the past, to wit, about
643                  * halfway to the wrap horizon
644                  */
645                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
646         }
647
648         /*
649          * Be careful not to generate a "permanent" XID
650          */
651         if (!TransactionIdIsNormal(limit))
652                 limit = FirstNormalTransactionId;
653
654         /*
655          * Ensure sane relationship of limits
656          */
657         if (TransactionIdFollows(limit, *oldestXmin))
658         {
659                 ereport(WARNING,
660                                 (errmsg("oldest xmin is far in the past"),
661                                  errhint("Close open transactions soon to avoid wraparound problems.")));
662                 limit = *oldestXmin;
663         }
664
665         *freezeLimit = limit;
666 }
667
668
669 /*
670  *      vac_update_relstats() -- update statistics for one relation
671  *
672  *              Update the whole-relation statistics that are kept in its pg_class
673  *              row.  There are additional stats that will be updated if we are
674  *              doing ANALYZE, but we always update these stats.  This routine works
675  *              for both index and heap relation entries in pg_class.
676  *
677  *              We violate transaction semantics here by overwriting the rel's
678  *              existing pg_class tuple with the new values.  This is reasonably
679  *              safe since the new values are correct whether or not this transaction
680  *              commits.  The reason for this is that if we updated these tuples in
681  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
682  *              by the time we got done with a vacuum cycle, most of the tuples in
683  *              pg_class would've been obsoleted.  Of course, this only works for
684  *              fixed-size never-null columns, but these are.
685  *
686  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
687  *              ANALYZE.
688  */
689 void
690 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
691                                         bool hasindex)
692 {
693         Relation        rd;
694         HeapTuple       ctup;
695         Form_pg_class pgcform;
696         bool            dirty;
697
698         rd = heap_open(RelationRelationId, RowExclusiveLock);
699
700         /* Fetch a copy of the tuple to scribble on */
701         ctup = SearchSysCacheCopy(RELOID,
702                                                           ObjectIdGetDatum(relid),
703                                                           0, 0, 0);
704         if (!HeapTupleIsValid(ctup))
705                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
706                          relid);
707         pgcform = (Form_pg_class) GETSTRUCT(ctup);
708
709         /* Apply required updates, if any, to copied tuple */
710
711         dirty = false;
712         if (pgcform->relpages != (int32) num_pages)
713         {
714                 pgcform->relpages = (int32) num_pages;
715                 dirty = true;
716         }
717         if (pgcform->reltuples != (float4) num_tuples)
718         {
719                 pgcform->reltuples = (float4) num_tuples;
720                 dirty = true;
721         }
722         if (pgcform->relhasindex != hasindex)
723         {
724                 pgcform->relhasindex = hasindex;
725                 dirty = true;
726         }
727         /*
728          * If we have discovered that there are no indexes, then there's no
729          * primary key either.  This could be done more thoroughly...
730          */
731         if (!hasindex)
732         {
733                 if (pgcform->relhaspkey)
734                 {
735                         pgcform->relhaspkey = false;
736                         dirty = true;
737                 }
738         }
739
740         /*
741          * If anything changed, write out the tuple
742          */
743         if (dirty)
744                 heap_inplace_update(rd, ctup);
745
746         heap_close(rd, RowExclusiveLock);
747 }
748
749
750 /*
751  *      vac_update_dbstats() -- update statistics for one database
752  *
753  *              Update the whole-database statistics that are kept in its pg_database
754  *              row, and the flat-file copy of pg_database.
755  *
756  *              We violate transaction semantics here by overwriting the database's
757  *              existing pg_database tuple with the new values.  This is reasonably
758  *              safe since the new values are correct whether or not this transaction
759  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
760  *              behind after a VACUUM.
761  *
762  *              This routine is shared by full and lazy VACUUM.  Note that it is only
763  *              applied after a database-wide VACUUM operation.
764  */
765 static void
766 vac_update_dbstats(Oid dbid,
767                                    TransactionId vacuumXID,
768                                    TransactionId frozenXID)
769 {
770         Relation        relation;
771         HeapTuple       tuple;
772         Form_pg_database dbform;
773
774         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
775
776         /* Fetch a copy of the tuple to scribble on */
777         tuple = SearchSysCacheCopy(DATABASEOID,
778                                                            ObjectIdGetDatum(dbid),
779                                                            0, 0, 0);
780         if (!HeapTupleIsValid(tuple))
781                 elog(ERROR, "could not find tuple for database %u", dbid);
782         dbform = (Form_pg_database) GETSTRUCT(tuple);
783
784         /* overwrite the existing statistics in the tuple */
785         dbform->datvacuumxid = vacuumXID;
786         dbform->datfrozenxid = frozenXID;
787
788         heap_inplace_update(relation, tuple);
789
790         heap_close(relation, RowExclusiveLock);
791
792         /* Mark the flat-file copy of pg_database for update at commit */
793         database_file_update_needed();
794 }
795
796
797 /*
798  *      vac_truncate_clog() -- attempt to truncate the commit log
799  *
800  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
801  *              and use it to truncate the transaction commit log (pg_clog).
802  *              Also update the XID wrap limit point maintained by varsup.c.
803  *
804  *              We also generate a warning if the system-wide oldest datfrozenxid
805  *              seems to be in danger of wrapping around.  This is a long-in-advance
806  *              warning; if we start getting uncomfortably close, GetNewTransactionId
807  *              will generate more-annoying warnings, and ultimately refuse to issue
808  *              any more new XIDs.
809  *
810  *              The passed XIDs are simply the ones I just wrote into my pg_database
811  *              entry.  They're used to initialize the "min" calculations.
812  *
813  *              This routine is shared by full and lazy VACUUM.  Note that it is only
814  *              applied after a database-wide VACUUM operation.
815  */
816 static void
817 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
818 {
819         TransactionId myXID = GetCurrentTransactionId();
820         Relation        relation;
821         HeapScanDesc scan;
822         HeapTuple       tuple;
823         int32           age;
824         NameData        oldest_datname;
825         bool            vacuumAlreadyWrapped = false;
826         bool            frozenAlreadyWrapped = false;
827
828         /* init oldest_datname to sync with my frozenXID */
829         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
830
831         /*
832          * Note: the "already wrapped" cases should now be impossible due to the
833          * defenses in GetNewTransactionId, but we keep them anyway.
834          */
835         relation = heap_open(DatabaseRelationId, AccessShareLock);
836
837         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
838
839         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
840         {
841                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
842
843                 /* Ignore non-connectable databases (eg, template0) */
844                 /* It's assumed that these have been frozen correctly */
845                 if (!dbform->datallowconn)
846                         continue;
847
848                 if (TransactionIdIsNormal(dbform->datvacuumxid))
849                 {
850                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
851                                 vacuumAlreadyWrapped = true;
852                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
853                                 vacuumXID = dbform->datvacuumxid;
854                 }
855                 if (TransactionIdIsNormal(dbform->datfrozenxid))
856                 {
857                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
858                                 frozenAlreadyWrapped = true;
859                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
860                         {
861                                 frozenXID = dbform->datfrozenxid;
862                                 namecpy(&oldest_datname, &dbform->datname);
863                         }
864                 }
865         }
866
867         heap_endscan(scan);
868
869         heap_close(relation, AccessShareLock);
870
871         /*
872          * Do not truncate CLOG if we seem to have suffered wraparound already;
873          * the computed minimum XID might be bogus.
874          */
875         if (vacuumAlreadyWrapped)
876         {
877                 ereport(WARNING,
878                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
879                                  errdetail("You may have already suffered transaction-wraparound data loss.")));
880                 return;
881         }
882
883         /* Truncate CLOG to the oldest vacuumxid */
884         TruncateCLOG(vacuumXID);
885
886         /*
887          * Do not update varsup.c if we seem to have suffered wraparound already;
888          * the computed XID might be bogus.
889          */
890         if (frozenAlreadyWrapped)
891         {
892                 ereport(WARNING,
893                                 (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
894                                  errhint("Better vacuum them soon, or you may have a wraparound failure.")));
895                 return;
896         }
897
898         /* Update the wrap limit for GetNewTransactionId */
899         SetTransactionIdLimit(frozenXID, &oldest_datname);
900
901         /* Give warning about impending wraparound problems */
902         age = (int32) (myXID - frozenXID);
903         if (age > (int32) ((MaxTransactionId >> 3) * 3))
904                 ereport(WARNING,
905                    (errmsg("database \"%s\" must be vacuumed within %u transactions",
906                                    NameStr(oldest_datname),
907                                    (MaxTransactionId >> 1) - age),
908                         errhint("To avoid a database shutdown, execute a full-database VACUUM in \"%s\".",
909                                         NameStr(oldest_datname))));
910 }
911
912
913 /****************************************************************************
914  *                                                                                                                                                      *
915  *                      Code common to both flavors of VACUUM                                                   *
916  *                                                                                                                                                      *
917  ****************************************************************************
918  */
919
920
921 /*
922  *      vacuum_rel() -- vacuum one heap relation
923  *
924  *              Returns TRUE if we actually processed the relation (or can ignore it
925  *              for some reason), FALSE if we failed to process it due to permissions
926  *              or other reasons.  (A FALSE result really means that some data
927  *              may have been left unvacuumed, so we can't update XID stats.)
928  *
929  *              Doing one heap at a time incurs extra overhead, since we need to
930  *              check that the heap exists again just before we vacuum it.      The
931  *              reason that we do this is so that vacuuming can be spread across
932  *              many small transactions.  Otherwise, two-phase locking would require
933  *              us to lock the entire database during one pass of the vacuum cleaner.
934  *
935  *              At entry and exit, we are not inside a transaction.
936  */
937 static bool
938 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
939 {
940         LOCKMODE        lmode;
941         Relation        onerel;
942         LockRelId       onerelid;
943         Oid                     toast_relid;
944         bool            result;
945
946         /* Begin a transaction for vacuuming this relation */
947         StartTransactionCommand();
948         /* functions in indexes may want a snapshot set */
949         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
950
951         /*
952          * Tell the cache replacement strategy that vacuum is causing all
953          * following IO
954          */
955         StrategyHintVacuum(true);
956
957         /*
958          * Check for user-requested abort.      Note we want this to be inside a
959          * transaction, so xact.c doesn't issue useless WARNING.
960          */
961         CHECK_FOR_INTERRUPTS();
962
963         /*
964          * Race condition -- if the pg_class tuple has gone away since the last
965          * time we saw it, we don't need to vacuum it.
966          */
967         if (!SearchSysCacheExists(RELOID,
968                                                           ObjectIdGetDatum(relid),
969                                                           0, 0, 0))
970         {
971                 StrategyHintVacuum(false);
972                 CommitTransactionCommand();
973                 return true;                    /* okay 'cause no data there */
974         }
975
976         /*
977          * Determine the type of lock we want --- hard exclusive lock for a FULL
978          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
979          * way, we can be sure that no other backend is vacuuming the same table.
980          */
981         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
982
983         /*
984          * Open the class, get an appropriate lock on it, and check permissions.
985          *
986          * We allow the user to vacuum a table if he is superuser, the table
987          * owner, or the database owner (but in the latter case, only if it's not
988          * a shared relation).  pg_class_ownercheck includes the superuser case.
989          *
990          * Note we choose to treat permissions failure as a WARNING and keep
991          * trying to vacuum the rest of the DB --- is this appropriate?
992          */
993         onerel = relation_open(relid, lmode);
994
995         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
996                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
997         {
998                 ereport(WARNING,
999                                 (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1000                                                 RelationGetRelationName(onerel))));
1001                 relation_close(onerel, lmode);
1002                 StrategyHintVacuum(false);
1003                 CommitTransactionCommand();
1004                 return false;
1005         }
1006
1007         /*
1008          * Check that it's a plain table; we used to do this in get_rel_oids() but
1009          * seems safer to check after we've locked the relation.
1010          */
1011         if (onerel->rd_rel->relkind != expected_relkind)
1012         {
1013                 ereport(WARNING,
1014                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1015                                                 RelationGetRelationName(onerel))));
1016                 relation_close(onerel, lmode);
1017                 StrategyHintVacuum(false);
1018                 CommitTransactionCommand();
1019                 return false;
1020         }
1021
1022         /*
1023          * Silently ignore tables that are temp tables of other backends ---
1024          * trying to vacuum these will lead to great unhappiness, since their
1025          * contents are probably not up-to-date on disk.  (We don't throw a
1026          * warning here; it would just lead to chatter during a database-wide
1027          * VACUUM.)
1028          */
1029         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1030         {
1031                 relation_close(onerel, lmode);
1032                 StrategyHintVacuum(false);
1033                 CommitTransactionCommand();
1034                 return true;                    /* assume no long-lived data in temp tables */
1035         }
1036
1037         /*
1038          * Get a session-level lock too. This will protect our access to the
1039          * relation across multiple transactions, so that we can vacuum the
1040          * relation's TOAST table (if any) secure in the knowledge that no one is
1041          * deleting the parent relation.
1042          *
1043          * NOTE: this cannot block, even if someone else is waiting for access,
1044          * because the lock manager knows that both lock requests are from the
1045          * same process.
1046          */
1047         onerelid = onerel->rd_lockInfo.lockRelId;
1048         LockRelationForSession(&onerelid, onerel->rd_istemp, lmode);
1049
1050         /*
1051          * Remember the relation's TOAST relation for later
1052          */
1053         toast_relid = onerel->rd_rel->reltoastrelid;
1054
1055         /*
1056          * Do the actual work --- either FULL or "lazy" vacuum
1057          */
1058         if (vacstmt->full)
1059                 full_vacuum_rel(onerel, vacstmt);
1060         else
1061                 lazy_vacuum_rel(onerel, vacstmt);
1062
1063         result = true;                          /* did the vacuum */
1064
1065         /* all done with this class, but hold lock until commit */
1066         relation_close(onerel, NoLock);
1067
1068         /*
1069          * Complete the transaction and free all temporary memory used.
1070          */
1071         StrategyHintVacuum(false);
1072         CommitTransactionCommand();
1073
1074         /*
1075          * If the relation has a secondary toast rel, vacuum that too while we
1076          * still hold the session lock on the master table.  Note however that
1077          * "analyze" will not get done on the toast table.      This is good, because
1078          * the toaster always uses hardcoded index access and statistics are
1079          * totally unimportant for toast relations.
1080          */
1081         if (toast_relid != InvalidOid)
1082         {
1083                 if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
1084                         result = false;         /* failed to vacuum the TOAST table? */
1085         }
1086
1087         /*
1088          * Now release the session-level lock on the master table.
1089          */
1090         UnlockRelationForSession(&onerelid, lmode);
1091
1092         return result;
1093 }
1094
1095
1096 /****************************************************************************
1097  *                                                                                                                                                      *
1098  *                      Code for VACUUM FULL (only)                                                                             *
1099  *                                                                                                                                                      *
1100  ****************************************************************************
1101  */
1102
1103
1104 /*
1105  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1106  *
1107  *              This routine vacuums a single heap, cleans out its indexes, and
1108  *              updates its num_pages and num_tuples statistics.
1109  *
1110  *              At entry, we have already established a transaction and opened
1111  *              and locked the relation.
1112  */
1113 static void
1114 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1115 {
1116         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1117                                                                                  * clean indexes */
1118         VacPageListData fraged_pages;           /* List of pages with space enough for
1119                                                                                  * re-using */
1120         Relation   *Irel;
1121         int                     nindexes,
1122                                 i;
1123         VRelStats  *vacrelstats;
1124
1125         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
1126                                                   &OldestXmin, &FreezeLimit);
1127
1128         /*
1129          * Set up statistics-gathering machinery.
1130          */
1131         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1132         vacrelstats->rel_pages = 0;
1133         vacrelstats->rel_tuples = 0;
1134         vacrelstats->hasindex = false;
1135
1136         /* scan the heap */
1137         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1138         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1139
1140         /* Now open all indexes of the relation */
1141         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1142         if (nindexes > 0)
1143                 vacrelstats->hasindex = true;
1144
1145         /* Clean/scan index relation(s) */
1146         if (Irel != NULL)
1147         {
1148                 if (vacuum_pages.num_pages > 0)
1149                 {
1150                         for (i = 0; i < nindexes; i++)
1151                                 vacuum_index(&vacuum_pages, Irel[i],
1152                                                          vacrelstats->rel_tuples, 0);
1153                 }
1154                 else
1155                 {
1156                         /* just scan indexes to update statistic */
1157                         for (i = 0; i < nindexes; i++)
1158                                 scan_index(Irel[i], vacrelstats->rel_tuples);
1159                 }
1160         }
1161
1162         if (fraged_pages.num_pages > 0)
1163         {
1164                 /* Try to shrink heap */
1165                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1166                                         nindexes, Irel);
1167                 vac_close_indexes(nindexes, Irel, NoLock);
1168         }
1169         else
1170         {
1171                 vac_close_indexes(nindexes, Irel, NoLock);
1172                 if (vacuum_pages.num_pages > 0)
1173                 {
1174                         /* Clean pages from vacuum_pages list */
1175                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1176                 }
1177         }
1178
1179         /* update shared free space map with final free space info */
1180         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1181
1182         /* update statistics in pg_class */
1183         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1184                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
1185
1186         /* report results to the stats collector, too */
1187         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1188                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1189 }
1190
1191
1192 /*
1193  *      scan_heap() -- scan an open heap relation
1194  *
1195  *              This routine sets commit status bits, constructs vacuum_pages (list
1196  *              of pages we need to compact free space on and/or clean indexes of
1197  *              deleted tuples), constructs fraged_pages (list of pages with free
1198  *              space that tuples could be moved into), and calculates statistics
1199  *              on the number of live tuples in the heap.
1200  */
1201 static void
1202 scan_heap(VRelStats *vacrelstats, Relation onerel,
1203                   VacPageList vacuum_pages, VacPageList fraged_pages)
1204 {
1205         BlockNumber nblocks,
1206                                 blkno;
1207         HeapTupleData tuple;
1208         char       *relname;
1209         VacPage         vacpage;
1210         BlockNumber empty_pages,
1211                                 empty_end_pages;
1212         double          num_tuples,
1213                                 tups_vacuumed,
1214                                 nkeep,
1215                                 nunused;
1216         double          free_space,
1217                                 usable_free_space;
1218         Size            min_tlen = MaxTupleSize;
1219         Size            max_tlen = 0;
1220         bool            do_shrinking = true;
1221         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1222         int                     num_vtlinks = 0;
1223         int                     free_vtlinks = 100;
1224         PGRUsage        ru0;
1225
1226         pg_rusage_init(&ru0);
1227
1228         relname = RelationGetRelationName(onerel);
1229         ereport(elevel,
1230                         (errmsg("vacuuming \"%s.%s\"",
1231                                         get_namespace_name(RelationGetNamespace(onerel)),
1232                                         relname)));
1233
1234         empty_pages = empty_end_pages = 0;
1235         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1236         free_space = 0;
1237
1238         nblocks = RelationGetNumberOfBlocks(onerel);
1239
1240         /*
1241          * We initially create each VacPage item in a maximal-sized workspace,
1242          * then copy the workspace into a just-large-enough copy.
1243          */
1244         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1245
1246         for (blkno = 0; blkno < nblocks; blkno++)
1247         {
1248                 Page            page,
1249                                         tempPage = NULL;
1250                 bool            do_reap,
1251                                         do_frag;
1252                 Buffer          buf;
1253                 OffsetNumber offnum,
1254                                         maxoff;
1255                 bool            pgchanged,
1256                                         notup;
1257
1258                 vacuum_delay_point();
1259
1260                 buf = ReadBuffer(onerel, blkno);
1261                 page = BufferGetPage(buf);
1262
1263                 /*
1264                  * Since we are holding exclusive lock on the relation, no other
1265                  * backend can be accessing the page; however it is possible that the
1266                  * background writer will try to write the page if it's already marked
1267                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1268                  * must take exclusive buffer lock wherever we potentially modify
1269                  * pages.
1270                  */
1271                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1272
1273                 vacpage->blkno = blkno;
1274                 vacpage->offsets_used = 0;
1275                 vacpage->offsets_free = 0;
1276
1277                 if (PageIsNew(page))
1278                 {
1279                         VacPage         vacpagecopy;
1280
1281                         ereport(WARNING,
1282                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1283                                            relname, blkno)));
1284                         PageInit(page, BufferGetPageSize(buf), 0);
1285                         MarkBufferDirty(buf);
1286                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1287                         free_space += vacpage->free;
1288                         empty_pages++;
1289                         empty_end_pages++;
1290                         vacpagecopy = copy_vac_page(vacpage);
1291                         vpage_insert(vacuum_pages, vacpagecopy);
1292                         vpage_insert(fraged_pages, vacpagecopy);
1293                         UnlockReleaseBuffer(buf);
1294                         continue;
1295                 }
1296
1297                 if (PageIsEmpty(page))
1298                 {
1299                         VacPage         vacpagecopy;
1300
1301                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1302                         free_space += vacpage->free;
1303                         empty_pages++;
1304                         empty_end_pages++;
1305                         vacpagecopy = copy_vac_page(vacpage);
1306                         vpage_insert(vacuum_pages, vacpagecopy);
1307                         vpage_insert(fraged_pages, vacpagecopy);
1308                         UnlockReleaseBuffer(buf);
1309                         continue;
1310                 }
1311
1312                 pgchanged = false;
1313                 notup = true;
1314                 maxoff = PageGetMaxOffsetNumber(page);
1315                 for (offnum = FirstOffsetNumber;
1316                          offnum <= maxoff;
1317                          offnum = OffsetNumberNext(offnum))
1318                 {
1319                         ItemId          itemid = PageGetItemId(page, offnum);
1320                         bool            tupgone = false;
1321
1322                         /*
1323                          * Collect un-used items too - it's possible to have indexes
1324                          * pointing here after crash.
1325                          */
1326                         if (!ItemIdIsUsed(itemid))
1327                         {
1328                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1329                                 nunused += 1;
1330                                 continue;
1331                         }
1332
1333                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1334                         tuple.t_len = ItemIdGetLength(itemid);
1335                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1336
1337                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1338                         {
1339                                 case HEAPTUPLE_DEAD:
1340                                         tupgone = true;         /* we can delete the tuple */
1341                                         break;
1342                                 case HEAPTUPLE_LIVE:
1343
1344                                         /*
1345                                          * Tuple is good.  Consider whether to replace its xmin
1346                                          * value with FrozenTransactionId.
1347                                          */
1348                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1349                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1350                                                                                           FreezeLimit))
1351                                         {
1352                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1353                                                 /* infomask should be okay already */
1354                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1355                                                 pgchanged = true;
1356                                         }
1357
1358                                         /*
1359                                          * Other checks...
1360                                          */
1361                                         if (onerel->rd_rel->relhasoids &&
1362                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1363                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1364                                                          relname, blkno, offnum);
1365                                         break;
1366                                 case HEAPTUPLE_RECENTLY_DEAD:
1367
1368                                         /*
1369                                          * If tuple is recently deleted then we must not remove it
1370                                          * from relation.
1371                                          */
1372                                         nkeep += 1;
1373
1374                                         /*
1375                                          * If we do shrinking and this tuple is updated one then
1376                                          * remember it to construct updated tuple dependencies.
1377                                          */
1378                                         if (do_shrinking &&
1379                                                 !(ItemPointerEquals(&(tuple.t_self),
1380                                                                                         &(tuple.t_data->t_ctid))))
1381                                         {
1382                                                 if (free_vtlinks == 0)
1383                                                 {
1384                                                         free_vtlinks = 1000;
1385                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1386                                                                                            (free_vtlinks + num_vtlinks) *
1387                                                                                                          sizeof(VTupleLinkData));
1388                                                 }
1389                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1390                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1391                                                 free_vtlinks--;
1392                                                 num_vtlinks++;
1393                                         }
1394                                         break;
1395                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1396
1397                                         /*
1398                                          * This should not happen, since we hold exclusive lock on
1399                                          * the relation; shouldn't we raise an error? (Actually,
1400                                          * it can happen in system catalogs, since we tend to
1401                                          * release write lock before commit there.)
1402                                          */
1403                                         ereport(NOTICE,
1404                                                         (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
1405                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
1406                                         do_shrinking = false;
1407                                         break;
1408                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1409
1410                                         /*
1411                                          * This should not happen, since we hold exclusive lock on
1412                                          * the relation; shouldn't we raise an error? (Actually,
1413                                          * it can happen in system catalogs, since we tend to
1414                                          * release write lock before commit there.)
1415                                          */
1416                                         ereport(NOTICE,
1417                                                         (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
1418                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
1419                                         do_shrinking = false;
1420                                         break;
1421                                 default:
1422                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1423                                         break;
1424                         }
1425
1426                         if (tupgone)
1427                         {
1428                                 ItemId          lpp;
1429
1430                                 /*
1431                                  * Here we are building a temporary copy of the page with dead
1432                                  * tuples removed.      Below we will apply
1433                                  * PageRepairFragmentation to the copy, so that we can
1434                                  * determine how much space will be available after removal of
1435                                  * dead tuples.  But note we are NOT changing the real page
1436                                  * yet...
1437                                  */
1438                                 if (tempPage == NULL)
1439                                 {
1440                                         Size            pageSize;
1441
1442                                         pageSize = PageGetPageSize(page);
1443                                         tempPage = (Page) palloc(pageSize);
1444                                         memcpy(tempPage, page, pageSize);
1445                                 }
1446
1447                                 /* mark it unused on the temp page */
1448                                 lpp = PageGetItemId(tempPage, offnum);
1449                                 lpp->lp_flags &= ~LP_USED;
1450
1451                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1452                                 tups_vacuumed += 1;
1453                         }
1454                         else
1455                         {
1456                                 num_tuples += 1;
1457                                 notup = false;
1458                                 if (tuple.t_len < min_tlen)
1459                                         min_tlen = tuple.t_len;
1460                                 if (tuple.t_len > max_tlen)
1461                                         max_tlen = tuple.t_len;
1462                         }
1463                 }                                               /* scan along page */
1464
1465                 if (tempPage != NULL)
1466                 {
1467                         /* Some tuples are removable; figure free space after removal */
1468                         PageRepairFragmentation(tempPage, NULL);
1469                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1470                         pfree(tempPage);
1471                         do_reap = true;
1472                 }
1473                 else
1474                 {
1475                         /* Just use current available space */
1476                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1477                         /* Need to reap the page if it has ~LP_USED line pointers */
1478                         do_reap = (vacpage->offsets_free > 0);
1479                 }
1480
1481                 free_space += vacpage->free;
1482
1483                 /*
1484                  * Add the page to fraged_pages if it has a useful amount of free
1485                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1486                  * don't know that accurately near the start of the relation, so add
1487                  * pages unconditionally if they have >= BLCKSZ/10 free space.
1488                  */
1489                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1490
1491                 if (do_reap || do_frag)
1492                 {
1493                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1494
1495                         if (do_reap)
1496                                 vpage_insert(vacuum_pages, vacpagecopy);
1497                         if (do_frag)
1498                                 vpage_insert(fraged_pages, vacpagecopy);
1499                 }
1500
1501                 /*
1502                  * Include the page in empty_end_pages if it will be empty after
1503                  * vacuuming; this is to keep us from using it as a move destination.
1504                  */
1505                 if (notup)
1506                 {
1507                         empty_pages++;
1508                         empty_end_pages++;
1509                 }
1510                 else
1511                         empty_end_pages = 0;
1512
1513                 if (pgchanged)
1514                         MarkBufferDirty(buf);
1515                 UnlockReleaseBuffer(buf);
1516         }
1517
1518         pfree(vacpage);
1519
1520         /* save stats in the rel list for use later */
1521         vacrelstats->rel_tuples = num_tuples;
1522         vacrelstats->rel_pages = nblocks;
1523         if (num_tuples == 0)
1524                 min_tlen = max_tlen = 0;
1525         vacrelstats->min_tlen = min_tlen;
1526         vacrelstats->max_tlen = max_tlen;
1527
1528         vacuum_pages->empty_end_pages = empty_end_pages;
1529         fraged_pages->empty_end_pages = empty_end_pages;
1530
1531         /*
1532          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1533          * remove any "empty" end-pages from the list, and compute usable free
1534          * space = free space in remaining pages.
1535          */
1536         if (do_shrinking)
1537         {
1538                 int                     i;
1539
1540                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1541                 fraged_pages->num_pages -= empty_end_pages;
1542                 usable_free_space = 0;
1543                 for (i = 0; i < fraged_pages->num_pages; i++)
1544                         usable_free_space += fraged_pages->pagedesc[i]->free;
1545         }
1546         else
1547         {
1548                 fraged_pages->num_pages = 0;
1549                 usable_free_space = 0;
1550         }
1551
1552         /* don't bother to save vtlinks if we will not call repair_frag */
1553         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1554         {
1555                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1556                           vac_cmp_vtlinks);
1557                 vacrelstats->vtlinks = vtlinks;
1558                 vacrelstats->num_vtlinks = num_vtlinks;
1559         }
1560         else
1561         {
1562                 vacrelstats->vtlinks = NULL;
1563                 vacrelstats->num_vtlinks = 0;
1564                 pfree(vtlinks);
1565         }
1566
1567         ereport(elevel,
1568                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1569                                         RelationGetRelationName(onerel),
1570                                         tups_vacuumed, num_tuples, nblocks),
1571                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1572                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1573                                            "There were %.0f unused item pointers.\n"
1574            "Total free space (including removable row versions) is %.0f bytes.\n"
1575                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1576          "%u pages containing %.0f free bytes are potential move destinations.\n"
1577                                            "%s.",
1578                                            nkeep,
1579                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1580                                            nunused,
1581                                            free_space,
1582                                            empty_pages, empty_end_pages,
1583                                            fraged_pages->num_pages, usable_free_space,
1584                                            pg_rusage_show(&ru0))));
1585 }
1586
1587
1588 /*
1589  *      repair_frag() -- try to repair relation's fragmentation
1590  *
1591  *              This routine marks dead tuples as unused and tries re-use dead space
1592  *              by moving tuples (and inserting indexes if needed). It constructs
1593  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1594  *              for them after committing (in hack-manner - without losing locks
1595  *              and freeing memory!) current transaction. It truncates relation
1596  *              if some end-blocks are gone away.
1597  */
1598 static void
1599 repair_frag(VRelStats *vacrelstats, Relation onerel,
1600                         VacPageList vacuum_pages, VacPageList fraged_pages,
1601                         int nindexes, Relation *Irel)
1602 {
1603         TransactionId myXID = GetCurrentTransactionId();
1604         Buffer          dst_buffer = InvalidBuffer;
1605         BlockNumber nblocks,
1606                                 blkno;
1607         BlockNumber last_move_dest_block = 0,
1608                                 last_vacuum_block;
1609         Page            dst_page = NULL;
1610         ExecContextData ec;
1611         VacPageListData Nvacpagelist;
1612         VacPage         dst_vacpage = NULL,
1613                                 last_vacuum_page,
1614                                 vacpage,
1615                            *curpage;
1616         int                     i;
1617         int                     num_moved = 0,
1618                                 num_fraged_pages,
1619                                 vacuumed_pages;
1620         int                     keep_tuples = 0;
1621         PGRUsage        ru0;
1622
1623         pg_rusage_init(&ru0);
1624
1625         ExecContext_Init(&ec, onerel);
1626
1627         Nvacpagelist.num_pages = 0;
1628         num_fraged_pages = fraged_pages->num_pages;
1629         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1630         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1631         if (vacuumed_pages > 0)
1632         {
1633                 /* get last reaped page from vacuum_pages */
1634                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1635                 last_vacuum_block = last_vacuum_page->blkno;
1636         }
1637         else
1638         {
1639                 last_vacuum_page = NULL;
1640                 last_vacuum_block = InvalidBlockNumber;
1641         }
1642
1643         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1644         vacpage->offsets_used = vacpage->offsets_free = 0;
1645
1646         /*
1647          * Scan pages backwards from the last nonempty page, trying to move tuples
1648          * down to lower pages.  Quit when we reach a page that we have moved any
1649          * tuples onto, or the first page if we haven't moved anything, or when we
1650          * find a page we cannot completely empty (this last condition is handled
1651          * by "break" statements within the loop).
1652          *
1653          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1654          * in order by blkno.
1655          */
1656         nblocks = vacrelstats->rel_pages;
1657         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1658                  blkno > last_move_dest_block;
1659                  blkno--)
1660         {
1661                 Buffer          buf;
1662                 Page            page;
1663                 OffsetNumber offnum,
1664                                         maxoff;
1665                 bool            isempty,
1666                                         chain_tuple_moved;
1667
1668                 vacuum_delay_point();
1669
1670                 /*
1671                  * Forget fraged_pages pages at or after this one; they're no longer
1672                  * useful as move targets, since we only want to move down. Note that
1673                  * since we stop the outer loop at last_move_dest_block, pages removed
1674                  * here cannot have had anything moved onto them already.
1675                  *
1676                  * Also note that we don't change the stored fraged_pages list, only
1677                  * our local variable num_fraged_pages; so the forgotten pages are
1678                  * still available to be loaded into the free space map later.
1679                  */
1680                 while (num_fraged_pages > 0 &&
1681                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1682                 {
1683                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1684                         --num_fraged_pages;
1685                 }
1686
1687                 /*
1688                  * Process this page of relation.
1689                  */
1690                 buf = ReadBuffer(onerel, blkno);
1691                 page = BufferGetPage(buf);
1692
1693                 vacpage->offsets_free = 0;
1694
1695                 isempty = PageIsEmpty(page);
1696
1697                 /* Is the page in the vacuum_pages list? */
1698                 if (blkno == last_vacuum_block)
1699                 {
1700                         if (last_vacuum_page->offsets_free > 0)
1701                         {
1702                                 /* there are dead tuples on this page - clean them */
1703                                 Assert(!isempty);
1704                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1705                                 vacuum_page(onerel, buf, last_vacuum_page);
1706                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1707                         }
1708                         else
1709                                 Assert(isempty);
1710                         --vacuumed_pages;
1711                         if (vacuumed_pages > 0)
1712                         {
1713                                 /* get prev reaped page from vacuum_pages */
1714                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1715                                 last_vacuum_block = last_vacuum_page->blkno;
1716                         }
1717                         else
1718                         {
1719                                 last_vacuum_page = NULL;
1720                                 last_vacuum_block = InvalidBlockNumber;
1721                         }
1722                         if (isempty)
1723                         {
1724                                 ReleaseBuffer(buf);
1725                                 continue;
1726                         }
1727                 }
1728                 else
1729                         Assert(!isempty);
1730
1731                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1732                                                                                  * this page, yet */
1733                 vacpage->blkno = blkno;
1734                 maxoff = PageGetMaxOffsetNumber(page);
1735                 for (offnum = FirstOffsetNumber;
1736                          offnum <= maxoff;
1737                          offnum = OffsetNumberNext(offnum))
1738                 {
1739                         Size            tuple_len;
1740                         HeapTupleData tuple;
1741                         ItemId          itemid = PageGetItemId(page, offnum);
1742
1743                         if (!ItemIdIsUsed(itemid))
1744                                 continue;
1745
1746                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1747                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1748                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1749
1750                         /* ---
1751                          * VACUUM FULL has an exclusive lock on the relation.  So
1752                          * normally no other transaction can have pending INSERTs or
1753                          * DELETEs in this relation.  A tuple is either:
1754                          *              (a) a tuple in a system catalog, inserted or deleted
1755                          *                      by a not yet committed transaction
1756                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1757                          *                      is visible to all active transactions)
1758                          *              (c) inserted by a committed xact (XMIN_COMMITTED)
1759                          *              (d) moved by the currently running VACUUM.
1760                          *              (e) deleted (XMAX_COMMITTED) but at least one active
1761                          *                      transaction does not see the deleting transaction
1762                          * In case (a) we wouldn't be in repair_frag() at all.
1763                          * In case (b) we cannot be here, because scan_heap() has
1764                          * already marked the item as unused, see continue above. Case
1765                          * (c) is what normally is to be expected. Case (d) is only
1766                          * possible, if a whole tuple chain has been moved while
1767                          * processing this or a higher numbered block.
1768                          * ---
1769                          */
1770                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1771                         {
1772                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1773                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1774                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
1775                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
1776
1777                                 /*
1778                                  * MOVED_OFF by another VACUUM would have caused the
1779                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
1780                                  */
1781                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1782                                         elog(ERROR, "invalid XVAC in tuple header");
1783
1784                                 /*
1785                                  * If this (chain) tuple is moved by me already then I have to
1786                                  * check is it in vacpage or not - i.e. is it moved while
1787                                  * cleaning this page or some previous one.
1788                                  */
1789
1790                                 /* Can't we Assert(keep_tuples > 0) here? */
1791                                 if (keep_tuples == 0)
1792                                         continue;
1793                                 if (chain_tuple_moved)
1794                                 {
1795                                         /* some chains were moved while cleaning this page */
1796                                         Assert(vacpage->offsets_free > 0);
1797                                         for (i = 0; i < vacpage->offsets_free; i++)
1798                                         {
1799                                                 if (vacpage->offsets[i] == offnum)
1800                                                         break;
1801                                         }
1802                                         if (i >= vacpage->offsets_free)         /* not found */
1803                                         {
1804                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1805                                                 keep_tuples--;
1806                                         }
1807                                 }
1808                                 else
1809                                 {
1810                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1811                                         keep_tuples--;
1812                                 }
1813                                 continue;
1814                         }
1815
1816                         /*
1817                          * If this tuple is in a chain of tuples created in updates by
1818                          * "recent" transactions then we have to move the whole chain of
1819                          * tuples to other places, so that we can write new t_ctid links
1820                          * that preserve the chain relationship.
1821                          *
1822                          * This test is complicated.  Read it as "if tuple is a recently
1823                          * created updated version, OR if it is an obsoleted version". (In
1824                          * the second half of the test, we needn't make any check on XMAX
1825                          * --- it must be recently obsoleted, else scan_heap would have
1826                          * deemed it removable.)
1827                          *
1828                          * NOTE: this test is not 100% accurate: it is possible for a
1829                          * tuple to be an updated one with recent xmin, and yet not match
1830                          * any new_tid entry in the vtlinks list.  Presumably there was
1831                          * once a parent tuple with xmax matching the xmin, but it's
1832                          * possible that that tuple has been removed --- for example, if
1833                          * it had xmin = xmax and wasn't itself an updated version, then
1834                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
1835                          * xmin xact completes.
1836                          *
1837                          * To be on the safe side, we abandon the repair_frag process if
1838                          * we cannot find the parent tuple in vtlinks.  This may be overly
1839                          * conservative; AFAICS it would be safe to move the chain.
1840                          */
1841                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1842                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1843                                                                                 OldestXmin)) ||
1844                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1845                                                                                            HEAP_IS_LOCKED)) &&
1846                                  !(ItemPointerEquals(&(tuple.t_self),
1847                                                                          &(tuple.t_data->t_ctid)))))
1848                         {
1849                                 Buffer          Cbuf = buf;
1850                                 bool            freeCbuf = false;
1851                                 bool            chain_move_failed = false;
1852                                 ItemPointerData Ctid;
1853                                 HeapTupleData tp = tuple;
1854                                 Size            tlen = tuple_len;
1855                                 VTupleMove      vtmove;
1856                                 int                     num_vtmove;
1857                                 int                     free_vtmove;
1858                                 VacPage         to_vacpage = NULL;
1859                                 int                     to_item = 0;
1860                                 int                     ti;
1861
1862                                 if (dst_buffer != InvalidBuffer)
1863                                 {
1864                                         ReleaseBuffer(dst_buffer);
1865                                         dst_buffer = InvalidBuffer;
1866                                 }
1867
1868                                 /* Quick exit if we have no vtlinks to search in */
1869                                 if (vacrelstats->vtlinks == NULL)
1870                                 {
1871                                         elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1872                                         break;          /* out of walk-along-page loop */
1873                                 }
1874
1875                                 /*
1876                                  * If this tuple is in the begin/middle of the chain then we
1877                                  * have to move to the end of chain.  As with any t_ctid
1878                                  * chase, we have to verify that each new tuple is really the
1879                                  * descendant of the tuple we came from.
1880                                  */
1881                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1882                                                                                                   HEAP_IS_LOCKED)) &&
1883                                            !(ItemPointerEquals(&(tp.t_self),
1884                                                                                    &(tp.t_data->t_ctid))))
1885                                 {
1886                                         ItemPointerData nextTid;
1887                                         TransactionId priorXmax;
1888                                         Buffer          nextBuf;
1889                                         Page            nextPage;
1890                                         OffsetNumber nextOffnum;
1891                                         ItemId          nextItemid;
1892                                         HeapTupleHeader nextTdata;
1893
1894                                         nextTid = tp.t_data->t_ctid;
1895                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
1896                                         /* assume block# is OK (see heap_fetch comments) */
1897                                         nextBuf = ReadBuffer(onerel,
1898                                                                                  ItemPointerGetBlockNumber(&nextTid));
1899                                         nextPage = BufferGetPage(nextBuf);
1900                                         /* If bogus or unused slot, assume tp is end of chain */
1901                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
1902                                         if (nextOffnum < FirstOffsetNumber ||
1903                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
1904                                         {
1905                                                 ReleaseBuffer(nextBuf);
1906                                                 break;
1907                                         }
1908                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
1909                                         if (!ItemIdIsUsed(nextItemid))
1910                                         {
1911                                                 ReleaseBuffer(nextBuf);
1912                                                 break;
1913                                         }
1914                                         /* if not matching XMIN, assume tp is end of chain */
1915                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
1916                                                                                                                           nextItemid);
1917                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
1918                                                                                          priorXmax))
1919                                         {
1920                                                 ReleaseBuffer(nextBuf);
1921                                                 break;
1922                                         }
1923                                         /* OK, switch our attention to the next tuple in chain */
1924                                         tp.t_data = nextTdata;
1925                                         tp.t_self = nextTid;
1926                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
1927                                         if (freeCbuf)
1928                                                 ReleaseBuffer(Cbuf);
1929                                         Cbuf = nextBuf;
1930                                         freeCbuf = true;
1931                                 }
1932
1933                                 /* Set up workspace for planning the chain move */
1934                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1935                                 num_vtmove = 0;
1936                                 free_vtmove = 100;
1937
1938                                 /*
1939                                  * Now, walk backwards up the chain (towards older tuples) and
1940                                  * check if all items in chain can be moved.  We record all
1941                                  * the moves that need to be made in the vtmove array.
1942                                  */
1943                                 for (;;)
1944                                 {
1945                                         Buffer          Pbuf;
1946                                         Page            Ppage;
1947                                         ItemId          Pitemid;
1948                                         HeapTupleHeader PTdata;
1949                                         VTupleLinkData vtld,
1950                                                            *vtlp;
1951
1952                                         /* Identify a target page to move this tuple to */
1953                                         if (to_vacpage == NULL ||
1954                                                 !enough_space(to_vacpage, tlen))
1955                                         {
1956                                                 for (i = 0; i < num_fraged_pages; i++)
1957                                                 {
1958                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1959                                                                 break;
1960                                                 }
1961
1962                                                 if (i == num_fraged_pages)
1963                                                 {
1964                                                         /* can't move item anywhere */
1965                                                         chain_move_failed = true;
1966                                                         break;          /* out of check-all-items loop */
1967                                                 }
1968                                                 to_item = i;
1969                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1970                                         }
1971                                         to_vacpage->free -= MAXALIGN(tlen);
1972                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1973                                                 to_vacpage->free -= sizeof(ItemIdData);
1974                                         (to_vacpage->offsets_used)++;
1975
1976                                         /* Add an entry to vtmove list */
1977                                         if (free_vtmove == 0)
1978                                         {
1979                                                 free_vtmove = 1000;
1980                                                 vtmove = (VTupleMove)
1981                                                         repalloc(vtmove,
1982                                                                          (free_vtmove + num_vtmove) *
1983                                                                          sizeof(VTupleMoveData));
1984                                         }
1985                                         vtmove[num_vtmove].tid = tp.t_self;
1986                                         vtmove[num_vtmove].vacpage = to_vacpage;
1987                                         if (to_vacpage->offsets_used == 1)
1988                                                 vtmove[num_vtmove].cleanVpd = true;
1989                                         else
1990                                                 vtmove[num_vtmove].cleanVpd = false;
1991                                         free_vtmove--;
1992                                         num_vtmove++;
1993
1994                                         /* Done if at beginning of chain */
1995                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1996                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1997                                                                                    OldestXmin))
1998                                                 break;  /* out of check-all-items loop */
1999
2000                                         /* Move to tuple with prior row version */
2001                                         vtld.new_tid = tp.t_self;
2002                                         vtlp = (VTupleLink)
2003                                                 vac_bsearch((void *) &vtld,
2004                                                                         (void *) (vacrelstats->vtlinks),
2005                                                                         vacrelstats->num_vtlinks,
2006                                                                         sizeof(VTupleLinkData),
2007                                                                         vac_cmp_vtlinks);
2008                                         if (vtlp == NULL)
2009                                         {
2010                                                 /* see discussion above */
2011                                                 elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
2012                                                 chain_move_failed = true;
2013                                                 break;  /* out of check-all-items loop */
2014                                         }
2015                                         tp.t_self = vtlp->this_tid;
2016                                         Pbuf = ReadBuffer(onerel,
2017                                                                         ItemPointerGetBlockNumber(&(tp.t_self)));
2018                                         Ppage = BufferGetPage(Pbuf);
2019                                         Pitemid = PageGetItemId(Ppage,
2020                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2021                                         /* this can't happen since we saw tuple earlier: */
2022                                         if (!ItemIdIsUsed(Pitemid))
2023                                                 elog(ERROR, "parent itemid marked as unused");
2024                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2025
2026                                         /* ctid should not have changed since we saved it */
2027                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2028                                                                                          &(PTdata->t_ctid)));
2029
2030                                         /*
2031                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2032                                          * (child item is removed)... Due to the fact that at the
2033                                          * moment we don't remove unuseful part of update-chain,
2034                                          * it's possible to get non-matching parent row here. Like
2035                                          * as in the case which caused this problem, we stop
2036                                          * shrinking here. I could try to find real parent row but
2037                                          * want not to do it because of real solution will be
2038                                          * implemented anyway, later, and we are too close to 6.5
2039                                          * release. - vadim 06/11/99
2040                                          */
2041                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2042                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2043                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2044                                         {
2045                                                 ReleaseBuffer(Pbuf);
2046                                                 elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
2047                                                 chain_move_failed = true;
2048                                                 break;  /* out of check-all-items loop */
2049                                         }
2050                                         tp.t_data = PTdata;
2051                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2052                                         if (freeCbuf)
2053                                                 ReleaseBuffer(Cbuf);
2054                                         Cbuf = Pbuf;
2055                                         freeCbuf = true;
2056                                 }                               /* end of check-all-items loop */
2057
2058                                 if (freeCbuf)
2059                                         ReleaseBuffer(Cbuf);
2060                                 freeCbuf = false;
2061
2062                                 if (chain_move_failed)
2063                                 {
2064                                         /*
2065                                          * Undo changes to offsets_used state.  We don't bother
2066                                          * cleaning up the amount-free state, since we're not
2067                                          * going to do any further tuple motion.
2068                                          */
2069                                         for (i = 0; i < num_vtmove; i++)
2070                                         {
2071                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2072                                                 (vtmove[i].vacpage->offsets_used)--;
2073                                         }
2074                                         pfree(vtmove);
2075                                         break;          /* out of walk-along-page loop */
2076                                 }
2077
2078                                 /*
2079                                  * Okay, move the whole tuple chain in reverse order.
2080                                  *
2081                                  * Ctid tracks the new location of the previously-moved tuple.
2082                                  */
2083                                 ItemPointerSetInvalid(&Ctid);
2084                                 for (ti = 0; ti < num_vtmove; ti++)
2085                                 {
2086                                         VacPage         destvacpage = vtmove[ti].vacpage;
2087                                         Page            Cpage;
2088                                         ItemId          Citemid;
2089
2090                                         /* Get page to move from */
2091                                         tuple.t_self = vtmove[ti].tid;
2092                                         Cbuf = ReadBuffer(onerel,
2093                                                                  ItemPointerGetBlockNumber(&(tuple.t_self)));
2094
2095                                         /* Get page to move to */
2096                                         dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
2097
2098                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2099                                         if (dst_buffer != Cbuf)
2100                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2101
2102                                         dst_page = BufferGetPage(dst_buffer);
2103                                         Cpage = BufferGetPage(Cbuf);
2104
2105                                         Citemid = PageGetItemId(Cpage,
2106                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2107                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2108                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2109
2110                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2111                                                                          dst_buffer, dst_page, destvacpage,
2112                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2113
2114                                         num_moved++;
2115                                         if (destvacpage->blkno > last_move_dest_block)
2116                                                 last_move_dest_block = destvacpage->blkno;
2117
2118                                         /*
2119                                          * Remember that we moved tuple from the current page
2120                                          * (corresponding index tuple will be cleaned).
2121                                          */
2122                                         if (Cbuf == buf)
2123                                                 vacpage->offsets[vacpage->offsets_free++] =
2124                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2125                                         else
2126                                                 keep_tuples++;
2127
2128                                         ReleaseBuffer(dst_buffer);
2129                                         ReleaseBuffer(Cbuf);
2130                                 }                               /* end of move-the-tuple-chain loop */
2131
2132                                 dst_buffer = InvalidBuffer;
2133                                 pfree(vtmove);
2134                                 chain_tuple_moved = true;
2135
2136                                 /* advance to next tuple in walk-along-page loop */
2137                                 continue;
2138                         }                                       /* end of is-tuple-in-chain test */
2139
2140                         /* try to find new page for this tuple */
2141                         if (dst_buffer == InvalidBuffer ||
2142                                 !enough_space(dst_vacpage, tuple_len))
2143                         {
2144                                 if (dst_buffer != InvalidBuffer)
2145                                 {
2146                                         ReleaseBuffer(dst_buffer);
2147                                         dst_buffer = InvalidBuffer;
2148                                 }
2149                                 for (i = 0; i < num_fraged_pages; i++)
2150                                 {
2151                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2152                                                 break;
2153                                 }
2154                                 if (i == num_fraged_pages)
2155                                         break;          /* can't move item anywhere */
2156                                 dst_vacpage = fraged_pages->pagedesc[i];
2157                                 dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
2158                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2159                                 dst_page = BufferGetPage(dst_buffer);
2160                                 /* if this page was not used before - clean it */
2161                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2162                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2163                         }
2164                         else
2165                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2166
2167                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2168
2169                         move_plain_tuple(onerel, buf, page, &tuple,
2170                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2171
2172                         num_moved++;
2173                         if (dst_vacpage->blkno > last_move_dest_block)
2174                                 last_move_dest_block = dst_vacpage->blkno;
2175
2176                         /*
2177                          * Remember that we moved tuple from the current page
2178                          * (corresponding index tuple will be cleaned).
2179                          */
2180                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2181                 }                                               /* walk along page */
2182
2183                 /*
2184                  * If we broke out of the walk-along-page loop early (ie, still have
2185                  * offnum <= maxoff), then we failed to move some tuple off this page.
2186                  * No point in shrinking any more, so clean up and exit the per-page
2187                  * loop.
2188                  */
2189                 if (offnum < maxoff && keep_tuples > 0)
2190                 {
2191                         OffsetNumber off;
2192
2193                         /*
2194                          * Fix vacpage state for any unvisited tuples remaining on page
2195                          */
2196                         for (off = OffsetNumberNext(offnum);
2197                                  off <= maxoff;
2198                                  off = OffsetNumberNext(off))
2199                         {
2200                                 ItemId          itemid = PageGetItemId(page, off);
2201                                 HeapTupleHeader htup;
2202
2203                                 if (!ItemIdIsUsed(itemid))
2204                                         continue;
2205                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2206                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2207                                         continue;
2208
2209                                 /*
2210                                  * See comments in the walk-along-page loop above about why
2211                                  * only MOVED_OFF tuples should be found here.
2212                                  */
2213                                 if (htup->t_infomask & HEAP_MOVED_IN)
2214                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2215                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2216                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2217                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2218                                         elog(ERROR, "invalid XVAC in tuple header");
2219
2220                                 if (chain_tuple_moved)
2221                                 {
2222                                         /* some chains were moved while cleaning this page */
2223                                         Assert(vacpage->offsets_free > 0);
2224                                         for (i = 0; i < vacpage->offsets_free; i++)
2225                                         {
2226                                                 if (vacpage->offsets[i] == off)
2227                                                         break;
2228                                         }
2229                                         if (i >= vacpage->offsets_free)         /* not found */
2230                                         {
2231                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2232                                                 Assert(keep_tuples > 0);
2233                                                 keep_tuples--;
2234                                         }
2235                                 }
2236                                 else
2237                                 {
2238                                         vacpage->offsets[vacpage->offsets_free++] = off;
2239                                         Assert(keep_tuples > 0);
2240                                         keep_tuples--;
2241                                 }
2242                         }
2243                 }
2244
2245                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2246                 {
2247                         if (chain_tuple_moved)          /* else - they are ordered */
2248                         {
2249                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2250                                           sizeof(OffsetNumber), vac_cmp_offno);
2251                         }
2252                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2253                 }
2254
2255                 ReleaseBuffer(buf);
2256
2257                 if (offnum <= maxoff)
2258                         break;                          /* had to quit early, see above note */
2259
2260         }                                                       /* walk along relation */
2261
2262         blkno++;                                        /* new number of blocks */
2263
2264         if (dst_buffer != InvalidBuffer)
2265         {
2266                 Assert(num_moved > 0);
2267                 ReleaseBuffer(dst_buffer);
2268         }
2269
2270         if (num_moved > 0)
2271         {
2272                 /*
2273                  * We have to commit our tuple movings before we truncate the
2274                  * relation.  Ideally we should do Commit/StartTransactionCommand
2275                  * here, relying on the session-level table lock to protect our
2276                  * exclusive access to the relation.  However, that would require a
2277                  * lot of extra code to close and re-open the relation, indexes, etc.
2278                  * For now, a quick hack: record status of current transaction as
2279                  * committed, and continue.
2280                  */
2281                 RecordTransactionCommit();
2282         }
2283
2284         /*
2285          * We are not going to move any more tuples across pages, but we still
2286          * need to apply vacuum_page to compact free space in the remaining pages
2287          * in vacuum_pages list.  Note that some of these pages may also be in the
2288          * fraged_pages list, and may have had tuples moved onto them; if so, we
2289          * already did vacuum_page and needn't do it again.
2290          */
2291         for (i = 0, curpage = vacuum_pages->pagedesc;
2292                  i < vacuumed_pages;
2293                  i++, curpage++)
2294         {
2295                 vacuum_delay_point();
2296
2297                 Assert((*curpage)->blkno < blkno);
2298                 if ((*curpage)->offsets_used == 0)
2299                 {
2300                         Buffer          buf;
2301                         Page            page;
2302
2303                         /* this page was not used as a move target, so must clean it */
2304                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2305                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2306                         page = BufferGetPage(buf);
2307                         if (!PageIsEmpty(page))
2308                                 vacuum_page(onerel, buf, *curpage);
2309                         UnlockReleaseBuffer(buf);
2310                 }
2311         }
2312
2313         /*
2314          * Now scan all the pages that we moved tuples onto and update tuple
2315          * status bits.  This is not really necessary, but will save time for
2316          * future transactions examining these tuples.
2317          */
2318         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2319                                          last_move_dest_block, num_moved);
2320
2321         /*
2322          * It'd be cleaner to make this report at the bottom of this routine, but
2323          * then the rusage would double-count the second pass of index vacuuming.
2324          * So do it here and ignore the relatively small amount of processing that
2325          * occurs below.
2326          */
2327         ereport(elevel,
2328                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2329                                         RelationGetRelationName(onerel),
2330                                         num_moved, nblocks, blkno),
2331                          errdetail("%s.",
2332                                            pg_rusage_show(&ru0))));
2333
2334         /*
2335          * Reflect the motion of system tuples to catalog cache here.
2336          */
2337         CommandCounterIncrement();
2338
2339         if (Nvacpagelist.num_pages > 0)
2340         {
2341                 /* vacuum indexes again if needed */
2342                 if (Irel != NULL)
2343                 {
2344                         VacPage    *vpleft,
2345                                            *vpright,
2346                                                 vpsave;
2347
2348                         /* re-sort Nvacpagelist.pagedesc */
2349                         for (vpleft = Nvacpagelist.pagedesc,
2350                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2351                                  vpleft < vpright; vpleft++, vpright--)
2352                         {
2353                                 vpsave = *vpleft;
2354                                 *vpleft = *vpright;
2355                                 *vpright = vpsave;
2356                         }
2357
2358                         /*
2359                          * keep_tuples is the number of tuples that have been moved off a
2360                          * page during chain moves but not been scanned over subsequently.
2361                          * The tuple ids of these tuples are not recorded as free offsets
2362                          * for any VacPage, so they will not be cleared from the indexes.
2363                          */
2364                         Assert(keep_tuples >= 0);
2365                         for (i = 0; i < nindexes; i++)
2366                                 vacuum_index(&Nvacpagelist, Irel[i],
2367                                                          vacrelstats->rel_tuples, keep_tuples);
2368                 }
2369
2370                 /*
2371                  * Clean moved-off tuples from last page in Nvacpagelist list.
2372                  *
2373                  * We need only do this in this one page, because higher-numbered
2374                  * pages are going to be truncated from the relation entirely. But see
2375                  * comments for update_hint_bits().
2376                  */
2377                 if (vacpage->blkno == (blkno - 1) &&
2378                         vacpage->offsets_free > 0)
2379                 {
2380                         Buffer          buf;
2381                         Page            page;
2382                         OffsetNumber unused[MaxOffsetNumber];
2383                         OffsetNumber offnum,
2384                                                 maxoff;
2385                         int                     uncnt;
2386                         int                     num_tuples = 0;
2387
2388                         buf = ReadBuffer(onerel, vacpage->blkno);
2389                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2390                         page = BufferGetPage(buf);
2391                         maxoff = PageGetMaxOffsetNumber(page);
2392                         for (offnum = FirstOffsetNumber;
2393                                  offnum <= maxoff;
2394                                  offnum = OffsetNumberNext(offnum))
2395                         {
2396                                 ItemId          itemid = PageGetItemId(page, offnum);
2397                                 HeapTupleHeader htup;
2398
2399                                 if (!ItemIdIsUsed(itemid))
2400                                         continue;
2401                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2402                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2403                                         continue;
2404
2405                                 /*
2406                                  * See comments in the walk-along-page loop above about why
2407                                  * only MOVED_OFF tuples should be found here.
2408                                  */
2409                                 if (htup->t_infomask & HEAP_MOVED_IN)
2410                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2411                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2412                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2413                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2414                                         elog(ERROR, "invalid XVAC in tuple header");
2415
2416                                 itemid->lp_flags &= ~LP_USED;
2417                                 num_tuples++;
2418                         }
2419                         Assert(vacpage->offsets_free == num_tuples);
2420
2421                         START_CRIT_SECTION();
2422
2423                         uncnt = PageRepairFragmentation(page, unused);
2424
2425                         MarkBufferDirty(buf);
2426
2427                         /* XLOG stuff */
2428                         if (!onerel->rd_istemp)
2429                         {
2430                                 XLogRecPtr      recptr;
2431
2432                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2433                                 PageSetLSN(page, recptr);
2434                                 PageSetTLI(page, ThisTimeLineID);
2435                         }
2436                         else
2437                         {
2438                                 /*
2439                                  * No XLOG record, but still need to flag that XID exists on
2440                                  * disk
2441                                  */
2442                                 MyXactMadeTempRelUpdate = true;
2443                         }
2444
2445                         END_CRIT_SECTION();
2446
2447                         UnlockReleaseBuffer(buf);
2448                 }
2449
2450                 /* now - free new list of reaped pages */
2451                 curpage = Nvacpagelist.pagedesc;
2452                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2453                         pfree(*curpage);
2454                 pfree(Nvacpagelist.pagedesc);
2455         }
2456
2457         /* Truncate relation, if needed */
2458         if (blkno < nblocks)
2459         {
2460                 RelationTruncate(onerel, blkno);
2461                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2462         }
2463
2464         /* clean up */
2465         pfree(vacpage);
2466         if (vacrelstats->vtlinks != NULL)
2467                 pfree(vacrelstats->vtlinks);
2468
2469         ExecContext_Finish(&ec);
2470 }
2471
2472 /*
2473  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2474  *
2475  *              This routine moves old_tup from old_page to dst_page.
2476  *              old_page and dst_page might be the same page.
2477  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2478  *              the single lock, if this is a intra-page-move) are released before
2479  *              exit.
2480  *
2481  *              Yes, a routine with ten parameters is ugly, but it's still better
2482  *              than having these 120 lines of code in repair_frag() which is
2483  *              already too long and almost unreadable.
2484  */
2485 static void
2486 move_chain_tuple(Relation rel,
2487                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2488                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2489                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2490 {
2491         TransactionId myXID = GetCurrentTransactionId();
2492         HeapTupleData newtup;
2493         OffsetNumber newoff;
2494         ItemId          newitemid;
2495         Size            tuple_len = old_tup->t_len;
2496
2497         /*
2498          * make a modifiable copy of the source tuple.
2499          */
2500         heap_copytuple_with_tuple(old_tup, &newtup);
2501
2502         /*
2503          * register invalidation of source tuple in catcaches.
2504          */
2505         CacheInvalidateHeapTuple(rel, old_tup);
2506
2507         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2508         START_CRIT_SECTION();
2509
2510         /*
2511          * mark the source tuple MOVED_OFF.
2512          */
2513         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2514                                                                          HEAP_XMIN_INVALID |
2515                                                                          HEAP_MOVED_IN);
2516         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2517         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2518
2519         /*
2520          * If this page was not used before - clean it.
2521          *
2522          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2523          * destination pages to be the same (since this tuple-chain member can be
2524          * on a page lower than the one we're currently processing in the outer
2525          * loop).  If that's true, then after vacuum_page() the source tuple will
2526          * have been moved, and tuple.t_data will be pointing at garbage.
2527          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2528          * step!!
2529          *
2530          * This path is different from the other callers of vacuum_page, because
2531          * we have already incremented the vacpage's offsets_used field to account
2532          * for the tuple(s) we expect to move onto the page. Therefore
2533          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2534          * good debugging check for all other callers, we work around it here
2535          * rather than remove it.
2536          */
2537         if (!PageIsEmpty(dst_page) && cleanVpd)
2538         {
2539                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2540
2541                 dst_vacpage->offsets_used = 0;
2542                 vacuum_page(rel, dst_buf, dst_vacpage);
2543                 dst_vacpage->offsets_used = sv_offsets_used;
2544         }
2545
2546         /*
2547          * Update the state of the copied tuple, and store it on the destination
2548          * page.
2549          */
2550         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2551                                                                    HEAP_XMIN_INVALID |
2552                                                                    HEAP_MOVED_OFF);
2553         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2554         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2555         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2556                                                  InvalidOffsetNumber, LP_USED);
2557         if (newoff == InvalidOffsetNumber)
2558                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2559                          (unsigned long) tuple_len, dst_vacpage->blkno);
2560         newitemid = PageGetItemId(dst_page, newoff);
2561         /* drop temporary copy, and point to the version on the dest page */
2562         pfree(newtup.t_data);
2563         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2564
2565         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2566
2567         /*
2568          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2569          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2570          * order, this is actually the previously processed tuple.)
2571          */
2572         if (!ItemPointerIsValid(ctid))
2573                 newtup.t_data->t_ctid = newtup.t_self;
2574         else
2575                 newtup.t_data->t_ctid = *ctid;
2576         *ctid = newtup.t_self;
2577
2578         MarkBufferDirty(dst_buf);
2579         if (dst_buf != old_buf)
2580                 MarkBufferDirty(old_buf);
2581
2582         /* XLOG stuff */
2583         if (!rel->rd_istemp)
2584         {
2585                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2586                                                                                    dst_buf, &newtup);
2587
2588                 if (old_buf != dst_buf)
2589                 {
2590                         PageSetLSN(old_page, recptr);
2591                         PageSetTLI(old_page, ThisTimeLineID);
2592                 }
2593                 PageSetLSN(dst_page, recptr);
2594                 PageSetTLI(dst_page, ThisTimeLineID);
2595         }
2596         else
2597         {
2598                 /*
2599                  * No XLOG record, but still need to flag that XID exists on disk
2600                  */
2601                 MyXactMadeTempRelUpdate = true;
2602         }
2603
2604         END_CRIT_SECTION();
2605
2606         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2607         if (dst_buf != old_buf)
2608                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2609
2610         /* Create index entries for the moved tuple */
2611         if (ec->resultRelInfo->ri_NumIndices > 0)
2612         {
2613                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2614                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2615                 ResetPerTupleExprContext(ec->estate);
2616         }
2617 }
2618
2619 /*
2620  *      move_plain_tuple() -- move one tuple that is not part of a chain
2621  *
2622  *              This routine moves old_tup from old_page to dst_page.
2623  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2624  *              released before exit.
2625  *
2626  *              Yes, a routine with eight parameters is ugly, but it's still better
2627  *              than having these 90 lines of code in repair_frag() which is already
2628  *              too long and almost unreadable.
2629  */
2630 static void
2631 move_plain_tuple(Relation rel,
2632                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2633                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2634                                  ExecContext ec)
2635 {
2636         TransactionId myXID = GetCurrentTransactionId();
2637         HeapTupleData newtup;
2638         OffsetNumber newoff;
2639         ItemId          newitemid;
2640         Size            tuple_len = old_tup->t_len;
2641
2642         /* copy tuple */
2643         heap_copytuple_with_tuple(old_tup, &newtup);
2644
2645         /*
2646          * register invalidation of source tuple in catcaches.
2647          *
2648          * (Note: we do not need to register the copied tuple, because we are not
2649          * changing the tuple contents and so there cannot be any need to flush
2650          * negative catcache entries.)
2651          */
2652         CacheInvalidateHeapTuple(rel, old_tup);
2653
2654         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2655         START_CRIT_SECTION();
2656
2657         /*
2658          * Mark new tuple as MOVED_IN by me.
2659          */
2660         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2661                                                                    HEAP_XMIN_INVALID |
2662                                                                    HEAP_MOVED_OFF);
2663         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2664         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2665
2666         /* add tuple to the page */
2667         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2668                                                  InvalidOffsetNumber, LP_USED);
2669         if (newoff == InvalidOffsetNumber)
2670                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2671                          (unsigned long) tuple_len,
2672                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
2673                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
2674         newitemid = PageGetItemId(dst_page, newoff);
2675         pfree(newtup.t_data);
2676         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2677         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
2678         newtup.t_self = newtup.t_data->t_ctid;
2679
2680         /*
2681          * Mark old tuple as MOVED_OFF by me.
2682          */
2683         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2684                                                                          HEAP_XMIN_INVALID |
2685                                                                          HEAP_MOVED_IN);
2686         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2687         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2688
2689         MarkBufferDirty(dst_buf);
2690         MarkBufferDirty(old_buf);
2691
2692         /* XLOG stuff */
2693         if (!rel->rd_istemp)
2694         {
2695                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2696                                                                                    dst_buf, &newtup);
2697
2698                 PageSetLSN(old_page, recptr);
2699                 PageSetTLI(old_page, ThisTimeLineID);
2700                 PageSetLSN(dst_page, recptr);
2701                 PageSetTLI(dst_page, ThisTimeLineID);
2702         }
2703         else
2704         {
2705                 /*
2706                  * No XLOG record, but still need to flag that XID exists on disk
2707                  */
2708                 MyXactMadeTempRelUpdate = true;
2709         }
2710
2711         END_CRIT_SECTION();
2712
2713         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
2714         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2715         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2716
2717         dst_vacpage->offsets_used++;
2718
2719         /* insert index' tuples if needed */
2720         if (ec->resultRelInfo->ri_NumIndices > 0)
2721         {
2722                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2723                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2724                 ResetPerTupleExprContext(ec->estate);
2725         }
2726 }
2727
2728 /*
2729  *      update_hint_bits() -- update hint bits in destination pages
2730  *
2731  * Scan all the pages that we moved tuples onto and update tuple status bits.
2732  * This is normally not really necessary, but it will save time for future
2733  * transactions examining these tuples.
2734  *
2735  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
2736  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
2737  *
2738  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2739  * pages that were move source pages but not move dest pages.  The bulk
2740  * of the move source pages will be physically truncated from the relation,
2741  * and the last page remaining in the rel will be fixed separately in
2742  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
2743  * hint bits updated are tuples that are moved as part of a chain and were
2744  * on pages that were not either move destinations nor at the end of the rel.
2745  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
2746  * to remember and revisit those pages too.
2747  *
2748  * Because of this omission, VACUUM FULL FREEZE is not a safe combination;
2749  * it's possible that the VACUUM's own XID remains exposed as something that
2750  * tqual tests would need to check.
2751  *
2752  * For the non-freeze case, one wonders whether it wouldn't be better to skip
2753  * this work entirely, and let the tuple status updates happen someplace
2754  * that's not holding an exclusive lock on the relation.
2755  */
2756 static void
2757 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
2758                                  BlockNumber last_move_dest_block, int num_moved)
2759 {
2760         TransactionId myXID = GetCurrentTransactionId();
2761         int                     checked_moved = 0;
2762         int                     i;
2763         VacPage    *curpage;
2764
2765         for (i = 0, curpage = fraged_pages->pagedesc;
2766                  i < num_fraged_pages;
2767                  i++, curpage++)
2768         {
2769                 Buffer          buf;
2770                 Page            page;
2771                 OffsetNumber max_offset;
2772                 OffsetNumber off;
2773                 int                     num_tuples = 0;
2774
2775                 vacuum_delay_point();
2776
2777                 if ((*curpage)->blkno > last_move_dest_block)
2778                         break;                          /* no need to scan any further */
2779                 if ((*curpage)->offsets_used == 0)
2780                         continue;                       /* this page was never used as a move dest */
2781                 buf = ReadBuffer(rel, (*curpage)->blkno);
2782                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2783                 page = BufferGetPage(buf);
2784                 max_offset = PageGetMaxOffsetNumber(page);
2785                 for (off = FirstOffsetNumber;
2786                          off <= max_offset;
2787                          off = OffsetNumberNext(off))
2788                 {
2789                         ItemId          itemid = PageGetItemId(page, off);
2790                         HeapTupleHeader htup;
2791
2792                         if (!ItemIdIsUsed(itemid))
2793                                 continue;
2794                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
2795                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2796                                 continue;
2797
2798                         /*
2799                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
2800                          */
2801                         if (!(htup->t_infomask & HEAP_MOVED))
2802                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2803                         if (HeapTupleHeaderGetXvac(htup) != myXID)
2804                                 elog(ERROR, "invalid XVAC in tuple header");
2805
2806                         if (htup->t_infomask & HEAP_MOVED_IN)
2807                         {
2808                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
2809                                 htup->t_infomask &= ~HEAP_MOVED;
2810                                 num_tuples++;
2811                         }
2812                         else
2813                                 htup->t_infomask |= HEAP_XMIN_INVALID;
2814                 }
2815                 MarkBufferDirty(buf);
2816                 UnlockReleaseBuffer(buf);
2817                 Assert((*curpage)->offsets_used == num_tuples);
2818                 checked_moved += num_tuples;
2819         }
2820         Assert(num_moved == checked_moved);
2821 }
2822
2823 /*
2824  *      vacuum_heap() -- free dead tuples
2825  *
2826  *              This routine marks dead tuples as unused and truncates relation
2827  *              if there are "empty" end-blocks.
2828  */
2829 static void
2830 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2831 {
2832         Buffer          buf;
2833         VacPage    *vacpage;
2834         BlockNumber relblocks;
2835         int                     nblocks;
2836         int                     i;
2837
2838         nblocks = vacuum_pages->num_pages;
2839         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2840
2841         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2842         {
2843                 vacuum_delay_point();
2844
2845                 if ((*vacpage)->offsets_free > 0)
2846                 {
2847                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2848                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2849                         vacuum_page(onerel, buf, *vacpage);
2850                         UnlockReleaseBuffer(buf);
2851                 }
2852         }
2853
2854         /* Truncate relation if there are some empty end-pages */
2855         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2856         if (vacuum_pages->empty_end_pages > 0)
2857         {
2858                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2859                 ereport(elevel,
2860                                 (errmsg("\"%s\": truncated %u to %u pages",
2861                                                 RelationGetRelationName(onerel),
2862                                                 vacrelstats->rel_pages, relblocks)));
2863                 RelationTruncate(onerel, relblocks);
2864                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
2865         }
2866 }
2867
2868 /*
2869  *      vacuum_page() -- free dead tuples on a page
2870  *                                       and repair its fragmentation.
2871  *
2872  * Caller must hold pin and lock on buffer.
2873  */
2874 static void
2875 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2876 {
2877         OffsetNumber unused[MaxOffsetNumber];
2878         int                     uncnt;
2879         Page            page = BufferGetPage(buffer);
2880         ItemId          itemid;
2881         int                     i;
2882
2883         /* There shouldn't be any tuples moved onto the page yet! */
2884         Assert(vacpage->offsets_used == 0);
2885
2886         START_CRIT_SECTION();
2887
2888         for (i = 0; i < vacpage->offsets_free; i++)
2889         {
2890                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2891                 itemid->lp_flags &= ~LP_USED;
2892         }
2893
2894         uncnt = PageRepairFragmentation(page, unused);
2895
2896         MarkBufferDirty(buffer);
2897
2898         /* XLOG stuff */
2899         if (!onerel->rd_istemp)
2900         {
2901                 XLogRecPtr      recptr;
2902
2903                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2904                 PageSetLSN(page, recptr);
2905                 PageSetTLI(page, ThisTimeLineID);
2906         }
2907         else
2908         {
2909                 /* No XLOG record, but still need to flag that XID exists on disk */
2910                 MyXactMadeTempRelUpdate = true;
2911         }
2912
2913         END_CRIT_SECTION();
2914 }
2915
2916 /*
2917  *      scan_index() -- scan one index relation to update pg_class statistics.
2918  *
2919  * We use this when we have no deletions to do.
2920  */
2921 static void
2922 scan_index(Relation indrel, double num_tuples)
2923 {
2924         IndexBulkDeleteResult *stats;
2925         IndexVacuumInfo ivinfo;
2926         PGRUsage        ru0;
2927
2928         pg_rusage_init(&ru0);
2929
2930         ivinfo.index = indrel;
2931         ivinfo.vacuum_full = true;
2932         ivinfo.message_level = elevel;
2933         ivinfo.num_heap_tuples = num_tuples;
2934
2935         stats = index_vacuum_cleanup(&ivinfo, NULL);
2936
2937         if (!stats)
2938                 return;
2939
2940         /* now update statistics in pg_class */
2941         vac_update_relstats(RelationGetRelid(indrel),
2942                                                 stats->num_pages, stats->num_index_tuples,
2943                                                 false);
2944
2945         ereport(elevel,
2946                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
2947                                         RelationGetRelationName(indrel),
2948                                         stats->num_index_tuples,
2949                                         stats->num_pages),
2950         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
2951                           "%s.",
2952                           stats->pages_deleted, stats->pages_free,
2953                           pg_rusage_show(&ru0))));
2954
2955         /*
2956          * Check for tuple count mismatch.      If the index is partial, then it's OK
2957          * for it to have fewer tuples than the heap; else we got trouble.
2958          */
2959         if (stats->num_index_tuples != num_tuples)
2960         {
2961                 if (stats->num_index_tuples > num_tuples ||
2962                         !vac_is_partial_index(indrel))
2963                         ereport(WARNING,
2964                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
2965                                                         RelationGetRelationName(indrel),
2966                                                         stats->num_index_tuples, num_tuples),
2967                                          errhint("Rebuild the index with REINDEX.")));
2968         }
2969
2970         pfree(stats);
2971 }
2972
2973 /*
2974  *      vacuum_index() -- vacuum one index relation.
2975  *
2976  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2977  *              It's locked. Indrel is an index relation on the vacuumed heap.
2978  *
2979  *              We don't bother to set locks on the index relation here, since
2980  *              the parent table is exclusive-locked already.
2981  *
2982  *              Finally, we arrange to update the index relation's statistics in
2983  *              pg_class.
2984  */
2985 static void
2986 vacuum_index(VacPageList vacpagelist, Relation indrel,
2987                          double num_tuples, int keep_tuples)
2988 {
2989         IndexBulkDeleteResult *stats;
2990         IndexVacuumInfo ivinfo;
2991         PGRUsage        ru0;
2992
2993         pg_rusage_init(&ru0);
2994
2995         ivinfo.index = indrel;
2996         ivinfo.vacuum_full = true;
2997         ivinfo.message_level = elevel;
2998         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
2999
3000         /* Do bulk deletion */
3001         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3002
3003         /* Do post-VACUUM cleanup */
3004         stats = index_vacuum_cleanup(&ivinfo, stats);
3005
3006         if (!stats)
3007                 return;
3008
3009         /* now update statistics in pg_class */
3010         vac_update_relstats(RelationGetRelid(indrel),
3011                                                 stats->num_pages, stats->num_index_tuples,
3012                                                 false);
3013
3014         ereport(elevel,
3015                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3016                                         RelationGetRelationName(indrel),
3017                                         stats->num_index_tuples,
3018                                         stats->num_pages),
3019                          errdetail("%.0f index row versions were removed.\n"
3020                          "%u index pages have been deleted, %u are currently reusable.\n"
3021                                            "%s.",
3022                                            stats->tuples_removed,
3023                                            stats->pages_deleted, stats->pages_free,
3024                                            pg_rusage_show(&ru0))));
3025
3026         /*
3027          * Check for tuple count mismatch.      If the index is partial, then it's OK
3028          * for it to have fewer tuples than the heap; else we got trouble.
3029          */
3030         if (stats->num_index_tuples != num_tuples + keep_tuples)
3031         {
3032                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3033                         !vac_is_partial_index(indrel))
3034                         ereport(WARNING,
3035                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3036                                                         RelationGetRelationName(indrel),
3037                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3038                                          errhint("Rebuild the index with REINDEX.")));
3039         }
3040
3041         pfree(stats);
3042 }
3043
3044 /*
3045  *      tid_reaped() -- is a particular tid reaped?
3046  *
3047  *              This has the right signature to be an IndexBulkDeleteCallback.
3048  *
3049  *              vacpagelist->VacPage_array is sorted in right order.
3050  */
3051 static bool
3052 tid_reaped(ItemPointer itemptr, void *state)
3053 {
3054         VacPageList vacpagelist = (VacPageList) state;
3055         OffsetNumber ioffno;
3056         OffsetNumber *voff;
3057         VacPage         vp,
3058                            *vpp;
3059         VacPageData vacpage;
3060
3061         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3062         ioffno = ItemPointerGetOffsetNumber(itemptr);
3063
3064         vp = &vacpage;
3065         vpp = (VacPage *) vac_bsearch((void *) &vp,
3066                                                                   (void *) (vacpagelist->pagedesc),
3067                                                                   vacpagelist->num_pages,
3068                                                                   sizeof(VacPage),
3069                                                                   vac_cmp_blk);
3070
3071         if (vpp == NULL)
3072                 return false;
3073
3074         /* ok - we are on a partially or fully reaped page */
3075         vp = *vpp;
3076
3077         if (vp->offsets_free == 0)
3078         {
3079                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3080                 return true;
3081         }
3082
3083         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3084                                                                                 (void *) (vp->offsets),
3085                                                                                 vp->offsets_free,
3086                                                                                 sizeof(OffsetNumber),
3087                                                                                 vac_cmp_offno);
3088
3089         if (voff == NULL)
3090                 return false;
3091
3092         /* tid is reaped */
3093         return true;
3094 }
3095
3096 /*
3097  * Update the shared Free Space Map with the info we now have about
3098  * free space in the relation, discarding any old info the map may have.
3099  */
3100 static void
3101 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3102                            BlockNumber rel_pages)
3103 {
3104         int                     nPages = fraged_pages->num_pages;
3105         VacPage    *pagedesc = fraged_pages->pagedesc;
3106         Size            threshold;
3107         PageFreeSpaceInfo *pageSpaces;
3108         int                     outPages;
3109         int                     i;
3110
3111         /*
3112          * We only report pages with free space at least equal to the average
3113          * request size --- this avoids cluttering FSM with uselessly-small bits
3114          * of space.  Although FSM would discard pages with little free space
3115          * anyway, it's important to do this prefiltering because (a) it reduces
3116          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3117          * FSM uses the number of pages reported as a statistic for guiding space
3118          * management.  If we didn't threshold our reports the same way
3119          * vacuumlazy.c does, we'd be skewing that statistic.
3120          */
3121         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3122         if (threshold < HeapGetPageFreeSpace(onerel))
3123                 threshold = HeapGetPageFreeSpace(onerel);
3124
3125         pageSpaces = (PageFreeSpaceInfo *)
3126                 palloc(nPages * sizeof(PageFreeSpaceInfo));
3127         outPages = 0;
3128
3129         for (i = 0; i < nPages; i++)
3130         {
3131                 /*
3132                  * fraged_pages may contain entries for pages that we later decided to
3133                  * truncate from the relation; don't enter them into the free space
3134                  * map!
3135                  */
3136                 if (pagedesc[i]->blkno >= rel_pages)
3137                         break;
3138
3139                 if (pagedesc[i]->free >= threshold)
3140                 {
3141                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
3142                         pageSpaces[outPages].avail = pagedesc[i]->free;
3143                         outPages++;
3144                 }
3145         }
3146
3147         RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
3148
3149         pfree(pageSpaces);
3150 }
3151
3152 /* Copy a VacPage structure */
3153 static VacPage
3154 copy_vac_page(VacPage vacpage)
3155 {
3156         VacPage         newvacpage;
3157
3158         /* allocate a VacPageData entry */
3159         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3160                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3161
3162         /* fill it in */
3163         if (vacpage->offsets_free > 0)
3164                 memcpy(newvacpage->offsets, vacpage->offsets,
3165                            vacpage->offsets_free * sizeof(OffsetNumber));
3166         newvacpage->blkno = vacpage->blkno;
3167         newvacpage->free = vacpage->free;
3168         newvacpage->offsets_used = vacpage->offsets_used;
3169         newvacpage->offsets_free = vacpage->offsets_free;
3170
3171         return newvacpage;
3172 }
3173
3174 /*
3175  * Add a VacPage pointer to a VacPageList.
3176  *
3177  *              As a side effect of the way that scan_heap works,
3178  *              higher pages come after lower pages in the array
3179  *              (and highest tid on a page is last).
3180  */
3181 static void
3182 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3183 {
3184 #define PG_NPAGEDESC 1024
3185
3186         /* allocate a VacPage entry if needed */
3187         if (vacpagelist->num_pages == 0)
3188         {
3189                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3190                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3191         }
3192         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3193         {
3194                 vacpagelist->num_allocated_pages *= 2;
3195                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3196         }
3197         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3198         (vacpagelist->num_pages)++;
3199 }
3200
3201 /*
3202  * vac_bsearch: just like standard C library routine bsearch(),
3203  * except that we first test to see whether the target key is outside
3204  * the range of the table entries.      This case is handled relatively slowly
3205  * by the normal binary search algorithm (ie, no faster than any other key)
3206  * but it occurs often enough in VACUUM to be worth optimizing.
3207  */
3208 static void *
3209 vac_bsearch(const void *key, const void *base,
3210                         size_t nelem, size_t size,
3211                         int (*compar) (const void *, const void *))
3212 {
3213         int                     res;
3214         const void *last;
3215
3216         if (nelem == 0)
3217                 return NULL;
3218         res = compar(key, base);
3219         if (res < 0)
3220                 return NULL;
3221         if (res == 0)
3222                 return (void *) base;
3223         if (nelem > 1)
3224         {
3225                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3226                 res = compar(key, last);
3227                 if (res > 0)
3228                         return NULL;
3229                 if (res == 0)
3230                         return (void *) last;
3231         }
3232         if (nelem <= 2)
3233                 return NULL;                    /* already checked 'em all */
3234         return bsearch(key, base, nelem, size, compar);
3235 }
3236
3237 /*
3238  * Comparator routines for use with qsort() and bsearch().
3239  */
3240 static int
3241 vac_cmp_blk(const void *left, const void *right)
3242 {
3243         BlockNumber lblk,
3244                                 rblk;
3245
3246         lblk = (*((VacPage *) left))->blkno;
3247         rblk = (*((VacPage *) right))->blkno;
3248
3249         if (lblk < rblk)
3250                 return -1;
3251         if (lblk == rblk)
3252                 return 0;
3253         return 1;
3254 }
3255
3256 static int
3257 vac_cmp_offno(const void *left, const void *right)
3258 {
3259         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3260                 return -1;
3261         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3262                 return 0;
3263         return 1;
3264 }
3265
3266 static int
3267 vac_cmp_vtlinks(const void *left, const void *right)
3268 {
3269         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3270                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3271                 return -1;
3272         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3273                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3274                 return 1;
3275         /* bi_hi-es are equal */
3276         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3277                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3278                 return -1;
3279         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3280                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3281                 return 1;
3282         /* bi_lo-es are equal */
3283         if (((VTupleLink) left)->new_tid.ip_posid <
3284                 ((VTupleLink) right)->new_tid.ip_posid)
3285                 return -1;
3286         if (((VTupleLink) left)->new_tid.ip_posid >
3287                 ((VTupleLink) right)->new_tid.ip_posid)
3288                 return 1;
3289         return 0;
3290 }
3291
3292
3293 /*
3294  * Open all the indexes of the given relation, obtaining the specified kind
3295  * of lock on each.  Return an array of Relation pointers for the indexes
3296  * into *Irel, and the number of indexes into *nindexes.
3297  */
3298 void
3299 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3300                                  int *nindexes, Relation **Irel)
3301 {
3302         List       *indexoidlist;
3303         ListCell   *indexoidscan;
3304         int                     i;
3305
3306         indexoidlist = RelationGetIndexList(relation);
3307
3308         *nindexes = list_length(indexoidlist);
3309
3310         if (*nindexes > 0)
3311                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3312         else
3313                 *Irel = NULL;
3314
3315         i = 0;
3316         foreach(indexoidscan, indexoidlist)
3317         {
3318                 Oid                     indexoid = lfirst_oid(indexoidscan);
3319                 Relation        ind;
3320
3321                 ind = index_open(indexoid);
3322                 (*Irel)[i++] = ind;
3323                 LockRelation(ind, lockmode);
3324         }
3325
3326         list_free(indexoidlist);
3327 }
3328
3329 /*
3330  * Release the resources acquired by vac_open_indexes.  Optionally release
3331  * the locks (say NoLock to keep 'em).
3332  */
3333 void
3334 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3335 {
3336         if (Irel == NULL)
3337                 return;
3338
3339         while (nindexes--)
3340         {
3341                 Relation        ind = Irel[nindexes];
3342
3343                 if (lockmode != NoLock)
3344                         UnlockRelation(ind, lockmode);
3345                 index_close(ind);
3346         }
3347         pfree(Irel);
3348 }
3349
3350
3351 /*
3352  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3353  */
3354 bool
3355 vac_is_partial_index(Relation indrel)
3356 {
3357         /*
3358          * If the index's AM doesn't support nulls, it's partial for our purposes
3359          */
3360         if (!indrel->rd_am->amindexnulls)
3361                 return true;
3362
3363         /* Otherwise, look to see if there's a partial-index predicate */
3364         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3365                 return true;
3366
3367         return false;
3368 }
3369
3370
3371 static bool
3372 enough_space(VacPage vacpage, Size len)
3373 {
3374         len = MAXALIGN(len);
3375
3376         if (len > vacpage->free)
3377                 return false;
3378
3379         /* if there are free itemid(s) and len <= free_space... */
3380         if (vacpage->offsets_used < vacpage->offsets_free)
3381                 return true;
3382
3383         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3384         if (len + sizeof(ItemIdData) <= vacpage->free)
3385                 return true;
3386
3387         return false;
3388 }
3389
3390 static Size
3391 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3392 {
3393         PageHeader      pd = (PageHeader) page;
3394         Size            pagefree = HeapGetPageFreeSpace(relation);
3395         Size            freespace = pd->pd_upper - pd->pd_lower;
3396
3397         if (freespace > pagefree)
3398                 return freespace - pagefree;
3399         else
3400                 return 0;
3401 }
3402
3403 /*
3404  * vacuum_delay_point --- check for interrupts and cost-based delay.
3405  *
3406  * This should be called in each major loop of VACUUM processing,
3407  * typically once per page processed.
3408  */
3409 void
3410 vacuum_delay_point(void)
3411 {
3412         /* Always check for interrupts */
3413         CHECK_FOR_INTERRUPTS();
3414
3415         /* Nap if appropriate */
3416         if (VacuumCostActive && !InterruptPending &&
3417                 VacuumCostBalance >= VacuumCostLimit)
3418         {
3419                 int                     msec;
3420
3421                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3422                 if (msec > VacuumCostDelay * 4)
3423                         msec = VacuumCostDelay * 4;
3424
3425                 pg_usleep(msec * 1000L);
3426
3427                 VacuumCostBalance = 0;
3428
3429                 /* Might have gotten an interrupt while sleeping */
3430                 CHECK_FOR_INTERRUPTS();
3431         }
3432 }