]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Unite ReadBufferWithFork, ReadBufferWithStrategy, and ZeroOrReadBuffer
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.379 2008/10/31 15:05:00 heikki Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_namespace.h"
34 #include "commands/dbcommands.h"
35 #include "commands/vacuum.h"
36 #include "executor/executor.h"
37 #include "miscadmin.h"
38 #include "pgstat.h"
39 #include "postmaster/autovacuum.h"
40 #include "storage/bufmgr.h"
41 #include "storage/freespace.h"
42 #include "storage/lmgr.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/builtins.h"
47 #include "utils/flatfiles.h"
48 #include "utils/fmgroids.h"
49 #include "utils/inval.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/pg_rusage.h"
53 #include "utils/relcache.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56 #include "utils/tqual.h"
57
58
59 /*
60  * GUC parameters
61  */
62 int                     vacuum_freeze_min_age;
63
64 /*
65  * VacPage structures keep track of each page on which we find useful
66  * amounts of free space.
67  */
68 typedef struct VacPageData
69 {
70         BlockNumber blkno;                      /* BlockNumber of this Page */
71         Size            free;                   /* FreeSpace on this Page */
72         uint16          offsets_used;   /* Number of OffNums used by vacuum */
73         uint16          offsets_free;   /* Number of OffNums free or to be free */
74         OffsetNumber offsets[1];        /* Array of free OffNums */
75 } VacPageData;
76
77 typedef VacPageData *VacPage;
78
79 typedef struct VacPageListData
80 {
81         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
82         int                     num_pages;              /* Number of pages in pagedesc */
83         int                     num_allocated_pages;    /* Number of allocated pages in
84                                                                                  * pagedesc */
85         VacPage    *pagedesc;           /* Descriptions of pages */
86 } VacPageListData;
87
88 typedef VacPageListData *VacPageList;
89
90 /*
91  * The "vtlinks" array keeps information about each recently-updated tuple
92  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
93  * We store the tuple's own TID as well as its t_ctid (its link to the next
94  * newer tuple version).  Searching in this array allows us to follow update
95  * chains backwards from newer to older tuples.  When we move a member of an
96  * update chain, we must move *all* the live members of the chain, so that we
97  * can maintain their t_ctid link relationships (we must not just overwrite
98  * t_ctid in an existing tuple).
99  *
100  * Note: because t_ctid links can be stale (this would only occur if a prior
101  * VACUUM crashed partway through), it is possible that new_tid points to an
102  * empty slot or unrelated tuple.  We have to check the linkage as we follow
103  * it, just as is done in EvalPlanQual.
104  */
105 typedef struct VTupleLinkData
106 {
107         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
108         ItemPointerData this_tid;       /* t_self of the tuple */
109 } VTupleLinkData;
110
111 typedef VTupleLinkData *VTupleLink;
112
113 /*
114  * We use an array of VTupleMoveData to plan a chain tuple move fully
115  * before we do it.
116  */
117 typedef struct VTupleMoveData
118 {
119         ItemPointerData tid;            /* tuple ID */
120         VacPage         vacpage;                /* where to move it to */
121         bool            cleanVpd;               /* clean vacpage before using? */
122 } VTupleMoveData;
123
124 typedef VTupleMoveData *VTupleMove;
125
126 /*
127  * VRelStats contains the data acquired by scan_heap for use later
128  */
129 typedef struct VRelStats
130 {
131         /* miscellaneous statistics */
132         BlockNumber rel_pages;          /* pages in relation */
133         double          rel_tuples;             /* tuples that remain after vacuuming */
134         double          rel_indexed_tuples;             /* indexed tuples that remain */
135         Size            min_tlen;               /* min surviving tuple size */
136         Size            max_tlen;               /* max surviving tuple size */
137         bool            hasindex;
138         /* vtlinks array for tuple chain following - sorted by new_tid */
139         int                     num_vtlinks;
140         VTupleLink      vtlinks;
141 } VRelStats;
142
143 /*----------------------------------------------------------------------
144  * ExecContext:
145  *
146  * As these variables always appear together, we put them into one struct
147  * and pull initialization and cleanup into separate routines.
148  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
149  * accurately:  It is *used* only in move_xxx_tuple(), but because this
150  * routine is called many times, we initialize the struct just once in
151  * repair_frag() and pass it on to move_xxx_tuple().
152  */
153 typedef struct ExecContextData
154 {
155         ResultRelInfo *resultRelInfo;
156         EState     *estate;
157         TupleTableSlot *slot;
158 } ExecContextData;
159
160 typedef ExecContextData *ExecContext;
161
162 static void
163 ExecContext_Init(ExecContext ec, Relation rel)
164 {
165         TupleDesc       tupdesc = RelationGetDescr(rel);
166
167         /*
168          * We need a ResultRelInfo and an EState so we can use the regular
169          * executor's index-entry-making machinery.
170          */
171         ec->estate = CreateExecutorState();
172
173         ec->resultRelInfo = makeNode(ResultRelInfo);
174         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
175         ec->resultRelInfo->ri_RelationDesc = rel;
176         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
177
178         ExecOpenIndices(ec->resultRelInfo);
179
180         ec->estate->es_result_relations = ec->resultRelInfo;
181         ec->estate->es_num_result_relations = 1;
182         ec->estate->es_result_relation_info = ec->resultRelInfo;
183
184         /* Set up a tuple slot too */
185         ec->slot = MakeSingleTupleTableSlot(tupdesc);
186 }
187
188 static void
189 ExecContext_Finish(ExecContext ec)
190 {
191         ExecDropSingleTupleTableSlot(ec->slot);
192         ExecCloseIndices(ec->resultRelInfo);
193         FreeExecutorState(ec->estate);
194 }
195
196 /*
197  * End of ExecContext Implementation
198  *----------------------------------------------------------------------
199  */
200
201 /* A few variables that don't seem worth passing around as parameters */
202 static MemoryContext vac_context = NULL;
203
204 static int      elevel = -1;
205
206 static TransactionId OldestXmin;
207 static TransactionId FreezeLimit;
208
209 static BufferAccessStrategy vac_strategy;
210
211
212 /* non-export function prototypes */
213 static List *get_rel_oids(Oid relid, const RangeVar *vacrel,
214                          const char *stmttype);
215 static void vac_truncate_clog(TransactionId frozenXID);
216 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast,
217                    bool for_wraparound);
218 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
219 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
220                   VacPageList vacuum_pages, VacPageList fraged_pages);
221 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
222                         VacPageList vacuum_pages, VacPageList fraged_pages,
223                         int nindexes, Relation *Irel);
224 static void move_chain_tuple(Relation rel,
225                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
226                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
227                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
228 static void move_plain_tuple(Relation rel,
229                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
230                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
231                                  ExecContext ec);
232 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
233                                  int num_fraged_pages, BlockNumber last_move_dest_block,
234                                  int num_moved);
235 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
236                         VacPageList vacpagelist);
237 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
238 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
239                          double num_tuples, int keep_tuples);
240 static void scan_index(Relation indrel, double num_tuples);
241 static bool tid_reaped(ItemPointer itemptr, void *state);
242 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
243                            BlockNumber rel_pages);
244 static VacPage copy_vac_page(VacPage vacpage);
245 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
246 static void *vac_bsearch(const void *key, const void *base,
247                         size_t nelem, size_t size,
248                         int (*compar) (const void *, const void *));
249 static int      vac_cmp_blk(const void *left, const void *right);
250 static int      vac_cmp_offno(const void *left, const void *right);
251 static int      vac_cmp_vtlinks(const void *left, const void *right);
252 static bool enough_space(VacPage vacpage, Size len);
253 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
254
255
256 /****************************************************************************
257  *                                                                                                                                                      *
258  *                      Code common to all flavors of VACUUM and ANALYZE                                *
259  *                                                                                                                                                      *
260  ****************************************************************************
261  */
262
263
264 /*
265  * Primary entry point for VACUUM and ANALYZE commands.
266  *
267  * relid is normally InvalidOid; if it is not, then it provides the relation
268  * OID to be processed, and vacstmt->relation is ignored.  (The non-invalid
269  * case is currently only used by autovacuum.)
270  *
271  * do_toast is passed as FALSE by autovacuum, because it processes TOAST
272  * tables separately.
273  *
274  * for_wraparound is used by autovacuum to let us know when it's forcing
275  * a vacuum for wraparound, which should not be auto-cancelled.
276  *
277  * bstrategy is normally given as NULL, but in autovacuum it can be passed
278  * in to use the same buffer strategy object across multiple vacuum() calls.
279  *
280  * isTopLevel should be passed down from ProcessUtility.
281  *
282  * It is the caller's responsibility that vacstmt and bstrategy
283  * (if given) be allocated in a memory context that won't disappear
284  * at transaction commit.
285  */
286 void
287 vacuum(VacuumStmt *vacstmt, Oid relid, bool do_toast,
288            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
289 {
290         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
291         volatile MemoryContext anl_context = NULL;
292         volatile bool all_rels,
293                                 in_outer_xact,
294                                 use_own_xacts;
295         List       *relations;
296
297         if (vacstmt->verbose)
298                 elevel = INFO;
299         else
300                 elevel = DEBUG2;
301
302         /*
303          * We cannot run VACUUM inside a user transaction block; if we were inside
304          * a transaction, then our commit- and start-transaction-command calls
305          * would not have the intended effect! Furthermore, the forced commit that
306          * occurs before truncating the relation's file would have the effect of
307          * committing the rest of the user's transaction too, which would
308          * certainly not be the desired behavior.  (This only applies to VACUUM
309          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
310          * block, but we choose to disallow that case because we'd rather commit
311          * as soon as possible after finishing the vacuum.      This is mainly so that
312          * we can let go the AccessExclusiveLock that we may be holding.)
313          *
314          * ANALYZE (without VACUUM) can run either way.
315          */
316         if (vacstmt->vacuum)
317         {
318                 PreventTransactionChain(isTopLevel, stmttype);
319                 in_outer_xact = false;
320         }
321         else
322                 in_outer_xact = IsInTransactionChain(isTopLevel);
323
324         /*
325          * Send info about dead objects to the statistics collector, unless we are
326          * in autovacuum --- autovacuum.c does this for itself.
327          */
328         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
329                 pgstat_vacuum_stat();
330
331         /*
332          * Create special memory context for cross-transaction storage.
333          *
334          * Since it is a child of PortalContext, it will go away eventually even
335          * if we suffer an error; there's no need for special abort cleanup logic.
336          */
337         vac_context = AllocSetContextCreate(PortalContext,
338                                                                                 "Vacuum",
339                                                                                 ALLOCSET_DEFAULT_MINSIZE,
340                                                                                 ALLOCSET_DEFAULT_INITSIZE,
341                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
342
343         /*
344          * If caller didn't give us a buffer strategy object, make one in the
345          * cross-transaction memory context.
346          */
347         if (bstrategy == NULL)
348         {
349                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
350
351                 bstrategy = GetAccessStrategy(BAS_VACUUM);
352                 MemoryContextSwitchTo(old_context);
353         }
354         vac_strategy = bstrategy;
355
356         /* Remember whether we are processing everything in the DB */
357         all_rels = (!OidIsValid(relid) && vacstmt->relation == NULL);
358
359         /*
360          * Build list of relations to process, unless caller gave us one. (If we
361          * build one, we put it in vac_context for safekeeping.)
362          */
363         relations = get_rel_oids(relid, vacstmt->relation, stmttype);
364
365         /*
366          * Decide whether we need to start/commit our own transactions.
367          *
368          * For VACUUM (with or without ANALYZE): always do so, so that we can
369          * release locks as soon as possible.  (We could possibly use the outer
370          * transaction for a one-table VACUUM, but handling TOAST tables would be
371          * problematic.)
372          *
373          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
374          * start/commit our own transactions.  Also, there's no need to do so if
375          * only processing one relation.  For multiple relations when not within a
376          * transaction block, and also in an autovacuum worker, use own
377          * transactions so we can release locks sooner.
378          */
379         if (vacstmt->vacuum)
380                 use_own_xacts = true;
381         else
382         {
383                 Assert(vacstmt->analyze);
384                 if (IsAutoVacuumWorkerProcess())
385                         use_own_xacts = true;
386                 else if (in_outer_xact)
387                         use_own_xacts = false;
388                 else if (list_length(relations) > 1)
389                         use_own_xacts = true;
390                 else
391                         use_own_xacts = false;
392         }
393
394         /*
395          * If we are running ANALYZE without per-table transactions, we'll need a
396          * memory context with table lifetime.
397          */
398         if (!use_own_xacts)
399                 anl_context = AllocSetContextCreate(PortalContext,
400                                                                                         "Analyze",
401                                                                                         ALLOCSET_DEFAULT_MINSIZE,
402                                                                                         ALLOCSET_DEFAULT_INITSIZE,
403                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
404
405         /*
406          * vacuum_rel expects to be entered with no transaction active; it will
407          * start and commit its own transaction.  But we are called by an SQL
408          * command, and so we are executing inside a transaction already. We
409          * commit the transaction started in PostgresMain() here, and start
410          * another one before exiting to match the commit waiting for us back in
411          * PostgresMain().
412          */
413         if (use_own_xacts)
414         {
415                 /* ActiveSnapshot is not set by autovacuum */
416                 if (ActiveSnapshotSet())
417                         PopActiveSnapshot();
418
419                 /* matches the StartTransaction in PostgresMain() */
420                 CommitTransactionCommand();
421         }
422
423         /* Turn vacuum cost accounting on or off */
424         PG_TRY();
425         {
426                 ListCell   *cur;
427
428                 VacuumCostActive = (VacuumCostDelay > 0);
429                 VacuumCostBalance = 0;
430
431                 /*
432                  * Loop to process each selected relation.
433                  */
434                 foreach(cur, relations)
435                 {
436                         Oid                     relid = lfirst_oid(cur);
437
438                         if (vacstmt->vacuum)
439                                 vacuum_rel(relid, vacstmt, do_toast, for_wraparound);
440
441                         if (vacstmt->analyze)
442                         {
443                                 MemoryContext old_context = NULL;
444
445                                 /*
446                                  * If using separate xacts, start one for analyze. Otherwise,
447                                  * we can use the outer transaction, but we still need to call
448                                  * analyze_rel in a memory context that will be cleaned up on
449                                  * return (else we leak memory while processing multiple
450                                  * tables).
451                                  */
452                                 if (use_own_xacts)
453                                 {
454                                         StartTransactionCommand();
455                                         /* functions in indexes may want a snapshot set */
456                                         PushActiveSnapshot(GetTransactionSnapshot());
457                                 }
458                                 else
459                                         old_context = MemoryContextSwitchTo(anl_context);
460
461                                 analyze_rel(relid, vacstmt, vac_strategy);
462
463                                 if (use_own_xacts)
464                                 {
465                                         PopActiveSnapshot();
466                                         CommitTransactionCommand();
467                                 }
468                                 else
469                                 {
470                                         MemoryContextSwitchTo(old_context);
471                                         MemoryContextResetAndDeleteChildren(anl_context);
472                                 }
473                         }
474                 }
475         }
476         PG_CATCH();
477         {
478                 /* Make sure cost accounting is turned off after error */
479                 VacuumCostActive = false;
480                 PG_RE_THROW();
481         }
482         PG_END_TRY();
483
484         /* Turn off vacuum cost accounting */
485         VacuumCostActive = false;
486
487         /*
488          * Finish up processing.
489          */
490         if (use_own_xacts)
491         {
492                 /* here, we are not in a transaction */
493
494                 /*
495                  * This matches the CommitTransaction waiting for us in
496                  * PostgresMain().
497                  */
498                 StartTransactionCommand();
499         }
500
501         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
502         {
503                 /*
504                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
505                  * (autovacuum.c does this for itself.)
506                  */
507                 vac_update_datfrozenxid();
508         }
509
510         /*
511          * Clean up working storage --- note we must do this after
512          * StartTransactionCommand, else we might be trying to delete the active
513          * context!
514          */
515         MemoryContextDelete(vac_context);
516         vac_context = NULL;
517
518         if (anl_context)
519                 MemoryContextDelete(anl_context);
520 }
521
522 /*
523  * Build a list of Oids for each relation to be processed
524  *
525  * The list is built in vac_context so that it will survive across our
526  * per-relation transactions.
527  */
528 static List *
529 get_rel_oids(Oid relid, const RangeVar *vacrel, const char *stmttype)
530 {
531         List       *oid_list = NIL;
532         MemoryContext oldcontext;
533
534         /* OID supplied by VACUUM's caller? */
535         if (OidIsValid(relid))
536         {
537                 oldcontext = MemoryContextSwitchTo(vac_context);
538                 oid_list = lappend_oid(oid_list, relid);
539                 MemoryContextSwitchTo(oldcontext);
540         }
541         else if (vacrel)
542         {
543                 /* Process a specific relation */
544                 Oid                     relid;
545
546                 relid = RangeVarGetRelid(vacrel, false);
547
548                 /* Make a relation list entry for this guy */
549                 oldcontext = MemoryContextSwitchTo(vac_context);
550                 oid_list = lappend_oid(oid_list, relid);
551                 MemoryContextSwitchTo(oldcontext);
552         }
553         else
554         {
555                 /* Process all plain relations listed in pg_class */
556                 Relation        pgclass;
557                 HeapScanDesc scan;
558                 HeapTuple       tuple;
559                 ScanKeyData key;
560
561                 ScanKeyInit(&key,
562                                         Anum_pg_class_relkind,
563                                         BTEqualStrategyNumber, F_CHAREQ,
564                                         CharGetDatum(RELKIND_RELATION));
565
566                 pgclass = heap_open(RelationRelationId, AccessShareLock);
567
568                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
569
570                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
571                 {
572                         /* Make a relation list entry for this guy */
573                         oldcontext = MemoryContextSwitchTo(vac_context);
574                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
575                         MemoryContextSwitchTo(oldcontext);
576                 }
577
578                 heap_endscan(scan);
579                 heap_close(pgclass, AccessShareLock);
580         }
581
582         return oid_list;
583 }
584
585 /*
586  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
587  */
588 void
589 vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
590                                           TransactionId *oldestXmin,
591                                           TransactionId *freezeLimit)
592 {
593         int                     freezemin;
594         TransactionId limit;
595         TransactionId safeLimit;
596
597         /*
598          * We can always ignore processes running lazy vacuum.  This is because we
599          * use these values only for deciding which tuples we must keep in the
600          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
601          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
602          * a full vacuum, but keep in mind that only one vacuum process can be
603          * working on a particular table at any time, and that each vacuum is
604          * always an independent transaction.
605          */
606         *oldestXmin = GetOldestXmin(sharedRel, true);
607
608         Assert(TransactionIdIsNormal(*oldestXmin));
609
610         /*
611          * Determine the minimum freeze age to use: as specified by the caller, or
612          * vacuum_freeze_min_age, but in any case not more than half
613          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
614          * wraparound won't occur too frequently.
615          */
616         freezemin = freeze_min_age;
617         if (freezemin < 0)
618                 freezemin = vacuum_freeze_min_age;
619         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
620         Assert(freezemin >= 0);
621
622         /*
623          * Compute the cutoff XID, being careful not to generate a "permanent" XID
624          */
625         limit = *oldestXmin - freezemin;
626         if (!TransactionIdIsNormal(limit))
627                 limit = FirstNormalTransactionId;
628
629         /*
630          * If oldestXmin is very far back (in practice, more than
631          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
632          * freeze age of zero.
633          */
634         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
635         if (!TransactionIdIsNormal(safeLimit))
636                 safeLimit = FirstNormalTransactionId;
637
638         if (TransactionIdPrecedes(limit, safeLimit))
639         {
640                 ereport(WARNING,
641                                 (errmsg("oldest xmin is far in the past"),
642                                  errhint("Close open transactions soon to avoid wraparound problems.")));
643                 limit = *oldestXmin;
644         }
645
646         *freezeLimit = limit;
647 }
648
649
650 /*
651  *      vac_update_relstats() -- update statistics for one relation
652  *
653  *              Update the whole-relation statistics that are kept in its pg_class
654  *              row.  There are additional stats that will be updated if we are
655  *              doing ANALYZE, but we always update these stats.  This routine works
656  *              for both index and heap relation entries in pg_class.
657  *
658  *              We violate transaction semantics here by overwriting the rel's
659  *              existing pg_class tuple with the new values.  This is reasonably
660  *              safe since the new values are correct whether or not this transaction
661  *              commits.  The reason for this is that if we updated these tuples in
662  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
663  *              by the time we got done with a vacuum cycle, most of the tuples in
664  *              pg_class would've been obsoleted.  Of course, this only works for
665  *              fixed-size never-null columns, but these are.
666  *
667  *              Another reason for doing it this way is that when we are in a lazy
668  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
669  *              somebody vacuuming pg_class might think they could delete a tuple
670  *              marked with xmin = our xid.
671  *
672  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
673  *              ANALYZE.
674  */
675 void
676 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
677                                         bool hasindex, TransactionId frozenxid)
678 {
679         Relation        rd;
680         HeapTuple       ctup;
681         Form_pg_class pgcform;
682         bool            dirty;
683
684         rd = heap_open(RelationRelationId, RowExclusiveLock);
685
686         /* Fetch a copy of the tuple to scribble on */
687         ctup = SearchSysCacheCopy(RELOID,
688                                                           ObjectIdGetDatum(relid),
689                                                           0, 0, 0);
690         if (!HeapTupleIsValid(ctup))
691                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
692                          relid);
693         pgcform = (Form_pg_class) GETSTRUCT(ctup);
694
695         /* Apply required updates, if any, to copied tuple */
696
697         dirty = false;
698         if (pgcform->relpages != (int32) num_pages)
699         {
700                 pgcform->relpages = (int32) num_pages;
701                 dirty = true;
702         }
703         if (pgcform->reltuples != (float4) num_tuples)
704         {
705                 pgcform->reltuples = (float4) num_tuples;
706                 dirty = true;
707         }
708         if (pgcform->relhasindex != hasindex)
709         {
710                 pgcform->relhasindex = hasindex;
711                 dirty = true;
712         }
713
714         /*
715          * If we have discovered that there are no indexes, then there's no
716          * primary key either.  This could be done more thoroughly...
717          */
718         if (!hasindex)
719         {
720                 if (pgcform->relhaspkey)
721                 {
722                         pgcform->relhaspkey = false;
723                         dirty = true;
724                 }
725         }
726
727         /*
728          * relfrozenxid should never go backward.  Caller can pass
729          * InvalidTransactionId if it has no new data.
730          */
731         if (TransactionIdIsNormal(frozenxid) &&
732                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
733         {
734                 pgcform->relfrozenxid = frozenxid;
735                 dirty = true;
736         }
737
738         /*
739          * If anything changed, write out the tuple.  Even if nothing changed,
740          * force relcache invalidation so all backends reset their rd_targblock
741          * --- otherwise it might point to a page we truncated away.
742          */
743         if (dirty)
744         {
745                 heap_inplace_update(rd, ctup);
746                 /* the above sends a cache inval message */
747         }
748         else
749         {
750                 /* no need to change tuple, but force relcache inval anyway */
751                 CacheInvalidateRelcacheByTuple(ctup);
752         }
753
754         heap_close(rd, RowExclusiveLock);
755 }
756
757
758 /*
759  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
760  *
761  *              Update pg_database's datfrozenxid entry for our database to be the
762  *              minimum of the pg_class.relfrozenxid values.  If we are able to
763  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
764  *
765  *              We violate transaction semantics here by overwriting the database's
766  *              existing pg_database tuple with the new value.  This is reasonably
767  *              safe since the new value is correct whether or not this transaction
768  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
769  *              behind after a VACUUM.
770  *
771  *              This routine is shared by full and lazy VACUUM.
772  */
773 void
774 vac_update_datfrozenxid(void)
775 {
776         HeapTuple       tuple;
777         Form_pg_database dbform;
778         Relation        relation;
779         SysScanDesc scan;
780         HeapTuple       classTup;
781         TransactionId newFrozenXid;
782         bool            dirty = false;
783
784         /*
785          * Initialize the "min" calculation with GetOldestXmin, which is a
786          * reasonable approximation to the minimum relfrozenxid for not-yet-
787          * committed pg_class entries for new tables; see AddNewRelationTuple().
788          * Se we cannot produce a wrong minimum by starting with this.
789          */
790         newFrozenXid = GetOldestXmin(true, true);
791
792         /*
793          * We must seqscan pg_class to find the minimum Xid, because there is no
794          * index that can help us here.
795          */
796         relation = heap_open(RelationRelationId, AccessShareLock);
797
798         scan = systable_beginscan(relation, InvalidOid, false,
799                                                           SnapshotNow, 0, NULL);
800
801         while ((classTup = systable_getnext(scan)) != NULL)
802         {
803                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
804
805                 /*
806                  * Only consider heap and TOAST tables (anything else should have
807                  * InvalidTransactionId in relfrozenxid anyway.)
808                  */
809                 if (classForm->relkind != RELKIND_RELATION &&
810                         classForm->relkind != RELKIND_TOASTVALUE)
811                         continue;
812
813                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
814
815                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
816                         newFrozenXid = classForm->relfrozenxid;
817         }
818
819         /* we're done with pg_class */
820         systable_endscan(scan);
821         heap_close(relation, AccessShareLock);
822
823         Assert(TransactionIdIsNormal(newFrozenXid));
824
825         /* Now fetch the pg_database tuple we need to update. */
826         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
827
828         /* Fetch a copy of the tuple to scribble on */
829         tuple = SearchSysCacheCopy(DATABASEOID,
830                                                            ObjectIdGetDatum(MyDatabaseId),
831                                                            0, 0, 0);
832         if (!HeapTupleIsValid(tuple))
833                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
834         dbform = (Form_pg_database) GETSTRUCT(tuple);
835
836         /*
837          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
838          * and detect the common case where it doesn't go forward either.
839          */
840         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
841         {
842                 dbform->datfrozenxid = newFrozenXid;
843                 dirty = true;
844         }
845
846         if (dirty)
847                 heap_inplace_update(relation, tuple);
848
849         heap_freetuple(tuple);
850         heap_close(relation, RowExclusiveLock);
851
852         /*
853          * If we were able to advance datfrozenxid, mark the flat-file copy of
854          * pg_database for update at commit, and see if we can truncate pg_clog.
855          */
856         if (dirty)
857         {
858                 database_file_update_needed();
859                 vac_truncate_clog(newFrozenXid);
860         }
861 }
862
863
864 /*
865  *      vac_truncate_clog() -- attempt to truncate the commit log
866  *
867  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
868  *              and use it to truncate the transaction commit log (pg_clog).
869  *              Also update the XID wrap limit info maintained by varsup.c.
870  *
871  *              The passed XID is simply the one I just wrote into my pg_database
872  *              entry.  It's used to initialize the "min" calculation.
873  *
874  *              This routine is shared by full and lazy VACUUM.  Note that it's
875  *              only invoked when we've managed to change our DB's datfrozenxid
876  *              entry.
877  */
878 static void
879 vac_truncate_clog(TransactionId frozenXID)
880 {
881         TransactionId myXID = GetCurrentTransactionId();
882         Relation        relation;
883         HeapScanDesc scan;
884         HeapTuple       tuple;
885         NameData        oldest_datname;
886         bool            frozenAlreadyWrapped = false;
887
888         /* init oldest_datname to sync with my frozenXID */
889         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
890
891         /*
892          * Scan pg_database to compute the minimum datfrozenxid
893          *
894          * Note: we need not worry about a race condition with new entries being
895          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
896          * existing DB's datfrozenxid, and that source DB cannot be ours because
897          * of the interlock against copying a DB containing an active backend.
898          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
899          * concurrently modify the datfrozenxid's of different databases, the
900          * worst possible outcome is that pg_clog is not truncated as aggressively
901          * as it could be.
902          */
903         relation = heap_open(DatabaseRelationId, AccessShareLock);
904
905         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
906
907         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
908         {
909                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
910
911                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
912
913                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
914                         frozenAlreadyWrapped = true;
915                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
916                 {
917                         frozenXID = dbform->datfrozenxid;
918                         namecpy(&oldest_datname, &dbform->datname);
919                 }
920         }
921
922         heap_endscan(scan);
923
924         heap_close(relation, AccessShareLock);
925
926         /*
927          * Do not truncate CLOG if we seem to have suffered wraparound already;
928          * the computed minimum XID might be bogus.  This case should now be
929          * impossible due to the defenses in GetNewTransactionId, but we keep the
930          * test anyway.
931          */
932         if (frozenAlreadyWrapped)
933         {
934                 ereport(WARNING,
935                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
936                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
937                 return;
938         }
939
940         /* Truncate CLOG to the oldest frozenxid */
941         TruncateCLOG(frozenXID);
942
943         /*
944          * Update the wrap limit for GetNewTransactionId.  Note: this function
945          * will also signal the postmaster for an(other) autovac cycle if needed.
946          */
947         SetTransactionIdLimit(frozenXID, &oldest_datname);
948 }
949
950
951 /****************************************************************************
952  *                                                                                                                                                      *
953  *                      Code common to both flavors of VACUUM                                                   *
954  *                                                                                                                                                      *
955  ****************************************************************************
956  */
957
958
959 /*
960  *      vacuum_rel() -- vacuum one heap relation
961  *
962  *              Doing one heap at a time incurs extra overhead, since we need to
963  *              check that the heap exists again just before we vacuum it.      The
964  *              reason that we do this is so that vacuuming can be spread across
965  *              many small transactions.  Otherwise, two-phase locking would require
966  *              us to lock the entire database during one pass of the vacuum cleaner.
967  *
968  *              At entry and exit, we are not inside a transaction.
969  */
970 static void
971 vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound)
972 {
973         LOCKMODE        lmode;
974         Relation        onerel;
975         LockRelId       onerelid;
976         Oid                     toast_relid;
977         Oid                     save_userid;
978         bool            save_secdefcxt;
979
980         /* Begin a transaction for vacuuming this relation */
981         StartTransactionCommand();
982
983         /*
984          * Functions in indexes may want a snapshot set.  Also, setting
985          * a snapshot ensures that RecentGlobalXmin is kept truly recent.
986          */
987         PushActiveSnapshot(GetTransactionSnapshot());
988
989         if (!vacstmt->full)
990         {
991                 /*
992                  * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets other
993                  * concurrent VACUUMs know that they can ignore this one while
994                  * determining their OldestXmin.  (The reason we don't set it during a
995                  * full VACUUM is exactly that we may have to run user- defined
996                  * functions for functional indexes, and we want to make sure that if
997                  * they use the snapshot set above, any tuples it requires can't get
998                  * removed from other tables.  An index function that depends on the
999                  * contents of other tables is arguably broken, but we won't break it
1000                  * here by violating transaction semantics.)
1001                  *
1002                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down
1003                  * by autovacuum; it's used to avoid cancelling a vacuum that was
1004                  * invoked in an emergency.
1005                  *
1006                  * Note: these flags remain set until CommitTransaction or
1007                  * AbortTransaction.  We don't want to clear them until we reset
1008                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1009                  * which is probably Not Good.
1010                  */
1011                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1012                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1013                 if (for_wraparound)
1014                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1015                 LWLockRelease(ProcArrayLock);
1016         }
1017
1018         /*
1019          * Check for user-requested abort.      Note we want this to be inside a
1020          * transaction, so xact.c doesn't issue useless WARNING.
1021          */
1022         CHECK_FOR_INTERRUPTS();
1023
1024         /*
1025          * Determine the type of lock we want --- hard exclusive lock for a FULL
1026          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1027          * way, we can be sure that no other backend is vacuuming the same table.
1028          */
1029         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1030
1031         /*
1032          * Open the relation and get the appropriate lock on it.
1033          *
1034          * There's a race condition here: the rel may have gone away since the
1035          * last time we saw it.  If so, we don't need to vacuum it.
1036          */
1037         onerel = try_relation_open(relid, lmode);
1038
1039         if (!onerel)
1040         {
1041                 PopActiveSnapshot();
1042                 CommitTransactionCommand();
1043                 return;
1044         }
1045
1046         /*
1047          * Check permissions.
1048          *
1049          * We allow the user to vacuum a table if he is superuser, the table
1050          * owner, or the database owner (but in the latter case, only if it's not
1051          * a shared relation).  pg_class_ownercheck includes the superuser case.
1052          *
1053          * Note we choose to treat permissions failure as a WARNING and keep
1054          * trying to vacuum the rest of the DB --- is this appropriate?
1055          */
1056         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1057                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1058         {
1059                 if (onerel->rd_rel->relisshared)
1060                         ereport(WARNING,
1061                                         (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1062                                                         RelationGetRelationName(onerel))));
1063                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1064                         ereport(WARNING,
1065                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1066                                                         RelationGetRelationName(onerel))));
1067                 else
1068                         ereport(WARNING,
1069                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1070                                                         RelationGetRelationName(onerel))));
1071                 relation_close(onerel, lmode);
1072                 PopActiveSnapshot();
1073                 CommitTransactionCommand();
1074                 return;
1075         }
1076
1077         /*
1078          * Check that it's a vacuumable table; we used to do this in get_rel_oids()
1079          * but seems safer to check after we've locked the relation.
1080          */
1081         if (onerel->rd_rel->relkind != RELKIND_RELATION &&
1082                 onerel->rd_rel->relkind != RELKIND_TOASTVALUE)
1083         {
1084                 ereport(WARNING,
1085                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1086                                                 RelationGetRelationName(onerel))));
1087                 relation_close(onerel, lmode);
1088                 PopActiveSnapshot();
1089                 CommitTransactionCommand();
1090                 return;
1091         }
1092
1093         /*
1094          * Silently ignore tables that are temp tables of other backends ---
1095          * trying to vacuum these will lead to great unhappiness, since their
1096          * contents are probably not up-to-date on disk.  (We don't throw a
1097          * warning here; it would just lead to chatter during a database-wide
1098          * VACUUM.)
1099          */
1100         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1101         {
1102                 relation_close(onerel, lmode);
1103                 PopActiveSnapshot();
1104                 CommitTransactionCommand();
1105                 return;
1106         }
1107
1108         /*
1109          * Get a session-level lock too. This will protect our access to the
1110          * relation across multiple transactions, so that we can vacuum the
1111          * relation's TOAST table (if any) secure in the knowledge that no one is
1112          * deleting the parent relation.
1113          *
1114          * NOTE: this cannot block, even if someone else is waiting for access,
1115          * because the lock manager knows that both lock requests are from the
1116          * same process.
1117          */
1118         onerelid = onerel->rd_lockInfo.lockRelId;
1119         LockRelationIdForSession(&onerelid, lmode);
1120
1121         /*
1122          * Remember the relation's TOAST relation for later, if the caller asked
1123          * us to process it.
1124          */
1125         if (do_toast)
1126                 toast_relid = onerel->rd_rel->reltoastrelid;
1127         else
1128                 toast_relid = InvalidOid;
1129
1130         /*
1131          * Switch to the table owner's userid, so that any index functions are
1132          * run as that user.  (This is unnecessary, but harmless, for lazy
1133          * VACUUM.)
1134          */
1135         GetUserIdAndContext(&save_userid, &save_secdefcxt);
1136         SetUserIdAndContext(onerel->rd_rel->relowner, true);
1137
1138         /*
1139          * Do the actual work --- either FULL or "lazy" vacuum
1140          */
1141         if (vacstmt->full)
1142                 full_vacuum_rel(onerel, vacstmt);
1143         else
1144                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy);
1145
1146         /* Restore userid */
1147         SetUserIdAndContext(save_userid, save_secdefcxt);
1148
1149         /* all done with this class, but hold lock until commit */
1150         relation_close(onerel, NoLock);
1151
1152         /*
1153          * Complete the transaction and free all temporary memory used.
1154          */
1155         PopActiveSnapshot();
1156         CommitTransactionCommand();
1157
1158         /*
1159          * If the relation has a secondary toast rel, vacuum that too while we
1160          * still hold the session lock on the master table.  Note however that
1161          * "analyze" will not get done on the toast table.      This is good, because
1162          * the toaster always uses hardcoded index access and statistics are
1163          * totally unimportant for toast relations.
1164          */
1165         if (toast_relid != InvalidOid)
1166                 vacuum_rel(toast_relid, vacstmt, false, for_wraparound);
1167
1168         /*
1169          * Now release the session-level lock on the master table.
1170          */
1171         UnlockRelationIdForSession(&onerelid, lmode);
1172 }
1173
1174
1175 /****************************************************************************
1176  *                                                                                                                                                      *
1177  *                      Code for VACUUM FULL (only)                                                                             *
1178  *                                                                                                                                                      *
1179  ****************************************************************************
1180  */
1181
1182
1183 /*
1184  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1185  *
1186  *              This routine vacuums a single heap, cleans out its indexes, and
1187  *              updates its num_pages and num_tuples statistics.
1188  *
1189  *              At entry, we have already established a transaction and opened
1190  *              and locked the relation.
1191  */
1192 static void
1193 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1194 {
1195         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1196                                                                                  * clean indexes */
1197         VacPageListData fraged_pages;           /* List of pages with space enough for
1198                                                                                  * re-using */
1199         Relation   *Irel;
1200         int                     nindexes,
1201                                 i;
1202         VRelStats  *vacrelstats;
1203
1204         vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
1205                                                   &OldestXmin, &FreezeLimit);
1206
1207         /*
1208          * Flush any previous async-commit transactions.  This does not guarantee
1209          * that we will be able to set hint bits for tuples they inserted, but it
1210          * improves the probability, especially in simple sequential-commands
1211          * cases.  See scan_heap() and repair_frag() for more about this.
1212          */
1213         XLogAsyncCommitFlush();
1214
1215         /*
1216          * Set up statistics-gathering machinery.
1217          */
1218         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1219         vacrelstats->rel_pages = 0;
1220         vacrelstats->rel_tuples = 0;
1221         vacrelstats->rel_indexed_tuples = 0;
1222         vacrelstats->hasindex = false;
1223
1224         /* scan the heap */
1225         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1226         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1227
1228         /* Now open all indexes of the relation */
1229         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1230         if (nindexes > 0)
1231                 vacrelstats->hasindex = true;
1232
1233         /* Clean/scan index relation(s) */
1234         if (Irel != NULL)
1235         {
1236                 if (vacuum_pages.num_pages > 0)
1237                 {
1238                         for (i = 0; i < nindexes; i++)
1239                                 vacuum_index(&vacuum_pages, Irel[i],
1240                                                          vacrelstats->rel_indexed_tuples, 0);
1241                 }
1242                 else
1243                 {
1244                         /* just scan indexes to update statistic */
1245                         for (i = 0; i < nindexes; i++)
1246                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1247                 }
1248         }
1249
1250         if (fraged_pages.num_pages > 0)
1251         {
1252                 /* Try to shrink heap */
1253                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1254                                         nindexes, Irel);
1255                 vac_close_indexes(nindexes, Irel, NoLock);
1256         }
1257         else
1258         {
1259                 vac_close_indexes(nindexes, Irel, NoLock);
1260                 if (vacuum_pages.num_pages > 0)
1261                 {
1262                         /* Clean pages from vacuum_pages list */
1263                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1264                 }
1265         }
1266
1267         /* update thefree space map with final free space info, and vacuum it */
1268         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1269         FreeSpaceMapVacuum(onerel);
1270
1271         /* update statistics in pg_class */
1272         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1273                                                 vacrelstats->rel_tuples, vacrelstats->hasindex,
1274                                                 FreezeLimit);
1275
1276         /* report results to the stats collector, too */
1277         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1278                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1279 }
1280
1281
1282 /*
1283  *      scan_heap() -- scan an open heap relation
1284  *
1285  *              This routine sets commit status bits, constructs vacuum_pages (list
1286  *              of pages we need to compact free space on and/or clean indexes of
1287  *              deleted tuples), constructs fraged_pages (list of pages with free
1288  *              space that tuples could be moved into), and calculates statistics
1289  *              on the number of live tuples in the heap.
1290  */
1291 static void
1292 scan_heap(VRelStats *vacrelstats, Relation onerel,
1293                   VacPageList vacuum_pages, VacPageList fraged_pages)
1294 {
1295         BlockNumber nblocks,
1296                                 blkno;
1297         char       *relname;
1298         VacPage         vacpage;
1299         BlockNumber empty_pages,
1300                                 empty_end_pages;
1301         double          num_tuples,
1302                                 num_indexed_tuples,
1303                                 tups_vacuumed,
1304                                 nkeep,
1305                                 nunused;
1306         double          free_space,
1307                                 usable_free_space;
1308         Size            min_tlen = MaxHeapTupleSize;
1309         Size            max_tlen = 0;
1310         bool            do_shrinking = true;
1311         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1312         int                     num_vtlinks = 0;
1313         int                     free_vtlinks = 100;
1314         PGRUsage        ru0;
1315
1316         pg_rusage_init(&ru0);
1317
1318         relname = RelationGetRelationName(onerel);
1319         ereport(elevel,
1320                         (errmsg("vacuuming \"%s.%s\"",
1321                                         get_namespace_name(RelationGetNamespace(onerel)),
1322                                         relname)));
1323
1324         empty_pages = empty_end_pages = 0;
1325         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1326         free_space = 0;
1327
1328         nblocks = RelationGetNumberOfBlocks(onerel);
1329
1330         /*
1331          * We initially create each VacPage item in a maximal-sized workspace,
1332          * then copy the workspace into a just-large-enough copy.
1333          */
1334         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1335
1336         for (blkno = 0; blkno < nblocks; blkno++)
1337         {
1338                 Page            page,
1339                                         tempPage = NULL;
1340                 bool            do_reap,
1341                                         do_frag;
1342                 Buffer          buf;
1343                 OffsetNumber offnum,
1344                                         maxoff;
1345                 bool            notup;
1346                 OffsetNumber frozen[MaxOffsetNumber];
1347                 int                     nfrozen;
1348
1349                 vacuum_delay_point();
1350
1351                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1352                                                                  vac_strategy);
1353                 page = BufferGetPage(buf);
1354
1355                 /*
1356                  * Since we are holding exclusive lock on the relation, no other
1357                  * backend can be accessing the page; however it is possible that the
1358                  * background writer will try to write the page if it's already marked
1359                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1360                  * must take exclusive buffer lock wherever we potentially modify
1361                  * pages.  In fact, we insist on cleanup lock so that we can safely
1362                  * call heap_page_prune().      (This might be overkill, since the
1363                  * bgwriter pays no attention to individual tuples, but on the other
1364                  * hand it's unlikely that the bgwriter has this particular page
1365                  * pinned at this instant.      So violating the coding rule would buy us
1366                  * little anyway.)
1367                  */
1368                 LockBufferForCleanup(buf);
1369
1370                 vacpage->blkno = blkno;
1371                 vacpage->offsets_used = 0;
1372                 vacpage->offsets_free = 0;
1373
1374                 if (PageIsNew(page))
1375                 {
1376                         VacPage         vacpagecopy;
1377
1378                         ereport(WARNING,
1379                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1380                                            relname, blkno)));
1381                         PageInit(page, BufferGetPageSize(buf), 0);
1382                         MarkBufferDirty(buf);
1383                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1384                         free_space += vacpage->free;
1385                         empty_pages++;
1386                         empty_end_pages++;
1387                         vacpagecopy = copy_vac_page(vacpage);
1388                         vpage_insert(vacuum_pages, vacpagecopy);
1389                         vpage_insert(fraged_pages, vacpagecopy);
1390                         UnlockReleaseBuffer(buf);
1391                         continue;
1392                 }
1393
1394                 if (PageIsEmpty(page))
1395                 {
1396                         VacPage         vacpagecopy;
1397
1398                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1399                         free_space += vacpage->free;
1400                         empty_pages++;
1401                         empty_end_pages++;
1402                         vacpagecopy = copy_vac_page(vacpage);
1403                         vpage_insert(vacuum_pages, vacpagecopy);
1404                         vpage_insert(fraged_pages, vacpagecopy);
1405                         UnlockReleaseBuffer(buf);
1406                         continue;
1407                 }
1408
1409                 /*
1410                  * Prune all HOT-update chains in this page.
1411                  *
1412                  * We use the redirect_move option so that redirecting line pointers
1413                  * get collapsed out; this allows us to not worry about them below.
1414                  *
1415                  * We count tuples removed by the pruning step as removed by VACUUM.
1416                  */
1417                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1418                                                                                  true, false);
1419
1420                 /*
1421                  * Now scan the page to collect vacuumable items and check for tuples
1422                  * requiring freezing.
1423                  */
1424                 nfrozen = 0;
1425                 notup = true;
1426                 maxoff = PageGetMaxOffsetNumber(page);
1427                 for (offnum = FirstOffsetNumber;
1428                          offnum <= maxoff;
1429                          offnum = OffsetNumberNext(offnum))
1430                 {
1431                         ItemId          itemid = PageGetItemId(page, offnum);
1432                         bool            tupgone = false;
1433                         HeapTupleData tuple;
1434
1435                         /*
1436                          * Collect un-used items too - it's possible to have indexes
1437                          * pointing here after crash.  (That's an ancient comment and is
1438                          * likely obsolete with WAL, but we might as well continue to
1439                          * check for such problems.)
1440                          */
1441                         if (!ItemIdIsUsed(itemid))
1442                         {
1443                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1444                                 nunused += 1;
1445                                 continue;
1446                         }
1447
1448                         /*
1449                          * DEAD item pointers are to be vacuumed normally; but we don't
1450                          * count them in tups_vacuumed, else we'd be double-counting (at
1451                          * least in the common case where heap_page_prune() just freed up
1452                          * a non-HOT tuple).
1453                          */
1454                         if (ItemIdIsDead(itemid))
1455                         {
1456                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1457                                 continue;
1458                         }
1459
1460                         /* Shouldn't have any redirected items anymore */
1461                         if (!ItemIdIsNormal(itemid))
1462                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1463                                          relname, blkno, offnum);
1464
1465                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1466                         tuple.t_len = ItemIdGetLength(itemid);
1467                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1468
1469                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1470                         {
1471                                 case HEAPTUPLE_LIVE:
1472                                         /* Tuple is good --- but let's do some validity checks */
1473                                         if (onerel->rd_rel->relhasoids &&
1474                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1475                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1476                                                          relname, blkno, offnum);
1477
1478                                         /*
1479                                          * The shrinkage phase of VACUUM FULL requires that all
1480                                          * live tuples have XMIN_COMMITTED set --- see comments in
1481                                          * repair_frag()'s walk-along-page loop.  Use of async
1482                                          * commit may prevent HeapTupleSatisfiesVacuum from
1483                                          * setting the bit for a recently committed tuple.      Rather
1484                                          * than trying to handle this corner case, we just give up
1485                                          * and don't shrink.
1486                                          */
1487                                         if (do_shrinking &&
1488                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1489                                         {
1490                                                 ereport(LOG,
1491                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1492                                                                                 relname, blkno, offnum,
1493                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1494                                                 do_shrinking = false;
1495                                         }
1496                                         break;
1497                                 case HEAPTUPLE_DEAD:
1498
1499                                         /*
1500                                          * Ordinarily, DEAD tuples would have been removed by
1501                                          * heap_page_prune(), but it's possible that the tuple
1502                                          * state changed since heap_page_prune() looked.  In
1503                                          * particular an INSERT_IN_PROGRESS tuple could have
1504                                          * changed to DEAD if the inserter aborted.  So this
1505                                          * cannot be considered an error condition, though it does
1506                                          * suggest that someone released a lock early.
1507                                          *
1508                                          * If the tuple is HOT-updated then it must only be
1509                                          * removed by a prune operation; so we keep it as if it
1510                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1511                                          * worth trying to make the shrinking code smart enough to
1512                                          * handle this?  It's an unusual corner case.)
1513                                          *
1514                                          * DEAD heap-only tuples can safely be removed if they
1515                                          * aren't themselves HOT-updated, although this is a bit
1516                                          * inefficient since we'll uselessly try to remove index
1517                                          * entries for them.
1518                                          */
1519                                         if (HeapTupleIsHotUpdated(&tuple))
1520                                         {
1521                                                 nkeep += 1;
1522                                                 if (do_shrinking)
1523                                                         ereport(LOG,
1524                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1525                                                                                         relname, blkno, offnum)));
1526                                                 do_shrinking = false;
1527                                         }
1528                                         else
1529                                         {
1530                                                 tupgone = true; /* we can delete the tuple */
1531
1532                                                 /*
1533                                                  * We need not require XMIN_COMMITTED or
1534                                                  * XMAX_COMMITTED to be set, since we will remove the
1535                                                  * tuple without any further examination of its hint
1536                                                  * bits.
1537                                                  */
1538                                         }
1539                                         break;
1540                                 case HEAPTUPLE_RECENTLY_DEAD:
1541
1542                                         /*
1543                                          * If tuple is recently deleted then we must not remove it
1544                                          * from relation.
1545                                          */
1546                                         nkeep += 1;
1547
1548                                         /*
1549                                          * As with the LIVE case, shrinkage requires
1550                                          * XMIN_COMMITTED to be set.
1551                                          */
1552                                         if (do_shrinking &&
1553                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1554                                         {
1555                                                 ereport(LOG,
1556                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1557                                                                                 relname, blkno, offnum,
1558                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1559                                                 do_shrinking = false;
1560                                         }
1561
1562                                         /*
1563                                          * If we do shrinking and this tuple is updated one then
1564                                          * remember it to construct updated tuple dependencies.
1565                                          */
1566                                         if (do_shrinking &&
1567                                                 !(ItemPointerEquals(&(tuple.t_self),
1568                                                                                         &(tuple.t_data->t_ctid))))
1569                                         {
1570                                                 if (free_vtlinks == 0)
1571                                                 {
1572                                                         free_vtlinks = 1000;
1573                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1574                                                                                            (free_vtlinks + num_vtlinks) *
1575                                                                                                          sizeof(VTupleLinkData));
1576                                                 }
1577                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1578                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1579                                                 free_vtlinks--;
1580                                                 num_vtlinks++;
1581                                         }
1582                                         break;
1583                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1584
1585                                         /*
1586                                          * This should not happen, since we hold exclusive lock on
1587                                          * the relation; shouldn't we raise an error?  (Actually,
1588                                          * it can happen in system catalogs, since we tend to
1589                                          * release write lock before commit there.)  As above, we
1590                                          * can't apply repair_frag() if the tuple state is
1591                                          * uncertain.
1592                                          */
1593                                         if (do_shrinking)
1594                                                 ereport(LOG,
1595                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1596                                                                                 relname, blkno, offnum,
1597                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1598                                         do_shrinking = false;
1599                                         break;
1600                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1601
1602                                         /*
1603                                          * This should not happen, since we hold exclusive lock on
1604                                          * the relation; shouldn't we raise an error?  (Actually,
1605                                          * it can happen in system catalogs, since we tend to
1606                                          * release write lock before commit there.)  As above, we
1607                                          * can't apply repair_frag() if the tuple state is
1608                                          * uncertain.
1609                                          */
1610                                         if (do_shrinking)
1611                                                 ereport(LOG,
1612                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1613                                                                                 relname, blkno, offnum,
1614                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1615                                         do_shrinking = false;
1616                                         break;
1617                                 default:
1618                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1619                                         break;
1620                         }
1621
1622                         if (tupgone)
1623                         {
1624                                 ItemId          lpp;
1625
1626                                 /*
1627                                  * Here we are building a temporary copy of the page with dead
1628                                  * tuples removed.      Below we will apply
1629                                  * PageRepairFragmentation to the copy, so that we can
1630                                  * determine how much space will be available after removal of
1631                                  * dead tuples.  But note we are NOT changing the real page
1632                                  * yet...
1633                                  */
1634                                 if (tempPage == NULL)
1635                                 {
1636                                         Size            pageSize;
1637
1638                                         pageSize = PageGetPageSize(page);
1639                                         tempPage = (Page) palloc(pageSize);
1640                                         memcpy(tempPage, page, pageSize);
1641                                 }
1642
1643                                 /* mark it unused on the temp page */
1644                                 lpp = PageGetItemId(tempPage, offnum);
1645                                 ItemIdSetUnused(lpp);
1646
1647                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1648                                 tups_vacuumed += 1;
1649                         }
1650                         else
1651                         {
1652                                 num_tuples += 1;
1653                                 if (!HeapTupleIsHeapOnly(&tuple))
1654                                         num_indexed_tuples += 1;
1655                                 notup = false;
1656                                 if (tuple.t_len < min_tlen)
1657                                         min_tlen = tuple.t_len;
1658                                 if (tuple.t_len > max_tlen)
1659                                         max_tlen = tuple.t_len;
1660
1661                                 /*
1662                                  * Each non-removable tuple must be checked to see if it needs
1663                                  * freezing.
1664                                  */
1665                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1666                                                                           InvalidBuffer))
1667                                         frozen[nfrozen++] = offnum;
1668                         }
1669                 }                                               /* scan along page */
1670
1671                 if (tempPage != NULL)
1672                 {
1673                         /* Some tuples are removable; figure free space after removal */
1674                         PageRepairFragmentation(tempPage);
1675                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1676                         pfree(tempPage);
1677                         do_reap = true;
1678                 }
1679                 else
1680                 {
1681                         /* Just use current available space */
1682                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1683                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1684                         do_reap = (vacpage->offsets_free > 0);
1685                 }
1686
1687                 free_space += vacpage->free;
1688
1689                 /*
1690                  * Add the page to vacuum_pages if it requires reaping, and add it to
1691                  * fraged_pages if it has a useful amount of free space.  "Useful"
1692                  * means enough for a minimal-sized tuple.  But we don't know that
1693                  * accurately near the start of the relation, so add pages
1694                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1695                  * forcibly add pages with no live tuples, to avoid confusing the
1696                  * empty_end_pages logic.  (In the presence of unreasonably small
1697                  * fillfactor, it seems possible that such pages might not pass
1698                  * the free-space test, but they had better be in the list anyway.)
1699                  */
1700                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1701                                    notup);
1702
1703                 if (do_reap || do_frag)
1704                 {
1705                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1706
1707                         if (do_reap)
1708                                 vpage_insert(vacuum_pages, vacpagecopy);
1709                         if (do_frag)
1710                                 vpage_insert(fraged_pages, vacpagecopy);
1711                 }
1712
1713                 /*
1714                  * Include the page in empty_end_pages if it will be empty after
1715                  * vacuuming; this is to keep us from using it as a move destination.
1716                  * Note that such pages are guaranteed to be in fraged_pages.
1717                  */
1718                 if (notup)
1719                 {
1720                         empty_pages++;
1721                         empty_end_pages++;
1722                 }
1723                 else
1724                         empty_end_pages = 0;
1725
1726                 /*
1727                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1728                  * record recording the changes.  We must log the changes to be
1729                  * crash-safe against future truncation of CLOG.
1730                  */
1731                 if (nfrozen > 0)
1732                 {
1733                         MarkBufferDirty(buf);
1734                         /* no XLOG for temp tables, though */
1735                         if (!onerel->rd_istemp)
1736                         {
1737                                 XLogRecPtr      recptr;
1738
1739                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1740                                                                                  frozen, nfrozen);
1741                                 PageSetLSN(page, recptr);
1742                                 PageSetTLI(page, ThisTimeLineID);
1743                         }
1744                 }
1745
1746                 UnlockReleaseBuffer(buf);
1747         }
1748
1749         pfree(vacpage);
1750
1751         /* save stats in the rel list for use later */
1752         vacrelstats->rel_tuples = num_tuples;
1753         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1754         vacrelstats->rel_pages = nblocks;
1755         if (num_tuples == 0)
1756                 min_tlen = max_tlen = 0;
1757         vacrelstats->min_tlen = min_tlen;
1758         vacrelstats->max_tlen = max_tlen;
1759
1760         vacuum_pages->empty_end_pages = empty_end_pages;
1761         fraged_pages->empty_end_pages = empty_end_pages;
1762
1763         /*
1764          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1765          * remove any "empty" end-pages from the list, and compute usable free
1766          * space = free space in remaining pages.
1767          */
1768         if (do_shrinking)
1769         {
1770                 int                     i;
1771
1772                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1773                 fraged_pages->num_pages -= empty_end_pages;
1774                 usable_free_space = 0;
1775                 for (i = 0; i < fraged_pages->num_pages; i++)
1776                         usable_free_space += fraged_pages->pagedesc[i]->free;
1777         }
1778         else
1779         {
1780                 fraged_pages->num_pages = 0;
1781                 usable_free_space = 0;
1782         }
1783
1784         /* don't bother to save vtlinks if we will not call repair_frag */
1785         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1786         {
1787                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1788                           vac_cmp_vtlinks);
1789                 vacrelstats->vtlinks = vtlinks;
1790                 vacrelstats->num_vtlinks = num_vtlinks;
1791         }
1792         else
1793         {
1794                 vacrelstats->vtlinks = NULL;
1795                 vacrelstats->num_vtlinks = 0;
1796                 pfree(vtlinks);
1797         }
1798
1799         ereport(elevel,
1800                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1801                                         RelationGetRelationName(onerel),
1802                                         tups_vacuumed, num_tuples, nblocks),
1803                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1804                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1805                                            "There were %.0f unused item pointers.\n"
1806            "Total free space (including removable row versions) is %.0f bytes.\n"
1807                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1808          "%u pages containing %.0f free bytes are potential move destinations.\n"
1809                                            "%s.",
1810                                            nkeep,
1811                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1812                                            nunused,
1813                                            free_space,
1814                                            empty_pages, empty_end_pages,
1815                                            fraged_pages->num_pages, usable_free_space,
1816                                            pg_rusage_show(&ru0))));
1817 }
1818
1819
1820 /*
1821  *      repair_frag() -- try to repair relation's fragmentation
1822  *
1823  *              This routine marks dead tuples as unused and tries re-use dead space
1824  *              by moving tuples (and inserting indexes if needed). It constructs
1825  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1826  *              for them after committing (in hack-manner - without losing locks
1827  *              and freeing memory!) current transaction. It truncates relation
1828  *              if some end-blocks are gone away.
1829  */
1830 static void
1831 repair_frag(VRelStats *vacrelstats, Relation onerel,
1832                         VacPageList vacuum_pages, VacPageList fraged_pages,
1833                         int nindexes, Relation *Irel)
1834 {
1835         TransactionId myXID = GetCurrentTransactionId();
1836         Buffer          dst_buffer = InvalidBuffer;
1837         BlockNumber nblocks,
1838                                 blkno;
1839         BlockNumber last_move_dest_block = 0,
1840                                 last_vacuum_block;
1841         Page            dst_page = NULL;
1842         ExecContextData ec;
1843         VacPageListData Nvacpagelist;
1844         VacPage         dst_vacpage = NULL,
1845                                 last_vacuum_page,
1846                                 vacpage,
1847                            *curpage;
1848         int                     i;
1849         int                     num_moved = 0,
1850                                 num_fraged_pages,
1851                                 vacuumed_pages;
1852         int                     keep_tuples = 0;
1853         int                     keep_indexed_tuples = 0;
1854         PGRUsage        ru0;
1855
1856         pg_rusage_init(&ru0);
1857
1858         ExecContext_Init(&ec, onerel);
1859
1860         Nvacpagelist.num_pages = 0;
1861         num_fraged_pages = fraged_pages->num_pages;
1862         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1863         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1864         if (vacuumed_pages > 0)
1865         {
1866                 /* get last reaped page from vacuum_pages */
1867                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1868                 last_vacuum_block = last_vacuum_page->blkno;
1869         }
1870         else
1871         {
1872                 last_vacuum_page = NULL;
1873                 last_vacuum_block = InvalidBlockNumber;
1874         }
1875
1876         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1877         vacpage->offsets_used = vacpage->offsets_free = 0;
1878
1879         /*
1880          * Scan pages backwards from the last nonempty page, trying to move tuples
1881          * down to lower pages.  Quit when we reach a page that we have moved any
1882          * tuples onto, or the first page if we haven't moved anything, or when we
1883          * find a page we cannot completely empty (this last condition is handled
1884          * by "break" statements within the loop).
1885          *
1886          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1887          * in order by blkno.
1888          */
1889         nblocks = vacrelstats->rel_pages;
1890         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1891                  blkno > last_move_dest_block;
1892                  blkno--)
1893         {
1894                 Buffer          buf;
1895                 Page            page;
1896                 OffsetNumber offnum,
1897                                         maxoff;
1898                 bool            isempty,
1899                                         chain_tuple_moved;
1900
1901                 vacuum_delay_point();
1902
1903                 /*
1904                  * Forget fraged_pages pages at or after this one; they're no longer
1905                  * useful as move targets, since we only want to move down. Note that
1906                  * since we stop the outer loop at last_move_dest_block, pages removed
1907                  * here cannot have had anything moved onto them already.
1908                  *
1909                  * Also note that we don't change the stored fraged_pages list, only
1910                  * our local variable num_fraged_pages; so the forgotten pages are
1911                  * still available to be loaded into the free space map later.
1912                  */
1913                 while (num_fraged_pages > 0 &&
1914                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1915                 {
1916                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1917                         --num_fraged_pages;
1918                 }
1919
1920                 /*
1921                  * Process this page of relation.
1922                  */
1923                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1924                                                                  vac_strategy);
1925                 page = BufferGetPage(buf);
1926
1927                 vacpage->offsets_free = 0;
1928
1929                 isempty = PageIsEmpty(page);
1930
1931                 /* Is the page in the vacuum_pages list? */
1932                 if (blkno == last_vacuum_block)
1933                 {
1934                         if (last_vacuum_page->offsets_free > 0)
1935                         {
1936                                 /* there are dead tuples on this page - clean them */
1937                                 Assert(!isempty);
1938                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1939                                 vacuum_page(onerel, buf, last_vacuum_page);
1940                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1941                         }
1942                         else
1943                                 Assert(isempty);
1944                         --vacuumed_pages;
1945                         if (vacuumed_pages > 0)
1946                         {
1947                                 /* get prev reaped page from vacuum_pages */
1948                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1949                                 last_vacuum_block = last_vacuum_page->blkno;
1950                         }
1951                         else
1952                         {
1953                                 last_vacuum_page = NULL;
1954                                 last_vacuum_block = InvalidBlockNumber;
1955                         }
1956                         if (isempty)
1957                         {
1958                                 ReleaseBuffer(buf);
1959                                 continue;
1960                         }
1961                 }
1962                 else
1963                         Assert(!isempty);
1964
1965                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1966                                                                                  * this page, yet */
1967                 vacpage->blkno = blkno;
1968                 maxoff = PageGetMaxOffsetNumber(page);
1969                 for (offnum = FirstOffsetNumber;
1970                          offnum <= maxoff;
1971                          offnum = OffsetNumberNext(offnum))
1972                 {
1973                         Size            tuple_len;
1974                         HeapTupleData tuple;
1975                         ItemId          itemid = PageGetItemId(page, offnum);
1976
1977                         if (!ItemIdIsUsed(itemid))
1978                                 continue;
1979
1980                         if (ItemIdIsDead(itemid))
1981                         {
1982                                 /* just remember it for vacuum_page() */
1983                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1984                                 continue;
1985                         }
1986
1987                         /* Shouldn't have any redirected items now */
1988                         Assert(ItemIdIsNormal(itemid));
1989
1990                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1991                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1992                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1993
1994                         /* ---
1995                          * VACUUM FULL has an exclusive lock on the relation.  So
1996                          * normally no other transaction can have pending INSERTs or
1997                          * DELETEs in this relation.  A tuple is either:
1998                          *              (a) live (XMIN_COMMITTED)
1999                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
2000                          *                      is visible to all active transactions)
2001                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
2002                          *                      but at least one active transaction does not see the
2003                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
2004                          *              (d) moved by the currently running VACUUM
2005                          *              (e) inserted or deleted by a not yet committed transaction,
2006                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
2007                          * In case (e) we wouldn't be in repair_frag() at all, because
2008                          * scan_heap() detects those cases and shuts off shrinking.
2009                          * We can't see case (b) here either, because such tuples were
2010                          * already removed by vacuum_page().  Cases (a) and (c) are
2011                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
2012                          * possible if a whole tuple chain has been moved while
2013                          * processing this or a higher numbered block.
2014                          * ---
2015                          */
2016                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2017                         {
2018                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2019                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2020                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2021                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2022
2023                                 /*
2024                                  * MOVED_OFF by another VACUUM would have caused the
2025                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2026                                  */
2027                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2028                                         elog(ERROR, "invalid XVAC in tuple header");
2029
2030                                 /*
2031                                  * If this (chain) tuple is moved by me already then I have to
2032                                  * check is it in vacpage or not - i.e. is it moved while
2033                                  * cleaning this page or some previous one.
2034                                  */
2035
2036                                 /* Can't we Assert(keep_tuples > 0) here? */
2037                                 if (keep_tuples == 0)
2038                                         continue;
2039                                 if (chain_tuple_moved)
2040                                 {
2041                                         /* some chains were moved while cleaning this page */
2042                                         Assert(vacpage->offsets_free > 0);
2043                                         for (i = 0; i < vacpage->offsets_free; i++)
2044                                         {
2045                                                 if (vacpage->offsets[i] == offnum)
2046                                                         break;
2047                                         }
2048                                         if (i >= vacpage->offsets_free)         /* not found */
2049                                         {
2050                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2051
2052                                                 /*
2053                                                  * If this is not a heap-only tuple, there must be an
2054                                                  * index entry for this item which will be removed in
2055                                                  * the index cleanup. Decrement the
2056                                                  * keep_indexed_tuples count to remember this.
2057                                                  */
2058                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2059                                                         keep_indexed_tuples--;
2060                                                 keep_tuples--;
2061                                         }
2062                                 }
2063                                 else
2064                                 {
2065                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2066
2067                                         /*
2068                                          * If this is not a heap-only tuple, there must be an
2069                                          * index entry for this item which will be removed in the
2070                                          * index cleanup. Decrement the keep_indexed_tuples count
2071                                          * to remember this.
2072                                          */
2073                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2074                                                 keep_indexed_tuples--;
2075                                         keep_tuples--;
2076                                 }
2077                                 continue;
2078                         }
2079
2080                         /*
2081                          * If this tuple is in a chain of tuples created in updates by
2082                          * "recent" transactions then we have to move the whole chain of
2083                          * tuples to other places, so that we can write new t_ctid links
2084                          * that preserve the chain relationship.
2085                          *
2086                          * This test is complicated.  Read it as "if tuple is a recently
2087                          * created updated version, OR if it is an obsoleted version". (In
2088                          * the second half of the test, we needn't make any check on XMAX
2089                          * --- it must be recently obsoleted, else scan_heap would have
2090                          * deemed it removable.)
2091                          *
2092                          * NOTE: this test is not 100% accurate: it is possible for a
2093                          * tuple to be an updated one with recent xmin, and yet not match
2094                          * any new_tid entry in the vtlinks list.  Presumably there was
2095                          * once a parent tuple with xmax matching the xmin, but it's
2096                          * possible that that tuple has been removed --- for example, if
2097                          * it had xmin = xmax and wasn't itself an updated version, then
2098                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2099                          * xmin xact completes.
2100                          *
2101                          * To be on the safe side, we abandon the repair_frag process if
2102                          * we cannot find the parent tuple in vtlinks.  This may be overly
2103                          * conservative; AFAICS it would be safe to move the chain.
2104                          *
2105                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2106                          * using OldestXmin, which is a rather coarse test, it is quite
2107                          * possible to have an update chain in which a tuple we think is
2108                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2109                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2110                          * it seems too complicated to try to make VACUUM remove it. We
2111                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2112                          * separately movable chain, ignoring any intervening DEAD ones.
2113                          */
2114                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2115                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2116                                                                                 OldestXmin)) ||
2117                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2118                                                                                            HEAP_IS_LOCKED)) &&
2119                                  !(ItemPointerEquals(&(tuple.t_self),
2120                                                                          &(tuple.t_data->t_ctid)))))
2121                         {
2122                                 Buffer          Cbuf = buf;
2123                                 bool            freeCbuf = false;
2124                                 bool            chain_move_failed = false;
2125                                 bool            moved_target = false;
2126                                 ItemPointerData Ctid;
2127                                 HeapTupleData tp = tuple;
2128                                 Size            tlen = tuple_len;
2129                                 VTupleMove      vtmove;
2130                                 int                     num_vtmove;
2131                                 int                     free_vtmove;
2132                                 VacPage         to_vacpage = NULL;
2133                                 int                     to_item = 0;
2134                                 int                     ti;
2135
2136                                 if (dst_buffer != InvalidBuffer)
2137                                 {
2138                                         ReleaseBuffer(dst_buffer);
2139                                         dst_buffer = InvalidBuffer;
2140                                 }
2141
2142                                 /* Quick exit if we have no vtlinks to search in */
2143                                 if (vacrelstats->vtlinks == NULL)
2144                                 {
2145                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2146                                         break;          /* out of walk-along-page loop */
2147                                 }
2148
2149                                 /*
2150                                  * If this tuple is in the begin/middle of the chain then we
2151                                  * have to move to the end of chain.  As with any t_ctid
2152                                  * chase, we have to verify that each new tuple is really the
2153                                  * descendant of the tuple we came from; however, here we need
2154                                  * even more than the normal amount of paranoia. If t_ctid
2155                                  * links forward to a tuple determined to be DEAD, then
2156                                  * depending on where that tuple is, it might already have
2157                                  * been removed, and perhaps even replaced by a MOVED_IN
2158                                  * tuple.  We don't want to include any DEAD tuples in the
2159                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2160                                  */
2161                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2162                                                                                                   HEAP_IS_LOCKED)) &&
2163                                            !(ItemPointerEquals(&(tp.t_self),
2164                                                                                    &(tp.t_data->t_ctid))))
2165                                 {
2166                                         ItemPointerData nextTid;
2167                                         TransactionId priorXmax;
2168                                         Buffer          nextBuf;
2169                                         Page            nextPage;
2170                                         OffsetNumber nextOffnum;
2171                                         ItemId          nextItemid;
2172                                         HeapTupleHeader nextTdata;
2173                                         HTSV_Result nextTstatus;
2174
2175                                         nextTid = tp.t_data->t_ctid;
2176                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2177                                         /* assume block# is OK (see heap_fetch comments) */
2178                                         nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2179                                                                                  ItemPointerGetBlockNumber(&nextTid),
2180                                                                                  RBM_NORMAL, vac_strategy);
2181                                         nextPage = BufferGetPage(nextBuf);
2182                                         /* If bogus or unused slot, assume tp is end of chain */
2183                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2184                                         if (nextOffnum < FirstOffsetNumber ||
2185                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2186                                         {
2187                                                 ReleaseBuffer(nextBuf);
2188                                                 break;
2189                                         }
2190                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2191                                         if (!ItemIdIsNormal(nextItemid))
2192                                         {
2193                                                 ReleaseBuffer(nextBuf);
2194                                                 break;
2195                                         }
2196                                         /* if not matching XMIN, assume tp is end of chain */
2197                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2198                                                                                                                           nextItemid);
2199                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2200                                                                                          priorXmax))
2201                                         {
2202                                                 ReleaseBuffer(nextBuf);
2203                                                 break;
2204                                         }
2205
2206                                         /*
2207                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2208                                          * potentially update hint bits, so we'd better hold the
2209                                          * buffer content lock.
2210                                          */
2211                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2212                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2213                                                                                                                    OldestXmin,
2214                                                                                                                    nextBuf);
2215                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2216                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2217                                         {
2218                                                 UnlockReleaseBuffer(nextBuf);
2219                                                 break;
2220                                         }
2221                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2222                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2223                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2224                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2225                                         /* OK, switch our attention to the next tuple in chain */
2226                                         tp.t_data = nextTdata;
2227                                         tp.t_self = nextTid;
2228                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2229                                         if (freeCbuf)
2230                                                 ReleaseBuffer(Cbuf);
2231                                         Cbuf = nextBuf;
2232                                         freeCbuf = true;
2233                                 }
2234
2235                                 /* Set up workspace for planning the chain move */
2236                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2237                                 num_vtmove = 0;
2238                                 free_vtmove = 100;
2239
2240                                 /*
2241                                  * Now, walk backwards up the chain (towards older tuples) and
2242                                  * check if all items in chain can be moved.  We record all
2243                                  * the moves that need to be made in the vtmove array.
2244                                  */
2245                                 for (;;)
2246                                 {
2247                                         Buffer          Pbuf;
2248                                         Page            Ppage;
2249                                         ItemId          Pitemid;
2250                                         HeapTupleHeader PTdata;
2251                                         VTupleLinkData vtld,
2252                                                            *vtlp;
2253
2254                                         /* Identify a target page to move this tuple to */
2255                                         if (to_vacpage == NULL ||
2256                                                 !enough_space(to_vacpage, tlen))
2257                                         {
2258                                                 for (i = 0; i < num_fraged_pages; i++)
2259                                                 {
2260                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2261                                                                 break;
2262                                                 }
2263
2264                                                 if (i == num_fraged_pages)
2265                                                 {
2266                                                         /* can't move item anywhere */
2267                                                         chain_move_failed = true;
2268                                                         break;          /* out of check-all-items loop */
2269                                                 }
2270                                                 to_item = i;
2271                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2272                                         }
2273                                         to_vacpage->free -= MAXALIGN(tlen);
2274                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2275                                                 to_vacpage->free -= sizeof(ItemIdData);
2276                                         (to_vacpage->offsets_used)++;
2277
2278                                         /* Add an entry to vtmove list */
2279                                         if (free_vtmove == 0)
2280                                         {
2281                                                 free_vtmove = 1000;
2282                                                 vtmove = (VTupleMove)
2283                                                         repalloc(vtmove,
2284                                                                          (free_vtmove + num_vtmove) *
2285                                                                          sizeof(VTupleMoveData));
2286                                         }
2287                                         vtmove[num_vtmove].tid = tp.t_self;
2288                                         vtmove[num_vtmove].vacpage = to_vacpage;
2289                                         if (to_vacpage->offsets_used == 1)
2290                                                 vtmove[num_vtmove].cleanVpd = true;
2291                                         else
2292                                                 vtmove[num_vtmove].cleanVpd = false;
2293                                         free_vtmove--;
2294                                         num_vtmove++;
2295
2296                                         /* Remember if we reached the original target tuple */
2297                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2298                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2299                                                 moved_target = true;
2300
2301                                         /* Done if at beginning of chain */
2302                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2303                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2304                                                                                    OldestXmin))
2305                                                 break;  /* out of check-all-items loop */
2306
2307                                         /* Move to tuple with prior row version */
2308                                         vtld.new_tid = tp.t_self;
2309                                         vtlp = (VTupleLink)
2310                                                 vac_bsearch((void *) &vtld,
2311                                                                         (void *) (vacrelstats->vtlinks),
2312                                                                         vacrelstats->num_vtlinks,
2313                                                                         sizeof(VTupleLinkData),
2314                                                                         vac_cmp_vtlinks);
2315                                         if (vtlp == NULL)
2316                                         {
2317                                                 /* see discussion above */
2318                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2319                                                 chain_move_failed = true;
2320                                                 break;  /* out of check-all-items loop */
2321                                         }
2322                                         tp.t_self = vtlp->this_tid;
2323                                         Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2324                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2325                                                                          RBM_NORMAL, vac_strategy);
2326                                         Ppage = BufferGetPage(Pbuf);
2327                                         Pitemid = PageGetItemId(Ppage,
2328                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2329                                         /* this can't happen since we saw tuple earlier: */
2330                                         if (!ItemIdIsNormal(Pitemid))
2331                                                 elog(ERROR, "parent itemid marked as unused");
2332                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2333
2334                                         /* ctid should not have changed since we saved it */
2335                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2336                                                                                          &(PTdata->t_ctid)));
2337
2338                                         /*
2339                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2340                                          * (child item is removed)... Due to the fact that at the
2341                                          * moment we don't remove unuseful part of update-chain,
2342                                          * it's possible to get non-matching parent row here. Like
2343                                          * as in the case which caused this problem, we stop
2344                                          * shrinking here. I could try to find real parent row but
2345                                          * want not to do it because of real solution will be
2346                                          * implemented anyway, later, and we are too close to 6.5
2347                                          * release. - vadim 06/11/99
2348                                          */
2349                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2350                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2351                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2352                                         {
2353                                                 ReleaseBuffer(Pbuf);
2354                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2355                                                 chain_move_failed = true;
2356                                                 break;  /* out of check-all-items loop */
2357                                         }
2358                                         tp.t_data = PTdata;
2359                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2360                                         if (freeCbuf)
2361                                                 ReleaseBuffer(Cbuf);
2362                                         Cbuf = Pbuf;
2363                                         freeCbuf = true;
2364                                 }                               /* end of check-all-items loop */
2365
2366                                 if (freeCbuf)
2367                                         ReleaseBuffer(Cbuf);
2368                                 freeCbuf = false;
2369
2370                                 /* Double-check that we will move the current target tuple */
2371                                 if (!moved_target && !chain_move_failed)
2372                                 {
2373                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2374                                         chain_move_failed = true;
2375                                 }
2376
2377                                 if (chain_move_failed)
2378                                 {
2379                                         /*
2380                                          * Undo changes to offsets_used state.  We don't bother
2381                                          * cleaning up the amount-free state, since we're not
2382                                          * going to do any further tuple motion.
2383                                          */
2384                                         for (i = 0; i < num_vtmove; i++)
2385                                         {
2386                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2387                                                 (vtmove[i].vacpage->offsets_used)--;
2388                                         }
2389                                         pfree(vtmove);
2390                                         break;          /* out of walk-along-page loop */
2391                                 }
2392
2393                                 /*
2394                                  * Okay, move the whole tuple chain in reverse order.
2395                                  *
2396                                  * Ctid tracks the new location of the previously-moved tuple.
2397                                  */
2398                                 ItemPointerSetInvalid(&Ctid);
2399                                 for (ti = 0; ti < num_vtmove; ti++)
2400                                 {
2401                                         VacPage         destvacpage = vtmove[ti].vacpage;
2402                                         Page            Cpage;
2403                                         ItemId          Citemid;
2404
2405                                         /* Get page to move from */
2406                                         tuple.t_self = vtmove[ti].tid;
2407                                         Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2408                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2409                                                                   RBM_NORMAL, vac_strategy);
2410
2411                                         /* Get page to move to */
2412                                         dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2413                                                                                                         destvacpage->blkno,
2414                                                                                                         RBM_NORMAL, vac_strategy);
2415
2416                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2417                                         if (dst_buffer != Cbuf)
2418                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2419
2420                                         dst_page = BufferGetPage(dst_buffer);
2421                                         Cpage = BufferGetPage(Cbuf);
2422
2423                                         Citemid = PageGetItemId(Cpage,
2424                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2425                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2426                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2427
2428                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2429                                                                          dst_buffer, dst_page, destvacpage,
2430                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2431
2432                                         /*
2433                                          * If the tuple we are moving is a heap-only tuple, this
2434                                          * move will generate an additional index entry, so
2435                                          * increment the rel_indexed_tuples count.
2436                                          */
2437                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2438                                                 vacrelstats->rel_indexed_tuples++;
2439
2440                                         num_moved++;
2441                                         if (destvacpage->blkno > last_move_dest_block)
2442                                                 last_move_dest_block = destvacpage->blkno;
2443
2444                                         /*
2445                                          * Remember that we moved tuple from the current page
2446                                          * (corresponding index tuple will be cleaned).
2447                                          */
2448                                         if (Cbuf == buf)
2449                                                 vacpage->offsets[vacpage->offsets_free++] =
2450                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2451                                         else
2452                                         {
2453                                                 /*
2454                                                  * When we move tuple chains, we may need to move
2455                                                  * tuples from a block that we haven't yet scanned in
2456                                                  * the outer walk-along-the-relation loop. Note that
2457                                                  * we can't be moving a tuple from a block that we
2458                                                  * have already scanned because if such a tuple
2459                                                  * exists, then we must have moved the chain along
2460                                                  * with that tuple when we scanned that block. IOW the
2461                                                  * test of (Cbuf != buf) guarantees that the tuple we
2462                                                  * are looking at right now is in a block which is yet
2463                                                  * to be scanned.
2464                                                  *
2465                                                  * We maintain two counters to correctly count the
2466                                                  * moved-off tuples from blocks that are not yet
2467                                                  * scanned (keep_tuples) and how many of them have
2468                                                  * index pointers (keep_indexed_tuples).  The main
2469                                                  * reason to track the latter is to help verify that
2470                                                  * indexes have the expected number of entries when
2471                                                  * all the dust settles.
2472                                                  */
2473                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2474                                                         keep_indexed_tuples++;
2475                                                 keep_tuples++;
2476                                         }
2477
2478                                         ReleaseBuffer(dst_buffer);
2479                                         ReleaseBuffer(Cbuf);
2480                                 }                               /* end of move-the-tuple-chain loop */
2481
2482                                 dst_buffer = InvalidBuffer;
2483                                 pfree(vtmove);
2484                                 chain_tuple_moved = true;
2485
2486                                 /* advance to next tuple in walk-along-page loop */
2487                                 continue;
2488                         }                                       /* end of is-tuple-in-chain test */
2489
2490                         /* try to find new page for this tuple */
2491                         if (dst_buffer == InvalidBuffer ||
2492                                 !enough_space(dst_vacpage, tuple_len))
2493                         {
2494                                 if (dst_buffer != InvalidBuffer)
2495                                 {
2496                                         ReleaseBuffer(dst_buffer);
2497                                         dst_buffer = InvalidBuffer;
2498                                 }
2499                                 for (i = 0; i < num_fraged_pages; i++)
2500                                 {
2501                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2502                                                 break;
2503                                 }
2504                                 if (i == num_fraged_pages)
2505                                         break;          /* can't move item anywhere */
2506                                 dst_vacpage = fraged_pages->pagedesc[i];
2507                                 dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2508                                                                                                 dst_vacpage->blkno,
2509                                                                                                 RBM_NORMAL, vac_strategy);
2510                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2511                                 dst_page = BufferGetPage(dst_buffer);
2512                                 /* if this page was not used before - clean it */
2513                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2514                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2515                         }
2516                         else
2517                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2518
2519                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2520
2521                         move_plain_tuple(onerel, buf, page, &tuple,
2522                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2523
2524                         /*
2525                          * If the tuple we are moving is a heap-only tuple, this move will
2526                          * generate an additional index entry, so increment the
2527                          * rel_indexed_tuples count.
2528                          */
2529                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2530                                 vacrelstats->rel_indexed_tuples++;
2531
2532                         num_moved++;
2533                         if (dst_vacpage->blkno > last_move_dest_block)
2534                                 last_move_dest_block = dst_vacpage->blkno;
2535
2536                         /*
2537                          * Remember that we moved tuple from the current page
2538                          * (corresponding index tuple will be cleaned).
2539                          */
2540                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2541                 }                                               /* walk along page */
2542
2543                 /*
2544                  * If we broke out of the walk-along-page loop early (ie, still have
2545                  * offnum <= maxoff), then we failed to move some tuple off this page.
2546                  * No point in shrinking any more, so clean up and exit the per-page
2547                  * loop.
2548                  */
2549                 if (offnum < maxoff && keep_tuples > 0)
2550                 {
2551                         OffsetNumber off;
2552
2553                         /*
2554                          * Fix vacpage state for any unvisited tuples remaining on page
2555                          */
2556                         for (off = OffsetNumberNext(offnum);
2557                                  off <= maxoff;
2558                                  off = OffsetNumberNext(off))
2559                         {
2560                                 ItemId          itemid = PageGetItemId(page, off);
2561                                 HeapTupleHeader htup;
2562
2563                                 if (!ItemIdIsUsed(itemid))
2564                                         continue;
2565                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2566                                 Assert(ItemIdIsNormal(itemid));
2567
2568                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2569                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2570                                         continue;
2571
2572                                 /*
2573                                  * See comments in the walk-along-page loop above about why
2574                                  * only MOVED_OFF tuples should be found here.
2575                                  */
2576                                 if (htup->t_infomask & HEAP_MOVED_IN)
2577                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2578                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2579                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2580                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2581                                         elog(ERROR, "invalid XVAC in tuple header");
2582
2583                                 if (chain_tuple_moved)
2584                                 {
2585                                         /* some chains were moved while cleaning this page */
2586                                         Assert(vacpage->offsets_free > 0);
2587                                         for (i = 0; i < vacpage->offsets_free; i++)
2588                                         {
2589                                                 if (vacpage->offsets[i] == off)
2590                                                         break;
2591                                         }
2592                                         if (i >= vacpage->offsets_free)         /* not found */
2593                                         {
2594                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2595                                                 Assert(keep_tuples > 0);
2596
2597                                                 /*
2598                                                  * If this is not a heap-only tuple, there must be an
2599                                                  * index entry for this item which will be removed in
2600                                                  * the index cleanup. Decrement the
2601                                                  * keep_indexed_tuples count to remember this.
2602                                                  */
2603                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2604                                                         keep_indexed_tuples--;
2605                                                 keep_tuples--;
2606                                         }
2607                                 }
2608                                 else
2609                                 {
2610                                         vacpage->offsets[vacpage->offsets_free++] = off;
2611                                         Assert(keep_tuples > 0);
2612                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2613                                                 keep_indexed_tuples--;
2614                                         keep_tuples--;
2615                                 }
2616                         }
2617                 }
2618
2619                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2620                 {
2621                         if (chain_tuple_moved)          /* else - they are ordered */
2622                         {
2623                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2624                                           sizeof(OffsetNumber), vac_cmp_offno);
2625                         }
2626                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2627                 }
2628
2629                 ReleaseBuffer(buf);
2630
2631                 if (offnum <= maxoff)
2632                         break;                          /* had to quit early, see above note */
2633
2634         }                                                       /* walk along relation */
2635
2636         blkno++;                                        /* new number of blocks */
2637
2638         if (dst_buffer != InvalidBuffer)
2639         {
2640                 Assert(num_moved > 0);
2641                 ReleaseBuffer(dst_buffer);
2642         }
2643
2644         if (num_moved > 0)
2645         {
2646                 /*
2647                  * We have to commit our tuple movings before we truncate the
2648                  * relation.  Ideally we should do Commit/StartTransactionCommand
2649                  * here, relying on the session-level table lock to protect our
2650                  * exclusive access to the relation.  However, that would require a
2651                  * lot of extra code to close and re-open the relation, indexes, etc.
2652                  * For now, a quick hack: record status of current transaction as
2653                  * committed, and continue.  We force the commit to be synchronous so
2654                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2655                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2656                  * continues to be shown as running in the ProcArray.
2657                  *
2658                  * XXX This desperately needs to be revisited.  Any failure after this
2659                  * point will result in a PANIC "cannot abort transaction nnn, it was
2660                  * already committed"!
2661                  */
2662                 ForceSyncCommit();
2663                 (void) RecordTransactionCommit();
2664         }
2665
2666         /*
2667          * We are not going to move any more tuples across pages, but we still
2668          * need to apply vacuum_page to compact free space in the remaining pages
2669          * in vacuum_pages list.  Note that some of these pages may also be in the
2670          * fraged_pages list, and may have had tuples moved onto them; if so, we
2671          * already did vacuum_page and needn't do it again.
2672          */
2673         for (i = 0, curpage = vacuum_pages->pagedesc;
2674                  i < vacuumed_pages;
2675                  i++, curpage++)
2676         {
2677                 vacuum_delay_point();
2678
2679                 Assert((*curpage)->blkno < blkno);
2680                 if ((*curpage)->offsets_used == 0)
2681                 {
2682                         Buffer          buf;
2683                         Page            page;
2684
2685                         /* this page was not used as a move target, so must clean it */
2686                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
2687                                                                          RBM_NORMAL, vac_strategy);
2688                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2689                         page = BufferGetPage(buf);
2690                         if (!PageIsEmpty(page))
2691                                 vacuum_page(onerel, buf, *curpage);
2692                         UnlockReleaseBuffer(buf);
2693                 }
2694         }
2695
2696         /*
2697          * Now scan all the pages that we moved tuples onto and update tuple
2698          * status bits.  This is not really necessary, but will save time for
2699          * future transactions examining these tuples.
2700          */
2701         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2702                                          last_move_dest_block, num_moved);
2703
2704         /*
2705          * It'd be cleaner to make this report at the bottom of this routine, but
2706          * then the rusage would double-count the second pass of index vacuuming.
2707          * So do it here and ignore the relatively small amount of processing that
2708          * occurs below.
2709          */
2710         ereport(elevel,
2711                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2712                                         RelationGetRelationName(onerel),
2713                                         num_moved, nblocks, blkno),
2714                          errdetail("%s.",
2715                                            pg_rusage_show(&ru0))));
2716
2717         /*
2718          * Reflect the motion of system tuples to catalog cache here.
2719          */
2720         CommandCounterIncrement();
2721
2722         if (Nvacpagelist.num_pages > 0)
2723         {
2724                 /* vacuum indexes again if needed */
2725                 if (Irel != NULL)
2726                 {
2727                         VacPage    *vpleft,
2728                                            *vpright,
2729                                                 vpsave;
2730
2731                         /* re-sort Nvacpagelist.pagedesc */
2732                         for (vpleft = Nvacpagelist.pagedesc,
2733                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2734                                  vpleft < vpright; vpleft++, vpright--)
2735                         {
2736                                 vpsave = *vpleft;
2737                                 *vpleft = *vpright;
2738                                 *vpright = vpsave;
2739                         }
2740
2741                         /*
2742                          * keep_tuples is the number of tuples that have been moved off a
2743                          * page during chain moves but not been scanned over subsequently.
2744                          * The tuple ids of these tuples are not recorded as free offsets
2745                          * for any VacPage, so they will not be cleared from the indexes.
2746                          * keep_indexed_tuples is the portion of these that are expected
2747                          * to have index entries.
2748                          */
2749                         Assert(keep_tuples >= 0);
2750                         for (i = 0; i < nindexes; i++)
2751                                 vacuum_index(&Nvacpagelist, Irel[i],
2752                                                          vacrelstats->rel_indexed_tuples,
2753                                                          keep_indexed_tuples);
2754                 }
2755
2756                 /*
2757                  * Clean moved-off tuples from last page in Nvacpagelist list.
2758                  *
2759                  * We need only do this in this one page, because higher-numbered
2760                  * pages are going to be truncated from the relation entirely. But see
2761                  * comments for update_hint_bits().
2762                  */
2763                 if (vacpage->blkno == (blkno - 1) &&
2764                         vacpage->offsets_free > 0)
2765                 {
2766                         Buffer          buf;
2767                         Page            page;
2768                         OffsetNumber unused[MaxOffsetNumber];
2769                         OffsetNumber offnum,
2770                                                 maxoff;
2771                         int                     uncnt = 0;
2772                         int                     num_tuples = 0;
2773
2774                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
2775                                                                          RBM_NORMAL, vac_strategy);
2776                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2777                         page = BufferGetPage(buf);
2778                         maxoff = PageGetMaxOffsetNumber(page);
2779                         for (offnum = FirstOffsetNumber;
2780                                  offnum <= maxoff;
2781                                  offnum = OffsetNumberNext(offnum))
2782                         {
2783                                 ItemId          itemid = PageGetItemId(page, offnum);
2784                                 HeapTupleHeader htup;
2785
2786                                 if (!ItemIdIsUsed(itemid))
2787                                         continue;
2788                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2789                                 Assert(ItemIdIsNormal(itemid));
2790
2791                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2792                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2793                                         continue;
2794
2795                                 /*
2796                                  * See comments in the walk-along-page loop above about why
2797                                  * only MOVED_OFF tuples should be found here.
2798                                  */
2799                                 if (htup->t_infomask & HEAP_MOVED_IN)
2800                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2801                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2802                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2803                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2804                                         elog(ERROR, "invalid XVAC in tuple header");
2805
2806                                 ItemIdSetUnused(itemid);
2807                                 num_tuples++;
2808
2809                                 unused[uncnt++] = offnum;
2810                         }
2811                         Assert(vacpage->offsets_free == num_tuples);
2812
2813                         START_CRIT_SECTION();
2814
2815                         PageRepairFragmentation(page);
2816
2817                         MarkBufferDirty(buf);
2818
2819                         /* XLOG stuff */
2820                         if (!onerel->rd_istemp)
2821                         {
2822                                 XLogRecPtr      recptr;
2823
2824                                 recptr = log_heap_clean(onerel, buf,
2825                                                                                 NULL, 0, NULL, 0,
2826                                                                                 unused, uncnt,
2827                                                                                 false);
2828                                 PageSetLSN(page, recptr);
2829                                 PageSetTLI(page, ThisTimeLineID);
2830                         }
2831
2832                         END_CRIT_SECTION();
2833
2834                         UnlockReleaseBuffer(buf);
2835                 }
2836
2837                 /* now - free new list of reaped pages */
2838                 curpage = Nvacpagelist.pagedesc;
2839                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2840                         pfree(*curpage);
2841                 pfree(Nvacpagelist.pagedesc);
2842         }
2843
2844         /* Truncate relation, if needed */
2845         if (blkno < nblocks)
2846         {
2847                 FreeSpaceMapTruncateRel(onerel, blkno);
2848                 RelationTruncate(onerel, blkno);
2849                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2850         }
2851
2852         /* clean up */
2853         pfree(vacpage);
2854         if (vacrelstats->vtlinks != NULL)
2855                 pfree(vacrelstats->vtlinks);
2856
2857         ExecContext_Finish(&ec);
2858 }
2859
2860 /*
2861  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2862  *
2863  *              This routine moves old_tup from old_page to dst_page.
2864  *              old_page and dst_page might be the same page.
2865  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2866  *              the single lock, if this is a intra-page-move) are released before
2867  *              exit.
2868  *
2869  *              Yes, a routine with ten parameters is ugly, but it's still better
2870  *              than having these 120 lines of code in repair_frag() which is
2871  *              already too long and almost unreadable.
2872  */
2873 static void
2874 move_chain_tuple(Relation rel,
2875                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2876                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2877                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2878 {
2879         TransactionId myXID = GetCurrentTransactionId();
2880         HeapTupleData newtup;
2881         OffsetNumber newoff;
2882         ItemId          newitemid;
2883         Size            tuple_len = old_tup->t_len;
2884
2885         /*
2886          * make a modifiable copy of the source tuple.
2887          */
2888         heap_copytuple_with_tuple(old_tup, &newtup);
2889
2890         /*
2891          * register invalidation of source tuple in catcaches.
2892          */
2893         CacheInvalidateHeapTuple(rel, old_tup);
2894
2895         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2896         START_CRIT_SECTION();
2897
2898         /*
2899          * mark the source tuple MOVED_OFF.
2900          */
2901         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2902                                                                          HEAP_XMIN_INVALID |
2903                                                                          HEAP_MOVED_IN);
2904         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2905         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2906
2907         /*
2908          * If this page was not used before - clean it.
2909          *
2910          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2911          * destination pages to be the same (since this tuple-chain member can be
2912          * on a page lower than the one we're currently processing in the outer
2913          * loop).  If that's true, then after vacuum_page() the source tuple will
2914          * have been moved, and tuple.t_data will be pointing at garbage.
2915          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2916          * step!!
2917          *
2918          * This path is different from the other callers of vacuum_page, because
2919          * we have already incremented the vacpage's offsets_used field to account
2920          * for the tuple(s) we expect to move onto the page. Therefore
2921          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2922          * good debugging check for all other callers, we work around it here
2923          * rather than remove it.
2924          */
2925         if (!PageIsEmpty(dst_page) && cleanVpd)
2926         {
2927                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2928
2929                 dst_vacpage->offsets_used = 0;
2930                 vacuum_page(rel, dst_buf, dst_vacpage);
2931                 dst_vacpage->offsets_used = sv_offsets_used;
2932         }
2933
2934         /*
2935          * Update the state of the copied tuple, and store it on the destination
2936          * page.  The copied tuple is never part of a HOT chain.
2937          */
2938         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2939                                                                    HEAP_XMIN_INVALID |
2940                                                                    HEAP_MOVED_OFF);
2941         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2942         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2943         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2944         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2945         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2946                                                  InvalidOffsetNumber, false, true);
2947         if (newoff == InvalidOffsetNumber)
2948                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2949                          (unsigned long) tuple_len, dst_vacpage->blkno);
2950         newitemid = PageGetItemId(dst_page, newoff);
2951         /* drop temporary copy, and point to the version on the dest page */
2952         pfree(newtup.t_data);
2953         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2954
2955         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2956
2957         /*
2958          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2959          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2960          * order, this is actually the previously processed tuple.)
2961          */
2962         if (!ItemPointerIsValid(ctid))
2963                 newtup.t_data->t_ctid = newtup.t_self;
2964         else
2965                 newtup.t_data->t_ctid = *ctid;
2966         *ctid = newtup.t_self;
2967
2968         MarkBufferDirty(dst_buf);
2969         if (dst_buf != old_buf)
2970                 MarkBufferDirty(old_buf);
2971
2972         /* XLOG stuff */
2973         if (!rel->rd_istemp)
2974         {
2975                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2976                                                                                    dst_buf, &newtup);
2977
2978                 if (old_buf != dst_buf)
2979                 {
2980                         PageSetLSN(old_page, recptr);
2981                         PageSetTLI(old_page, ThisTimeLineID);
2982                 }
2983                 PageSetLSN(dst_page, recptr);
2984                 PageSetTLI(dst_page, ThisTimeLineID);
2985         }
2986
2987         END_CRIT_SECTION();
2988
2989         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2990         if (dst_buf != old_buf)
2991                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2992
2993         /* Create index entries for the moved tuple */
2994         if (ec->resultRelInfo->ri_NumIndices > 0)
2995         {
2996                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2997                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2998                 ResetPerTupleExprContext(ec->estate);
2999         }
3000 }
3001
3002 /*
3003  *      move_plain_tuple() -- move one tuple that is not part of a chain
3004  *
3005  *              This routine moves old_tup from old_page to dst_page.
3006  *              On entry old_buf and dst_buf are locked exclusively, both locks are
3007  *              released before exit.
3008  *
3009  *              Yes, a routine with eight parameters is ugly, but it's still better
3010  *              than having these 90 lines of code in repair_frag() which is already
3011  *              too long and almost unreadable.
3012  */
3013 static void
3014 move_plain_tuple(Relation rel,
3015                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3016                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3017                                  ExecContext ec)
3018 {
3019         TransactionId myXID = GetCurrentTransactionId();
3020         HeapTupleData newtup;
3021         OffsetNumber newoff;
3022         ItemId          newitemid;
3023         Size            tuple_len = old_tup->t_len;
3024
3025         /* copy tuple */
3026         heap_copytuple_with_tuple(old_tup, &newtup);
3027
3028         /*
3029          * register invalidation of source tuple in catcaches.
3030          *
3031          * (Note: we do not need to register the copied tuple, because we are not
3032          * changing the tuple contents and so there cannot be any need to flush
3033          * negative catcache entries.)
3034          */
3035         CacheInvalidateHeapTuple(rel, old_tup);
3036
3037         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3038         START_CRIT_SECTION();
3039
3040         /*
3041          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3042          */
3043         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3044                                                                    HEAP_XMIN_INVALID |
3045                                                                    HEAP_MOVED_OFF);
3046         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3047         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3048         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3049         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3050
3051         /* add tuple to the page */
3052         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3053                                                  InvalidOffsetNumber, false, true);
3054         if (newoff == InvalidOffsetNumber)
3055                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3056                          (unsigned long) tuple_len,
3057                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3058                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3059         newitemid = PageGetItemId(dst_page, newoff);
3060         pfree(newtup.t_data);
3061         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3062         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3063         newtup.t_self = newtup.t_data->t_ctid;
3064
3065         /*
3066          * Mark old tuple as MOVED_OFF by me.
3067          */
3068         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3069                                                                          HEAP_XMIN_INVALID |
3070                                                                          HEAP_MOVED_IN);
3071         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3072         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3073
3074         MarkBufferDirty(dst_buf);
3075         MarkBufferDirty(old_buf);
3076
3077         /* XLOG stuff */
3078         if (!rel->rd_istemp)
3079         {
3080                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3081                                                                                    dst_buf, &newtup);
3082
3083                 PageSetLSN(old_page, recptr);
3084                 PageSetTLI(old_page, ThisTimeLineID);
3085                 PageSetLSN(dst_page, recptr);
3086                 PageSetTLI(dst_page, ThisTimeLineID);
3087         }
3088
3089         END_CRIT_SECTION();
3090
3091         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3092         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3093         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3094
3095         dst_vacpage->offsets_used++;
3096
3097         /* insert index' tuples if needed */
3098         if (ec->resultRelInfo->ri_NumIndices > 0)
3099         {
3100                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3101                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3102                 ResetPerTupleExprContext(ec->estate);
3103         }
3104 }
3105
3106 /*
3107  *      update_hint_bits() -- update hint bits in destination pages
3108  *
3109  * Scan all the pages that we moved tuples onto and update tuple status bits.
3110  * This is not really necessary, but it will save time for future transactions
3111  * examining these tuples.
3112  *
3113  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3114  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3115  *
3116  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3117  * pages that were move source pages but not move dest pages.  The bulk
3118  * of the move source pages will be physically truncated from the relation,
3119  * and the last page remaining in the rel will be fixed separately in
3120  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3121  * hint bits updated are tuples that are moved as part of a chain and were
3122  * on pages that were not either move destinations nor at the end of the rel.
3123  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3124  * to remember and revisit those pages too.
3125  *
3126  * One wonders whether it wouldn't be better to skip this work entirely,
3127  * and let the tuple status updates happen someplace that's not holding an
3128  * exclusive lock on the relation.
3129  */
3130 static void
3131 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3132                                  BlockNumber last_move_dest_block, int num_moved)
3133 {
3134         TransactionId myXID = GetCurrentTransactionId();
3135         int                     checked_moved = 0;
3136         int                     i;
3137         VacPage    *curpage;
3138
3139         for (i = 0, curpage = fraged_pages->pagedesc;
3140                  i < num_fraged_pages;
3141                  i++, curpage++)
3142         {
3143                 Buffer          buf;
3144                 Page            page;
3145                 OffsetNumber max_offset;
3146                 OffsetNumber off;
3147                 int                     num_tuples = 0;
3148
3149                 vacuum_delay_point();
3150
3151                 if ((*curpage)->blkno > last_move_dest_block)
3152                         break;                          /* no need to scan any further */
3153                 if ((*curpage)->offsets_used == 0)
3154                         continue;                       /* this page was never used as a move dest */
3155                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
3156                                                                  RBM_NORMAL, vac_strategy);
3157                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3158                 page = BufferGetPage(buf);
3159                 max_offset = PageGetMaxOffsetNumber(page);
3160                 for (off = FirstOffsetNumber;
3161                          off <= max_offset;
3162                          off = OffsetNumberNext(off))
3163                 {
3164                         ItemId          itemid = PageGetItemId(page, off);
3165                         HeapTupleHeader htup;
3166
3167                         if (!ItemIdIsUsed(itemid))
3168                                 continue;
3169                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3170                         Assert(ItemIdIsNormal(itemid));
3171
3172                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3173                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3174                                 continue;
3175
3176                         /*
3177                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3178                          */
3179                         if (!(htup->t_infomask & HEAP_MOVED))
3180                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3181                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3182                                 elog(ERROR, "invalid XVAC in tuple header");
3183
3184                         if (htup->t_infomask & HEAP_MOVED_IN)
3185                         {
3186                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3187                                 htup->t_infomask &= ~HEAP_MOVED;
3188                                 num_tuples++;
3189                         }
3190                         else
3191                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3192                 }
3193                 MarkBufferDirty(buf);
3194                 UnlockReleaseBuffer(buf);
3195                 Assert((*curpage)->offsets_used == num_tuples);
3196                 checked_moved += num_tuples;
3197         }
3198         Assert(num_moved == checked_moved);
3199 }
3200
3201 /*
3202  *      vacuum_heap() -- free dead tuples
3203  *
3204  *              This routine marks dead tuples as unused and truncates relation
3205  *              if there are "empty" end-blocks.
3206  */
3207 static void
3208 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3209 {
3210         Buffer          buf;
3211         VacPage    *vacpage;
3212         BlockNumber relblocks;
3213         int                     nblocks;
3214         int                     i;
3215
3216         nblocks = vacuum_pages->num_pages;
3217         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3218
3219         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3220         {
3221                 vacuum_delay_point();
3222
3223                 if ((*vacpage)->offsets_free > 0)
3224                 {
3225                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
3226                                                                          RBM_NORMAL, vac_strategy);
3227                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3228                         vacuum_page(onerel, buf, *vacpage);
3229                         UnlockReleaseBuffer(buf);
3230                 }
3231         }
3232
3233         /* Truncate relation if there are some empty end-pages */
3234         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3235         if (vacuum_pages->empty_end_pages > 0)
3236         {
3237                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3238                 ereport(elevel,
3239                                 (errmsg("\"%s\": truncated %u to %u pages",
3240                                                 RelationGetRelationName(onerel),
3241                                                 vacrelstats->rel_pages, relblocks)));
3242                 FreeSpaceMapTruncateRel(onerel, relblocks);
3243                 RelationTruncate(onerel, relblocks);
3244                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3245         }
3246 }
3247
3248 /*
3249  *      vacuum_page() -- free dead tuples on a page
3250  *                                       and repair its fragmentation.
3251  *
3252  * Caller must hold pin and lock on buffer.
3253  */
3254 static void
3255 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3256 {
3257         Page            page = BufferGetPage(buffer);
3258         int                     i;
3259
3260         /* There shouldn't be any tuples moved onto the page yet! */
3261         Assert(vacpage->offsets_used == 0);
3262
3263         START_CRIT_SECTION();
3264
3265         for (i = 0; i < vacpage->offsets_free; i++)
3266         {
3267                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3268
3269                 ItemIdSetUnused(itemid);
3270         }
3271
3272         PageRepairFragmentation(page);
3273
3274         MarkBufferDirty(buffer);
3275
3276         /* XLOG stuff */
3277         if (!onerel->rd_istemp)
3278         {
3279                 XLogRecPtr      recptr;
3280
3281                 recptr = log_heap_clean(onerel, buffer,
3282                                                                 NULL, 0, NULL, 0,
3283                                                                 vacpage->offsets, vacpage->offsets_free,
3284                                                                 false);
3285                 PageSetLSN(page, recptr);
3286                 PageSetTLI(page, ThisTimeLineID);
3287         }
3288
3289         END_CRIT_SECTION();
3290 }
3291
3292 /*
3293  *      scan_index() -- scan one index relation to update pg_class statistics.
3294  *
3295  * We use this when we have no deletions to do.
3296  */
3297 static void
3298 scan_index(Relation indrel, double num_tuples)
3299 {
3300         IndexBulkDeleteResult *stats;
3301         IndexVacuumInfo ivinfo;
3302         PGRUsage        ru0;
3303
3304         pg_rusage_init(&ru0);
3305
3306         ivinfo.index = indrel;
3307         ivinfo.vacuum_full = true;
3308         ivinfo.message_level = elevel;
3309         ivinfo.num_heap_tuples = num_tuples;
3310         ivinfo.strategy = vac_strategy;
3311
3312         stats = index_vacuum_cleanup(&ivinfo, NULL);
3313
3314         if (!stats)
3315                 return;
3316
3317         /* now update statistics in pg_class */
3318         vac_update_relstats(RelationGetRelid(indrel),
3319                                                 stats->num_pages, stats->num_index_tuples,
3320                                                 false, InvalidTransactionId);
3321
3322         ereport(elevel,
3323                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3324                                         RelationGetRelationName(indrel),
3325                                         stats->num_index_tuples,
3326                                         stats->num_pages),
3327         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3328                           "%s.",
3329                           stats->pages_deleted, stats->pages_free,
3330                           pg_rusage_show(&ru0))));
3331
3332         /*
3333          * Check for tuple count mismatch.      If the index is partial, then it's OK
3334          * for it to have fewer tuples than the heap; else we got trouble.
3335          */
3336         if (stats->num_index_tuples != num_tuples)
3337         {
3338                 if (stats->num_index_tuples > num_tuples ||
3339                         !vac_is_partial_index(indrel))
3340                         ereport(WARNING,
3341                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3342                                                         RelationGetRelationName(indrel),
3343                                                         stats->num_index_tuples, num_tuples),
3344                                          errhint("Rebuild the index with REINDEX.")));
3345         }
3346
3347         pfree(stats);
3348 }
3349
3350 /*
3351  *      vacuum_index() -- vacuum one index relation.
3352  *
3353  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3354  *              It's locked. Indrel is an index relation on the vacuumed heap.
3355  *
3356  *              We don't bother to set locks on the index relation here, since
3357  *              the parent table is exclusive-locked already.
3358  *
3359  *              Finally, we arrange to update the index relation's statistics in
3360  *              pg_class.
3361  */
3362 static void
3363 vacuum_index(VacPageList vacpagelist, Relation indrel,
3364                          double num_tuples, int keep_tuples)
3365 {
3366         IndexBulkDeleteResult *stats;
3367         IndexVacuumInfo ivinfo;
3368         PGRUsage        ru0;
3369
3370         pg_rusage_init(&ru0);
3371
3372         ivinfo.index = indrel;
3373         ivinfo.vacuum_full = true;
3374         ivinfo.message_level = elevel;
3375         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3376         ivinfo.strategy = vac_strategy;
3377
3378         /* Do bulk deletion */
3379         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3380
3381         /* Do post-VACUUM cleanup */
3382         stats = index_vacuum_cleanup(&ivinfo, stats);
3383
3384         if (!stats)
3385                 return;
3386
3387         /* now update statistics in pg_class */
3388         vac_update_relstats(RelationGetRelid(indrel),
3389                                                 stats->num_pages, stats->num_index_tuples,
3390                                                 false, InvalidTransactionId);
3391
3392         ereport(elevel,
3393                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3394                                         RelationGetRelationName(indrel),
3395                                         stats->num_index_tuples,
3396                                         stats->num_pages),
3397                          errdetail("%.0f index row versions were removed.\n"
3398                          "%u index pages have been deleted, %u are currently reusable.\n"
3399                                            "%s.",
3400                                            stats->tuples_removed,
3401                                            stats->pages_deleted, stats->pages_free,
3402                                            pg_rusage_show(&ru0))));
3403
3404         /*
3405          * Check for tuple count mismatch.      If the index is partial, then it's OK
3406          * for it to have fewer tuples than the heap; else we got trouble.
3407          */
3408         if (stats->num_index_tuples != num_tuples + keep_tuples)
3409         {
3410                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3411                         !vac_is_partial_index(indrel))
3412                         ereport(WARNING,
3413                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3414                                                         RelationGetRelationName(indrel),
3415                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3416                                          errhint("Rebuild the index with REINDEX.")));
3417         }
3418
3419         pfree(stats);
3420 }
3421
3422 /*
3423  *      tid_reaped() -- is a particular tid reaped?
3424  *
3425  *              This has the right signature to be an IndexBulkDeleteCallback.
3426  *
3427  *              vacpagelist->VacPage_array is sorted in right order.
3428  */
3429 static bool
3430 tid_reaped(ItemPointer itemptr, void *state)
3431 {
3432         VacPageList vacpagelist = (VacPageList) state;
3433         OffsetNumber ioffno;
3434         OffsetNumber *voff;
3435         VacPage         vp,
3436                            *vpp;
3437         VacPageData vacpage;
3438
3439         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3440         ioffno = ItemPointerGetOffsetNumber(itemptr);
3441
3442         vp = &vacpage;
3443         vpp = (VacPage *) vac_bsearch((void *) &vp,
3444                                                                   (void *) (vacpagelist->pagedesc),
3445                                                                   vacpagelist->num_pages,
3446                                                                   sizeof(VacPage),
3447                                                                   vac_cmp_blk);
3448
3449         if (vpp == NULL)
3450                 return false;
3451
3452         /* ok - we are on a partially or fully reaped page */
3453         vp = *vpp;
3454
3455         if (vp->offsets_free == 0)
3456         {
3457                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3458                 return true;
3459         }
3460
3461         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3462                                                                                 (void *) (vp->offsets),
3463                                                                                 vp->offsets_free,
3464                                                                                 sizeof(OffsetNumber),
3465                                                                                 vac_cmp_offno);
3466
3467         if (voff == NULL)
3468                 return false;
3469
3470         /* tid is reaped */
3471         return true;
3472 }
3473
3474 /*
3475  * Update the Free Space Map with the info we now have about free space in
3476  * the relation.
3477  */
3478 static void
3479 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3480                            BlockNumber rel_pages)
3481 {
3482         int                     nPages = fraged_pages->num_pages;
3483         VacPage    *pagedesc = fraged_pages->pagedesc;
3484         int                     i;
3485
3486         for (i = 0; i < nPages; i++)
3487         {
3488                 /*
3489                  * fraged_pages may contain entries for pages that we later decided to
3490                  * truncate from the relation; don't enter them into the free space
3491                  * map!
3492                  */
3493                 if (pagedesc[i]->blkno >= rel_pages)
3494                         break;
3495
3496                 RecordPageWithFreeSpace(onerel, pagedesc[i]->blkno, pagedesc[i]->free);
3497         }
3498
3499 }
3500
3501 /* Copy a VacPage structure */
3502 static VacPage
3503 copy_vac_page(VacPage vacpage)
3504 {
3505         VacPage         newvacpage;
3506
3507         /* allocate a VacPageData entry */
3508         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3509                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3510
3511         /* fill it in */
3512         if (vacpage->offsets_free > 0)
3513                 memcpy(newvacpage->offsets, vacpage->offsets,
3514                            vacpage->offsets_free * sizeof(OffsetNumber));
3515         newvacpage->blkno = vacpage->blkno;
3516         newvacpage->free = vacpage->free;
3517         newvacpage->offsets_used = vacpage->offsets_used;
3518         newvacpage->offsets_free = vacpage->offsets_free;
3519
3520         return newvacpage;
3521 }
3522
3523 /*
3524  * Add a VacPage pointer to a VacPageList.
3525  *
3526  *              As a side effect of the way that scan_heap works,
3527  *              higher pages come after lower pages in the array
3528  *              (and highest tid on a page is last).
3529  */
3530 static void
3531 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3532 {
3533 #define PG_NPAGEDESC 1024
3534
3535         /* allocate a VacPage entry if needed */
3536         if (vacpagelist->num_pages == 0)
3537         {
3538                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3539                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3540         }
3541         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3542         {
3543                 vacpagelist->num_allocated_pages *= 2;
3544                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3545         }
3546         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3547         (vacpagelist->num_pages)++;
3548 }
3549
3550 /*
3551  * vac_bsearch: just like standard C library routine bsearch(),
3552  * except that we first test to see whether the target key is outside
3553  * the range of the table entries.      This case is handled relatively slowly
3554  * by the normal binary search algorithm (ie, no faster than any other key)
3555  * but it occurs often enough in VACUUM to be worth optimizing.
3556  */
3557 static void *
3558 vac_bsearch(const void *key, const void *base,
3559                         size_t nelem, size_t size,
3560                         int (*compar) (const void *, const void *))
3561 {
3562         int                     res;
3563         const void *last;
3564
3565         if (nelem == 0)
3566                 return NULL;
3567         res = compar(key, base);
3568         if (res < 0)
3569                 return NULL;
3570         if (res == 0)
3571                 return (void *) base;
3572         if (nelem > 1)
3573         {
3574                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3575                 res = compar(key, last);
3576                 if (res > 0)
3577                         return NULL;
3578                 if (res == 0)
3579                         return (void *) last;
3580         }
3581         if (nelem <= 2)
3582                 return NULL;                    /* already checked 'em all */
3583         return bsearch(key, base, nelem, size, compar);
3584 }
3585
3586 /*
3587  * Comparator routines for use with qsort() and bsearch().
3588  */
3589 static int
3590 vac_cmp_blk(const void *left, const void *right)
3591 {
3592         BlockNumber lblk,
3593                                 rblk;
3594
3595         lblk = (*((VacPage *) left))->blkno;
3596         rblk = (*((VacPage *) right))->blkno;
3597
3598         if (lblk < rblk)
3599                 return -1;
3600         if (lblk == rblk)
3601                 return 0;
3602         return 1;
3603 }
3604
3605 static int
3606 vac_cmp_offno(const void *left, const void *right)
3607 {
3608         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3609                 return -1;
3610         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3611                 return 0;
3612         return 1;
3613 }
3614
3615 static int
3616 vac_cmp_vtlinks(const void *left, const void *right)
3617 {
3618         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3619                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3620                 return -1;
3621         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3622                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3623                 return 1;
3624         /* bi_hi-es are equal */
3625         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3626                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3627                 return -1;
3628         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3629                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3630                 return 1;
3631         /* bi_lo-es are equal */
3632         if (((VTupleLink) left)->new_tid.ip_posid <
3633                 ((VTupleLink) right)->new_tid.ip_posid)
3634                 return -1;
3635         if (((VTupleLink) left)->new_tid.ip_posid >
3636                 ((VTupleLink) right)->new_tid.ip_posid)
3637                 return 1;
3638         return 0;
3639 }
3640
3641
3642 /*
3643  * Open all the indexes of the given relation, obtaining the specified kind
3644  * of lock on each.  Return an array of Relation pointers for the indexes
3645  * into *Irel, and the number of indexes into *nindexes.
3646  */
3647 void
3648 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3649                                  int *nindexes, Relation **Irel)
3650 {
3651         List       *indexoidlist;
3652         ListCell   *indexoidscan;
3653         int                     i;
3654
3655         Assert(lockmode != NoLock);
3656
3657         indexoidlist = RelationGetIndexList(relation);
3658
3659         *nindexes = list_length(indexoidlist);
3660
3661         if (*nindexes > 0)
3662                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3663         else
3664                 *Irel = NULL;
3665
3666         i = 0;
3667         foreach(indexoidscan, indexoidlist)
3668         {
3669                 Oid                     indexoid = lfirst_oid(indexoidscan);
3670
3671                 (*Irel)[i++] = index_open(indexoid, lockmode);
3672         }
3673
3674         list_free(indexoidlist);
3675 }
3676
3677 /*
3678  * Release the resources acquired by vac_open_indexes.  Optionally release
3679  * the locks (say NoLock to keep 'em).
3680  */
3681 void
3682 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3683 {
3684         if (Irel == NULL)
3685                 return;
3686
3687         while (nindexes--)
3688         {
3689                 Relation        ind = Irel[nindexes];
3690
3691                 index_close(ind, lockmode);
3692         }
3693         pfree(Irel);
3694 }
3695
3696
3697 /*
3698  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3699  */
3700 bool
3701 vac_is_partial_index(Relation indrel)
3702 {
3703         /*
3704          * If the index's AM doesn't support nulls, it's partial for our purposes
3705          */
3706         if (!indrel->rd_am->amindexnulls)
3707                 return true;
3708
3709         /* Otherwise, look to see if there's a partial-index predicate */
3710         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3711                 return true;
3712
3713         return false;
3714 }
3715
3716
3717 static bool
3718 enough_space(VacPage vacpage, Size len)
3719 {
3720         len = MAXALIGN(len);
3721
3722         if (len > vacpage->free)
3723                 return false;
3724
3725         /* if there are free itemid(s) and len <= free_space... */
3726         if (vacpage->offsets_used < vacpage->offsets_free)
3727                 return true;
3728
3729         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3730         if (len + sizeof(ItemIdData) <= vacpage->free)
3731                 return true;
3732
3733         return false;
3734 }
3735
3736 static Size
3737 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3738 {
3739         /*
3740          * It is correct to use PageGetExactFreeSpace() here, *not*
3741          * PageGetHeapFreeSpace().  This is because (a) we do our own, exact
3742          * accounting for whether line pointers must be added, and (b) we will
3743          * recycle any LP_DEAD line pointers before starting to add rows to a
3744          * page, but that may not have happened yet at the time this function is
3745          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3746          * against too many line pointers on a page could fire incorrectly.  We do
3747          * not need that protection here: since VACUUM FULL always recycles all
3748          * dead line pointers first, it'd be physically impossible to insert more
3749          * than MaxHeapTuplesPerPage tuples anyway.
3750          */
3751         Size            freespace = PageGetExactFreeSpace(page);
3752         Size            targetfree;
3753
3754         targetfree = RelationGetTargetPageFreeSpace(relation,
3755                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3756         if (freespace > targetfree)
3757                 return freespace - targetfree;
3758         else
3759                 return 0;
3760 }
3761
3762 /*
3763  * vacuum_delay_point --- check for interrupts and cost-based delay.
3764  *
3765  * This should be called in each major loop of VACUUM processing,
3766  * typically once per page processed.
3767  */
3768 void
3769 vacuum_delay_point(void)
3770 {
3771         /* Always check for interrupts */
3772         CHECK_FOR_INTERRUPTS();
3773
3774         /* Nap if appropriate */
3775         if (VacuumCostActive && !InterruptPending &&
3776                 VacuumCostBalance >= VacuumCostLimit)
3777         {
3778                 int                     msec;
3779
3780                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3781                 if (msec > VacuumCostDelay * 4)
3782                         msec = VacuumCostDelay * 4;
3783
3784                 pg_usleep(msec * 1000L);
3785
3786                 VacuumCostBalance = 0;
3787
3788                 /* update balance values for workers */
3789                 AutoVacuumUpdateDelay();
3790
3791                 /* Might have gotten an interrupt while sleeping */
3792                 CHECK_FOR_INTERRUPTS();
3793         }
3794 }