]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Move the HTSU_Result enum definition into snapshot.h, to avoid including
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.371 2008/03/26 21:10:38 alvherre Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_namespace.h"
34 #include "commands/dbcommands.h"
35 #include "commands/vacuum.h"
36 #include "executor/executor.h"
37 #include "miscadmin.h"
38 #include "pgstat.h"
39 #include "postmaster/autovacuum.h"
40 #include "storage/freespace.h"
41 #include "storage/proc.h"
42 #include "storage/procarray.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/flatfiles.h"
46 #include "utils/fmgroids.h"
47 #include "utils/inval.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/pg_rusage.h"
51 #include "utils/relcache.h"
52 #include "utils/snapmgr.h"
53 #include "utils/syscache.h"
54 #include "utils/tqual.h"
55
56
57 /*
58  * GUC parameters
59  */
60 int                     vacuum_freeze_min_age;
61
62 /*
63  * VacPage structures keep track of each page on which we find useful
64  * amounts of free space.
65  */
66 typedef struct VacPageData
67 {
68         BlockNumber blkno;                      /* BlockNumber of this Page */
69         Size            free;                   /* FreeSpace on this Page */
70         uint16          offsets_used;   /* Number of OffNums used by vacuum */
71         uint16          offsets_free;   /* Number of OffNums free or to be free */
72         OffsetNumber offsets[1];        /* Array of free OffNums */
73 } VacPageData;
74
75 typedef VacPageData *VacPage;
76
77 typedef struct VacPageListData
78 {
79         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
80         int                     num_pages;              /* Number of pages in pagedesc */
81         int                     num_allocated_pages;    /* Number of allocated pages in
82                                                                                  * pagedesc */
83         VacPage    *pagedesc;           /* Descriptions of pages */
84 } VacPageListData;
85
86 typedef VacPageListData *VacPageList;
87
88 /*
89  * The "vtlinks" array keeps information about each recently-updated tuple
90  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
91  * We store the tuple's own TID as well as its t_ctid (its link to the next
92  * newer tuple version).  Searching in this array allows us to follow update
93  * chains backwards from newer to older tuples.  When we move a member of an
94  * update chain, we must move *all* the live members of the chain, so that we
95  * can maintain their t_ctid link relationships (we must not just overwrite
96  * t_ctid in an existing tuple).
97  *
98  * Note: because t_ctid links can be stale (this would only occur if a prior
99  * VACUUM crashed partway through), it is possible that new_tid points to an
100  * empty slot or unrelated tuple.  We have to check the linkage as we follow
101  * it, just as is done in EvalPlanQual.
102  */
103 typedef struct VTupleLinkData
104 {
105         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
106         ItemPointerData this_tid;       /* t_self of the tuple */
107 } VTupleLinkData;
108
109 typedef VTupleLinkData *VTupleLink;
110
111 /*
112  * We use an array of VTupleMoveData to plan a chain tuple move fully
113  * before we do it.
114  */
115 typedef struct VTupleMoveData
116 {
117         ItemPointerData tid;            /* tuple ID */
118         VacPage         vacpage;                /* where to move it to */
119         bool            cleanVpd;               /* clean vacpage before using? */
120 } VTupleMoveData;
121
122 typedef VTupleMoveData *VTupleMove;
123
124 /*
125  * VRelStats contains the data acquired by scan_heap for use later
126  */
127 typedef struct VRelStats
128 {
129         /* miscellaneous statistics */
130         BlockNumber rel_pages;          /* pages in relation */
131         double          rel_tuples;             /* tuples that remain after vacuuming */
132         double          rel_indexed_tuples;             /* indexed tuples that remain */
133         Size            min_tlen;               /* min surviving tuple size */
134         Size            max_tlen;               /* max surviving tuple size */
135         bool            hasindex;
136         /* vtlinks array for tuple chain following - sorted by new_tid */
137         int                     num_vtlinks;
138         VTupleLink      vtlinks;
139 } VRelStats;
140
141 /*----------------------------------------------------------------------
142  * ExecContext:
143  *
144  * As these variables always appear together, we put them into one struct
145  * and pull initialization and cleanup into separate routines.
146  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
147  * accurately:  It is *used* only in move_xxx_tuple(), but because this
148  * routine is called many times, we initialize the struct just once in
149  * repair_frag() and pass it on to move_xxx_tuple().
150  */
151 typedef struct ExecContextData
152 {
153         ResultRelInfo *resultRelInfo;
154         EState     *estate;
155         TupleTableSlot *slot;
156 } ExecContextData;
157
158 typedef ExecContextData *ExecContext;
159
160 static void
161 ExecContext_Init(ExecContext ec, Relation rel)
162 {
163         TupleDesc       tupdesc = RelationGetDescr(rel);
164
165         /*
166          * We need a ResultRelInfo and an EState so we can use the regular
167          * executor's index-entry-making machinery.
168          */
169         ec->estate = CreateExecutorState();
170
171         ec->resultRelInfo = makeNode(ResultRelInfo);
172         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
173         ec->resultRelInfo->ri_RelationDesc = rel;
174         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
175
176         ExecOpenIndices(ec->resultRelInfo);
177
178         ec->estate->es_result_relations = ec->resultRelInfo;
179         ec->estate->es_num_result_relations = 1;
180         ec->estate->es_result_relation_info = ec->resultRelInfo;
181
182         /* Set up a tuple slot too */
183         ec->slot = MakeSingleTupleTableSlot(tupdesc);
184 }
185
186 static void
187 ExecContext_Finish(ExecContext ec)
188 {
189         ExecDropSingleTupleTableSlot(ec->slot);
190         ExecCloseIndices(ec->resultRelInfo);
191         FreeExecutorState(ec->estate);
192 }
193
194 /*
195  * End of ExecContext Implementation
196  *----------------------------------------------------------------------
197  */
198
199 /* A few variables that don't seem worth passing around as parameters */
200 static MemoryContext vac_context = NULL;
201
202 static int      elevel = -1;
203
204 static TransactionId OldestXmin;
205 static TransactionId FreezeLimit;
206
207 static BufferAccessStrategy vac_strategy;
208
209
210 /* non-export function prototypes */
211 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
212                          const char *stmttype);
213 static void vac_truncate_clog(TransactionId frozenXID);
214 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
215                                            bool for_wraparound);
216 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
217 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
218                   VacPageList vacuum_pages, VacPageList fraged_pages);
219 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
220                         VacPageList vacuum_pages, VacPageList fraged_pages,
221                         int nindexes, Relation *Irel);
222 static void move_chain_tuple(Relation rel,
223                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
224                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
225                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
226 static void move_plain_tuple(Relation rel,
227                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
228                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
229                                  ExecContext ec);
230 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
231                                  int num_fraged_pages, BlockNumber last_move_dest_block,
232                                  int num_moved);
233 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
234                         VacPageList vacpagelist);
235 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
236 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
237                          double num_tuples, int keep_tuples);
238 static void scan_index(Relation indrel, double num_tuples);
239 static bool tid_reaped(ItemPointer itemptr, void *state);
240 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
241                            BlockNumber rel_pages);
242 static VacPage copy_vac_page(VacPage vacpage);
243 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
244 static void *vac_bsearch(const void *key, const void *base,
245                         size_t nelem, size_t size,
246                         int (*compar) (const void *, const void *));
247 static int      vac_cmp_blk(const void *left, const void *right);
248 static int      vac_cmp_offno(const void *left, const void *right);
249 static int      vac_cmp_vtlinks(const void *left, const void *right);
250 static bool enough_space(VacPage vacpage, Size len);
251 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
252
253
254 /****************************************************************************
255  *                                                                                                                                                      *
256  *                      Code common to all flavors of VACUUM and ANALYZE                                *
257  *                                                                                                                                                      *
258  ****************************************************************************
259  */
260
261
262 /*
263  * Primary entry point for VACUUM and ANALYZE commands.
264  *
265  * relids is normally NIL; if it is not, then it provides the list of
266  * relation OIDs to be processed, and vacstmt->relation is ignored.
267  * (The non-NIL case is currently only used by autovacuum.)
268  *
269  * for_wraparound is used by autovacuum to let us know when it's forcing
270  * a vacuum for wraparound, which should not be auto-cancelled.
271  *
272  * bstrategy is normally given as NULL, but in autovacuum it can be passed
273  * in to use the same buffer strategy object across multiple vacuum() calls.
274  *
275  * isTopLevel should be passed down from ProcessUtility.
276  *
277  * It is the caller's responsibility that vacstmt, relids, and bstrategy
278  * (if given) be allocated in a memory context that won't disappear
279  * at transaction commit.
280  */
281 void
282 vacuum(VacuumStmt *vacstmt, List *relids,
283            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
284 {
285         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
286         volatile MemoryContext anl_context = NULL;
287         volatile bool all_rels,
288                                 in_outer_xact,
289                                 use_own_xacts;
290         List       *relations;
291
292         if (vacstmt->verbose)
293                 elevel = INFO;
294         else
295                 elevel = DEBUG2;
296
297         /*
298          * We cannot run VACUUM inside a user transaction block; if we were inside
299          * a transaction, then our commit- and start-transaction-command calls
300          * would not have the intended effect! Furthermore, the forced commit that
301          * occurs before truncating the relation's file would have the effect of
302          * committing the rest of the user's transaction too, which would
303          * certainly not be the desired behavior.  (This only applies to VACUUM
304          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
305          * block, but we choose to disallow that case because we'd rather commit
306          * as soon as possible after finishing the vacuum.      This is mainly so that
307          * we can let go the AccessExclusiveLock that we may be holding.)
308          *
309          * ANALYZE (without VACUUM) can run either way.
310          */
311         if (vacstmt->vacuum)
312         {
313                 PreventTransactionChain(isTopLevel, stmttype);
314                 in_outer_xact = false;
315         }
316         else
317                 in_outer_xact = IsInTransactionChain(isTopLevel);
318
319         /*
320          * Send info about dead objects to the statistics collector, unless we are
321          * in autovacuum --- autovacuum.c does this for itself.
322          */
323         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
324                 pgstat_vacuum_tabstat();
325
326         /*
327          * Create special memory context for cross-transaction storage.
328          *
329          * Since it is a child of PortalContext, it will go away eventually even
330          * if we suffer an error; there's no need for special abort cleanup logic.
331          */
332         vac_context = AllocSetContextCreate(PortalContext,
333                                                                                 "Vacuum",
334                                                                                 ALLOCSET_DEFAULT_MINSIZE,
335                                                                                 ALLOCSET_DEFAULT_INITSIZE,
336                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
337
338         /*
339          * If caller didn't give us a buffer strategy object, make one in the
340          * cross-transaction memory context.
341          */
342         if (bstrategy == NULL)
343         {
344                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
345
346                 bstrategy = GetAccessStrategy(BAS_VACUUM);
347                 MemoryContextSwitchTo(old_context);
348         }
349         vac_strategy = bstrategy;
350
351         /* Remember whether we are processing everything in the DB */
352         all_rels = (relids == NIL && vacstmt->relation == NULL);
353
354         /*
355          * Build list of relations to process, unless caller gave us one. (If we
356          * build one, we put it in vac_context for safekeeping.)
357          */
358         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
359
360         /*
361          * Decide whether we need to start/commit our own transactions.
362          *
363          * For VACUUM (with or without ANALYZE): always do so, so that we can
364          * release locks as soon as possible.  (We could possibly use the outer
365          * transaction for a one-table VACUUM, but handling TOAST tables would be
366          * problematic.)
367          *
368          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
369          * start/commit our own transactions.  Also, there's no need to do so if
370          * only processing one relation.  For multiple relations when not within a
371          * transaction block, and also in an autovacuum worker, use own
372          * transactions so we can release locks sooner.
373          */
374         if (vacstmt->vacuum)
375                 use_own_xacts = true;
376         else
377         {
378                 Assert(vacstmt->analyze);
379                 if (IsAutoVacuumWorkerProcess())
380                         use_own_xacts = true;
381                 else if (in_outer_xact)
382                         use_own_xacts = false;
383                 else if (list_length(relations) > 1)
384                         use_own_xacts = true;
385                 else
386                         use_own_xacts = false;
387         }
388
389         /*
390          * If we are running ANALYZE without per-table transactions, we'll need a
391          * memory context with table lifetime.
392          */
393         if (!use_own_xacts)
394                 anl_context = AllocSetContextCreate(PortalContext,
395                                                                                         "Analyze",
396                                                                                         ALLOCSET_DEFAULT_MINSIZE,
397                                                                                         ALLOCSET_DEFAULT_INITSIZE,
398                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
399
400         /*
401          * vacuum_rel expects to be entered with no transaction active; it will
402          * start and commit its own transaction.  But we are called by an SQL
403          * command, and so we are executing inside a transaction already. We
404          * commit the transaction started in PostgresMain() here, and start
405          * another one before exiting to match the commit waiting for us back in
406          * PostgresMain().
407          */
408         if (use_own_xacts)
409         {
410                 /* matches the StartTransaction in PostgresMain() */
411                 CommitTransactionCommand();
412         }
413
414         /* Turn vacuum cost accounting on or off */
415         PG_TRY();
416         {
417                 ListCell   *cur;
418
419                 VacuumCostActive = (VacuumCostDelay > 0);
420                 VacuumCostBalance = 0;
421
422                 /*
423                  * Loop to process each selected relation.
424                  */
425                 foreach(cur, relations)
426                 {
427                         Oid                     relid = lfirst_oid(cur);
428
429                         if (vacstmt->vacuum)
430                                 vacuum_rel(relid, vacstmt, RELKIND_RELATION, for_wraparound);
431
432                         if (vacstmt->analyze)
433                         {
434                                 MemoryContext old_context = NULL;
435
436                                 /*
437                                  * If using separate xacts, start one for analyze. Otherwise,
438                                  * we can use the outer transaction, but we still need to call
439                                  * analyze_rel in a memory context that will be cleaned up on
440                                  * return (else we leak memory while processing multiple
441                                  * tables).
442                                  */
443                                 if (use_own_xacts)
444                                 {
445                                         StartTransactionCommand();
446                                         /* functions in indexes may want a snapshot set */
447                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
448                                 }
449                                 else
450                                         old_context = MemoryContextSwitchTo(anl_context);
451
452                                 analyze_rel(relid, vacstmt, vac_strategy);
453
454                                 if (use_own_xacts)
455                                         CommitTransactionCommand();
456                                 else
457                                 {
458                                         MemoryContextSwitchTo(old_context);
459                                         MemoryContextResetAndDeleteChildren(anl_context);
460                                 }
461                         }
462                 }
463         }
464         PG_CATCH();
465         {
466                 /* Make sure cost accounting is turned off after error */
467                 VacuumCostActive = false;
468                 PG_RE_THROW();
469         }
470         PG_END_TRY();
471
472         /* Turn off vacuum cost accounting */
473         VacuumCostActive = false;
474
475         /*
476          * Finish up processing.
477          */
478         if (use_own_xacts)
479         {
480                 /* here, we are not in a transaction */
481
482                 /*
483                  * This matches the CommitTransaction waiting for us in
484                  * PostgresMain().
485                  */
486                 StartTransactionCommand();
487         }
488
489         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
490         {
491                 /*
492                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
493                  * (autovacuum.c does this for itself.)
494                  */
495                 vac_update_datfrozenxid();
496
497                 /*
498                  * If it was a database-wide VACUUM, print FSM usage statistics (we
499                  * don't make you be superuser to see these).  We suppress this in
500                  * autovacuum, too.
501                  */
502                 if (all_rels)
503                         PrintFreeSpaceMapStatistics(elevel);
504         }
505
506         /*
507          * Clean up working storage --- note we must do this after
508          * StartTransactionCommand, else we might be trying to delete the active
509          * context!
510          */
511         MemoryContextDelete(vac_context);
512         vac_context = NULL;
513
514         if (anl_context)
515                 MemoryContextDelete(anl_context);
516 }
517
518 /*
519  * Build a list of Oids for each relation to be processed
520  *
521  * The list is built in vac_context so that it will survive across our
522  * per-relation transactions.
523  */
524 static List *
525 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
526 {
527         List       *oid_list = NIL;
528         MemoryContext oldcontext;
529
530         /* List supplied by VACUUM's caller? */
531         if (relids)
532                 return relids;
533
534         if (vacrel)
535         {
536                 /* Process a specific relation */
537                 Oid                     relid;
538
539                 relid = RangeVarGetRelid(vacrel, false);
540
541                 /* Make a relation list entry for this guy */
542                 oldcontext = MemoryContextSwitchTo(vac_context);
543                 oid_list = lappend_oid(oid_list, relid);
544                 MemoryContextSwitchTo(oldcontext);
545         }
546         else
547         {
548                 /* Process all plain relations listed in pg_class */
549                 Relation        pgclass;
550                 HeapScanDesc scan;
551                 HeapTuple       tuple;
552                 ScanKeyData key;
553
554                 ScanKeyInit(&key,
555                                         Anum_pg_class_relkind,
556                                         BTEqualStrategyNumber, F_CHAREQ,
557                                         CharGetDatum(RELKIND_RELATION));
558
559                 pgclass = heap_open(RelationRelationId, AccessShareLock);
560
561                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
562
563                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
564                 {
565                         /* Make a relation list entry for this guy */
566                         oldcontext = MemoryContextSwitchTo(vac_context);
567                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
568                         MemoryContextSwitchTo(oldcontext);
569                 }
570
571                 heap_endscan(scan);
572                 heap_close(pgclass, AccessShareLock);
573         }
574
575         return oid_list;
576 }
577
578 /*
579  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
580  */
581 void
582 vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
583                                           TransactionId *oldestXmin,
584                                           TransactionId *freezeLimit)
585 {
586         int                     freezemin;
587         TransactionId limit;
588         TransactionId safeLimit;
589
590         /*
591          * We can always ignore processes running lazy vacuum.  This is because we
592          * use these values only for deciding which tuples we must keep in the
593          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
594          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
595          * a full vacuum, but keep in mind that only one vacuum process can be
596          * working on a particular table at any time, and that each vacuum is
597          * always an independent transaction.
598          */
599         *oldestXmin = GetOldestXmin(sharedRel, true);
600
601         Assert(TransactionIdIsNormal(*oldestXmin));
602
603         /*
604          * Determine the minimum freeze age to use: as specified by the caller, or
605          * vacuum_freeze_min_age, but in any case not more than half
606          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
607          * wraparound won't occur too frequently.
608          */
609         freezemin = freeze_min_age;
610         if (freezemin < 0)
611                 freezemin = vacuum_freeze_min_age;
612         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
613         Assert(freezemin >= 0);
614
615         /*
616          * Compute the cutoff XID, being careful not to generate a "permanent" XID
617          */
618         limit = *oldestXmin - freezemin;
619         if (!TransactionIdIsNormal(limit))
620                 limit = FirstNormalTransactionId;
621
622         /*
623          * If oldestXmin is very far back (in practice, more than
624          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
625          * freeze age of zero.
626          */
627         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
628         if (!TransactionIdIsNormal(safeLimit))
629                 safeLimit = FirstNormalTransactionId;
630
631         if (TransactionIdPrecedes(limit, safeLimit))
632         {
633                 ereport(WARNING,
634                                 (errmsg("oldest xmin is far in the past"),
635                                  errhint("Close open transactions soon to avoid wraparound problems.")));
636                 limit = *oldestXmin;
637         }
638
639         *freezeLimit = limit;
640 }
641
642
643 /*
644  *      vac_update_relstats() -- update statistics for one relation
645  *
646  *              Update the whole-relation statistics that are kept in its pg_class
647  *              row.  There are additional stats that will be updated if we are
648  *              doing ANALYZE, but we always update these stats.  This routine works
649  *              for both index and heap relation entries in pg_class.
650  *
651  *              We violate transaction semantics here by overwriting the rel's
652  *              existing pg_class tuple with the new values.  This is reasonably
653  *              safe since the new values are correct whether or not this transaction
654  *              commits.  The reason for this is that if we updated these tuples in
655  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
656  *              by the time we got done with a vacuum cycle, most of the tuples in
657  *              pg_class would've been obsoleted.  Of course, this only works for
658  *              fixed-size never-null columns, but these are.
659  *
660  *              Another reason for doing it this way is that when we are in a lazy
661  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
662  *              somebody vacuuming pg_class might think they could delete a tuple
663  *              marked with xmin = our xid.
664  *
665  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
666  *              ANALYZE.
667  */
668 void
669 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
670                                         bool hasindex, TransactionId frozenxid)
671 {
672         Relation        rd;
673         HeapTuple       ctup;
674         Form_pg_class pgcform;
675         bool            dirty;
676
677         rd = heap_open(RelationRelationId, RowExclusiveLock);
678
679         /* Fetch a copy of the tuple to scribble on */
680         ctup = SearchSysCacheCopy(RELOID,
681                                                           ObjectIdGetDatum(relid),
682                                                           0, 0, 0);
683         if (!HeapTupleIsValid(ctup))
684                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
685                          relid);
686         pgcform = (Form_pg_class) GETSTRUCT(ctup);
687
688         /* Apply required updates, if any, to copied tuple */
689
690         dirty = false;
691         if (pgcform->relpages != (int32) num_pages)
692         {
693                 pgcform->relpages = (int32) num_pages;
694                 dirty = true;
695         }
696         if (pgcform->reltuples != (float4) num_tuples)
697         {
698                 pgcform->reltuples = (float4) num_tuples;
699                 dirty = true;
700         }
701         if (pgcform->relhasindex != hasindex)
702         {
703                 pgcform->relhasindex = hasindex;
704                 dirty = true;
705         }
706
707         /*
708          * If we have discovered that there are no indexes, then there's no
709          * primary key either.  This could be done more thoroughly...
710          */
711         if (!hasindex)
712         {
713                 if (pgcform->relhaspkey)
714                 {
715                         pgcform->relhaspkey = false;
716                         dirty = true;
717                 }
718         }
719
720         /*
721          * relfrozenxid should never go backward.  Caller can pass
722          * InvalidTransactionId if it has no new data.
723          */
724         if (TransactionIdIsNormal(frozenxid) &&
725                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
726         {
727                 pgcform->relfrozenxid = frozenxid;
728                 dirty = true;
729         }
730
731         /*
732          * If anything changed, write out the tuple.  Even if nothing changed,
733          * force relcache invalidation so all backends reset their rd_targblock
734          * --- otherwise it might point to a page we truncated away.
735          */
736         if (dirty)
737         {
738                 heap_inplace_update(rd, ctup);
739                 /* the above sends a cache inval message */
740         }
741         else
742         {
743                 /* no need to change tuple, but force relcache inval anyway */
744                 CacheInvalidateRelcacheByTuple(ctup);
745         }
746
747         heap_close(rd, RowExclusiveLock);
748 }
749
750
751 /*
752  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
753  *
754  *              Update pg_database's datfrozenxid entry for our database to be the
755  *              minimum of the pg_class.relfrozenxid values.  If we are able to
756  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
757  *
758  *              We violate transaction semantics here by overwriting the database's
759  *              existing pg_database tuple with the new value.  This is reasonably
760  *              safe since the new value is correct whether or not this transaction
761  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
762  *              behind after a VACUUM.
763  *
764  *              This routine is shared by full and lazy VACUUM.
765  */
766 void
767 vac_update_datfrozenxid(void)
768 {
769         HeapTuple       tuple;
770         Form_pg_database dbform;
771         Relation        relation;
772         SysScanDesc scan;
773         HeapTuple       classTup;
774         TransactionId newFrozenXid;
775         bool            dirty = false;
776
777         /*
778          * Initialize the "min" calculation with RecentGlobalXmin.      Any
779          * not-yet-committed pg_class entries for new tables must have
780          * relfrozenxid at least this high, because any other open xact must have
781          * RecentXmin >= its PGPROC.xmin >= our RecentGlobalXmin; see
782          * AddNewRelationTuple().  So we cannot produce a wrong minimum by
783          * starting with this.
784          */
785         newFrozenXid = RecentGlobalXmin;
786
787         /*
788          * We must seqscan pg_class to find the minimum Xid, because there is no
789          * index that can help us here.
790          */
791         relation = heap_open(RelationRelationId, AccessShareLock);
792
793         scan = systable_beginscan(relation, InvalidOid, false,
794                                                           SnapshotNow, 0, NULL);
795
796         while ((classTup = systable_getnext(scan)) != NULL)
797         {
798                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
799
800                 /*
801                  * Only consider heap and TOAST tables (anything else should have
802                  * InvalidTransactionId in relfrozenxid anyway.)
803                  */
804                 if (classForm->relkind != RELKIND_RELATION &&
805                         classForm->relkind != RELKIND_TOASTVALUE)
806                         continue;
807
808                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
809
810                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
811                         newFrozenXid = classForm->relfrozenxid;
812         }
813
814         /* we're done with pg_class */
815         systable_endscan(scan);
816         heap_close(relation, AccessShareLock);
817
818         Assert(TransactionIdIsNormal(newFrozenXid));
819
820         /* Now fetch the pg_database tuple we need to update. */
821         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
822
823         /* Fetch a copy of the tuple to scribble on */
824         tuple = SearchSysCacheCopy(DATABASEOID,
825                                                            ObjectIdGetDatum(MyDatabaseId),
826                                                            0, 0, 0);
827         if (!HeapTupleIsValid(tuple))
828                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
829         dbform = (Form_pg_database) GETSTRUCT(tuple);
830
831         /*
832          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
833          * and detect the common case where it doesn't go forward either.
834          */
835         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
836         {
837                 dbform->datfrozenxid = newFrozenXid;
838                 dirty = true;
839         }
840
841         if (dirty)
842                 heap_inplace_update(relation, tuple);
843
844         heap_freetuple(tuple);
845         heap_close(relation, RowExclusiveLock);
846
847         /*
848          * If we were able to advance datfrozenxid, mark the flat-file copy of
849          * pg_database for update at commit, and see if we can truncate pg_clog.
850          */
851         if (dirty)
852         {
853                 database_file_update_needed();
854                 vac_truncate_clog(newFrozenXid);
855         }
856 }
857
858
859 /*
860  *      vac_truncate_clog() -- attempt to truncate the commit log
861  *
862  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
863  *              and use it to truncate the transaction commit log (pg_clog).
864  *              Also update the XID wrap limit info maintained by varsup.c.
865  *
866  *              The passed XID is simply the one I just wrote into my pg_database
867  *              entry.  It's used to initialize the "min" calculation.
868  *
869  *              This routine is shared by full and lazy VACUUM.  Note that it's
870  *              only invoked when we've managed to change our DB's datfrozenxid
871  *              entry.
872  */
873 static void
874 vac_truncate_clog(TransactionId frozenXID)
875 {
876         TransactionId myXID = GetCurrentTransactionId();
877         Relation        relation;
878         HeapScanDesc scan;
879         HeapTuple       tuple;
880         NameData        oldest_datname;
881         bool            frozenAlreadyWrapped = false;
882
883         /* init oldest_datname to sync with my frozenXID */
884         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
885
886         /*
887          * Scan pg_database to compute the minimum datfrozenxid
888          *
889          * Note: we need not worry about a race condition with new entries being
890          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
891          * existing DB's datfrozenxid, and that source DB cannot be ours because
892          * of the interlock against copying a DB containing an active backend.
893          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
894          * concurrently modify the datfrozenxid's of different databases, the
895          * worst possible outcome is that pg_clog is not truncated as aggressively
896          * as it could be.
897          */
898         relation = heap_open(DatabaseRelationId, AccessShareLock);
899
900         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
901
902         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
903         {
904                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
905
906                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
907
908                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
909                         frozenAlreadyWrapped = true;
910                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
911                 {
912                         frozenXID = dbform->datfrozenxid;
913                         namecpy(&oldest_datname, &dbform->datname);
914                 }
915         }
916
917         heap_endscan(scan);
918
919         heap_close(relation, AccessShareLock);
920
921         /*
922          * Do not truncate CLOG if we seem to have suffered wraparound already;
923          * the computed minimum XID might be bogus.  This case should now be
924          * impossible due to the defenses in GetNewTransactionId, but we keep the
925          * test anyway.
926          */
927         if (frozenAlreadyWrapped)
928         {
929                 ereport(WARNING,
930                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
931                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
932                 return;
933         }
934
935         /* Truncate CLOG to the oldest frozenxid */
936         TruncateCLOG(frozenXID);
937
938         /*
939          * Update the wrap limit for GetNewTransactionId.  Note: this function
940          * will also signal the postmaster for an(other) autovac cycle if needed.
941          */
942         SetTransactionIdLimit(frozenXID, &oldest_datname);
943 }
944
945
946 /****************************************************************************
947  *                                                                                                                                                      *
948  *                      Code common to both flavors of VACUUM                                                   *
949  *                                                                                                                                                      *
950  ****************************************************************************
951  */
952
953
954 /*
955  *      vacuum_rel() -- vacuum one heap relation
956  *
957  *              Doing one heap at a time incurs extra overhead, since we need to
958  *              check that the heap exists again just before we vacuum it.      The
959  *              reason that we do this is so that vacuuming can be spread across
960  *              many small transactions.  Otherwise, two-phase locking would require
961  *              us to lock the entire database during one pass of the vacuum cleaner.
962  *
963  *              At entry and exit, we are not inside a transaction.
964  */
965 static void
966 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
967                    bool for_wraparound)
968 {
969         LOCKMODE        lmode;
970         Relation        onerel;
971         LockRelId       onerelid;
972         Oid                     toast_relid;
973         Oid                     save_userid;
974         bool            save_secdefcxt;
975
976         /* Begin a transaction for vacuuming this relation */
977         StartTransactionCommand();
978
979         if (vacstmt->full)
980         {
981                 /* functions in indexes may want a snapshot set */
982                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
983         }
984         else
985         {
986                 /*
987                  * During a lazy VACUUM we do not run any user-supplied functions, and
988                  * so it should be safe to not create a transaction snapshot.
989                  *
990                  * We can furthermore set the PROC_IN_VACUUM flag, which lets other
991                  * concurrent VACUUMs know that they can ignore this one while
992                  * determining their OldestXmin.  (The reason we don't set it during a
993                  * full VACUUM is exactly that we may have to run user- defined
994                  * functions for functional indexes, and we want to make sure that if
995                  * they use the snapshot set above, any tuples it requires can't get
996                  * removed from other tables.  An index function that depends on the
997                  * contents of other tables is arguably broken, but we won't break it
998                  * here by violating transaction semantics.)
999                  *
1000                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down
1001                  * by autovacuum; it's used to avoid cancelling a vacuum that was
1002                  * invoked in an emergency.
1003                  *
1004                  * Note: this flag remains set until CommitTransaction or
1005                  * AbortTransaction.  We don't want to clear it until we reset
1006                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1007                  * which is probably Not Good.
1008                  */
1009                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1010                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1011                 if (for_wraparound)
1012                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1013                 LWLockRelease(ProcArrayLock);
1014         }
1015
1016         /*
1017          * Check for user-requested abort.      Note we want this to be inside a
1018          * transaction, so xact.c doesn't issue useless WARNING.
1019          */
1020         CHECK_FOR_INTERRUPTS();
1021
1022         /*
1023          * Determine the type of lock we want --- hard exclusive lock for a FULL
1024          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1025          * way, we can be sure that no other backend is vacuuming the same table.
1026          */
1027         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1028
1029         /*
1030          * Open the relation and get the appropriate lock on it.
1031          *
1032          * There's a race condition here: the rel may have gone away since the
1033          * last time we saw it.  If so, we don't need to vacuum it.
1034          */
1035         onerel = try_relation_open(relid, lmode);
1036
1037         if (!onerel)
1038         {
1039                 CommitTransactionCommand();
1040                 return;
1041         }
1042
1043         /*
1044          * Check permissions.
1045          *
1046          * We allow the user to vacuum a table if he is superuser, the table
1047          * owner, or the database owner (but in the latter case, only if it's not
1048          * a shared relation).  pg_class_ownercheck includes the superuser case.
1049          *
1050          * Note we choose to treat permissions failure as a WARNING and keep
1051          * trying to vacuum the rest of the DB --- is this appropriate?
1052          */
1053         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1054                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1055         {
1056                 if (onerel->rd_rel->relisshared)
1057                         ereport(WARNING,
1058                                         (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1059                                                         RelationGetRelationName(onerel))));
1060                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1061                         ereport(WARNING,
1062                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1063                                                         RelationGetRelationName(onerel))));
1064                 else
1065                         ereport(WARNING,
1066                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1067                                                         RelationGetRelationName(onerel))));
1068                 relation_close(onerel, lmode);
1069                 CommitTransactionCommand();
1070                 return;
1071         }
1072
1073         /*
1074          * Check that it's a plain table; we used to do this in get_rel_oids() but
1075          * seems safer to check after we've locked the relation.
1076          */
1077         if (onerel->rd_rel->relkind != expected_relkind)
1078         {
1079                 ereport(WARNING,
1080                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1081                                                 RelationGetRelationName(onerel))));
1082                 relation_close(onerel, lmode);
1083                 CommitTransactionCommand();
1084                 return;
1085         }
1086
1087         /*
1088          * Silently ignore tables that are temp tables of other backends ---
1089          * trying to vacuum these will lead to great unhappiness, since their
1090          * contents are probably not up-to-date on disk.  (We don't throw a
1091          * warning here; it would just lead to chatter during a database-wide
1092          * VACUUM.)
1093          */
1094         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1095         {
1096                 relation_close(onerel, lmode);
1097                 CommitTransactionCommand();
1098                 return;
1099         }
1100
1101         /*
1102          * Get a session-level lock too. This will protect our access to the
1103          * relation across multiple transactions, so that we can vacuum the
1104          * relation's TOAST table (if any) secure in the knowledge that no one is
1105          * deleting the parent relation.
1106          *
1107          * NOTE: this cannot block, even if someone else is waiting for access,
1108          * because the lock manager knows that both lock requests are from the
1109          * same process.
1110          */
1111         onerelid = onerel->rd_lockInfo.lockRelId;
1112         LockRelationIdForSession(&onerelid, lmode);
1113
1114         /*
1115          * Remember the relation's TOAST relation for later
1116          */
1117         toast_relid = onerel->rd_rel->reltoastrelid;
1118
1119         /*
1120          * Switch to the table owner's userid, so that any index functions are
1121          * run as that user.  (This is unnecessary, but harmless, for lazy
1122          * VACUUM.)
1123          */
1124         GetUserIdAndContext(&save_userid, &save_secdefcxt);
1125         SetUserIdAndContext(onerel->rd_rel->relowner, true);
1126
1127         /*
1128          * Do the actual work --- either FULL or "lazy" vacuum
1129          */
1130         if (vacstmt->full)
1131                 full_vacuum_rel(onerel, vacstmt);
1132         else
1133                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy);
1134
1135         /* Restore userid */
1136         SetUserIdAndContext(save_userid, save_secdefcxt);
1137
1138         /* all done with this class, but hold lock until commit */
1139         relation_close(onerel, NoLock);
1140
1141         /*
1142          * Complete the transaction and free all temporary memory used.
1143          */
1144         CommitTransactionCommand();
1145
1146         /*
1147          * If the relation has a secondary toast rel, vacuum that too while we
1148          * still hold the session lock on the master table.  Note however that
1149          * "analyze" will not get done on the toast table.      This is good, because
1150          * the toaster always uses hardcoded index access and statistics are
1151          * totally unimportant for toast relations.
1152          */
1153         if (toast_relid != InvalidOid)
1154                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE, for_wraparound);
1155
1156         /*
1157          * Now release the session-level lock on the master table.
1158          */
1159         UnlockRelationIdForSession(&onerelid, lmode);
1160 }
1161
1162
1163 /****************************************************************************
1164  *                                                                                                                                                      *
1165  *                      Code for VACUUM FULL (only)                                                                             *
1166  *                                                                                                                                                      *
1167  ****************************************************************************
1168  */
1169
1170
1171 /*
1172  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1173  *
1174  *              This routine vacuums a single heap, cleans out its indexes, and
1175  *              updates its num_pages and num_tuples statistics.
1176  *
1177  *              At entry, we have already established a transaction and opened
1178  *              and locked the relation.
1179  */
1180 static void
1181 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1182 {
1183         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1184                                                                                  * clean indexes */
1185         VacPageListData fraged_pages;           /* List of pages with space enough for
1186                                                                                  * re-using */
1187         Relation   *Irel;
1188         int                     nindexes,
1189                                 i;
1190         VRelStats  *vacrelstats;
1191
1192         vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
1193                                                   &OldestXmin, &FreezeLimit);
1194
1195         /*
1196          * Flush any previous async-commit transactions.  This does not guarantee
1197          * that we will be able to set hint bits for tuples they inserted, but it
1198          * improves the probability, especially in simple sequential-commands
1199          * cases.  See scan_heap() and repair_frag() for more about this.
1200          */
1201         XLogAsyncCommitFlush();
1202
1203         /*
1204          * Set up statistics-gathering machinery.
1205          */
1206         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1207         vacrelstats->rel_pages = 0;
1208         vacrelstats->rel_tuples = 0;
1209         vacrelstats->rel_indexed_tuples = 0;
1210         vacrelstats->hasindex = false;
1211
1212         /* scan the heap */
1213         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1214         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1215
1216         /* Now open all indexes of the relation */
1217         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1218         if (nindexes > 0)
1219                 vacrelstats->hasindex = true;
1220
1221         /* Clean/scan index relation(s) */
1222         if (Irel != NULL)
1223         {
1224                 if (vacuum_pages.num_pages > 0)
1225                 {
1226                         for (i = 0; i < nindexes; i++)
1227                                 vacuum_index(&vacuum_pages, Irel[i],
1228                                                          vacrelstats->rel_indexed_tuples, 0);
1229                 }
1230                 else
1231                 {
1232                         /* just scan indexes to update statistic */
1233                         for (i = 0; i < nindexes; i++)
1234                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1235                 }
1236         }
1237
1238         if (fraged_pages.num_pages > 0)
1239         {
1240                 /* Try to shrink heap */
1241                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1242                                         nindexes, Irel);
1243                 vac_close_indexes(nindexes, Irel, NoLock);
1244         }
1245         else
1246         {
1247                 vac_close_indexes(nindexes, Irel, NoLock);
1248                 if (vacuum_pages.num_pages > 0)
1249                 {
1250                         /* Clean pages from vacuum_pages list */
1251                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1252                 }
1253         }
1254
1255         /* update shared free space map with final free space info */
1256         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1257
1258         /* update statistics in pg_class */
1259         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1260                                                 vacrelstats->rel_tuples, vacrelstats->hasindex,
1261                                                 FreezeLimit);
1262
1263         /* report results to the stats collector, too */
1264         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1265                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1266 }
1267
1268
1269 /*
1270  *      scan_heap() -- scan an open heap relation
1271  *
1272  *              This routine sets commit status bits, constructs vacuum_pages (list
1273  *              of pages we need to compact free space on and/or clean indexes of
1274  *              deleted tuples), constructs fraged_pages (list of pages with free
1275  *              space that tuples could be moved into), and calculates statistics
1276  *              on the number of live tuples in the heap.
1277  */
1278 static void
1279 scan_heap(VRelStats *vacrelstats, Relation onerel,
1280                   VacPageList vacuum_pages, VacPageList fraged_pages)
1281 {
1282         BlockNumber nblocks,
1283                                 blkno;
1284         char       *relname;
1285         VacPage         vacpage;
1286         BlockNumber empty_pages,
1287                                 empty_end_pages;
1288         double          num_tuples,
1289                                 num_indexed_tuples,
1290                                 tups_vacuumed,
1291                                 nkeep,
1292                                 nunused;
1293         double          free_space,
1294                                 usable_free_space;
1295         Size            min_tlen = MaxHeapTupleSize;
1296         Size            max_tlen = 0;
1297         bool            do_shrinking = true;
1298         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1299         int                     num_vtlinks = 0;
1300         int                     free_vtlinks = 100;
1301         PGRUsage        ru0;
1302
1303         pg_rusage_init(&ru0);
1304
1305         relname = RelationGetRelationName(onerel);
1306         ereport(elevel,
1307                         (errmsg("vacuuming \"%s.%s\"",
1308                                         get_namespace_name(RelationGetNamespace(onerel)),
1309                                         relname)));
1310
1311         empty_pages = empty_end_pages = 0;
1312         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1313         free_space = 0;
1314
1315         nblocks = RelationGetNumberOfBlocks(onerel);
1316
1317         /*
1318          * We initially create each VacPage item in a maximal-sized workspace,
1319          * then copy the workspace into a just-large-enough copy.
1320          */
1321         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1322
1323         for (blkno = 0; blkno < nblocks; blkno++)
1324         {
1325                 Page            page,
1326                                         tempPage = NULL;
1327                 bool            do_reap,
1328                                         do_frag;
1329                 Buffer          buf;
1330                 OffsetNumber offnum,
1331                                         maxoff;
1332                 bool            notup;
1333                 OffsetNumber frozen[MaxOffsetNumber];
1334                 int                     nfrozen;
1335
1336                 vacuum_delay_point();
1337
1338                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1339                 page = BufferGetPage(buf);
1340
1341                 /*
1342                  * Since we are holding exclusive lock on the relation, no other
1343                  * backend can be accessing the page; however it is possible that the
1344                  * background writer will try to write the page if it's already marked
1345                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1346                  * must take exclusive buffer lock wherever we potentially modify
1347                  * pages.  In fact, we insist on cleanup lock so that we can safely
1348                  * call heap_page_prune().      (This might be overkill, since the
1349                  * bgwriter pays no attention to individual tuples, but on the other
1350                  * hand it's unlikely that the bgwriter has this particular page
1351                  * pinned at this instant.      So violating the coding rule would buy us
1352                  * little anyway.)
1353                  */
1354                 LockBufferForCleanup(buf);
1355
1356                 vacpage->blkno = blkno;
1357                 vacpage->offsets_used = 0;
1358                 vacpage->offsets_free = 0;
1359
1360                 if (PageIsNew(page))
1361                 {
1362                         VacPage         vacpagecopy;
1363
1364                         ereport(WARNING,
1365                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1366                                            relname, blkno)));
1367                         PageInit(page, BufferGetPageSize(buf), 0);
1368                         MarkBufferDirty(buf);
1369                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1370                         free_space += vacpage->free;
1371                         empty_pages++;
1372                         empty_end_pages++;
1373                         vacpagecopy = copy_vac_page(vacpage);
1374                         vpage_insert(vacuum_pages, vacpagecopy);
1375                         vpage_insert(fraged_pages, vacpagecopy);
1376                         UnlockReleaseBuffer(buf);
1377                         continue;
1378                 }
1379
1380                 if (PageIsEmpty(page))
1381                 {
1382                         VacPage         vacpagecopy;
1383
1384                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1385                         free_space += vacpage->free;
1386                         empty_pages++;
1387                         empty_end_pages++;
1388                         vacpagecopy = copy_vac_page(vacpage);
1389                         vpage_insert(vacuum_pages, vacpagecopy);
1390                         vpage_insert(fraged_pages, vacpagecopy);
1391                         UnlockReleaseBuffer(buf);
1392                         continue;
1393                 }
1394
1395                 /*
1396                  * Prune all HOT-update chains in this page.
1397                  *
1398                  * We use the redirect_move option so that redirecting line pointers
1399                  * get collapsed out; this allows us to not worry about them below.
1400                  *
1401                  * We count tuples removed by the pruning step as removed by VACUUM.
1402                  */
1403                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1404                                                                                  true, false);
1405
1406                 /*
1407                  * Now scan the page to collect vacuumable items and check for tuples
1408                  * requiring freezing.
1409                  */
1410                 nfrozen = 0;
1411                 notup = true;
1412                 maxoff = PageGetMaxOffsetNumber(page);
1413                 for (offnum = FirstOffsetNumber;
1414                          offnum <= maxoff;
1415                          offnum = OffsetNumberNext(offnum))
1416                 {
1417                         ItemId          itemid = PageGetItemId(page, offnum);
1418                         bool            tupgone = false;
1419                         HeapTupleData tuple;
1420
1421                         /*
1422                          * Collect un-used items too - it's possible to have indexes
1423                          * pointing here after crash.  (That's an ancient comment and is
1424                          * likely obsolete with WAL, but we might as well continue to
1425                          * check for such problems.)
1426                          */
1427                         if (!ItemIdIsUsed(itemid))
1428                         {
1429                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1430                                 nunused += 1;
1431                                 continue;
1432                         }
1433
1434                         /*
1435                          * DEAD item pointers are to be vacuumed normally; but we don't
1436                          * count them in tups_vacuumed, else we'd be double-counting (at
1437                          * least in the common case where heap_page_prune() just freed up
1438                          * a non-HOT tuple).
1439                          */
1440                         if (ItemIdIsDead(itemid))
1441                         {
1442                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1443                                 continue;
1444                         }
1445
1446                         /* Shouldn't have any redirected items anymore */
1447                         if (!ItemIdIsNormal(itemid))
1448                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1449                                          relname, blkno, offnum);
1450
1451                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1452                         tuple.t_len = ItemIdGetLength(itemid);
1453                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1454
1455                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1456                         {
1457                                 case HEAPTUPLE_LIVE:
1458                                         /* Tuple is good --- but let's do some validity checks */
1459                                         if (onerel->rd_rel->relhasoids &&
1460                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1461                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1462                                                          relname, blkno, offnum);
1463
1464                                         /*
1465                                          * The shrinkage phase of VACUUM FULL requires that all
1466                                          * live tuples have XMIN_COMMITTED set --- see comments in
1467                                          * repair_frag()'s walk-along-page loop.  Use of async
1468                                          * commit may prevent HeapTupleSatisfiesVacuum from
1469                                          * setting the bit for a recently committed tuple.      Rather
1470                                          * than trying to handle this corner case, we just give up
1471                                          * and don't shrink.
1472                                          */
1473                                         if (do_shrinking &&
1474                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1475                                         {
1476                                                 ereport(LOG,
1477                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1478                                                                                 relname, blkno, offnum,
1479                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1480                                                 do_shrinking = false;
1481                                         }
1482                                         break;
1483                                 case HEAPTUPLE_DEAD:
1484
1485                                         /*
1486                                          * Ordinarily, DEAD tuples would have been removed by
1487                                          * heap_page_prune(), but it's possible that the tuple
1488                                          * state changed since heap_page_prune() looked.  In
1489                                          * particular an INSERT_IN_PROGRESS tuple could have
1490                                          * changed to DEAD if the inserter aborted.  So this
1491                                          * cannot be considered an error condition, though it does
1492                                          * suggest that someone released a lock early.
1493                                          *
1494                                          * If the tuple is HOT-updated then it must only be
1495                                          * removed by a prune operation; so we keep it as if it
1496                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1497                                          * worth trying to make the shrinking code smart enough to
1498                                          * handle this?  It's an unusual corner case.)
1499                                          *
1500                                          * DEAD heap-only tuples can safely be removed if they
1501                                          * aren't themselves HOT-updated, although this is a bit
1502                                          * inefficient since we'll uselessly try to remove index
1503                                          * entries for them.
1504                                          */
1505                                         if (HeapTupleIsHotUpdated(&tuple))
1506                                         {
1507                                                 nkeep += 1;
1508                                                 if (do_shrinking)
1509                                                         ereport(LOG,
1510                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1511                                                                                         relname, blkno, offnum)));
1512                                                 do_shrinking = false;
1513                                         }
1514                                         else
1515                                         {
1516                                                 tupgone = true; /* we can delete the tuple */
1517
1518                                                 /*
1519                                                  * We need not require XMIN_COMMITTED or
1520                                                  * XMAX_COMMITTED to be set, since we will remove the
1521                                                  * tuple without any further examination of its hint
1522                                                  * bits.
1523                                                  */
1524                                         }
1525                                         break;
1526                                 case HEAPTUPLE_RECENTLY_DEAD:
1527
1528                                         /*
1529                                          * If tuple is recently deleted then we must not remove it
1530                                          * from relation.
1531                                          */
1532                                         nkeep += 1;
1533
1534                                         /*
1535                                          * As with the LIVE case, shrinkage requires
1536                                          * XMIN_COMMITTED to be set.
1537                                          */
1538                                         if (do_shrinking &&
1539                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1540                                         {
1541                                                 ereport(LOG,
1542                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1543                                                                                 relname, blkno, offnum,
1544                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1545                                                 do_shrinking = false;
1546                                         }
1547
1548                                         /*
1549                                          * If we do shrinking and this tuple is updated one then
1550                                          * remember it to construct updated tuple dependencies.
1551                                          */
1552                                         if (do_shrinking &&
1553                                                 !(ItemPointerEquals(&(tuple.t_self),
1554                                                                                         &(tuple.t_data->t_ctid))))
1555                                         {
1556                                                 if (free_vtlinks == 0)
1557                                                 {
1558                                                         free_vtlinks = 1000;
1559                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1560                                                                                            (free_vtlinks + num_vtlinks) *
1561                                                                                                          sizeof(VTupleLinkData));
1562                                                 }
1563                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1564                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1565                                                 free_vtlinks--;
1566                                                 num_vtlinks++;
1567                                         }
1568                                         break;
1569                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1570
1571                                         /*
1572                                          * This should not happen, since we hold exclusive lock on
1573                                          * the relation; shouldn't we raise an error?  (Actually,
1574                                          * it can happen in system catalogs, since we tend to
1575                                          * release write lock before commit there.)  As above, we
1576                                          * can't apply repair_frag() if the tuple state is
1577                                          * uncertain.
1578                                          */
1579                                         if (do_shrinking)
1580                                                 ereport(LOG,
1581                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1582                                                                                 relname, blkno, offnum,
1583                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1584                                         do_shrinking = false;
1585                                         break;
1586                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1587
1588                                         /*
1589                                          * This should not happen, since we hold exclusive lock on
1590                                          * the relation; shouldn't we raise an error?  (Actually,
1591                                          * it can happen in system catalogs, since we tend to
1592                                          * release write lock before commit there.)  As above, we
1593                                          * can't apply repair_frag() if the tuple state is
1594                                          * uncertain.
1595                                          */
1596                                         if (do_shrinking)
1597                                                 ereport(LOG,
1598                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1599                                                                                 relname, blkno, offnum,
1600                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1601                                         do_shrinking = false;
1602                                         break;
1603                                 default:
1604                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1605                                         break;
1606                         }
1607
1608                         if (tupgone)
1609                         {
1610                                 ItemId          lpp;
1611
1612                                 /*
1613                                  * Here we are building a temporary copy of the page with dead
1614                                  * tuples removed.      Below we will apply
1615                                  * PageRepairFragmentation to the copy, so that we can
1616                                  * determine how much space will be available after removal of
1617                                  * dead tuples.  But note we are NOT changing the real page
1618                                  * yet...
1619                                  */
1620                                 if (tempPage == NULL)
1621                                 {
1622                                         Size            pageSize;
1623
1624                                         pageSize = PageGetPageSize(page);
1625                                         tempPage = (Page) palloc(pageSize);
1626                                         memcpy(tempPage, page, pageSize);
1627                                 }
1628
1629                                 /* mark it unused on the temp page */
1630                                 lpp = PageGetItemId(tempPage, offnum);
1631                                 ItemIdSetUnused(lpp);
1632
1633                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1634                                 tups_vacuumed += 1;
1635                         }
1636                         else
1637                         {
1638                                 num_tuples += 1;
1639                                 if (!HeapTupleIsHeapOnly(&tuple))
1640                                         num_indexed_tuples += 1;
1641                                 notup = false;
1642                                 if (tuple.t_len < min_tlen)
1643                                         min_tlen = tuple.t_len;
1644                                 if (tuple.t_len > max_tlen)
1645                                         max_tlen = tuple.t_len;
1646
1647                                 /*
1648                                  * Each non-removable tuple must be checked to see if it needs
1649                                  * freezing.
1650                                  */
1651                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1652                                                                           InvalidBuffer))
1653                                         frozen[nfrozen++] = offnum;
1654                         }
1655                 }                                               /* scan along page */
1656
1657                 if (tempPage != NULL)
1658                 {
1659                         /* Some tuples are removable; figure free space after removal */
1660                         PageRepairFragmentation(tempPage);
1661                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1662                         pfree(tempPage);
1663                         do_reap = true;
1664                 }
1665                 else
1666                 {
1667                         /* Just use current available space */
1668                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1669                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1670                         do_reap = (vacpage->offsets_free > 0);
1671                 }
1672
1673                 free_space += vacpage->free;
1674
1675                 /*
1676                  * Add the page to vacuum_pages if it requires reaping, and add it to
1677                  * fraged_pages if it has a useful amount of free space.  "Useful"
1678                  * means enough for a minimal-sized tuple.  But we don't know that
1679                  * accurately near the start of the relation, so add pages
1680                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1681                  * forcibly add pages with no live tuples, to avoid confusing the
1682                  * empty_end_pages logic.  (In the presence of unreasonably small
1683                  * fillfactor, it seems possible that such pages might not pass
1684                  * the free-space test, but they had better be in the list anyway.)
1685                  */
1686                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1687                                    notup);
1688
1689                 if (do_reap || do_frag)
1690                 {
1691                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1692
1693                         if (do_reap)
1694                                 vpage_insert(vacuum_pages, vacpagecopy);
1695                         if (do_frag)
1696                                 vpage_insert(fraged_pages, vacpagecopy);
1697                 }
1698
1699                 /*
1700                  * Include the page in empty_end_pages if it will be empty after
1701                  * vacuuming; this is to keep us from using it as a move destination.
1702                  * Note that such pages are guaranteed to be in fraged_pages.
1703                  */
1704                 if (notup)
1705                 {
1706                         empty_pages++;
1707                         empty_end_pages++;
1708                 }
1709                 else
1710                         empty_end_pages = 0;
1711
1712                 /*
1713                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1714                  * record recording the changes.  We must log the changes to be
1715                  * crash-safe against future truncation of CLOG.
1716                  */
1717                 if (nfrozen > 0)
1718                 {
1719                         MarkBufferDirty(buf);
1720                         /* no XLOG for temp tables, though */
1721                         if (!onerel->rd_istemp)
1722                         {
1723                                 XLogRecPtr      recptr;
1724
1725                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1726                                                                                  frozen, nfrozen);
1727                                 PageSetLSN(page, recptr);
1728                                 PageSetTLI(page, ThisTimeLineID);
1729                         }
1730                 }
1731
1732                 UnlockReleaseBuffer(buf);
1733         }
1734
1735         pfree(vacpage);
1736
1737         /* save stats in the rel list for use later */
1738         vacrelstats->rel_tuples = num_tuples;
1739         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1740         vacrelstats->rel_pages = nblocks;
1741         if (num_tuples == 0)
1742                 min_tlen = max_tlen = 0;
1743         vacrelstats->min_tlen = min_tlen;
1744         vacrelstats->max_tlen = max_tlen;
1745
1746         vacuum_pages->empty_end_pages = empty_end_pages;
1747         fraged_pages->empty_end_pages = empty_end_pages;
1748
1749         /*
1750          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1751          * remove any "empty" end-pages from the list, and compute usable free
1752          * space = free space in remaining pages.
1753          */
1754         if (do_shrinking)
1755         {
1756                 int                     i;
1757
1758                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1759                 fraged_pages->num_pages -= empty_end_pages;
1760                 usable_free_space = 0;
1761                 for (i = 0; i < fraged_pages->num_pages; i++)
1762                         usable_free_space += fraged_pages->pagedesc[i]->free;
1763         }
1764         else
1765         {
1766                 fraged_pages->num_pages = 0;
1767                 usable_free_space = 0;
1768         }
1769
1770         /* don't bother to save vtlinks if we will not call repair_frag */
1771         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1772         {
1773                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1774                           vac_cmp_vtlinks);
1775                 vacrelstats->vtlinks = vtlinks;
1776                 vacrelstats->num_vtlinks = num_vtlinks;
1777         }
1778         else
1779         {
1780                 vacrelstats->vtlinks = NULL;
1781                 vacrelstats->num_vtlinks = 0;
1782                 pfree(vtlinks);
1783         }
1784
1785         ereport(elevel,
1786                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1787                                         RelationGetRelationName(onerel),
1788                                         tups_vacuumed, num_tuples, nblocks),
1789                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1790                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1791                                            "There were %.0f unused item pointers.\n"
1792            "Total free space (including removable row versions) is %.0f bytes.\n"
1793                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1794          "%u pages containing %.0f free bytes are potential move destinations.\n"
1795                                            "%s.",
1796                                            nkeep,
1797                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1798                                            nunused,
1799                                            free_space,
1800                                            empty_pages, empty_end_pages,
1801                                            fraged_pages->num_pages, usable_free_space,
1802                                            pg_rusage_show(&ru0))));
1803 }
1804
1805
1806 /*
1807  *      repair_frag() -- try to repair relation's fragmentation
1808  *
1809  *              This routine marks dead tuples as unused and tries re-use dead space
1810  *              by moving tuples (and inserting indexes if needed). It constructs
1811  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1812  *              for them after committing (in hack-manner - without losing locks
1813  *              and freeing memory!) current transaction. It truncates relation
1814  *              if some end-blocks are gone away.
1815  */
1816 static void
1817 repair_frag(VRelStats *vacrelstats, Relation onerel,
1818                         VacPageList vacuum_pages, VacPageList fraged_pages,
1819                         int nindexes, Relation *Irel)
1820 {
1821         TransactionId myXID = GetCurrentTransactionId();
1822         Buffer          dst_buffer = InvalidBuffer;
1823         BlockNumber nblocks,
1824                                 blkno;
1825         BlockNumber last_move_dest_block = 0,
1826                                 last_vacuum_block;
1827         Page            dst_page = NULL;
1828         ExecContextData ec;
1829         VacPageListData Nvacpagelist;
1830         VacPage         dst_vacpage = NULL,
1831                                 last_vacuum_page,
1832                                 vacpage,
1833                            *curpage;
1834         int                     i;
1835         int                     num_moved = 0,
1836                                 num_fraged_pages,
1837                                 vacuumed_pages;
1838         int                     keep_tuples = 0;
1839         int                     keep_indexed_tuples = 0;
1840         PGRUsage        ru0;
1841
1842         pg_rusage_init(&ru0);
1843
1844         ExecContext_Init(&ec, onerel);
1845
1846         Nvacpagelist.num_pages = 0;
1847         num_fraged_pages = fraged_pages->num_pages;
1848         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1849         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1850         if (vacuumed_pages > 0)
1851         {
1852                 /* get last reaped page from vacuum_pages */
1853                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1854                 last_vacuum_block = last_vacuum_page->blkno;
1855         }
1856         else
1857         {
1858                 last_vacuum_page = NULL;
1859                 last_vacuum_block = InvalidBlockNumber;
1860         }
1861
1862         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1863         vacpage->offsets_used = vacpage->offsets_free = 0;
1864
1865         /*
1866          * Scan pages backwards from the last nonempty page, trying to move tuples
1867          * down to lower pages.  Quit when we reach a page that we have moved any
1868          * tuples onto, or the first page if we haven't moved anything, or when we
1869          * find a page we cannot completely empty (this last condition is handled
1870          * by "break" statements within the loop).
1871          *
1872          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1873          * in order by blkno.
1874          */
1875         nblocks = vacrelstats->rel_pages;
1876         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1877                  blkno > last_move_dest_block;
1878                  blkno--)
1879         {
1880                 Buffer          buf;
1881                 Page            page;
1882                 OffsetNumber offnum,
1883                                         maxoff;
1884                 bool            isempty,
1885                                         chain_tuple_moved;
1886
1887                 vacuum_delay_point();
1888
1889                 /*
1890                  * Forget fraged_pages pages at or after this one; they're no longer
1891                  * useful as move targets, since we only want to move down. Note that
1892                  * since we stop the outer loop at last_move_dest_block, pages removed
1893                  * here cannot have had anything moved onto them already.
1894                  *
1895                  * Also note that we don't change the stored fraged_pages list, only
1896                  * our local variable num_fraged_pages; so the forgotten pages are
1897                  * still available to be loaded into the free space map later.
1898                  */
1899                 while (num_fraged_pages > 0 &&
1900                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1901                 {
1902                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1903                         --num_fraged_pages;
1904                 }
1905
1906                 /*
1907                  * Process this page of relation.
1908                  */
1909                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1910                 page = BufferGetPage(buf);
1911
1912                 vacpage->offsets_free = 0;
1913
1914                 isempty = PageIsEmpty(page);
1915
1916                 /* Is the page in the vacuum_pages list? */
1917                 if (blkno == last_vacuum_block)
1918                 {
1919                         if (last_vacuum_page->offsets_free > 0)
1920                         {
1921                                 /* there are dead tuples on this page - clean them */
1922                                 Assert(!isempty);
1923                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1924                                 vacuum_page(onerel, buf, last_vacuum_page);
1925                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1926                         }
1927                         else
1928                                 Assert(isempty);
1929                         --vacuumed_pages;
1930                         if (vacuumed_pages > 0)
1931                         {
1932                                 /* get prev reaped page from vacuum_pages */
1933                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1934                                 last_vacuum_block = last_vacuum_page->blkno;
1935                         }
1936                         else
1937                         {
1938                                 last_vacuum_page = NULL;
1939                                 last_vacuum_block = InvalidBlockNumber;
1940                         }
1941                         if (isempty)
1942                         {
1943                                 ReleaseBuffer(buf);
1944                                 continue;
1945                         }
1946                 }
1947                 else
1948                         Assert(!isempty);
1949
1950                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1951                                                                                  * this page, yet */
1952                 vacpage->blkno = blkno;
1953                 maxoff = PageGetMaxOffsetNumber(page);
1954                 for (offnum = FirstOffsetNumber;
1955                          offnum <= maxoff;
1956                          offnum = OffsetNumberNext(offnum))
1957                 {
1958                         Size            tuple_len;
1959                         HeapTupleData tuple;
1960                         ItemId          itemid = PageGetItemId(page, offnum);
1961
1962                         if (!ItemIdIsUsed(itemid))
1963                                 continue;
1964
1965                         if (ItemIdIsDead(itemid))
1966                         {
1967                                 /* just remember it for vacuum_page() */
1968                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1969                                 continue;
1970                         }
1971
1972                         /* Shouldn't have any redirected items now */
1973                         Assert(ItemIdIsNormal(itemid));
1974
1975                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1976                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1977                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1978
1979                         /* ---
1980                          * VACUUM FULL has an exclusive lock on the relation.  So
1981                          * normally no other transaction can have pending INSERTs or
1982                          * DELETEs in this relation.  A tuple is either:
1983                          *              (a) live (XMIN_COMMITTED)
1984                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1985                          *                      is visible to all active transactions)
1986                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
1987                          *                      but at least one active transaction does not see the
1988                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
1989                          *              (d) moved by the currently running VACUUM
1990                          *              (e) inserted or deleted by a not yet committed transaction,
1991                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
1992                          * In case (e) we wouldn't be in repair_frag() at all, because
1993                          * scan_heap() detects those cases and shuts off shrinking.
1994                          * We can't see case (b) here either, because such tuples were
1995                          * already removed by vacuum_page().  Cases (a) and (c) are
1996                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
1997                          * possible if a whole tuple chain has been moved while
1998                          * processing this or a higher numbered block.
1999                          * ---
2000                          */
2001                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2002                         {
2003                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2004                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2005                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2006                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2007
2008                                 /*
2009                                  * MOVED_OFF by another VACUUM would have caused the
2010                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2011                                  */
2012                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2013                                         elog(ERROR, "invalid XVAC in tuple header");
2014
2015                                 /*
2016                                  * If this (chain) tuple is moved by me already then I have to
2017                                  * check is it in vacpage or not - i.e. is it moved while
2018                                  * cleaning this page or some previous one.
2019                                  */
2020
2021                                 /* Can't we Assert(keep_tuples > 0) here? */
2022                                 if (keep_tuples == 0)
2023                                         continue;
2024                                 if (chain_tuple_moved)
2025                                 {
2026                                         /* some chains were moved while cleaning this page */
2027                                         Assert(vacpage->offsets_free > 0);
2028                                         for (i = 0; i < vacpage->offsets_free; i++)
2029                                         {
2030                                                 if (vacpage->offsets[i] == offnum)
2031                                                         break;
2032                                         }
2033                                         if (i >= vacpage->offsets_free)         /* not found */
2034                                         {
2035                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2036
2037                                                 /*
2038                                                  * If this is not a heap-only tuple, there must be an
2039                                                  * index entry for this item which will be removed in
2040                                                  * the index cleanup. Decrement the
2041                                                  * keep_indexed_tuples count to remember this.
2042                                                  */
2043                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2044                                                         keep_indexed_tuples--;
2045                                                 keep_tuples--;
2046                                         }
2047                                 }
2048                                 else
2049                                 {
2050                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2051
2052                                         /*
2053                                          * If this is not a heap-only tuple, there must be an
2054                                          * index entry for this item which will be removed in the
2055                                          * index cleanup. Decrement the keep_indexed_tuples count
2056                                          * to remember this.
2057                                          */
2058                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2059                                                 keep_indexed_tuples--;
2060                                         keep_tuples--;
2061                                 }
2062                                 continue;
2063                         }
2064
2065                         /*
2066                          * If this tuple is in a chain of tuples created in updates by
2067                          * "recent" transactions then we have to move the whole chain of
2068                          * tuples to other places, so that we can write new t_ctid links
2069                          * that preserve the chain relationship.
2070                          *
2071                          * This test is complicated.  Read it as "if tuple is a recently
2072                          * created updated version, OR if it is an obsoleted version". (In
2073                          * the second half of the test, we needn't make any check on XMAX
2074                          * --- it must be recently obsoleted, else scan_heap would have
2075                          * deemed it removable.)
2076                          *
2077                          * NOTE: this test is not 100% accurate: it is possible for a
2078                          * tuple to be an updated one with recent xmin, and yet not match
2079                          * any new_tid entry in the vtlinks list.  Presumably there was
2080                          * once a parent tuple with xmax matching the xmin, but it's
2081                          * possible that that tuple has been removed --- for example, if
2082                          * it had xmin = xmax and wasn't itself an updated version, then
2083                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2084                          * xmin xact completes.
2085                          *
2086                          * To be on the safe side, we abandon the repair_frag process if
2087                          * we cannot find the parent tuple in vtlinks.  This may be overly
2088                          * conservative; AFAICS it would be safe to move the chain.
2089                          *
2090                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2091                          * using OldestXmin, which is a rather coarse test, it is quite
2092                          * possible to have an update chain in which a tuple we think is
2093                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2094                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2095                          * it seems too complicated to try to make VACUUM remove it. We
2096                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2097                          * separately movable chain, ignoring any intervening DEAD ones.
2098                          */
2099                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2100                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2101                                                                                 OldestXmin)) ||
2102                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2103                                                                                            HEAP_IS_LOCKED)) &&
2104                                  !(ItemPointerEquals(&(tuple.t_self),
2105                                                                          &(tuple.t_data->t_ctid)))))
2106                         {
2107                                 Buffer          Cbuf = buf;
2108                                 bool            freeCbuf = false;
2109                                 bool            chain_move_failed = false;
2110                                 bool            moved_target = false;
2111                                 ItemPointerData Ctid;
2112                                 HeapTupleData tp = tuple;
2113                                 Size            tlen = tuple_len;
2114                                 VTupleMove      vtmove;
2115                                 int                     num_vtmove;
2116                                 int                     free_vtmove;
2117                                 VacPage         to_vacpage = NULL;
2118                                 int                     to_item = 0;
2119                                 int                     ti;
2120
2121                                 if (dst_buffer != InvalidBuffer)
2122                                 {
2123                                         ReleaseBuffer(dst_buffer);
2124                                         dst_buffer = InvalidBuffer;
2125                                 }
2126
2127                                 /* Quick exit if we have no vtlinks to search in */
2128                                 if (vacrelstats->vtlinks == NULL)
2129                                 {
2130                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2131                                         break;          /* out of walk-along-page loop */
2132                                 }
2133
2134                                 /*
2135                                  * If this tuple is in the begin/middle of the chain then we
2136                                  * have to move to the end of chain.  As with any t_ctid
2137                                  * chase, we have to verify that each new tuple is really the
2138                                  * descendant of the tuple we came from; however, here we need
2139                                  * even more than the normal amount of paranoia. If t_ctid
2140                                  * links forward to a tuple determined to be DEAD, then
2141                                  * depending on where that tuple is, it might already have
2142                                  * been removed, and perhaps even replaced by a MOVED_IN
2143                                  * tuple.  We don't want to include any DEAD tuples in the
2144                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2145                                  */
2146                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2147                                                                                                   HEAP_IS_LOCKED)) &&
2148                                            !(ItemPointerEquals(&(tp.t_self),
2149                                                                                    &(tp.t_data->t_ctid))))
2150                                 {
2151                                         ItemPointerData nextTid;
2152                                         TransactionId priorXmax;
2153                                         Buffer          nextBuf;
2154                                         Page            nextPage;
2155                                         OffsetNumber nextOffnum;
2156                                         ItemId          nextItemid;
2157                                         HeapTupleHeader nextTdata;
2158                                         HTSV_Result nextTstatus;
2159
2160                                         nextTid = tp.t_data->t_ctid;
2161                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2162                                         /* assume block# is OK (see heap_fetch comments) */
2163                                         nextBuf = ReadBufferWithStrategy(onerel,
2164                                                                                  ItemPointerGetBlockNumber(&nextTid),
2165                                                                                                          vac_strategy);
2166                                         nextPage = BufferGetPage(nextBuf);
2167                                         /* If bogus or unused slot, assume tp is end of chain */
2168                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2169                                         if (nextOffnum < FirstOffsetNumber ||
2170                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2171                                         {
2172                                                 ReleaseBuffer(nextBuf);
2173                                                 break;
2174                                         }
2175                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2176                                         if (!ItemIdIsNormal(nextItemid))
2177                                         {
2178                                                 ReleaseBuffer(nextBuf);
2179                                                 break;
2180                                         }
2181                                         /* if not matching XMIN, assume tp is end of chain */
2182                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2183                                                                                                                           nextItemid);
2184                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2185                                                                                          priorXmax))
2186                                         {
2187                                                 ReleaseBuffer(nextBuf);
2188                                                 break;
2189                                         }
2190
2191                                         /*
2192                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2193                                          * potentially update hint bits, so we'd better hold the
2194                                          * buffer content lock.
2195                                          */
2196                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2197                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2198                                                                                                                    OldestXmin,
2199                                                                                                                    nextBuf);
2200                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2201                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2202                                         {
2203                                                 UnlockReleaseBuffer(nextBuf);
2204                                                 break;
2205                                         }
2206                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2207                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2208                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2209                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2210                                         /* OK, switch our attention to the next tuple in chain */
2211                                         tp.t_data = nextTdata;
2212                                         tp.t_self = nextTid;
2213                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2214                                         if (freeCbuf)
2215                                                 ReleaseBuffer(Cbuf);
2216                                         Cbuf = nextBuf;
2217                                         freeCbuf = true;
2218                                 }
2219
2220                                 /* Set up workspace for planning the chain move */
2221                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2222                                 num_vtmove = 0;
2223                                 free_vtmove = 100;
2224
2225                                 /*
2226                                  * Now, walk backwards up the chain (towards older tuples) and
2227                                  * check if all items in chain can be moved.  We record all
2228                                  * the moves that need to be made in the vtmove array.
2229                                  */
2230                                 for (;;)
2231                                 {
2232                                         Buffer          Pbuf;
2233                                         Page            Ppage;
2234                                         ItemId          Pitemid;
2235                                         HeapTupleHeader PTdata;
2236                                         VTupleLinkData vtld,
2237                                                            *vtlp;
2238
2239                                         /* Identify a target page to move this tuple to */
2240                                         if (to_vacpage == NULL ||
2241                                                 !enough_space(to_vacpage, tlen))
2242                                         {
2243                                                 for (i = 0; i < num_fraged_pages; i++)
2244                                                 {
2245                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2246                                                                 break;
2247                                                 }
2248
2249                                                 if (i == num_fraged_pages)
2250                                                 {
2251                                                         /* can't move item anywhere */
2252                                                         chain_move_failed = true;
2253                                                         break;          /* out of check-all-items loop */
2254                                                 }
2255                                                 to_item = i;
2256                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2257                                         }
2258                                         to_vacpage->free -= MAXALIGN(tlen);
2259                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2260                                                 to_vacpage->free -= sizeof(ItemIdData);
2261                                         (to_vacpage->offsets_used)++;
2262
2263                                         /* Add an entry to vtmove list */
2264                                         if (free_vtmove == 0)
2265                                         {
2266                                                 free_vtmove = 1000;
2267                                                 vtmove = (VTupleMove)
2268                                                         repalloc(vtmove,
2269                                                                          (free_vtmove + num_vtmove) *
2270                                                                          sizeof(VTupleMoveData));
2271                                         }
2272                                         vtmove[num_vtmove].tid = tp.t_self;
2273                                         vtmove[num_vtmove].vacpage = to_vacpage;
2274                                         if (to_vacpage->offsets_used == 1)
2275                                                 vtmove[num_vtmove].cleanVpd = true;
2276                                         else
2277                                                 vtmove[num_vtmove].cleanVpd = false;
2278                                         free_vtmove--;
2279                                         num_vtmove++;
2280
2281                                         /* Remember if we reached the original target tuple */
2282                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2283                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2284                                                 moved_target = true;
2285
2286                                         /* Done if at beginning of chain */
2287                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2288                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2289                                                                                    OldestXmin))
2290                                                 break;  /* out of check-all-items loop */
2291
2292                                         /* Move to tuple with prior row version */
2293                                         vtld.new_tid = tp.t_self;
2294                                         vtlp = (VTupleLink)
2295                                                 vac_bsearch((void *) &vtld,
2296                                                                         (void *) (vacrelstats->vtlinks),
2297                                                                         vacrelstats->num_vtlinks,
2298                                                                         sizeof(VTupleLinkData),
2299                                                                         vac_cmp_vtlinks);
2300                                         if (vtlp == NULL)
2301                                         {
2302                                                 /* see discussion above */
2303                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2304                                                 chain_move_failed = true;
2305                                                 break;  /* out of check-all-items loop */
2306                                         }
2307                                         tp.t_self = vtlp->this_tid;
2308                                         Pbuf = ReadBufferWithStrategy(onerel,
2309                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2310                                                                                                   vac_strategy);
2311                                         Ppage = BufferGetPage(Pbuf);
2312                                         Pitemid = PageGetItemId(Ppage,
2313                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2314                                         /* this can't happen since we saw tuple earlier: */
2315                                         if (!ItemIdIsNormal(Pitemid))
2316                                                 elog(ERROR, "parent itemid marked as unused");
2317                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2318
2319                                         /* ctid should not have changed since we saved it */
2320                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2321                                                                                          &(PTdata->t_ctid)));
2322
2323                                         /*
2324                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2325                                          * (child item is removed)... Due to the fact that at the
2326                                          * moment we don't remove unuseful part of update-chain,
2327                                          * it's possible to get non-matching parent row here. Like
2328                                          * as in the case which caused this problem, we stop
2329                                          * shrinking here. I could try to find real parent row but
2330                                          * want not to do it because of real solution will be
2331                                          * implemented anyway, later, and we are too close to 6.5
2332                                          * release. - vadim 06/11/99
2333                                          */
2334                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2335                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2336                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2337                                         {
2338                                                 ReleaseBuffer(Pbuf);
2339                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2340                                                 chain_move_failed = true;
2341                                                 break;  /* out of check-all-items loop */
2342                                         }
2343                                         tp.t_data = PTdata;
2344                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2345                                         if (freeCbuf)
2346                                                 ReleaseBuffer(Cbuf);
2347                                         Cbuf = Pbuf;
2348                                         freeCbuf = true;
2349                                 }                               /* end of check-all-items loop */
2350
2351                                 if (freeCbuf)
2352                                         ReleaseBuffer(Cbuf);
2353                                 freeCbuf = false;
2354
2355                                 /* Double-check that we will move the current target tuple */
2356                                 if (!moved_target && !chain_move_failed)
2357                                 {
2358                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2359                                         chain_move_failed = true;
2360                                 }
2361
2362                                 if (chain_move_failed)
2363                                 {
2364                                         /*
2365                                          * Undo changes to offsets_used state.  We don't bother
2366                                          * cleaning up the amount-free state, since we're not
2367                                          * going to do any further tuple motion.
2368                                          */
2369                                         for (i = 0; i < num_vtmove; i++)
2370                                         {
2371                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2372                                                 (vtmove[i].vacpage->offsets_used)--;
2373                                         }
2374                                         pfree(vtmove);
2375                                         break;          /* out of walk-along-page loop */
2376                                 }
2377
2378                                 /*
2379                                  * Okay, move the whole tuple chain in reverse order.
2380                                  *
2381                                  * Ctid tracks the new location of the previously-moved tuple.
2382                                  */
2383                                 ItemPointerSetInvalid(&Ctid);
2384                                 for (ti = 0; ti < num_vtmove; ti++)
2385                                 {
2386                                         VacPage         destvacpage = vtmove[ti].vacpage;
2387                                         Page            Cpage;
2388                                         ItemId          Citemid;
2389
2390                                         /* Get page to move from */
2391                                         tuple.t_self = vtmove[ti].tid;
2392                                         Cbuf = ReadBufferWithStrategy(onerel,
2393                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2394                                                                                                   vac_strategy);
2395
2396                                         /* Get page to move to */
2397                                         dst_buffer = ReadBufferWithStrategy(onerel,
2398                                                                                                                 destvacpage->blkno,
2399                                                                                                                 vac_strategy);
2400
2401                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2402                                         if (dst_buffer != Cbuf)
2403                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2404
2405                                         dst_page = BufferGetPage(dst_buffer);
2406                                         Cpage = BufferGetPage(Cbuf);
2407
2408                                         Citemid = PageGetItemId(Cpage,
2409                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2410                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2411                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2412
2413                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2414                                                                          dst_buffer, dst_page, destvacpage,
2415                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2416
2417                                         /*
2418                                          * If the tuple we are moving is a heap-only tuple, this
2419                                          * move will generate an additional index entry, so
2420                                          * increment the rel_indexed_tuples count.
2421                                          */
2422                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2423                                                 vacrelstats->rel_indexed_tuples++;
2424
2425                                         num_moved++;
2426                                         if (destvacpage->blkno > last_move_dest_block)
2427                                                 last_move_dest_block = destvacpage->blkno;
2428
2429                                         /*
2430                                          * Remember that we moved tuple from the current page
2431                                          * (corresponding index tuple will be cleaned).
2432                                          */
2433                                         if (Cbuf == buf)
2434                                                 vacpage->offsets[vacpage->offsets_free++] =
2435                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2436                                         else
2437                                         {
2438                                                 /*
2439                                                  * When we move tuple chains, we may need to move
2440                                                  * tuples from a block that we haven't yet scanned in
2441                                                  * the outer walk-along-the-relation loop. Note that
2442                                                  * we can't be moving a tuple from a block that we
2443                                                  * have already scanned because if such a tuple
2444                                                  * exists, then we must have moved the chain along
2445                                                  * with that tuple when we scanned that block. IOW the
2446                                                  * test of (Cbuf != buf) guarantees that the tuple we
2447                                                  * are looking at right now is in a block which is yet
2448                                                  * to be scanned.
2449                                                  *
2450                                                  * We maintain two counters to correctly count the
2451                                                  * moved-off tuples from blocks that are not yet
2452                                                  * scanned (keep_tuples) and how many of them have
2453                                                  * index pointers (keep_indexed_tuples).  The main
2454                                                  * reason to track the latter is to help verify that
2455                                                  * indexes have the expected number of entries when
2456                                                  * all the dust settles.
2457                                                  */
2458                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2459                                                         keep_indexed_tuples++;
2460                                                 keep_tuples++;
2461                                         }
2462
2463                                         ReleaseBuffer(dst_buffer);
2464                                         ReleaseBuffer(Cbuf);
2465                                 }                               /* end of move-the-tuple-chain loop */
2466
2467                                 dst_buffer = InvalidBuffer;
2468                                 pfree(vtmove);
2469                                 chain_tuple_moved = true;
2470
2471                                 /* advance to next tuple in walk-along-page loop */
2472                                 continue;
2473                         }                                       /* end of is-tuple-in-chain test */
2474
2475                         /* try to find new page for this tuple */
2476                         if (dst_buffer == InvalidBuffer ||
2477                                 !enough_space(dst_vacpage, tuple_len))
2478                         {
2479                                 if (dst_buffer != InvalidBuffer)
2480                                 {
2481                                         ReleaseBuffer(dst_buffer);
2482                                         dst_buffer = InvalidBuffer;
2483                                 }
2484                                 for (i = 0; i < num_fraged_pages; i++)
2485                                 {
2486                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2487                                                 break;
2488                                 }
2489                                 if (i == num_fraged_pages)
2490                                         break;          /* can't move item anywhere */
2491                                 dst_vacpage = fraged_pages->pagedesc[i];
2492                                 dst_buffer = ReadBufferWithStrategy(onerel,
2493                                                                                                         dst_vacpage->blkno,
2494                                                                                                         vac_strategy);
2495                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2496                                 dst_page = BufferGetPage(dst_buffer);
2497                                 /* if this page was not used before - clean it */
2498                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2499                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2500                         }
2501                         else
2502                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2503
2504                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2505
2506                         move_plain_tuple(onerel, buf, page, &tuple,
2507                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2508
2509                         /*
2510                          * If the tuple we are moving is a heap-only tuple, this move will
2511                          * generate an additional index entry, so increment the
2512                          * rel_indexed_tuples count.
2513                          */
2514                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2515                                 vacrelstats->rel_indexed_tuples++;
2516
2517                         num_moved++;
2518                         if (dst_vacpage->blkno > last_move_dest_block)
2519                                 last_move_dest_block = dst_vacpage->blkno;
2520
2521                         /*
2522                          * Remember that we moved tuple from the current page
2523                          * (corresponding index tuple will be cleaned).
2524                          */
2525                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2526                 }                                               /* walk along page */
2527
2528                 /*
2529                  * If we broke out of the walk-along-page loop early (ie, still have
2530                  * offnum <= maxoff), then we failed to move some tuple off this page.
2531                  * No point in shrinking any more, so clean up and exit the per-page
2532                  * loop.
2533                  */
2534                 if (offnum < maxoff && keep_tuples > 0)
2535                 {
2536                         OffsetNumber off;
2537
2538                         /*
2539                          * Fix vacpage state for any unvisited tuples remaining on page
2540                          */
2541                         for (off = OffsetNumberNext(offnum);
2542                                  off <= maxoff;
2543                                  off = OffsetNumberNext(off))
2544                         {
2545                                 ItemId          itemid = PageGetItemId(page, off);
2546                                 HeapTupleHeader htup;
2547
2548                                 if (!ItemIdIsUsed(itemid))
2549                                         continue;
2550                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2551                                 Assert(ItemIdIsNormal(itemid));
2552
2553                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2554                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2555                                         continue;
2556
2557                                 /*
2558                                  * See comments in the walk-along-page loop above about why
2559                                  * only MOVED_OFF tuples should be found here.
2560                                  */
2561                                 if (htup->t_infomask & HEAP_MOVED_IN)
2562                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2563                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2564                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2565                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2566                                         elog(ERROR, "invalid XVAC in tuple header");
2567
2568                                 if (chain_tuple_moved)
2569                                 {
2570                                         /* some chains were moved while cleaning this page */
2571                                         Assert(vacpage->offsets_free > 0);
2572                                         for (i = 0; i < vacpage->offsets_free; i++)
2573                                         {
2574                                                 if (vacpage->offsets[i] == off)
2575                                                         break;
2576                                         }
2577                                         if (i >= vacpage->offsets_free)         /* not found */
2578                                         {
2579                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2580                                                 Assert(keep_tuples > 0);
2581
2582                                                 /*
2583                                                  * If this is not a heap-only tuple, there must be an
2584                                                  * index entry for this item which will be removed in
2585                                                  * the index cleanup. Decrement the
2586                                                  * keep_indexed_tuples count to remember this.
2587                                                  */
2588                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2589                                                         keep_indexed_tuples--;
2590                                                 keep_tuples--;
2591                                         }
2592                                 }
2593                                 else
2594                                 {
2595                                         vacpage->offsets[vacpage->offsets_free++] = off;
2596                                         Assert(keep_tuples > 0);
2597                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2598                                                 keep_indexed_tuples--;
2599                                         keep_tuples--;
2600                                 }
2601                         }
2602                 }
2603
2604                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2605                 {
2606                         if (chain_tuple_moved)          /* else - they are ordered */
2607                         {
2608                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2609                                           sizeof(OffsetNumber), vac_cmp_offno);
2610                         }
2611                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2612                 }
2613
2614                 ReleaseBuffer(buf);
2615
2616                 if (offnum <= maxoff)
2617                         break;                          /* had to quit early, see above note */
2618
2619         }                                                       /* walk along relation */
2620
2621         blkno++;                                        /* new number of blocks */
2622
2623         if (dst_buffer != InvalidBuffer)
2624         {
2625                 Assert(num_moved > 0);
2626                 ReleaseBuffer(dst_buffer);
2627         }
2628
2629         if (num_moved > 0)
2630         {
2631                 /*
2632                  * We have to commit our tuple movings before we truncate the
2633                  * relation.  Ideally we should do Commit/StartTransactionCommand
2634                  * here, relying on the session-level table lock to protect our
2635                  * exclusive access to the relation.  However, that would require a
2636                  * lot of extra code to close and re-open the relation, indexes, etc.
2637                  * For now, a quick hack: record status of current transaction as
2638                  * committed, and continue.  We force the commit to be synchronous so
2639                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2640                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2641                  * continues to be shown as running in the ProcArray.
2642                  *
2643                  * XXX This desperately needs to be revisited.  Any failure after this
2644                  * point will result in a PANIC "cannot abort transaction nnn, it was
2645                  * already committed"!
2646                  */
2647                 ForceSyncCommit();
2648                 (void) RecordTransactionCommit();
2649         }
2650
2651         /*
2652          * We are not going to move any more tuples across pages, but we still
2653          * need to apply vacuum_page to compact free space in the remaining pages
2654          * in vacuum_pages list.  Note that some of these pages may also be in the
2655          * fraged_pages list, and may have had tuples moved onto them; if so, we
2656          * already did vacuum_page and needn't do it again.
2657          */
2658         for (i = 0, curpage = vacuum_pages->pagedesc;
2659                  i < vacuumed_pages;
2660                  i++, curpage++)
2661         {
2662                 vacuum_delay_point();
2663
2664                 Assert((*curpage)->blkno < blkno);
2665                 if ((*curpage)->offsets_used == 0)
2666                 {
2667                         Buffer          buf;
2668                         Page            page;
2669
2670                         /* this page was not used as a move target, so must clean it */
2671                         buf = ReadBufferWithStrategy(onerel,
2672                                                                                  (*curpage)->blkno,
2673                                                                                  vac_strategy);
2674                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2675                         page = BufferGetPage(buf);
2676                         if (!PageIsEmpty(page))
2677                                 vacuum_page(onerel, buf, *curpage);
2678                         UnlockReleaseBuffer(buf);
2679                 }
2680         }
2681
2682         /*
2683          * Now scan all the pages that we moved tuples onto and update tuple
2684          * status bits.  This is not really necessary, but will save time for
2685          * future transactions examining these tuples.
2686          */
2687         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2688                                          last_move_dest_block, num_moved);
2689
2690         /*
2691          * It'd be cleaner to make this report at the bottom of this routine, but
2692          * then the rusage would double-count the second pass of index vacuuming.
2693          * So do it here and ignore the relatively small amount of processing that
2694          * occurs below.
2695          */
2696         ereport(elevel,
2697                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2698                                         RelationGetRelationName(onerel),
2699                                         num_moved, nblocks, blkno),
2700                          errdetail("%s.",
2701                                            pg_rusage_show(&ru0))));
2702
2703         /*
2704          * Reflect the motion of system tuples to catalog cache here.
2705          */
2706         CommandCounterIncrement();
2707
2708         if (Nvacpagelist.num_pages > 0)
2709         {
2710                 /* vacuum indexes again if needed */
2711                 if (Irel != NULL)
2712                 {
2713                         VacPage    *vpleft,
2714                                            *vpright,
2715                                                 vpsave;
2716
2717                         /* re-sort Nvacpagelist.pagedesc */
2718                         for (vpleft = Nvacpagelist.pagedesc,
2719                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2720                                  vpleft < vpright; vpleft++, vpright--)
2721                         {
2722                                 vpsave = *vpleft;
2723                                 *vpleft = *vpright;
2724                                 *vpright = vpsave;
2725                         }
2726
2727                         /*
2728                          * keep_tuples is the number of tuples that have been moved off a
2729                          * page during chain moves but not been scanned over subsequently.
2730                          * The tuple ids of these tuples are not recorded as free offsets
2731                          * for any VacPage, so they will not be cleared from the indexes.
2732                          * keep_indexed_tuples is the portion of these that are expected
2733                          * to have index entries.
2734                          */
2735                         Assert(keep_tuples >= 0);
2736                         for (i = 0; i < nindexes; i++)
2737                                 vacuum_index(&Nvacpagelist, Irel[i],
2738                                                          vacrelstats->rel_indexed_tuples,
2739                                                          keep_indexed_tuples);
2740                 }
2741
2742                 /*
2743                  * Clean moved-off tuples from last page in Nvacpagelist list.
2744                  *
2745                  * We need only do this in this one page, because higher-numbered
2746                  * pages are going to be truncated from the relation entirely. But see
2747                  * comments for update_hint_bits().
2748                  */
2749                 if (vacpage->blkno == (blkno - 1) &&
2750                         vacpage->offsets_free > 0)
2751                 {
2752                         Buffer          buf;
2753                         Page            page;
2754                         OffsetNumber unused[MaxOffsetNumber];
2755                         OffsetNumber offnum,
2756                                                 maxoff;
2757                         int                     uncnt = 0;
2758                         int                     num_tuples = 0;
2759
2760                         buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
2761                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2762                         page = BufferGetPage(buf);
2763                         maxoff = PageGetMaxOffsetNumber(page);
2764                         for (offnum = FirstOffsetNumber;
2765                                  offnum <= maxoff;
2766                                  offnum = OffsetNumberNext(offnum))
2767                         {
2768                                 ItemId          itemid = PageGetItemId(page, offnum);
2769                                 HeapTupleHeader htup;
2770
2771                                 if (!ItemIdIsUsed(itemid))
2772                                         continue;
2773                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2774                                 Assert(ItemIdIsNormal(itemid));
2775
2776                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2777                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2778                                         continue;
2779
2780                                 /*
2781                                  * See comments in the walk-along-page loop above about why
2782                                  * only MOVED_OFF tuples should be found here.
2783                                  */
2784                                 if (htup->t_infomask & HEAP_MOVED_IN)
2785                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2786                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2787                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2788                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2789                                         elog(ERROR, "invalid XVAC in tuple header");
2790
2791                                 ItemIdSetUnused(itemid);
2792                                 num_tuples++;
2793
2794                                 unused[uncnt++] = offnum;
2795                         }
2796                         Assert(vacpage->offsets_free == num_tuples);
2797
2798                         START_CRIT_SECTION();
2799
2800                         PageRepairFragmentation(page);
2801
2802                         MarkBufferDirty(buf);
2803
2804                         /* XLOG stuff */
2805                         if (!onerel->rd_istemp)
2806                         {
2807                                 XLogRecPtr      recptr;
2808
2809                                 recptr = log_heap_clean(onerel, buf,
2810                                                                                 NULL, 0, NULL, 0,
2811                                                                                 unused, uncnt,
2812                                                                                 false);
2813                                 PageSetLSN(page, recptr);
2814                                 PageSetTLI(page, ThisTimeLineID);
2815                         }
2816
2817                         END_CRIT_SECTION();
2818
2819                         UnlockReleaseBuffer(buf);
2820                 }
2821
2822                 /* now - free new list of reaped pages */
2823                 curpage = Nvacpagelist.pagedesc;
2824                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2825                         pfree(*curpage);
2826                 pfree(Nvacpagelist.pagedesc);
2827         }
2828
2829         /* Truncate relation, if needed */
2830         if (blkno < nblocks)
2831         {
2832                 RelationTruncate(onerel, blkno);
2833                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2834         }
2835
2836         /* clean up */
2837         pfree(vacpage);
2838         if (vacrelstats->vtlinks != NULL)
2839                 pfree(vacrelstats->vtlinks);
2840
2841         ExecContext_Finish(&ec);
2842 }
2843
2844 /*
2845  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2846  *
2847  *              This routine moves old_tup from old_page to dst_page.
2848  *              old_page and dst_page might be the same page.
2849  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2850  *              the single lock, if this is a intra-page-move) are released before
2851  *              exit.
2852  *
2853  *              Yes, a routine with ten parameters is ugly, but it's still better
2854  *              than having these 120 lines of code in repair_frag() which is
2855  *              already too long and almost unreadable.
2856  */
2857 static void
2858 move_chain_tuple(Relation rel,
2859                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2860                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2861                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2862 {
2863         TransactionId myXID = GetCurrentTransactionId();
2864         HeapTupleData newtup;
2865         OffsetNumber newoff;
2866         ItemId          newitemid;
2867         Size            tuple_len = old_tup->t_len;
2868
2869         /*
2870          * make a modifiable copy of the source tuple.
2871          */
2872         heap_copytuple_with_tuple(old_tup, &newtup);
2873
2874         /*
2875          * register invalidation of source tuple in catcaches.
2876          */
2877         CacheInvalidateHeapTuple(rel, old_tup);
2878
2879         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2880         START_CRIT_SECTION();
2881
2882         /*
2883          * mark the source tuple MOVED_OFF.
2884          */
2885         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2886                                                                          HEAP_XMIN_INVALID |
2887                                                                          HEAP_MOVED_IN);
2888         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2889         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2890
2891         /*
2892          * If this page was not used before - clean it.
2893          *
2894          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2895          * destination pages to be the same (since this tuple-chain member can be
2896          * on a page lower than the one we're currently processing in the outer
2897          * loop).  If that's true, then after vacuum_page() the source tuple will
2898          * have been moved, and tuple.t_data will be pointing at garbage.
2899          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2900          * step!!
2901          *
2902          * This path is different from the other callers of vacuum_page, because
2903          * we have already incremented the vacpage's offsets_used field to account
2904          * for the tuple(s) we expect to move onto the page. Therefore
2905          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2906          * good debugging check for all other callers, we work around it here
2907          * rather than remove it.
2908          */
2909         if (!PageIsEmpty(dst_page) && cleanVpd)
2910         {
2911                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2912
2913                 dst_vacpage->offsets_used = 0;
2914                 vacuum_page(rel, dst_buf, dst_vacpage);
2915                 dst_vacpage->offsets_used = sv_offsets_used;
2916         }
2917
2918         /*
2919          * Update the state of the copied tuple, and store it on the destination
2920          * page.  The copied tuple is never part of a HOT chain.
2921          */
2922         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2923                                                                    HEAP_XMIN_INVALID |
2924                                                                    HEAP_MOVED_OFF);
2925         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2926         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2927         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2928         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2929         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2930                                                  InvalidOffsetNumber, false, true);
2931         if (newoff == InvalidOffsetNumber)
2932                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2933                          (unsigned long) tuple_len, dst_vacpage->blkno);
2934         newitemid = PageGetItemId(dst_page, newoff);
2935         /* drop temporary copy, and point to the version on the dest page */
2936         pfree(newtup.t_data);
2937         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2938
2939         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2940
2941         /*
2942          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2943          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2944          * order, this is actually the previously processed tuple.)
2945          */
2946         if (!ItemPointerIsValid(ctid))
2947                 newtup.t_data->t_ctid = newtup.t_self;
2948         else
2949                 newtup.t_data->t_ctid = *ctid;
2950         *ctid = newtup.t_self;
2951
2952         MarkBufferDirty(dst_buf);
2953         if (dst_buf != old_buf)
2954                 MarkBufferDirty(old_buf);
2955
2956         /* XLOG stuff */
2957         if (!rel->rd_istemp)
2958         {
2959                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2960                                                                                    dst_buf, &newtup);
2961
2962                 if (old_buf != dst_buf)
2963                 {
2964                         PageSetLSN(old_page, recptr);
2965                         PageSetTLI(old_page, ThisTimeLineID);
2966                 }
2967                 PageSetLSN(dst_page, recptr);
2968                 PageSetTLI(dst_page, ThisTimeLineID);
2969         }
2970
2971         END_CRIT_SECTION();
2972
2973         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2974         if (dst_buf != old_buf)
2975                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2976
2977         /* Create index entries for the moved tuple */
2978         if (ec->resultRelInfo->ri_NumIndices > 0)
2979         {
2980                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2981                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2982                 ResetPerTupleExprContext(ec->estate);
2983         }
2984 }
2985
2986 /*
2987  *      move_plain_tuple() -- move one tuple that is not part of a chain
2988  *
2989  *              This routine moves old_tup from old_page to dst_page.
2990  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2991  *              released before exit.
2992  *
2993  *              Yes, a routine with eight parameters is ugly, but it's still better
2994  *              than having these 90 lines of code in repair_frag() which is already
2995  *              too long and almost unreadable.
2996  */
2997 static void
2998 move_plain_tuple(Relation rel,
2999                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3000                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3001                                  ExecContext ec)
3002 {
3003         TransactionId myXID = GetCurrentTransactionId();
3004         HeapTupleData newtup;
3005         OffsetNumber newoff;
3006         ItemId          newitemid;
3007         Size            tuple_len = old_tup->t_len;
3008
3009         /* copy tuple */
3010         heap_copytuple_with_tuple(old_tup, &newtup);
3011
3012         /*
3013          * register invalidation of source tuple in catcaches.
3014          *
3015          * (Note: we do not need to register the copied tuple, because we are not
3016          * changing the tuple contents and so there cannot be any need to flush
3017          * negative catcache entries.)
3018          */
3019         CacheInvalidateHeapTuple(rel, old_tup);
3020
3021         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3022         START_CRIT_SECTION();
3023
3024         /*
3025          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3026          */
3027         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3028                                                                    HEAP_XMIN_INVALID |
3029                                                                    HEAP_MOVED_OFF);
3030         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3031         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3032         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3033         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3034
3035         /* add tuple to the page */
3036         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3037                                                  InvalidOffsetNumber, false, true);
3038         if (newoff == InvalidOffsetNumber)
3039                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3040                          (unsigned long) tuple_len,
3041                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3042                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3043         newitemid = PageGetItemId(dst_page, newoff);
3044         pfree(newtup.t_data);
3045         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3046         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3047         newtup.t_self = newtup.t_data->t_ctid;
3048
3049         /*
3050          * Mark old tuple as MOVED_OFF by me.
3051          */
3052         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3053                                                                          HEAP_XMIN_INVALID |
3054                                                                          HEAP_MOVED_IN);
3055         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3056         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3057
3058         MarkBufferDirty(dst_buf);
3059         MarkBufferDirty(old_buf);
3060
3061         /* XLOG stuff */
3062         if (!rel->rd_istemp)
3063         {
3064                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3065                                                                                    dst_buf, &newtup);
3066
3067                 PageSetLSN(old_page, recptr);
3068                 PageSetTLI(old_page, ThisTimeLineID);
3069                 PageSetLSN(dst_page, recptr);
3070                 PageSetTLI(dst_page, ThisTimeLineID);
3071         }
3072
3073         END_CRIT_SECTION();
3074
3075         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3076         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3077         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3078
3079         dst_vacpage->offsets_used++;
3080
3081         /* insert index' tuples if needed */
3082         if (ec->resultRelInfo->ri_NumIndices > 0)
3083         {
3084                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3085                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3086                 ResetPerTupleExprContext(ec->estate);
3087         }
3088 }
3089
3090 /*
3091  *      update_hint_bits() -- update hint bits in destination pages
3092  *
3093  * Scan all the pages that we moved tuples onto and update tuple status bits.
3094  * This is not really necessary, but it will save time for future transactions
3095  * examining these tuples.
3096  *
3097  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3098  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3099  *
3100  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3101  * pages that were move source pages but not move dest pages.  The bulk
3102  * of the move source pages will be physically truncated from the relation,
3103  * and the last page remaining in the rel will be fixed separately in
3104  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3105  * hint bits updated are tuples that are moved as part of a chain and were
3106  * on pages that were not either move destinations nor at the end of the rel.
3107  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3108  * to remember and revisit those pages too.
3109  *
3110  * One wonders whether it wouldn't be better to skip this work entirely,
3111  * and let the tuple status updates happen someplace that's not holding an
3112  * exclusive lock on the relation.
3113  */
3114 static void
3115 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3116                                  BlockNumber last_move_dest_block, int num_moved)
3117 {
3118         TransactionId myXID = GetCurrentTransactionId();
3119         int                     checked_moved = 0;
3120         int                     i;
3121         VacPage    *curpage;
3122
3123         for (i = 0, curpage = fraged_pages->pagedesc;
3124                  i < num_fraged_pages;
3125                  i++, curpage++)
3126         {
3127                 Buffer          buf;
3128                 Page            page;
3129                 OffsetNumber max_offset;
3130                 OffsetNumber off;
3131                 int                     num_tuples = 0;
3132
3133                 vacuum_delay_point();
3134
3135                 if ((*curpage)->blkno > last_move_dest_block)
3136                         break;                          /* no need to scan any further */
3137                 if ((*curpage)->offsets_used == 0)
3138                         continue;                       /* this page was never used as a move dest */
3139                 buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
3140                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3141                 page = BufferGetPage(buf);
3142                 max_offset = PageGetMaxOffsetNumber(page);
3143                 for (off = FirstOffsetNumber;
3144                          off <= max_offset;
3145                          off = OffsetNumberNext(off))
3146                 {
3147                         ItemId          itemid = PageGetItemId(page, off);
3148                         HeapTupleHeader htup;
3149
3150                         if (!ItemIdIsUsed(itemid))
3151                                 continue;
3152                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3153                         Assert(ItemIdIsNormal(itemid));
3154
3155                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3156                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3157                                 continue;
3158
3159                         /*
3160                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3161                          */
3162                         if (!(htup->t_infomask & HEAP_MOVED))
3163                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3164                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3165                                 elog(ERROR, "invalid XVAC in tuple header");
3166
3167                         if (htup->t_infomask & HEAP_MOVED_IN)
3168                         {
3169                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3170                                 htup->t_infomask &= ~HEAP_MOVED;
3171                                 num_tuples++;
3172                         }
3173                         else
3174                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3175                 }
3176                 MarkBufferDirty(buf);
3177                 UnlockReleaseBuffer(buf);
3178                 Assert((*curpage)->offsets_used == num_tuples);
3179                 checked_moved += num_tuples;
3180         }
3181         Assert(num_moved == checked_moved);
3182 }
3183
3184 /*
3185  *      vacuum_heap() -- free dead tuples
3186  *
3187  *              This routine marks dead tuples as unused and truncates relation
3188  *              if there are "empty" end-blocks.
3189  */
3190 static void
3191 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3192 {
3193         Buffer          buf;
3194         VacPage    *vacpage;
3195         BlockNumber relblocks;
3196         int                     nblocks;
3197         int                     i;
3198
3199         nblocks = vacuum_pages->num_pages;
3200         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3201
3202         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3203         {
3204                 vacuum_delay_point();
3205
3206                 if ((*vacpage)->offsets_free > 0)
3207                 {
3208                         buf = ReadBufferWithStrategy(onerel,
3209                                                                                  (*vacpage)->blkno,
3210                                                                                  vac_strategy);
3211                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3212                         vacuum_page(onerel, buf, *vacpage);
3213                         UnlockReleaseBuffer(buf);
3214                 }
3215         }
3216
3217         /* Truncate relation if there are some empty end-pages */
3218         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3219         if (vacuum_pages->empty_end_pages > 0)
3220         {
3221                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3222                 ereport(elevel,
3223                                 (errmsg("\"%s\": truncated %u to %u pages",
3224                                                 RelationGetRelationName(onerel),
3225                                                 vacrelstats->rel_pages, relblocks)));
3226                 RelationTruncate(onerel, relblocks);
3227                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3228         }
3229 }
3230
3231 /*
3232  *      vacuum_page() -- free dead tuples on a page
3233  *                                       and repair its fragmentation.
3234  *
3235  * Caller must hold pin and lock on buffer.
3236  */
3237 static void
3238 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3239 {
3240         Page            page = BufferGetPage(buffer);
3241         int                     i;
3242
3243         /* There shouldn't be any tuples moved onto the page yet! */
3244         Assert(vacpage->offsets_used == 0);
3245
3246         START_CRIT_SECTION();
3247
3248         for (i = 0; i < vacpage->offsets_free; i++)
3249         {
3250                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3251
3252                 ItemIdSetUnused(itemid);
3253         }
3254
3255         PageRepairFragmentation(page);
3256
3257         MarkBufferDirty(buffer);
3258
3259         /* XLOG stuff */
3260         if (!onerel->rd_istemp)
3261         {
3262                 XLogRecPtr      recptr;
3263
3264                 recptr = log_heap_clean(onerel, buffer,
3265                                                                 NULL, 0, NULL, 0,
3266                                                                 vacpage->offsets, vacpage->offsets_free,
3267                                                                 false);
3268                 PageSetLSN(page, recptr);
3269                 PageSetTLI(page, ThisTimeLineID);
3270         }
3271
3272         END_CRIT_SECTION();
3273 }
3274
3275 /*
3276  *      scan_index() -- scan one index relation to update pg_class statistics.
3277  *
3278  * We use this when we have no deletions to do.
3279  */
3280 static void
3281 scan_index(Relation indrel, double num_tuples)
3282 {
3283         IndexBulkDeleteResult *stats;
3284         IndexVacuumInfo ivinfo;
3285         PGRUsage        ru0;
3286
3287         pg_rusage_init(&ru0);
3288
3289         ivinfo.index = indrel;
3290         ivinfo.vacuum_full = true;
3291         ivinfo.message_level = elevel;
3292         ivinfo.num_heap_tuples = num_tuples;
3293         ivinfo.strategy = vac_strategy;
3294
3295         stats = index_vacuum_cleanup(&ivinfo, NULL);
3296
3297         if (!stats)
3298                 return;
3299
3300         /* now update statistics in pg_class */
3301         vac_update_relstats(RelationGetRelid(indrel),
3302                                                 stats->num_pages, stats->num_index_tuples,
3303                                                 false, InvalidTransactionId);
3304
3305         ereport(elevel,
3306                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3307                                         RelationGetRelationName(indrel),
3308                                         stats->num_index_tuples,
3309                                         stats->num_pages),
3310         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3311                           "%s.",
3312                           stats->pages_deleted, stats->pages_free,
3313                           pg_rusage_show(&ru0))));
3314
3315         /*
3316          * Check for tuple count mismatch.      If the index is partial, then it's OK
3317          * for it to have fewer tuples than the heap; else we got trouble.
3318          */
3319         if (stats->num_index_tuples != num_tuples)
3320         {
3321                 if (stats->num_index_tuples > num_tuples ||
3322                         !vac_is_partial_index(indrel))
3323                         ereport(WARNING,
3324                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3325                                                         RelationGetRelationName(indrel),
3326                                                         stats->num_index_tuples, num_tuples),
3327                                          errhint("Rebuild the index with REINDEX.")));
3328         }
3329
3330         pfree(stats);
3331 }
3332
3333 /*
3334  *      vacuum_index() -- vacuum one index relation.
3335  *
3336  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3337  *              It's locked. Indrel is an index relation on the vacuumed heap.
3338  *
3339  *              We don't bother to set locks on the index relation here, since
3340  *              the parent table is exclusive-locked already.
3341  *
3342  *              Finally, we arrange to update the index relation's statistics in
3343  *              pg_class.
3344  */
3345 static void
3346 vacuum_index(VacPageList vacpagelist, Relation indrel,
3347                          double num_tuples, int keep_tuples)
3348 {
3349         IndexBulkDeleteResult *stats;
3350         IndexVacuumInfo ivinfo;
3351         PGRUsage        ru0;
3352
3353         pg_rusage_init(&ru0);
3354
3355         ivinfo.index = indrel;
3356         ivinfo.vacuum_full = true;
3357         ivinfo.message_level = elevel;
3358         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3359         ivinfo.strategy = vac_strategy;
3360
3361         /* Do bulk deletion */
3362         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3363
3364         /* Do post-VACUUM cleanup */
3365         stats = index_vacuum_cleanup(&ivinfo, stats);
3366
3367         if (!stats)
3368                 return;
3369
3370         /* now update statistics in pg_class */
3371         vac_update_relstats(RelationGetRelid(indrel),
3372                                                 stats->num_pages, stats->num_index_tuples,
3373                                                 false, InvalidTransactionId);
3374
3375         ereport(elevel,
3376                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3377                                         RelationGetRelationName(indrel),
3378                                         stats->num_index_tuples,
3379                                         stats->num_pages),
3380                          errdetail("%.0f index row versions were removed.\n"
3381                          "%u index pages have been deleted, %u are currently reusable.\n"
3382                                            "%s.",
3383                                            stats->tuples_removed,
3384                                            stats->pages_deleted, stats->pages_free,
3385                                            pg_rusage_show(&ru0))));
3386
3387         /*
3388          * Check for tuple count mismatch.      If the index is partial, then it's OK
3389          * for it to have fewer tuples than the heap; else we got trouble.
3390          */
3391         if (stats->num_index_tuples != num_tuples + keep_tuples)
3392         {
3393                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3394                         !vac_is_partial_index(indrel))
3395                         ereport(WARNING,
3396                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3397                                                         RelationGetRelationName(indrel),
3398                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3399                                          errhint("Rebuild the index with REINDEX.")));
3400         }
3401
3402         pfree(stats);
3403 }
3404
3405 /*
3406  *      tid_reaped() -- is a particular tid reaped?
3407  *
3408  *              This has the right signature to be an IndexBulkDeleteCallback.
3409  *
3410  *              vacpagelist->VacPage_array is sorted in right order.
3411  */
3412 static bool
3413 tid_reaped(ItemPointer itemptr, void *state)
3414 {
3415         VacPageList vacpagelist = (VacPageList) state;
3416         OffsetNumber ioffno;
3417         OffsetNumber *voff;
3418         VacPage         vp,
3419                            *vpp;
3420         VacPageData vacpage;
3421
3422         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3423         ioffno = ItemPointerGetOffsetNumber(itemptr);
3424
3425         vp = &vacpage;
3426         vpp = (VacPage *) vac_bsearch((void *) &vp,
3427                                                                   (void *) (vacpagelist->pagedesc),
3428                                                                   vacpagelist->num_pages,
3429                                                                   sizeof(VacPage),
3430                                                                   vac_cmp_blk);
3431
3432         if (vpp == NULL)
3433                 return false;
3434
3435         /* ok - we are on a partially or fully reaped page */
3436         vp = *vpp;
3437
3438         if (vp->offsets_free == 0)
3439         {
3440                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3441                 return true;
3442         }
3443
3444         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3445                                                                                 (void *) (vp->offsets),
3446                                                                                 vp->offsets_free,
3447                                                                                 sizeof(OffsetNumber),
3448                                                                                 vac_cmp_offno);
3449
3450         if (voff == NULL)
3451                 return false;
3452
3453         /* tid is reaped */
3454         return true;
3455 }
3456
3457 /*
3458  * Update the shared Free Space Map with the info we now have about
3459  * free space in the relation, discarding any old info the map may have.
3460  */
3461 static void
3462 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3463                            BlockNumber rel_pages)
3464 {
3465         int                     nPages = fraged_pages->num_pages;
3466         VacPage    *pagedesc = fraged_pages->pagedesc;
3467         Size            threshold;
3468         FSMPageData *pageSpaces;
3469         int                     outPages;
3470         int                     i;
3471
3472         /*
3473          * We only report pages with free space at least equal to the average
3474          * request size --- this avoids cluttering FSM with uselessly-small bits
3475          * of space.  Although FSM would discard pages with little free space
3476          * anyway, it's important to do this prefiltering because (a) it reduces
3477          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3478          * FSM uses the number of pages reported as a statistic for guiding space
3479          * management.  If we didn't threshold our reports the same way
3480          * vacuumlazy.c does, we'd be skewing that statistic.
3481          */
3482         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3483
3484         pageSpaces = (FSMPageData *) palloc(nPages * sizeof(FSMPageData));
3485         outPages = 0;
3486
3487         for (i = 0; i < nPages; i++)
3488         {
3489                 /*
3490                  * fraged_pages may contain entries for pages that we later decided to
3491                  * truncate from the relation; don't enter them into the free space
3492                  * map!
3493                  */
3494                 if (pagedesc[i]->blkno >= rel_pages)
3495                         break;
3496
3497                 if (pagedesc[i]->free >= threshold)
3498                 {
3499                         FSMPageSetPageNum(&pageSpaces[outPages], pagedesc[i]->blkno);
3500                         FSMPageSetSpace(&pageSpaces[outPages], pagedesc[i]->free);
3501                         outPages++;
3502                 }
3503         }
3504
3505         RecordRelationFreeSpace(&onerel->rd_node, outPages, outPages, pageSpaces);
3506
3507         pfree(pageSpaces);
3508 }
3509
3510 /* Copy a VacPage structure */
3511 static VacPage
3512 copy_vac_page(VacPage vacpage)
3513 {
3514         VacPage         newvacpage;
3515
3516         /* allocate a VacPageData entry */
3517         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3518                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3519
3520         /* fill it in */
3521         if (vacpage->offsets_free > 0)
3522                 memcpy(newvacpage->offsets, vacpage->offsets,
3523                            vacpage->offsets_free * sizeof(OffsetNumber));
3524         newvacpage->blkno = vacpage->blkno;
3525         newvacpage->free = vacpage->free;
3526         newvacpage->offsets_used = vacpage->offsets_used;
3527         newvacpage->offsets_free = vacpage->offsets_free;
3528
3529         return newvacpage;
3530 }
3531
3532 /*
3533  * Add a VacPage pointer to a VacPageList.
3534  *
3535  *              As a side effect of the way that scan_heap works,
3536  *              higher pages come after lower pages in the array
3537  *              (and highest tid on a page is last).
3538  */
3539 static void
3540 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3541 {
3542 #define PG_NPAGEDESC 1024
3543
3544         /* allocate a VacPage entry if needed */
3545         if (vacpagelist->num_pages == 0)
3546         {
3547                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3548                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3549         }
3550         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3551         {
3552                 vacpagelist->num_allocated_pages *= 2;
3553                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3554         }
3555         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3556         (vacpagelist->num_pages)++;
3557 }
3558
3559 /*
3560  * vac_bsearch: just like standard C library routine bsearch(),
3561  * except that we first test to see whether the target key is outside
3562  * the range of the table entries.      This case is handled relatively slowly
3563  * by the normal binary search algorithm (ie, no faster than any other key)
3564  * but it occurs often enough in VACUUM to be worth optimizing.
3565  */
3566 static void *
3567 vac_bsearch(const void *key, const void *base,
3568                         size_t nelem, size_t size,
3569                         int (*compar) (const void *, const void *))
3570 {
3571         int                     res;
3572         const void *last;
3573
3574         if (nelem == 0)
3575                 return NULL;
3576         res = compar(key, base);
3577         if (res < 0)
3578                 return NULL;
3579         if (res == 0)
3580                 return (void *) base;
3581         if (nelem > 1)
3582         {
3583                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3584                 res = compar(key, last);
3585                 if (res > 0)
3586                         return NULL;
3587                 if (res == 0)
3588                         return (void *) last;
3589         }
3590         if (nelem <= 2)
3591                 return NULL;                    /* already checked 'em all */
3592         return bsearch(key, base, nelem, size, compar);
3593 }
3594
3595 /*
3596  * Comparator routines for use with qsort() and bsearch().
3597  */
3598 static int
3599 vac_cmp_blk(const void *left, const void *right)
3600 {
3601         BlockNumber lblk,
3602                                 rblk;
3603
3604         lblk = (*((VacPage *) left))->blkno;
3605         rblk = (*((VacPage *) right))->blkno;
3606
3607         if (lblk < rblk)
3608                 return -1;
3609         if (lblk == rblk)
3610                 return 0;
3611         return 1;
3612 }
3613
3614 static int
3615 vac_cmp_offno(const void *left, const void *right)
3616 {
3617         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3618                 return -1;
3619         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3620                 return 0;
3621         return 1;
3622 }
3623
3624 static int
3625 vac_cmp_vtlinks(const void *left, const void *right)
3626 {
3627         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3628                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3629                 return -1;
3630         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3631                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3632                 return 1;
3633         /* bi_hi-es are equal */
3634         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3635                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3636                 return -1;
3637         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3638                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3639                 return 1;
3640         /* bi_lo-es are equal */
3641         if (((VTupleLink) left)->new_tid.ip_posid <
3642                 ((VTupleLink) right)->new_tid.ip_posid)
3643                 return -1;
3644         if (((VTupleLink) left)->new_tid.ip_posid >
3645                 ((VTupleLink) right)->new_tid.ip_posid)
3646                 return 1;
3647         return 0;
3648 }
3649
3650
3651 /*
3652  * Open all the indexes of the given relation, obtaining the specified kind
3653  * of lock on each.  Return an array of Relation pointers for the indexes
3654  * into *Irel, and the number of indexes into *nindexes.
3655  */
3656 void
3657 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3658                                  int *nindexes, Relation **Irel)
3659 {
3660         List       *indexoidlist;
3661         ListCell   *indexoidscan;
3662         int                     i;
3663
3664         Assert(lockmode != NoLock);
3665
3666         indexoidlist = RelationGetIndexList(relation);
3667
3668         *nindexes = list_length(indexoidlist);
3669
3670         if (*nindexes > 0)
3671                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3672         else
3673                 *Irel = NULL;
3674
3675         i = 0;
3676         foreach(indexoidscan, indexoidlist)
3677         {
3678                 Oid                     indexoid = lfirst_oid(indexoidscan);
3679
3680                 (*Irel)[i++] = index_open(indexoid, lockmode);
3681         }
3682
3683         list_free(indexoidlist);
3684 }
3685
3686 /*
3687  * Release the resources acquired by vac_open_indexes.  Optionally release
3688  * the locks (say NoLock to keep 'em).
3689  */
3690 void
3691 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3692 {
3693         if (Irel == NULL)
3694                 return;
3695
3696         while (nindexes--)
3697         {
3698                 Relation        ind = Irel[nindexes];
3699
3700                 index_close(ind, lockmode);
3701         }
3702         pfree(Irel);
3703 }
3704
3705
3706 /*
3707  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3708  */
3709 bool
3710 vac_is_partial_index(Relation indrel)
3711 {
3712         /*
3713          * If the index's AM doesn't support nulls, it's partial for our purposes
3714          */
3715         if (!indrel->rd_am->amindexnulls)
3716                 return true;
3717
3718         /* Otherwise, look to see if there's a partial-index predicate */
3719         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3720                 return true;
3721
3722         return false;
3723 }
3724
3725
3726 static bool
3727 enough_space(VacPage vacpage, Size len)
3728 {
3729         len = MAXALIGN(len);
3730
3731         if (len > vacpage->free)
3732                 return false;
3733
3734         /* if there are free itemid(s) and len <= free_space... */
3735         if (vacpage->offsets_used < vacpage->offsets_free)
3736                 return true;
3737
3738         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3739         if (len + sizeof(ItemIdData) <= vacpage->free)
3740                 return true;
3741
3742         return false;
3743 }
3744
3745 static Size
3746 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3747 {
3748         /*
3749          * It is correct to use PageGetExactFreeSpace() here, *not*
3750          * PageGetHeapFreeSpace().  This is because (a) we do our own, exact
3751          * accounting for whether line pointers must be added, and (b) we will
3752          * recycle any LP_DEAD line pointers before starting to add rows to a
3753          * page, but that may not have happened yet at the time this function is
3754          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3755          * against too many line pointers on a page could fire incorrectly.  We do
3756          * not need that protection here: since VACUUM FULL always recycles all
3757          * dead line pointers first, it'd be physically impossible to insert more
3758          * than MaxHeapTuplesPerPage tuples anyway.
3759          */
3760         Size            freespace = PageGetExactFreeSpace(page);
3761         Size            targetfree;
3762
3763         targetfree = RelationGetTargetPageFreeSpace(relation,
3764                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3765         if (freespace > targetfree)
3766                 return freespace - targetfree;
3767         else
3768                 return 0;
3769 }
3770
3771 /*
3772  * vacuum_delay_point --- check for interrupts and cost-based delay.
3773  *
3774  * This should be called in each major loop of VACUUM processing,
3775  * typically once per page processed.
3776  */
3777 void
3778 vacuum_delay_point(void)
3779 {
3780         /* Always check for interrupts */
3781         CHECK_FOR_INTERRUPTS();
3782
3783         /* Nap if appropriate */
3784         if (VacuumCostActive && !InterruptPending &&
3785                 VacuumCostBalance >= VacuumCostLimit)
3786         {
3787                 int                     msec;
3788
3789                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3790                 if (msec > VacuumCostDelay * 4)
3791                         msec = VacuumCostDelay * 4;
3792
3793                 pg_usleep(msec * 1000L);
3794
3795                 VacuumCostBalance = 0;
3796
3797                 /* update balance values for workers */
3798                 AutoVacuumUpdateDelay();
3799
3800                 /* Might have gotten an interrupt while sleeping */
3801                 CHECK_FOR_INTERRUPTS();
3802         }
3803 }