]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Add support for tracking call counts and elapsed runtime for user-defined
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.374 2008/05/15 00:17:39 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_namespace.h"
34 #include "commands/dbcommands.h"
35 #include "commands/vacuum.h"
36 #include "executor/executor.h"
37 #include "miscadmin.h"
38 #include "pgstat.h"
39 #include "postmaster/autovacuum.h"
40 #include "storage/bufmgr.h"
41 #include "storage/freespace.h"
42 #include "storage/lmgr.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/builtins.h"
47 #include "utils/flatfiles.h"
48 #include "utils/fmgroids.h"
49 #include "utils/inval.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/pg_rusage.h"
53 #include "utils/relcache.h"
54 #include "utils/snapmgr.h"
55 #include "utils/syscache.h"
56 #include "utils/tqual.h"
57
58
59 /*
60  * GUC parameters
61  */
62 int                     vacuum_freeze_min_age;
63
64 /*
65  * VacPage structures keep track of each page on which we find useful
66  * amounts of free space.
67  */
68 typedef struct VacPageData
69 {
70         BlockNumber blkno;                      /* BlockNumber of this Page */
71         Size            free;                   /* FreeSpace on this Page */
72         uint16          offsets_used;   /* Number of OffNums used by vacuum */
73         uint16          offsets_free;   /* Number of OffNums free or to be free */
74         OffsetNumber offsets[1];        /* Array of free OffNums */
75 } VacPageData;
76
77 typedef VacPageData *VacPage;
78
79 typedef struct VacPageListData
80 {
81         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
82         int                     num_pages;              /* Number of pages in pagedesc */
83         int                     num_allocated_pages;    /* Number of allocated pages in
84                                                                                  * pagedesc */
85         VacPage    *pagedesc;           /* Descriptions of pages */
86 } VacPageListData;
87
88 typedef VacPageListData *VacPageList;
89
90 /*
91  * The "vtlinks" array keeps information about each recently-updated tuple
92  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
93  * We store the tuple's own TID as well as its t_ctid (its link to the next
94  * newer tuple version).  Searching in this array allows us to follow update
95  * chains backwards from newer to older tuples.  When we move a member of an
96  * update chain, we must move *all* the live members of the chain, so that we
97  * can maintain their t_ctid link relationships (we must not just overwrite
98  * t_ctid in an existing tuple).
99  *
100  * Note: because t_ctid links can be stale (this would only occur if a prior
101  * VACUUM crashed partway through), it is possible that new_tid points to an
102  * empty slot or unrelated tuple.  We have to check the linkage as we follow
103  * it, just as is done in EvalPlanQual.
104  */
105 typedef struct VTupleLinkData
106 {
107         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
108         ItemPointerData this_tid;       /* t_self of the tuple */
109 } VTupleLinkData;
110
111 typedef VTupleLinkData *VTupleLink;
112
113 /*
114  * We use an array of VTupleMoveData to plan a chain tuple move fully
115  * before we do it.
116  */
117 typedef struct VTupleMoveData
118 {
119         ItemPointerData tid;            /* tuple ID */
120         VacPage         vacpage;                /* where to move it to */
121         bool            cleanVpd;               /* clean vacpage before using? */
122 } VTupleMoveData;
123
124 typedef VTupleMoveData *VTupleMove;
125
126 /*
127  * VRelStats contains the data acquired by scan_heap for use later
128  */
129 typedef struct VRelStats
130 {
131         /* miscellaneous statistics */
132         BlockNumber rel_pages;          /* pages in relation */
133         double          rel_tuples;             /* tuples that remain after vacuuming */
134         double          rel_indexed_tuples;             /* indexed tuples that remain */
135         Size            min_tlen;               /* min surviving tuple size */
136         Size            max_tlen;               /* max surviving tuple size */
137         bool            hasindex;
138         /* vtlinks array for tuple chain following - sorted by new_tid */
139         int                     num_vtlinks;
140         VTupleLink      vtlinks;
141 } VRelStats;
142
143 /*----------------------------------------------------------------------
144  * ExecContext:
145  *
146  * As these variables always appear together, we put them into one struct
147  * and pull initialization and cleanup into separate routines.
148  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
149  * accurately:  It is *used* only in move_xxx_tuple(), but because this
150  * routine is called many times, we initialize the struct just once in
151  * repair_frag() and pass it on to move_xxx_tuple().
152  */
153 typedef struct ExecContextData
154 {
155         ResultRelInfo *resultRelInfo;
156         EState     *estate;
157         TupleTableSlot *slot;
158 } ExecContextData;
159
160 typedef ExecContextData *ExecContext;
161
162 static void
163 ExecContext_Init(ExecContext ec, Relation rel)
164 {
165         TupleDesc       tupdesc = RelationGetDescr(rel);
166
167         /*
168          * We need a ResultRelInfo and an EState so we can use the regular
169          * executor's index-entry-making machinery.
170          */
171         ec->estate = CreateExecutorState();
172
173         ec->resultRelInfo = makeNode(ResultRelInfo);
174         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
175         ec->resultRelInfo->ri_RelationDesc = rel;
176         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
177
178         ExecOpenIndices(ec->resultRelInfo);
179
180         ec->estate->es_result_relations = ec->resultRelInfo;
181         ec->estate->es_num_result_relations = 1;
182         ec->estate->es_result_relation_info = ec->resultRelInfo;
183
184         /* Set up a tuple slot too */
185         ec->slot = MakeSingleTupleTableSlot(tupdesc);
186 }
187
188 static void
189 ExecContext_Finish(ExecContext ec)
190 {
191         ExecDropSingleTupleTableSlot(ec->slot);
192         ExecCloseIndices(ec->resultRelInfo);
193         FreeExecutorState(ec->estate);
194 }
195
196 /*
197  * End of ExecContext Implementation
198  *----------------------------------------------------------------------
199  */
200
201 /* A few variables that don't seem worth passing around as parameters */
202 static MemoryContext vac_context = NULL;
203
204 static int      elevel = -1;
205
206 static TransactionId OldestXmin;
207 static TransactionId FreezeLimit;
208
209 static BufferAccessStrategy vac_strategy;
210
211
212 /* non-export function prototypes */
213 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
214                          const char *stmttype);
215 static void vac_truncate_clog(TransactionId frozenXID);
216 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
217                                            bool for_wraparound);
218 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
219 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
220                   VacPageList vacuum_pages, VacPageList fraged_pages);
221 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
222                         VacPageList vacuum_pages, VacPageList fraged_pages,
223                         int nindexes, Relation *Irel);
224 static void move_chain_tuple(Relation rel,
225                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
226                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
227                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
228 static void move_plain_tuple(Relation rel,
229                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
230                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
231                                  ExecContext ec);
232 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
233                                  int num_fraged_pages, BlockNumber last_move_dest_block,
234                                  int num_moved);
235 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
236                         VacPageList vacpagelist);
237 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
238 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
239                          double num_tuples, int keep_tuples);
240 static void scan_index(Relation indrel, double num_tuples);
241 static bool tid_reaped(ItemPointer itemptr, void *state);
242 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
243                            BlockNumber rel_pages);
244 static VacPage copy_vac_page(VacPage vacpage);
245 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
246 static void *vac_bsearch(const void *key, const void *base,
247                         size_t nelem, size_t size,
248                         int (*compar) (const void *, const void *));
249 static int      vac_cmp_blk(const void *left, const void *right);
250 static int      vac_cmp_offno(const void *left, const void *right);
251 static int      vac_cmp_vtlinks(const void *left, const void *right);
252 static bool enough_space(VacPage vacpage, Size len);
253 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
254
255
256 /****************************************************************************
257  *                                                                                                                                                      *
258  *                      Code common to all flavors of VACUUM and ANALYZE                                *
259  *                                                                                                                                                      *
260  ****************************************************************************
261  */
262
263
264 /*
265  * Primary entry point for VACUUM and ANALYZE commands.
266  *
267  * relids is normally NIL; if it is not, then it provides the list of
268  * relation OIDs to be processed, and vacstmt->relation is ignored.
269  * (The non-NIL case is currently only used by autovacuum.)
270  *
271  * for_wraparound is used by autovacuum to let us know when it's forcing
272  * a vacuum for wraparound, which should not be auto-cancelled.
273  *
274  * bstrategy is normally given as NULL, but in autovacuum it can be passed
275  * in to use the same buffer strategy object across multiple vacuum() calls.
276  *
277  * isTopLevel should be passed down from ProcessUtility.
278  *
279  * It is the caller's responsibility that vacstmt, relids, and bstrategy
280  * (if given) be allocated in a memory context that won't disappear
281  * at transaction commit.
282  */
283 void
284 vacuum(VacuumStmt *vacstmt, List *relids,
285            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
286 {
287         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
288         volatile MemoryContext anl_context = NULL;
289         volatile bool all_rels,
290                                 in_outer_xact,
291                                 use_own_xacts;
292         List       *relations;
293
294         if (vacstmt->verbose)
295                 elevel = INFO;
296         else
297                 elevel = DEBUG2;
298
299         /*
300          * We cannot run VACUUM inside a user transaction block; if we were inside
301          * a transaction, then our commit- and start-transaction-command calls
302          * would not have the intended effect! Furthermore, the forced commit that
303          * occurs before truncating the relation's file would have the effect of
304          * committing the rest of the user's transaction too, which would
305          * certainly not be the desired behavior.  (This only applies to VACUUM
306          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
307          * block, but we choose to disallow that case because we'd rather commit
308          * as soon as possible after finishing the vacuum.      This is mainly so that
309          * we can let go the AccessExclusiveLock that we may be holding.)
310          *
311          * ANALYZE (without VACUUM) can run either way.
312          */
313         if (vacstmt->vacuum)
314         {
315                 PreventTransactionChain(isTopLevel, stmttype);
316                 in_outer_xact = false;
317         }
318         else
319                 in_outer_xact = IsInTransactionChain(isTopLevel);
320
321         /*
322          * Send info about dead objects to the statistics collector, unless we are
323          * in autovacuum --- autovacuum.c does this for itself.
324          */
325         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
326                 pgstat_vacuum_stat();
327
328         /*
329          * Create special memory context for cross-transaction storage.
330          *
331          * Since it is a child of PortalContext, it will go away eventually even
332          * if we suffer an error; there's no need for special abort cleanup logic.
333          */
334         vac_context = AllocSetContextCreate(PortalContext,
335                                                                                 "Vacuum",
336                                                                                 ALLOCSET_DEFAULT_MINSIZE,
337                                                                                 ALLOCSET_DEFAULT_INITSIZE,
338                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
339
340         /*
341          * If caller didn't give us a buffer strategy object, make one in the
342          * cross-transaction memory context.
343          */
344         if (bstrategy == NULL)
345         {
346                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
347
348                 bstrategy = GetAccessStrategy(BAS_VACUUM);
349                 MemoryContextSwitchTo(old_context);
350         }
351         vac_strategy = bstrategy;
352
353         /* Remember whether we are processing everything in the DB */
354         all_rels = (relids == NIL && vacstmt->relation == NULL);
355
356         /*
357          * Build list of relations to process, unless caller gave us one. (If we
358          * build one, we put it in vac_context for safekeeping.)
359          */
360         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
361
362         /*
363          * Decide whether we need to start/commit our own transactions.
364          *
365          * For VACUUM (with or without ANALYZE): always do so, so that we can
366          * release locks as soon as possible.  (We could possibly use the outer
367          * transaction for a one-table VACUUM, but handling TOAST tables would be
368          * problematic.)
369          *
370          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
371          * start/commit our own transactions.  Also, there's no need to do so if
372          * only processing one relation.  For multiple relations when not within a
373          * transaction block, and also in an autovacuum worker, use own
374          * transactions so we can release locks sooner.
375          */
376         if (vacstmt->vacuum)
377                 use_own_xacts = true;
378         else
379         {
380                 Assert(vacstmt->analyze);
381                 if (IsAutoVacuumWorkerProcess())
382                         use_own_xacts = true;
383                 else if (in_outer_xact)
384                         use_own_xacts = false;
385                 else if (list_length(relations) > 1)
386                         use_own_xacts = true;
387                 else
388                         use_own_xacts = false;
389         }
390
391         /*
392          * If we are running ANALYZE without per-table transactions, we'll need a
393          * memory context with table lifetime.
394          */
395         if (!use_own_xacts)
396                 anl_context = AllocSetContextCreate(PortalContext,
397                                                                                         "Analyze",
398                                                                                         ALLOCSET_DEFAULT_MINSIZE,
399                                                                                         ALLOCSET_DEFAULT_INITSIZE,
400                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
401
402         /*
403          * vacuum_rel expects to be entered with no transaction active; it will
404          * start and commit its own transaction.  But we are called by an SQL
405          * command, and so we are executing inside a transaction already. We
406          * commit the transaction started in PostgresMain() here, and start
407          * another one before exiting to match the commit waiting for us back in
408          * PostgresMain().
409          */
410         if (use_own_xacts)
411         {
412                 /* ActiveSnapshot is not set by autovacuum */
413                 if (ActiveSnapshotSet())
414                         PopActiveSnapshot();
415
416                 /* matches the StartTransaction in PostgresMain() */
417                 CommitTransactionCommand();
418         }
419
420         /* Turn vacuum cost accounting on or off */
421         PG_TRY();
422         {
423                 ListCell   *cur;
424
425                 VacuumCostActive = (VacuumCostDelay > 0);
426                 VacuumCostBalance = 0;
427
428                 /*
429                  * Loop to process each selected relation.
430                  */
431                 foreach(cur, relations)
432                 {
433                         Oid                     relid = lfirst_oid(cur);
434
435                         if (vacstmt->vacuum)
436                                 vacuum_rel(relid, vacstmt, RELKIND_RELATION, for_wraparound);
437
438                         if (vacstmt->analyze)
439                         {
440                                 MemoryContext old_context = NULL;
441
442                                 /*
443                                  * If using separate xacts, start one for analyze. Otherwise,
444                                  * we can use the outer transaction, but we still need to call
445                                  * analyze_rel in a memory context that will be cleaned up on
446                                  * return (else we leak memory while processing multiple
447                                  * tables).
448                                  */
449                                 if (use_own_xacts)
450                                 {
451                                         StartTransactionCommand();
452                                         /* functions in indexes may want a snapshot set */
453                                         PushActiveSnapshot(GetTransactionSnapshot());
454                                 }
455                                 else
456                                         old_context = MemoryContextSwitchTo(anl_context);
457
458                                 analyze_rel(relid, vacstmt, vac_strategy);
459
460                                 if (use_own_xacts)
461                                 {
462                                         PopActiveSnapshot();
463                                         CommitTransactionCommand();
464                                 }
465                                 else
466                                 {
467                                         MemoryContextSwitchTo(old_context);
468                                         MemoryContextResetAndDeleteChildren(anl_context);
469                                 }
470                         }
471                 }
472         }
473         PG_CATCH();
474         {
475                 /* Make sure cost accounting is turned off after error */
476                 VacuumCostActive = false;
477                 PG_RE_THROW();
478         }
479         PG_END_TRY();
480
481         /* Turn off vacuum cost accounting */
482         VacuumCostActive = false;
483
484         /*
485          * Finish up processing.
486          */
487         if (use_own_xacts)
488         {
489                 /* here, we are not in a transaction */
490
491                 /*
492                  * This matches the CommitTransaction waiting for us in
493                  * PostgresMain().
494                  */
495                 StartTransactionCommand();
496         }
497
498         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
499         {
500                 /*
501                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
502                  * (autovacuum.c does this for itself.)
503                  */
504                 vac_update_datfrozenxid();
505
506                 /*
507                  * If it was a database-wide VACUUM, print FSM usage statistics (we
508                  * don't make you be superuser to see these).  We suppress this in
509                  * autovacuum, too.
510                  */
511                 if (all_rels)
512                         PrintFreeSpaceMapStatistics(elevel);
513         }
514
515         /*
516          * Clean up working storage --- note we must do this after
517          * StartTransactionCommand, else we might be trying to delete the active
518          * context!
519          */
520         MemoryContextDelete(vac_context);
521         vac_context = NULL;
522
523         if (anl_context)
524                 MemoryContextDelete(anl_context);
525 }
526
527 /*
528  * Build a list of Oids for each relation to be processed
529  *
530  * The list is built in vac_context so that it will survive across our
531  * per-relation transactions.
532  */
533 static List *
534 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
535 {
536         List       *oid_list = NIL;
537         MemoryContext oldcontext;
538
539         /* List supplied by VACUUM's caller? */
540         if (relids)
541                 return relids;
542
543         if (vacrel)
544         {
545                 /* Process a specific relation */
546                 Oid                     relid;
547
548                 relid = RangeVarGetRelid(vacrel, false);
549
550                 /* Make a relation list entry for this guy */
551                 oldcontext = MemoryContextSwitchTo(vac_context);
552                 oid_list = lappend_oid(oid_list, relid);
553                 MemoryContextSwitchTo(oldcontext);
554         }
555         else
556         {
557                 /* Process all plain relations listed in pg_class */
558                 Relation        pgclass;
559                 HeapScanDesc scan;
560                 HeapTuple       tuple;
561                 ScanKeyData key;
562
563                 ScanKeyInit(&key,
564                                         Anum_pg_class_relkind,
565                                         BTEqualStrategyNumber, F_CHAREQ,
566                                         CharGetDatum(RELKIND_RELATION));
567
568                 pgclass = heap_open(RelationRelationId, AccessShareLock);
569
570                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
571
572                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
573                 {
574                         /* Make a relation list entry for this guy */
575                         oldcontext = MemoryContextSwitchTo(vac_context);
576                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
577                         MemoryContextSwitchTo(oldcontext);
578                 }
579
580                 heap_endscan(scan);
581                 heap_close(pgclass, AccessShareLock);
582         }
583
584         return oid_list;
585 }
586
587 /*
588  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
589  */
590 void
591 vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
592                                           TransactionId *oldestXmin,
593                                           TransactionId *freezeLimit)
594 {
595         int                     freezemin;
596         TransactionId limit;
597         TransactionId safeLimit;
598
599         /*
600          * We can always ignore processes running lazy vacuum.  This is because we
601          * use these values only for deciding which tuples we must keep in the
602          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
603          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
604          * a full vacuum, but keep in mind that only one vacuum process can be
605          * working on a particular table at any time, and that each vacuum is
606          * always an independent transaction.
607          */
608         *oldestXmin = GetOldestXmin(sharedRel, true);
609
610         Assert(TransactionIdIsNormal(*oldestXmin));
611
612         /*
613          * Determine the minimum freeze age to use: as specified by the caller, or
614          * vacuum_freeze_min_age, but in any case not more than half
615          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
616          * wraparound won't occur too frequently.
617          */
618         freezemin = freeze_min_age;
619         if (freezemin < 0)
620                 freezemin = vacuum_freeze_min_age;
621         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
622         Assert(freezemin >= 0);
623
624         /*
625          * Compute the cutoff XID, being careful not to generate a "permanent" XID
626          */
627         limit = *oldestXmin - freezemin;
628         if (!TransactionIdIsNormal(limit))
629                 limit = FirstNormalTransactionId;
630
631         /*
632          * If oldestXmin is very far back (in practice, more than
633          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
634          * freeze age of zero.
635          */
636         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
637         if (!TransactionIdIsNormal(safeLimit))
638                 safeLimit = FirstNormalTransactionId;
639
640         if (TransactionIdPrecedes(limit, safeLimit))
641         {
642                 ereport(WARNING,
643                                 (errmsg("oldest xmin is far in the past"),
644                                  errhint("Close open transactions soon to avoid wraparound problems.")));
645                 limit = *oldestXmin;
646         }
647
648         *freezeLimit = limit;
649 }
650
651
652 /*
653  *      vac_update_relstats() -- update statistics for one relation
654  *
655  *              Update the whole-relation statistics that are kept in its pg_class
656  *              row.  There are additional stats that will be updated if we are
657  *              doing ANALYZE, but we always update these stats.  This routine works
658  *              for both index and heap relation entries in pg_class.
659  *
660  *              We violate transaction semantics here by overwriting the rel's
661  *              existing pg_class tuple with the new values.  This is reasonably
662  *              safe since the new values are correct whether or not this transaction
663  *              commits.  The reason for this is that if we updated these tuples in
664  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
665  *              by the time we got done with a vacuum cycle, most of the tuples in
666  *              pg_class would've been obsoleted.  Of course, this only works for
667  *              fixed-size never-null columns, but these are.
668  *
669  *              Another reason for doing it this way is that when we are in a lazy
670  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
671  *              somebody vacuuming pg_class might think they could delete a tuple
672  *              marked with xmin = our xid.
673  *
674  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
675  *              ANALYZE.
676  */
677 void
678 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
679                                         bool hasindex, TransactionId frozenxid)
680 {
681         Relation        rd;
682         HeapTuple       ctup;
683         Form_pg_class pgcform;
684         bool            dirty;
685
686         rd = heap_open(RelationRelationId, RowExclusiveLock);
687
688         /* Fetch a copy of the tuple to scribble on */
689         ctup = SearchSysCacheCopy(RELOID,
690                                                           ObjectIdGetDatum(relid),
691                                                           0, 0, 0);
692         if (!HeapTupleIsValid(ctup))
693                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
694                          relid);
695         pgcform = (Form_pg_class) GETSTRUCT(ctup);
696
697         /* Apply required updates, if any, to copied tuple */
698
699         dirty = false;
700         if (pgcform->relpages != (int32) num_pages)
701         {
702                 pgcform->relpages = (int32) num_pages;
703                 dirty = true;
704         }
705         if (pgcform->reltuples != (float4) num_tuples)
706         {
707                 pgcform->reltuples = (float4) num_tuples;
708                 dirty = true;
709         }
710         if (pgcform->relhasindex != hasindex)
711         {
712                 pgcform->relhasindex = hasindex;
713                 dirty = true;
714         }
715
716         /*
717          * If we have discovered that there are no indexes, then there's no
718          * primary key either.  This could be done more thoroughly...
719          */
720         if (!hasindex)
721         {
722                 if (pgcform->relhaspkey)
723                 {
724                         pgcform->relhaspkey = false;
725                         dirty = true;
726                 }
727         }
728
729         /*
730          * relfrozenxid should never go backward.  Caller can pass
731          * InvalidTransactionId if it has no new data.
732          */
733         if (TransactionIdIsNormal(frozenxid) &&
734                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
735         {
736                 pgcform->relfrozenxid = frozenxid;
737                 dirty = true;
738         }
739
740         /*
741          * If anything changed, write out the tuple.  Even if nothing changed,
742          * force relcache invalidation so all backends reset their rd_targblock
743          * --- otherwise it might point to a page we truncated away.
744          */
745         if (dirty)
746         {
747                 heap_inplace_update(rd, ctup);
748                 /* the above sends a cache inval message */
749         }
750         else
751         {
752                 /* no need to change tuple, but force relcache inval anyway */
753                 CacheInvalidateRelcacheByTuple(ctup);
754         }
755
756         heap_close(rd, RowExclusiveLock);
757 }
758
759
760 /*
761  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
762  *
763  *              Update pg_database's datfrozenxid entry for our database to be the
764  *              minimum of the pg_class.relfrozenxid values.  If we are able to
765  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
766  *
767  *              We violate transaction semantics here by overwriting the database's
768  *              existing pg_database tuple with the new value.  This is reasonably
769  *              safe since the new value is correct whether or not this transaction
770  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
771  *              behind after a VACUUM.
772  *
773  *              This routine is shared by full and lazy VACUUM.
774  */
775 void
776 vac_update_datfrozenxid(void)
777 {
778         HeapTuple       tuple;
779         Form_pg_database dbform;
780         Relation        relation;
781         SysScanDesc scan;
782         HeapTuple       classTup;
783         TransactionId newFrozenXid;
784         bool            dirty = false;
785
786         /*
787          * Initialize the "min" calculation with RecentGlobalXmin.      Any
788          * not-yet-committed pg_class entries for new tables must have
789          * relfrozenxid at least this high, because any other open xact must have
790          * RecentXmin >= its PGPROC.xmin >= our RecentGlobalXmin; see
791          * AddNewRelationTuple().  So we cannot produce a wrong minimum by
792          * starting with this.
793          */
794         newFrozenXid = RecentGlobalXmin;
795
796         /*
797          * We must seqscan pg_class to find the minimum Xid, because there is no
798          * index that can help us here.
799          */
800         relation = heap_open(RelationRelationId, AccessShareLock);
801
802         scan = systable_beginscan(relation, InvalidOid, false,
803                                                           SnapshotNow, 0, NULL);
804
805         while ((classTup = systable_getnext(scan)) != NULL)
806         {
807                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
808
809                 /*
810                  * Only consider heap and TOAST tables (anything else should have
811                  * InvalidTransactionId in relfrozenxid anyway.)
812                  */
813                 if (classForm->relkind != RELKIND_RELATION &&
814                         classForm->relkind != RELKIND_TOASTVALUE)
815                         continue;
816
817                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
818
819                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
820                         newFrozenXid = classForm->relfrozenxid;
821         }
822
823         /* we're done with pg_class */
824         systable_endscan(scan);
825         heap_close(relation, AccessShareLock);
826
827         Assert(TransactionIdIsNormal(newFrozenXid));
828
829         /* Now fetch the pg_database tuple we need to update. */
830         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
831
832         /* Fetch a copy of the tuple to scribble on */
833         tuple = SearchSysCacheCopy(DATABASEOID,
834                                                            ObjectIdGetDatum(MyDatabaseId),
835                                                            0, 0, 0);
836         if (!HeapTupleIsValid(tuple))
837                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
838         dbform = (Form_pg_database) GETSTRUCT(tuple);
839
840         /*
841          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
842          * and detect the common case where it doesn't go forward either.
843          */
844         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
845         {
846                 dbform->datfrozenxid = newFrozenXid;
847                 dirty = true;
848         }
849
850         if (dirty)
851                 heap_inplace_update(relation, tuple);
852
853         heap_freetuple(tuple);
854         heap_close(relation, RowExclusiveLock);
855
856         /*
857          * If we were able to advance datfrozenxid, mark the flat-file copy of
858          * pg_database for update at commit, and see if we can truncate pg_clog.
859          */
860         if (dirty)
861         {
862                 database_file_update_needed();
863                 vac_truncate_clog(newFrozenXid);
864         }
865 }
866
867
868 /*
869  *      vac_truncate_clog() -- attempt to truncate the commit log
870  *
871  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
872  *              and use it to truncate the transaction commit log (pg_clog).
873  *              Also update the XID wrap limit info maintained by varsup.c.
874  *
875  *              The passed XID is simply the one I just wrote into my pg_database
876  *              entry.  It's used to initialize the "min" calculation.
877  *
878  *              This routine is shared by full and lazy VACUUM.  Note that it's
879  *              only invoked when we've managed to change our DB's datfrozenxid
880  *              entry.
881  */
882 static void
883 vac_truncate_clog(TransactionId frozenXID)
884 {
885         TransactionId myXID = GetCurrentTransactionId();
886         Relation        relation;
887         HeapScanDesc scan;
888         HeapTuple       tuple;
889         NameData        oldest_datname;
890         bool            frozenAlreadyWrapped = false;
891
892         /* init oldest_datname to sync with my frozenXID */
893         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
894
895         /*
896          * Scan pg_database to compute the minimum datfrozenxid
897          *
898          * Note: we need not worry about a race condition with new entries being
899          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
900          * existing DB's datfrozenxid, and that source DB cannot be ours because
901          * of the interlock against copying a DB containing an active backend.
902          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
903          * concurrently modify the datfrozenxid's of different databases, the
904          * worst possible outcome is that pg_clog is not truncated as aggressively
905          * as it could be.
906          */
907         relation = heap_open(DatabaseRelationId, AccessShareLock);
908
909         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
910
911         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
912         {
913                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
914
915                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
916
917                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
918                         frozenAlreadyWrapped = true;
919                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
920                 {
921                         frozenXID = dbform->datfrozenxid;
922                         namecpy(&oldest_datname, &dbform->datname);
923                 }
924         }
925
926         heap_endscan(scan);
927
928         heap_close(relation, AccessShareLock);
929
930         /*
931          * Do not truncate CLOG if we seem to have suffered wraparound already;
932          * the computed minimum XID might be bogus.  This case should now be
933          * impossible due to the defenses in GetNewTransactionId, but we keep the
934          * test anyway.
935          */
936         if (frozenAlreadyWrapped)
937         {
938                 ereport(WARNING,
939                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
940                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
941                 return;
942         }
943
944         /* Truncate CLOG to the oldest frozenxid */
945         TruncateCLOG(frozenXID);
946
947         /*
948          * Update the wrap limit for GetNewTransactionId.  Note: this function
949          * will also signal the postmaster for an(other) autovac cycle if needed.
950          */
951         SetTransactionIdLimit(frozenXID, &oldest_datname);
952 }
953
954
955 /****************************************************************************
956  *                                                                                                                                                      *
957  *                      Code common to both flavors of VACUUM                                                   *
958  *                                                                                                                                                      *
959  ****************************************************************************
960  */
961
962
963 /*
964  *      vacuum_rel() -- vacuum one heap relation
965  *
966  *              Doing one heap at a time incurs extra overhead, since we need to
967  *              check that the heap exists again just before we vacuum it.      The
968  *              reason that we do this is so that vacuuming can be spread across
969  *              many small transactions.  Otherwise, two-phase locking would require
970  *              us to lock the entire database during one pass of the vacuum cleaner.
971  *
972  *              At entry and exit, we are not inside a transaction.
973  */
974 static void
975 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind,
976                    bool for_wraparound)
977 {
978         LOCKMODE        lmode;
979         Relation        onerel;
980         LockRelId       onerelid;
981         Oid                     toast_relid;
982         Oid                     save_userid;
983         bool            save_secdefcxt;
984
985         /* Begin a transaction for vacuuming this relation */
986         StartTransactionCommand();
987
988         if (vacstmt->full)
989         {
990                 /* functions in indexes may want a snapshot set */
991                 PushActiveSnapshot(GetTransactionSnapshot());
992         }
993         else
994         {
995                 /*
996                  * During a lazy VACUUM we do not run any user-supplied functions, and
997                  * so it should be safe to not create a transaction snapshot.
998                  *
999                  * We can furthermore set the PROC_IN_VACUUM flag, which lets other
1000                  * concurrent VACUUMs know that they can ignore this one while
1001                  * determining their OldestXmin.  (The reason we don't set it during a
1002                  * full VACUUM is exactly that we may have to run user- defined
1003                  * functions for functional indexes, and we want to make sure that if
1004                  * they use the snapshot set above, any tuples it requires can't get
1005                  * removed from other tables.  An index function that depends on the
1006                  * contents of other tables is arguably broken, but we won't break it
1007                  * here by violating transaction semantics.)
1008                  *
1009                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down
1010                  * by autovacuum; it's used to avoid cancelling a vacuum that was
1011                  * invoked in an emergency.
1012                  *
1013                  * Note: this flag remains set until CommitTransaction or
1014                  * AbortTransaction.  We don't want to clear it until we reset
1015                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1016                  * which is probably Not Good.
1017                  */
1018                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1019                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1020                 if (for_wraparound)
1021                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1022                 LWLockRelease(ProcArrayLock);
1023         }
1024
1025         /*
1026          * Check for user-requested abort.      Note we want this to be inside a
1027          * transaction, so xact.c doesn't issue useless WARNING.
1028          */
1029         CHECK_FOR_INTERRUPTS();
1030
1031         /*
1032          * Determine the type of lock we want --- hard exclusive lock for a FULL
1033          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1034          * way, we can be sure that no other backend is vacuuming the same table.
1035          */
1036         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1037
1038         /*
1039          * Open the relation and get the appropriate lock on it.
1040          *
1041          * There's a race condition here: the rel may have gone away since the
1042          * last time we saw it.  If so, we don't need to vacuum it.
1043          */
1044         onerel = try_relation_open(relid, lmode);
1045
1046         if (!onerel)
1047         {
1048                 if (vacstmt->full)
1049                         PopActiveSnapshot();
1050                 CommitTransactionCommand();
1051                 return;
1052         }
1053
1054         /*
1055          * Check permissions.
1056          *
1057          * We allow the user to vacuum a table if he is superuser, the table
1058          * owner, or the database owner (but in the latter case, only if it's not
1059          * a shared relation).  pg_class_ownercheck includes the superuser case.
1060          *
1061          * Note we choose to treat permissions failure as a WARNING and keep
1062          * trying to vacuum the rest of the DB --- is this appropriate?
1063          */
1064         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1065                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1066         {
1067                 if (onerel->rd_rel->relisshared)
1068                         ereport(WARNING,
1069                                         (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1070                                                         RelationGetRelationName(onerel))));
1071                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1072                         ereport(WARNING,
1073                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1074                                                         RelationGetRelationName(onerel))));
1075                 else
1076                         ereport(WARNING,
1077                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1078                                                         RelationGetRelationName(onerel))));
1079                 relation_close(onerel, lmode);
1080                 if (vacstmt->full)
1081                         PopActiveSnapshot();
1082                 CommitTransactionCommand();
1083                 return;
1084         }
1085
1086         /*
1087          * Check that it's a plain table; we used to do this in get_rel_oids() but
1088          * seems safer to check after we've locked the relation.
1089          */
1090         if (onerel->rd_rel->relkind != expected_relkind)
1091         {
1092                 ereport(WARNING,
1093                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1094                                                 RelationGetRelationName(onerel))));
1095                 relation_close(onerel, lmode);
1096                 if (vacstmt->full)
1097                         PopActiveSnapshot();
1098                 CommitTransactionCommand();
1099                 return;
1100         }
1101
1102         /*
1103          * Silently ignore tables that are temp tables of other backends ---
1104          * trying to vacuum these will lead to great unhappiness, since their
1105          * contents are probably not up-to-date on disk.  (We don't throw a
1106          * warning here; it would just lead to chatter during a database-wide
1107          * VACUUM.)
1108          */
1109         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1110         {
1111                 relation_close(onerel, lmode);
1112                 if (vacstmt->full)
1113                         PopActiveSnapshot();
1114                 CommitTransactionCommand();
1115                 return;
1116         }
1117
1118         /*
1119          * Get a session-level lock too. This will protect our access to the
1120          * relation across multiple transactions, so that we can vacuum the
1121          * relation's TOAST table (if any) secure in the knowledge that no one is
1122          * deleting the parent relation.
1123          *
1124          * NOTE: this cannot block, even if someone else is waiting for access,
1125          * because the lock manager knows that both lock requests are from the
1126          * same process.
1127          */
1128         onerelid = onerel->rd_lockInfo.lockRelId;
1129         LockRelationIdForSession(&onerelid, lmode);
1130
1131         /*
1132          * Remember the relation's TOAST relation for later
1133          */
1134         toast_relid = onerel->rd_rel->reltoastrelid;
1135
1136         /*
1137          * Switch to the table owner's userid, so that any index functions are
1138          * run as that user.  (This is unnecessary, but harmless, for lazy
1139          * VACUUM.)
1140          */
1141         GetUserIdAndContext(&save_userid, &save_secdefcxt);
1142         SetUserIdAndContext(onerel->rd_rel->relowner, true);
1143
1144         /*
1145          * Do the actual work --- either FULL or "lazy" vacuum
1146          */
1147         if (vacstmt->full)
1148                 full_vacuum_rel(onerel, vacstmt);
1149         else
1150                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy);
1151
1152         /* Restore userid */
1153         SetUserIdAndContext(save_userid, save_secdefcxt);
1154
1155         /* all done with this class, but hold lock until commit */
1156         relation_close(onerel, NoLock);
1157
1158         /*
1159          * Complete the transaction and free all temporary memory used.
1160          */
1161         if (vacstmt->full)
1162                 PopActiveSnapshot();
1163         CommitTransactionCommand();
1164
1165         /*
1166          * If the relation has a secondary toast rel, vacuum that too while we
1167          * still hold the session lock on the master table.  Note however that
1168          * "analyze" will not get done on the toast table.      This is good, because
1169          * the toaster always uses hardcoded index access and statistics are
1170          * totally unimportant for toast relations.
1171          */
1172         if (toast_relid != InvalidOid)
1173                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE, for_wraparound);
1174
1175         /*
1176          * Now release the session-level lock on the master table.
1177          */
1178         UnlockRelationIdForSession(&onerelid, lmode);
1179 }
1180
1181
1182 /****************************************************************************
1183  *                                                                                                                                                      *
1184  *                      Code for VACUUM FULL (only)                                                                             *
1185  *                                                                                                                                                      *
1186  ****************************************************************************
1187  */
1188
1189
1190 /*
1191  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1192  *
1193  *              This routine vacuums a single heap, cleans out its indexes, and
1194  *              updates its num_pages and num_tuples statistics.
1195  *
1196  *              At entry, we have already established a transaction and opened
1197  *              and locked the relation.
1198  */
1199 static void
1200 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1201 {
1202         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1203                                                                                  * clean indexes */
1204         VacPageListData fraged_pages;           /* List of pages with space enough for
1205                                                                                  * re-using */
1206         Relation   *Irel;
1207         int                     nindexes,
1208                                 i;
1209         VRelStats  *vacrelstats;
1210
1211         vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
1212                                                   &OldestXmin, &FreezeLimit);
1213
1214         /*
1215          * Flush any previous async-commit transactions.  This does not guarantee
1216          * that we will be able to set hint bits for tuples they inserted, but it
1217          * improves the probability, especially in simple sequential-commands
1218          * cases.  See scan_heap() and repair_frag() for more about this.
1219          */
1220         XLogAsyncCommitFlush();
1221
1222         /*
1223          * Set up statistics-gathering machinery.
1224          */
1225         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1226         vacrelstats->rel_pages = 0;
1227         vacrelstats->rel_tuples = 0;
1228         vacrelstats->rel_indexed_tuples = 0;
1229         vacrelstats->hasindex = false;
1230
1231         /* scan the heap */
1232         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1233         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1234
1235         /* Now open all indexes of the relation */
1236         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1237         if (nindexes > 0)
1238                 vacrelstats->hasindex = true;
1239
1240         /* Clean/scan index relation(s) */
1241         if (Irel != NULL)
1242         {
1243                 if (vacuum_pages.num_pages > 0)
1244                 {
1245                         for (i = 0; i < nindexes; i++)
1246                                 vacuum_index(&vacuum_pages, Irel[i],
1247                                                          vacrelstats->rel_indexed_tuples, 0);
1248                 }
1249                 else
1250                 {
1251                         /* just scan indexes to update statistic */
1252                         for (i = 0; i < nindexes; i++)
1253                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1254                 }
1255         }
1256
1257         if (fraged_pages.num_pages > 0)
1258         {
1259                 /* Try to shrink heap */
1260                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1261                                         nindexes, Irel);
1262                 vac_close_indexes(nindexes, Irel, NoLock);
1263         }
1264         else
1265         {
1266                 vac_close_indexes(nindexes, Irel, NoLock);
1267                 if (vacuum_pages.num_pages > 0)
1268                 {
1269                         /* Clean pages from vacuum_pages list */
1270                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1271                 }
1272         }
1273
1274         /* update shared free space map with final free space info */
1275         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1276
1277         /* update statistics in pg_class */
1278         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1279                                                 vacrelstats->rel_tuples, vacrelstats->hasindex,
1280                                                 FreezeLimit);
1281
1282         /* report results to the stats collector, too */
1283         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1284                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1285 }
1286
1287
1288 /*
1289  *      scan_heap() -- scan an open heap relation
1290  *
1291  *              This routine sets commit status bits, constructs vacuum_pages (list
1292  *              of pages we need to compact free space on and/or clean indexes of
1293  *              deleted tuples), constructs fraged_pages (list of pages with free
1294  *              space that tuples could be moved into), and calculates statistics
1295  *              on the number of live tuples in the heap.
1296  */
1297 static void
1298 scan_heap(VRelStats *vacrelstats, Relation onerel,
1299                   VacPageList vacuum_pages, VacPageList fraged_pages)
1300 {
1301         BlockNumber nblocks,
1302                                 blkno;
1303         char       *relname;
1304         VacPage         vacpage;
1305         BlockNumber empty_pages,
1306                                 empty_end_pages;
1307         double          num_tuples,
1308                                 num_indexed_tuples,
1309                                 tups_vacuumed,
1310                                 nkeep,
1311                                 nunused;
1312         double          free_space,
1313                                 usable_free_space;
1314         Size            min_tlen = MaxHeapTupleSize;
1315         Size            max_tlen = 0;
1316         bool            do_shrinking = true;
1317         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1318         int                     num_vtlinks = 0;
1319         int                     free_vtlinks = 100;
1320         PGRUsage        ru0;
1321
1322         pg_rusage_init(&ru0);
1323
1324         relname = RelationGetRelationName(onerel);
1325         ereport(elevel,
1326                         (errmsg("vacuuming \"%s.%s\"",
1327                                         get_namespace_name(RelationGetNamespace(onerel)),
1328                                         relname)));
1329
1330         empty_pages = empty_end_pages = 0;
1331         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1332         free_space = 0;
1333
1334         nblocks = RelationGetNumberOfBlocks(onerel);
1335
1336         /*
1337          * We initially create each VacPage item in a maximal-sized workspace,
1338          * then copy the workspace into a just-large-enough copy.
1339          */
1340         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1341
1342         for (blkno = 0; blkno < nblocks; blkno++)
1343         {
1344                 Page            page,
1345                                         tempPage = NULL;
1346                 bool            do_reap,
1347                                         do_frag;
1348                 Buffer          buf;
1349                 OffsetNumber offnum,
1350                                         maxoff;
1351                 bool            notup;
1352                 OffsetNumber frozen[MaxOffsetNumber];
1353                 int                     nfrozen;
1354
1355                 vacuum_delay_point();
1356
1357                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1358                 page = BufferGetPage(buf);
1359
1360                 /*
1361                  * Since we are holding exclusive lock on the relation, no other
1362                  * backend can be accessing the page; however it is possible that the
1363                  * background writer will try to write the page if it's already marked
1364                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1365                  * must take exclusive buffer lock wherever we potentially modify
1366                  * pages.  In fact, we insist on cleanup lock so that we can safely
1367                  * call heap_page_prune().      (This might be overkill, since the
1368                  * bgwriter pays no attention to individual tuples, but on the other
1369                  * hand it's unlikely that the bgwriter has this particular page
1370                  * pinned at this instant.      So violating the coding rule would buy us
1371                  * little anyway.)
1372                  */
1373                 LockBufferForCleanup(buf);
1374
1375                 vacpage->blkno = blkno;
1376                 vacpage->offsets_used = 0;
1377                 vacpage->offsets_free = 0;
1378
1379                 if (PageIsNew(page))
1380                 {
1381                         VacPage         vacpagecopy;
1382
1383                         ereport(WARNING,
1384                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1385                                            relname, blkno)));
1386                         PageInit(page, BufferGetPageSize(buf), 0);
1387                         MarkBufferDirty(buf);
1388                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1389                         free_space += vacpage->free;
1390                         empty_pages++;
1391                         empty_end_pages++;
1392                         vacpagecopy = copy_vac_page(vacpage);
1393                         vpage_insert(vacuum_pages, vacpagecopy);
1394                         vpage_insert(fraged_pages, vacpagecopy);
1395                         UnlockReleaseBuffer(buf);
1396                         continue;
1397                 }
1398
1399                 if (PageIsEmpty(page))
1400                 {
1401                         VacPage         vacpagecopy;
1402
1403                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1404                         free_space += vacpage->free;
1405                         empty_pages++;
1406                         empty_end_pages++;
1407                         vacpagecopy = copy_vac_page(vacpage);
1408                         vpage_insert(vacuum_pages, vacpagecopy);
1409                         vpage_insert(fraged_pages, vacpagecopy);
1410                         UnlockReleaseBuffer(buf);
1411                         continue;
1412                 }
1413
1414                 /*
1415                  * Prune all HOT-update chains in this page.
1416                  *
1417                  * We use the redirect_move option so that redirecting line pointers
1418                  * get collapsed out; this allows us to not worry about them below.
1419                  *
1420                  * We count tuples removed by the pruning step as removed by VACUUM.
1421                  */
1422                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1423                                                                                  true, false);
1424
1425                 /*
1426                  * Now scan the page to collect vacuumable items and check for tuples
1427                  * requiring freezing.
1428                  */
1429                 nfrozen = 0;
1430                 notup = true;
1431                 maxoff = PageGetMaxOffsetNumber(page);
1432                 for (offnum = FirstOffsetNumber;
1433                          offnum <= maxoff;
1434                          offnum = OffsetNumberNext(offnum))
1435                 {
1436                         ItemId          itemid = PageGetItemId(page, offnum);
1437                         bool            tupgone = false;
1438                         HeapTupleData tuple;
1439
1440                         /*
1441                          * Collect un-used items too - it's possible to have indexes
1442                          * pointing here after crash.  (That's an ancient comment and is
1443                          * likely obsolete with WAL, but we might as well continue to
1444                          * check for such problems.)
1445                          */
1446                         if (!ItemIdIsUsed(itemid))
1447                         {
1448                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1449                                 nunused += 1;
1450                                 continue;
1451                         }
1452
1453                         /*
1454                          * DEAD item pointers are to be vacuumed normally; but we don't
1455                          * count them in tups_vacuumed, else we'd be double-counting (at
1456                          * least in the common case where heap_page_prune() just freed up
1457                          * a non-HOT tuple).
1458                          */
1459                         if (ItemIdIsDead(itemid))
1460                         {
1461                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1462                                 continue;
1463                         }
1464
1465                         /* Shouldn't have any redirected items anymore */
1466                         if (!ItemIdIsNormal(itemid))
1467                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1468                                          relname, blkno, offnum);
1469
1470                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1471                         tuple.t_len = ItemIdGetLength(itemid);
1472                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1473
1474                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1475                         {
1476                                 case HEAPTUPLE_LIVE:
1477                                         /* Tuple is good --- but let's do some validity checks */
1478                                         if (onerel->rd_rel->relhasoids &&
1479                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1480                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1481                                                          relname, blkno, offnum);
1482
1483                                         /*
1484                                          * The shrinkage phase of VACUUM FULL requires that all
1485                                          * live tuples have XMIN_COMMITTED set --- see comments in
1486                                          * repair_frag()'s walk-along-page loop.  Use of async
1487                                          * commit may prevent HeapTupleSatisfiesVacuum from
1488                                          * setting the bit for a recently committed tuple.      Rather
1489                                          * than trying to handle this corner case, we just give up
1490                                          * and don't shrink.
1491                                          */
1492                                         if (do_shrinking &&
1493                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1494                                         {
1495                                                 ereport(LOG,
1496                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1497                                                                                 relname, blkno, offnum,
1498                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1499                                                 do_shrinking = false;
1500                                         }
1501                                         break;
1502                                 case HEAPTUPLE_DEAD:
1503
1504                                         /*
1505                                          * Ordinarily, DEAD tuples would have been removed by
1506                                          * heap_page_prune(), but it's possible that the tuple
1507                                          * state changed since heap_page_prune() looked.  In
1508                                          * particular an INSERT_IN_PROGRESS tuple could have
1509                                          * changed to DEAD if the inserter aborted.  So this
1510                                          * cannot be considered an error condition, though it does
1511                                          * suggest that someone released a lock early.
1512                                          *
1513                                          * If the tuple is HOT-updated then it must only be
1514                                          * removed by a prune operation; so we keep it as if it
1515                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1516                                          * worth trying to make the shrinking code smart enough to
1517                                          * handle this?  It's an unusual corner case.)
1518                                          *
1519                                          * DEAD heap-only tuples can safely be removed if they
1520                                          * aren't themselves HOT-updated, although this is a bit
1521                                          * inefficient since we'll uselessly try to remove index
1522                                          * entries for them.
1523                                          */
1524                                         if (HeapTupleIsHotUpdated(&tuple))
1525                                         {
1526                                                 nkeep += 1;
1527                                                 if (do_shrinking)
1528                                                         ereport(LOG,
1529                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1530                                                                                         relname, blkno, offnum)));
1531                                                 do_shrinking = false;
1532                                         }
1533                                         else
1534                                         {
1535                                                 tupgone = true; /* we can delete the tuple */
1536
1537                                                 /*
1538                                                  * We need not require XMIN_COMMITTED or
1539                                                  * XMAX_COMMITTED to be set, since we will remove the
1540                                                  * tuple without any further examination of its hint
1541                                                  * bits.
1542                                                  */
1543                                         }
1544                                         break;
1545                                 case HEAPTUPLE_RECENTLY_DEAD:
1546
1547                                         /*
1548                                          * If tuple is recently deleted then we must not remove it
1549                                          * from relation.
1550                                          */
1551                                         nkeep += 1;
1552
1553                                         /*
1554                                          * As with the LIVE case, shrinkage requires
1555                                          * XMIN_COMMITTED to be set.
1556                                          */
1557                                         if (do_shrinking &&
1558                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1559                                         {
1560                                                 ereport(LOG,
1561                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1562                                                                                 relname, blkno, offnum,
1563                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1564                                                 do_shrinking = false;
1565                                         }
1566
1567                                         /*
1568                                          * If we do shrinking and this tuple is updated one then
1569                                          * remember it to construct updated tuple dependencies.
1570                                          */
1571                                         if (do_shrinking &&
1572                                                 !(ItemPointerEquals(&(tuple.t_self),
1573                                                                                         &(tuple.t_data->t_ctid))))
1574                                         {
1575                                                 if (free_vtlinks == 0)
1576                                                 {
1577                                                         free_vtlinks = 1000;
1578                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1579                                                                                            (free_vtlinks + num_vtlinks) *
1580                                                                                                          sizeof(VTupleLinkData));
1581                                                 }
1582                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1583                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1584                                                 free_vtlinks--;
1585                                                 num_vtlinks++;
1586                                         }
1587                                         break;
1588                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1589
1590                                         /*
1591                                          * This should not happen, since we hold exclusive lock on
1592                                          * the relation; shouldn't we raise an error?  (Actually,
1593                                          * it can happen in system catalogs, since we tend to
1594                                          * release write lock before commit there.)  As above, we
1595                                          * can't apply repair_frag() if the tuple state is
1596                                          * uncertain.
1597                                          */
1598                                         if (do_shrinking)
1599                                                 ereport(LOG,
1600                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1601                                                                                 relname, blkno, offnum,
1602                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1603                                         do_shrinking = false;
1604                                         break;
1605                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1606
1607                                         /*
1608                                          * This should not happen, since we hold exclusive lock on
1609                                          * the relation; shouldn't we raise an error?  (Actually,
1610                                          * it can happen in system catalogs, since we tend to
1611                                          * release write lock before commit there.)  As above, we
1612                                          * can't apply repair_frag() if the tuple state is
1613                                          * uncertain.
1614                                          */
1615                                         if (do_shrinking)
1616                                                 ereport(LOG,
1617                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1618                                                                                 relname, blkno, offnum,
1619                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1620                                         do_shrinking = false;
1621                                         break;
1622                                 default:
1623                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1624                                         break;
1625                         }
1626
1627                         if (tupgone)
1628                         {
1629                                 ItemId          lpp;
1630
1631                                 /*
1632                                  * Here we are building a temporary copy of the page with dead
1633                                  * tuples removed.      Below we will apply
1634                                  * PageRepairFragmentation to the copy, so that we can
1635                                  * determine how much space will be available after removal of
1636                                  * dead tuples.  But note we are NOT changing the real page
1637                                  * yet...
1638                                  */
1639                                 if (tempPage == NULL)
1640                                 {
1641                                         Size            pageSize;
1642
1643                                         pageSize = PageGetPageSize(page);
1644                                         tempPage = (Page) palloc(pageSize);
1645                                         memcpy(tempPage, page, pageSize);
1646                                 }
1647
1648                                 /* mark it unused on the temp page */
1649                                 lpp = PageGetItemId(tempPage, offnum);
1650                                 ItemIdSetUnused(lpp);
1651
1652                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1653                                 tups_vacuumed += 1;
1654                         }
1655                         else
1656                         {
1657                                 num_tuples += 1;
1658                                 if (!HeapTupleIsHeapOnly(&tuple))
1659                                         num_indexed_tuples += 1;
1660                                 notup = false;
1661                                 if (tuple.t_len < min_tlen)
1662                                         min_tlen = tuple.t_len;
1663                                 if (tuple.t_len > max_tlen)
1664                                         max_tlen = tuple.t_len;
1665
1666                                 /*
1667                                  * Each non-removable tuple must be checked to see if it needs
1668                                  * freezing.
1669                                  */
1670                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1671                                                                           InvalidBuffer))
1672                                         frozen[nfrozen++] = offnum;
1673                         }
1674                 }                                               /* scan along page */
1675
1676                 if (tempPage != NULL)
1677                 {
1678                         /* Some tuples are removable; figure free space after removal */
1679                         PageRepairFragmentation(tempPage);
1680                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1681                         pfree(tempPage);
1682                         do_reap = true;
1683                 }
1684                 else
1685                 {
1686                         /* Just use current available space */
1687                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1688                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1689                         do_reap = (vacpage->offsets_free > 0);
1690                 }
1691
1692                 free_space += vacpage->free;
1693
1694                 /*
1695                  * Add the page to vacuum_pages if it requires reaping, and add it to
1696                  * fraged_pages if it has a useful amount of free space.  "Useful"
1697                  * means enough for a minimal-sized tuple.  But we don't know that
1698                  * accurately near the start of the relation, so add pages
1699                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1700                  * forcibly add pages with no live tuples, to avoid confusing the
1701                  * empty_end_pages logic.  (In the presence of unreasonably small
1702                  * fillfactor, it seems possible that such pages might not pass
1703                  * the free-space test, but they had better be in the list anyway.)
1704                  */
1705                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1706                                    notup);
1707
1708                 if (do_reap || do_frag)
1709                 {
1710                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1711
1712                         if (do_reap)
1713                                 vpage_insert(vacuum_pages, vacpagecopy);
1714                         if (do_frag)
1715                                 vpage_insert(fraged_pages, vacpagecopy);
1716                 }
1717
1718                 /*
1719                  * Include the page in empty_end_pages if it will be empty after
1720                  * vacuuming; this is to keep us from using it as a move destination.
1721                  * Note that such pages are guaranteed to be in fraged_pages.
1722                  */
1723                 if (notup)
1724                 {
1725                         empty_pages++;
1726                         empty_end_pages++;
1727                 }
1728                 else
1729                         empty_end_pages = 0;
1730
1731                 /*
1732                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1733                  * record recording the changes.  We must log the changes to be
1734                  * crash-safe against future truncation of CLOG.
1735                  */
1736                 if (nfrozen > 0)
1737                 {
1738                         MarkBufferDirty(buf);
1739                         /* no XLOG for temp tables, though */
1740                         if (!onerel->rd_istemp)
1741                         {
1742                                 XLogRecPtr      recptr;
1743
1744                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1745                                                                                  frozen, nfrozen);
1746                                 PageSetLSN(page, recptr);
1747                                 PageSetTLI(page, ThisTimeLineID);
1748                         }
1749                 }
1750
1751                 UnlockReleaseBuffer(buf);
1752         }
1753
1754         pfree(vacpage);
1755
1756         /* save stats in the rel list for use later */
1757         vacrelstats->rel_tuples = num_tuples;
1758         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1759         vacrelstats->rel_pages = nblocks;
1760         if (num_tuples == 0)
1761                 min_tlen = max_tlen = 0;
1762         vacrelstats->min_tlen = min_tlen;
1763         vacrelstats->max_tlen = max_tlen;
1764
1765         vacuum_pages->empty_end_pages = empty_end_pages;
1766         fraged_pages->empty_end_pages = empty_end_pages;
1767
1768         /*
1769          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1770          * remove any "empty" end-pages from the list, and compute usable free
1771          * space = free space in remaining pages.
1772          */
1773         if (do_shrinking)
1774         {
1775                 int                     i;
1776
1777                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1778                 fraged_pages->num_pages -= empty_end_pages;
1779                 usable_free_space = 0;
1780                 for (i = 0; i < fraged_pages->num_pages; i++)
1781                         usable_free_space += fraged_pages->pagedesc[i]->free;
1782         }
1783         else
1784         {
1785                 fraged_pages->num_pages = 0;
1786                 usable_free_space = 0;
1787         }
1788
1789         /* don't bother to save vtlinks if we will not call repair_frag */
1790         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1791         {
1792                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1793                           vac_cmp_vtlinks);
1794                 vacrelstats->vtlinks = vtlinks;
1795                 vacrelstats->num_vtlinks = num_vtlinks;
1796         }
1797         else
1798         {
1799                 vacrelstats->vtlinks = NULL;
1800                 vacrelstats->num_vtlinks = 0;
1801                 pfree(vtlinks);
1802         }
1803
1804         ereport(elevel,
1805                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1806                                         RelationGetRelationName(onerel),
1807                                         tups_vacuumed, num_tuples, nblocks),
1808                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1809                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1810                                            "There were %.0f unused item pointers.\n"
1811            "Total free space (including removable row versions) is %.0f bytes.\n"
1812                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1813          "%u pages containing %.0f free bytes are potential move destinations.\n"
1814                                            "%s.",
1815                                            nkeep,
1816                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1817                                            nunused,
1818                                            free_space,
1819                                            empty_pages, empty_end_pages,
1820                                            fraged_pages->num_pages, usable_free_space,
1821                                            pg_rusage_show(&ru0))));
1822 }
1823
1824
1825 /*
1826  *      repair_frag() -- try to repair relation's fragmentation
1827  *
1828  *              This routine marks dead tuples as unused and tries re-use dead space
1829  *              by moving tuples (and inserting indexes if needed). It constructs
1830  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1831  *              for them after committing (in hack-manner - without losing locks
1832  *              and freeing memory!) current transaction. It truncates relation
1833  *              if some end-blocks are gone away.
1834  */
1835 static void
1836 repair_frag(VRelStats *vacrelstats, Relation onerel,
1837                         VacPageList vacuum_pages, VacPageList fraged_pages,
1838                         int nindexes, Relation *Irel)
1839 {
1840         TransactionId myXID = GetCurrentTransactionId();
1841         Buffer          dst_buffer = InvalidBuffer;
1842         BlockNumber nblocks,
1843                                 blkno;
1844         BlockNumber last_move_dest_block = 0,
1845                                 last_vacuum_block;
1846         Page            dst_page = NULL;
1847         ExecContextData ec;
1848         VacPageListData Nvacpagelist;
1849         VacPage         dst_vacpage = NULL,
1850                                 last_vacuum_page,
1851                                 vacpage,
1852                            *curpage;
1853         int                     i;
1854         int                     num_moved = 0,
1855                                 num_fraged_pages,
1856                                 vacuumed_pages;
1857         int                     keep_tuples = 0;
1858         int                     keep_indexed_tuples = 0;
1859         PGRUsage        ru0;
1860
1861         pg_rusage_init(&ru0);
1862
1863         ExecContext_Init(&ec, onerel);
1864
1865         Nvacpagelist.num_pages = 0;
1866         num_fraged_pages = fraged_pages->num_pages;
1867         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1868         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1869         if (vacuumed_pages > 0)
1870         {
1871                 /* get last reaped page from vacuum_pages */
1872                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1873                 last_vacuum_block = last_vacuum_page->blkno;
1874         }
1875         else
1876         {
1877                 last_vacuum_page = NULL;
1878                 last_vacuum_block = InvalidBlockNumber;
1879         }
1880
1881         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1882         vacpage->offsets_used = vacpage->offsets_free = 0;
1883
1884         /*
1885          * Scan pages backwards from the last nonempty page, trying to move tuples
1886          * down to lower pages.  Quit when we reach a page that we have moved any
1887          * tuples onto, or the first page if we haven't moved anything, or when we
1888          * find a page we cannot completely empty (this last condition is handled
1889          * by "break" statements within the loop).
1890          *
1891          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1892          * in order by blkno.
1893          */
1894         nblocks = vacrelstats->rel_pages;
1895         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1896                  blkno > last_move_dest_block;
1897                  blkno--)
1898         {
1899                 Buffer          buf;
1900                 Page            page;
1901                 OffsetNumber offnum,
1902                                         maxoff;
1903                 bool            isempty,
1904                                         chain_tuple_moved;
1905
1906                 vacuum_delay_point();
1907
1908                 /*
1909                  * Forget fraged_pages pages at or after this one; they're no longer
1910                  * useful as move targets, since we only want to move down. Note that
1911                  * since we stop the outer loop at last_move_dest_block, pages removed
1912                  * here cannot have had anything moved onto them already.
1913                  *
1914                  * Also note that we don't change the stored fraged_pages list, only
1915                  * our local variable num_fraged_pages; so the forgotten pages are
1916                  * still available to be loaded into the free space map later.
1917                  */
1918                 while (num_fraged_pages > 0 &&
1919                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1920                 {
1921                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1922                         --num_fraged_pages;
1923                 }
1924
1925                 /*
1926                  * Process this page of relation.
1927                  */
1928                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1929                 page = BufferGetPage(buf);
1930
1931                 vacpage->offsets_free = 0;
1932
1933                 isempty = PageIsEmpty(page);
1934
1935                 /* Is the page in the vacuum_pages list? */
1936                 if (blkno == last_vacuum_block)
1937                 {
1938                         if (last_vacuum_page->offsets_free > 0)
1939                         {
1940                                 /* there are dead tuples on this page - clean them */
1941                                 Assert(!isempty);
1942                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1943                                 vacuum_page(onerel, buf, last_vacuum_page);
1944                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1945                         }
1946                         else
1947                                 Assert(isempty);
1948                         --vacuumed_pages;
1949                         if (vacuumed_pages > 0)
1950                         {
1951                                 /* get prev reaped page from vacuum_pages */
1952                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1953                                 last_vacuum_block = last_vacuum_page->blkno;
1954                         }
1955                         else
1956                         {
1957                                 last_vacuum_page = NULL;
1958                                 last_vacuum_block = InvalidBlockNumber;
1959                         }
1960                         if (isempty)
1961                         {
1962                                 ReleaseBuffer(buf);
1963                                 continue;
1964                         }
1965                 }
1966                 else
1967                         Assert(!isempty);
1968
1969                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1970                                                                                  * this page, yet */
1971                 vacpage->blkno = blkno;
1972                 maxoff = PageGetMaxOffsetNumber(page);
1973                 for (offnum = FirstOffsetNumber;
1974                          offnum <= maxoff;
1975                          offnum = OffsetNumberNext(offnum))
1976                 {
1977                         Size            tuple_len;
1978                         HeapTupleData tuple;
1979                         ItemId          itemid = PageGetItemId(page, offnum);
1980
1981                         if (!ItemIdIsUsed(itemid))
1982                                 continue;
1983
1984                         if (ItemIdIsDead(itemid))
1985                         {
1986                                 /* just remember it for vacuum_page() */
1987                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1988                                 continue;
1989                         }
1990
1991                         /* Shouldn't have any redirected items now */
1992                         Assert(ItemIdIsNormal(itemid));
1993
1994                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1995                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1996                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1997
1998                         /* ---
1999                          * VACUUM FULL has an exclusive lock on the relation.  So
2000                          * normally no other transaction can have pending INSERTs or
2001                          * DELETEs in this relation.  A tuple is either:
2002                          *              (a) live (XMIN_COMMITTED)
2003                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
2004                          *                      is visible to all active transactions)
2005                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
2006                          *                      but at least one active transaction does not see the
2007                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
2008                          *              (d) moved by the currently running VACUUM
2009                          *              (e) inserted or deleted by a not yet committed transaction,
2010                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
2011                          * In case (e) we wouldn't be in repair_frag() at all, because
2012                          * scan_heap() detects those cases and shuts off shrinking.
2013                          * We can't see case (b) here either, because such tuples were
2014                          * already removed by vacuum_page().  Cases (a) and (c) are
2015                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
2016                          * possible if a whole tuple chain has been moved while
2017                          * processing this or a higher numbered block.
2018                          * ---
2019                          */
2020                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2021                         {
2022                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2023                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2024                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2025                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2026
2027                                 /*
2028                                  * MOVED_OFF by another VACUUM would have caused the
2029                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2030                                  */
2031                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2032                                         elog(ERROR, "invalid XVAC in tuple header");
2033
2034                                 /*
2035                                  * If this (chain) tuple is moved by me already then I have to
2036                                  * check is it in vacpage or not - i.e. is it moved while
2037                                  * cleaning this page or some previous one.
2038                                  */
2039
2040                                 /* Can't we Assert(keep_tuples > 0) here? */
2041                                 if (keep_tuples == 0)
2042                                         continue;
2043                                 if (chain_tuple_moved)
2044                                 {
2045                                         /* some chains were moved while cleaning this page */
2046                                         Assert(vacpage->offsets_free > 0);
2047                                         for (i = 0; i < vacpage->offsets_free; i++)
2048                                         {
2049                                                 if (vacpage->offsets[i] == offnum)
2050                                                         break;
2051                                         }
2052                                         if (i >= vacpage->offsets_free)         /* not found */
2053                                         {
2054                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2055
2056                                                 /*
2057                                                  * If this is not a heap-only tuple, there must be an
2058                                                  * index entry for this item which will be removed in
2059                                                  * the index cleanup. Decrement the
2060                                                  * keep_indexed_tuples count to remember this.
2061                                                  */
2062                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2063                                                         keep_indexed_tuples--;
2064                                                 keep_tuples--;
2065                                         }
2066                                 }
2067                                 else
2068                                 {
2069                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2070
2071                                         /*
2072                                          * If this is not a heap-only tuple, there must be an
2073                                          * index entry for this item which will be removed in the
2074                                          * index cleanup. Decrement the keep_indexed_tuples count
2075                                          * to remember this.
2076                                          */
2077                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2078                                                 keep_indexed_tuples--;
2079                                         keep_tuples--;
2080                                 }
2081                                 continue;
2082                         }
2083
2084                         /*
2085                          * If this tuple is in a chain of tuples created in updates by
2086                          * "recent" transactions then we have to move the whole chain of
2087                          * tuples to other places, so that we can write new t_ctid links
2088                          * that preserve the chain relationship.
2089                          *
2090                          * This test is complicated.  Read it as "if tuple is a recently
2091                          * created updated version, OR if it is an obsoleted version". (In
2092                          * the second half of the test, we needn't make any check on XMAX
2093                          * --- it must be recently obsoleted, else scan_heap would have
2094                          * deemed it removable.)
2095                          *
2096                          * NOTE: this test is not 100% accurate: it is possible for a
2097                          * tuple to be an updated one with recent xmin, and yet not match
2098                          * any new_tid entry in the vtlinks list.  Presumably there was
2099                          * once a parent tuple with xmax matching the xmin, but it's
2100                          * possible that that tuple has been removed --- for example, if
2101                          * it had xmin = xmax and wasn't itself an updated version, then
2102                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2103                          * xmin xact completes.
2104                          *
2105                          * To be on the safe side, we abandon the repair_frag process if
2106                          * we cannot find the parent tuple in vtlinks.  This may be overly
2107                          * conservative; AFAICS it would be safe to move the chain.
2108                          *
2109                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2110                          * using OldestXmin, which is a rather coarse test, it is quite
2111                          * possible to have an update chain in which a tuple we think is
2112                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2113                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2114                          * it seems too complicated to try to make VACUUM remove it. We
2115                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2116                          * separately movable chain, ignoring any intervening DEAD ones.
2117                          */
2118                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2119                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2120                                                                                 OldestXmin)) ||
2121                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2122                                                                                            HEAP_IS_LOCKED)) &&
2123                                  !(ItemPointerEquals(&(tuple.t_self),
2124                                                                          &(tuple.t_data->t_ctid)))))
2125                         {
2126                                 Buffer          Cbuf = buf;
2127                                 bool            freeCbuf = false;
2128                                 bool            chain_move_failed = false;
2129                                 bool            moved_target = false;
2130                                 ItemPointerData Ctid;
2131                                 HeapTupleData tp = tuple;
2132                                 Size            tlen = tuple_len;
2133                                 VTupleMove      vtmove;
2134                                 int                     num_vtmove;
2135                                 int                     free_vtmove;
2136                                 VacPage         to_vacpage = NULL;
2137                                 int                     to_item = 0;
2138                                 int                     ti;
2139
2140                                 if (dst_buffer != InvalidBuffer)
2141                                 {
2142                                         ReleaseBuffer(dst_buffer);
2143                                         dst_buffer = InvalidBuffer;
2144                                 }
2145
2146                                 /* Quick exit if we have no vtlinks to search in */
2147                                 if (vacrelstats->vtlinks == NULL)
2148                                 {
2149                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2150                                         break;          /* out of walk-along-page loop */
2151                                 }
2152
2153                                 /*
2154                                  * If this tuple is in the begin/middle of the chain then we
2155                                  * have to move to the end of chain.  As with any t_ctid
2156                                  * chase, we have to verify that each new tuple is really the
2157                                  * descendant of the tuple we came from; however, here we need
2158                                  * even more than the normal amount of paranoia. If t_ctid
2159                                  * links forward to a tuple determined to be DEAD, then
2160                                  * depending on where that tuple is, it might already have
2161                                  * been removed, and perhaps even replaced by a MOVED_IN
2162                                  * tuple.  We don't want to include any DEAD tuples in the
2163                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2164                                  */
2165                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2166                                                                                                   HEAP_IS_LOCKED)) &&
2167                                            !(ItemPointerEquals(&(tp.t_self),
2168                                                                                    &(tp.t_data->t_ctid))))
2169                                 {
2170                                         ItemPointerData nextTid;
2171                                         TransactionId priorXmax;
2172                                         Buffer          nextBuf;
2173                                         Page            nextPage;
2174                                         OffsetNumber nextOffnum;
2175                                         ItemId          nextItemid;
2176                                         HeapTupleHeader nextTdata;
2177                                         HTSV_Result nextTstatus;
2178
2179                                         nextTid = tp.t_data->t_ctid;
2180                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2181                                         /* assume block# is OK (see heap_fetch comments) */
2182                                         nextBuf = ReadBufferWithStrategy(onerel,
2183                                                                                  ItemPointerGetBlockNumber(&nextTid),
2184                                                                                                          vac_strategy);
2185                                         nextPage = BufferGetPage(nextBuf);
2186                                         /* If bogus or unused slot, assume tp is end of chain */
2187                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2188                                         if (nextOffnum < FirstOffsetNumber ||
2189                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2190                                         {
2191                                                 ReleaseBuffer(nextBuf);
2192                                                 break;
2193                                         }
2194                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2195                                         if (!ItemIdIsNormal(nextItemid))
2196                                         {
2197                                                 ReleaseBuffer(nextBuf);
2198                                                 break;
2199                                         }
2200                                         /* if not matching XMIN, assume tp is end of chain */
2201                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2202                                                                                                                           nextItemid);
2203                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2204                                                                                          priorXmax))
2205                                         {
2206                                                 ReleaseBuffer(nextBuf);
2207                                                 break;
2208                                         }
2209
2210                                         /*
2211                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2212                                          * potentially update hint bits, so we'd better hold the
2213                                          * buffer content lock.
2214                                          */
2215                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2216                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2217                                                                                                                    OldestXmin,
2218                                                                                                                    nextBuf);
2219                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2220                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2221                                         {
2222                                                 UnlockReleaseBuffer(nextBuf);
2223                                                 break;
2224                                         }
2225                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2226                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2227                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2228                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2229                                         /* OK, switch our attention to the next tuple in chain */
2230                                         tp.t_data = nextTdata;
2231                                         tp.t_self = nextTid;
2232                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2233                                         if (freeCbuf)
2234                                                 ReleaseBuffer(Cbuf);
2235                                         Cbuf = nextBuf;
2236                                         freeCbuf = true;
2237                                 }
2238
2239                                 /* Set up workspace for planning the chain move */
2240                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2241                                 num_vtmove = 0;
2242                                 free_vtmove = 100;
2243
2244                                 /*
2245                                  * Now, walk backwards up the chain (towards older tuples) and
2246                                  * check if all items in chain can be moved.  We record all
2247                                  * the moves that need to be made in the vtmove array.
2248                                  */
2249                                 for (;;)
2250                                 {
2251                                         Buffer          Pbuf;
2252                                         Page            Ppage;
2253                                         ItemId          Pitemid;
2254                                         HeapTupleHeader PTdata;
2255                                         VTupleLinkData vtld,
2256                                                            *vtlp;
2257
2258                                         /* Identify a target page to move this tuple to */
2259                                         if (to_vacpage == NULL ||
2260                                                 !enough_space(to_vacpage, tlen))
2261                                         {
2262                                                 for (i = 0; i < num_fraged_pages; i++)
2263                                                 {
2264                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2265                                                                 break;
2266                                                 }
2267
2268                                                 if (i == num_fraged_pages)
2269                                                 {
2270                                                         /* can't move item anywhere */
2271                                                         chain_move_failed = true;
2272                                                         break;          /* out of check-all-items loop */
2273                                                 }
2274                                                 to_item = i;
2275                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2276                                         }
2277                                         to_vacpage->free -= MAXALIGN(tlen);
2278                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2279                                                 to_vacpage->free -= sizeof(ItemIdData);
2280                                         (to_vacpage->offsets_used)++;
2281
2282                                         /* Add an entry to vtmove list */
2283                                         if (free_vtmove == 0)
2284                                         {
2285                                                 free_vtmove = 1000;
2286                                                 vtmove = (VTupleMove)
2287                                                         repalloc(vtmove,
2288                                                                          (free_vtmove + num_vtmove) *
2289                                                                          sizeof(VTupleMoveData));
2290                                         }
2291                                         vtmove[num_vtmove].tid = tp.t_self;
2292                                         vtmove[num_vtmove].vacpage = to_vacpage;
2293                                         if (to_vacpage->offsets_used == 1)
2294                                                 vtmove[num_vtmove].cleanVpd = true;
2295                                         else
2296                                                 vtmove[num_vtmove].cleanVpd = false;
2297                                         free_vtmove--;
2298                                         num_vtmove++;
2299
2300                                         /* Remember if we reached the original target tuple */
2301                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2302                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2303                                                 moved_target = true;
2304
2305                                         /* Done if at beginning of chain */
2306                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2307                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2308                                                                                    OldestXmin))
2309                                                 break;  /* out of check-all-items loop */
2310
2311                                         /* Move to tuple with prior row version */
2312                                         vtld.new_tid = tp.t_self;
2313                                         vtlp = (VTupleLink)
2314                                                 vac_bsearch((void *) &vtld,
2315                                                                         (void *) (vacrelstats->vtlinks),
2316                                                                         vacrelstats->num_vtlinks,
2317                                                                         sizeof(VTupleLinkData),
2318                                                                         vac_cmp_vtlinks);
2319                                         if (vtlp == NULL)
2320                                         {
2321                                                 /* see discussion above */
2322                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2323                                                 chain_move_failed = true;
2324                                                 break;  /* out of check-all-items loop */
2325                                         }
2326                                         tp.t_self = vtlp->this_tid;
2327                                         Pbuf = ReadBufferWithStrategy(onerel,
2328                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2329                                                                                                   vac_strategy);
2330                                         Ppage = BufferGetPage(Pbuf);
2331                                         Pitemid = PageGetItemId(Ppage,
2332                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2333                                         /* this can't happen since we saw tuple earlier: */
2334                                         if (!ItemIdIsNormal(Pitemid))
2335                                                 elog(ERROR, "parent itemid marked as unused");
2336                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2337
2338                                         /* ctid should not have changed since we saved it */
2339                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2340                                                                                          &(PTdata->t_ctid)));
2341
2342                                         /*
2343                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2344                                          * (child item is removed)... Due to the fact that at the
2345                                          * moment we don't remove unuseful part of update-chain,
2346                                          * it's possible to get non-matching parent row here. Like
2347                                          * as in the case which caused this problem, we stop
2348                                          * shrinking here. I could try to find real parent row but
2349                                          * want not to do it because of real solution will be
2350                                          * implemented anyway, later, and we are too close to 6.5
2351                                          * release. - vadim 06/11/99
2352                                          */
2353                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2354                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2355                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2356                                         {
2357                                                 ReleaseBuffer(Pbuf);
2358                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2359                                                 chain_move_failed = true;
2360                                                 break;  /* out of check-all-items loop */
2361                                         }
2362                                         tp.t_data = PTdata;
2363                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2364                                         if (freeCbuf)
2365                                                 ReleaseBuffer(Cbuf);
2366                                         Cbuf = Pbuf;
2367                                         freeCbuf = true;
2368                                 }                               /* end of check-all-items loop */
2369
2370                                 if (freeCbuf)
2371                                         ReleaseBuffer(Cbuf);
2372                                 freeCbuf = false;
2373
2374                                 /* Double-check that we will move the current target tuple */
2375                                 if (!moved_target && !chain_move_failed)
2376                                 {
2377                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2378                                         chain_move_failed = true;
2379                                 }
2380
2381                                 if (chain_move_failed)
2382                                 {
2383                                         /*
2384                                          * Undo changes to offsets_used state.  We don't bother
2385                                          * cleaning up the amount-free state, since we're not
2386                                          * going to do any further tuple motion.
2387                                          */
2388                                         for (i = 0; i < num_vtmove; i++)
2389                                         {
2390                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2391                                                 (vtmove[i].vacpage->offsets_used)--;
2392                                         }
2393                                         pfree(vtmove);
2394                                         break;          /* out of walk-along-page loop */
2395                                 }
2396
2397                                 /*
2398                                  * Okay, move the whole tuple chain in reverse order.
2399                                  *
2400                                  * Ctid tracks the new location of the previously-moved tuple.
2401                                  */
2402                                 ItemPointerSetInvalid(&Ctid);
2403                                 for (ti = 0; ti < num_vtmove; ti++)
2404                                 {
2405                                         VacPage         destvacpage = vtmove[ti].vacpage;
2406                                         Page            Cpage;
2407                                         ItemId          Citemid;
2408
2409                                         /* Get page to move from */
2410                                         tuple.t_self = vtmove[ti].tid;
2411                                         Cbuf = ReadBufferWithStrategy(onerel,
2412                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2413                                                                                                   vac_strategy);
2414
2415                                         /* Get page to move to */
2416                                         dst_buffer = ReadBufferWithStrategy(onerel,
2417                                                                                                                 destvacpage->blkno,
2418                                                                                                                 vac_strategy);
2419
2420                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2421                                         if (dst_buffer != Cbuf)
2422                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2423
2424                                         dst_page = BufferGetPage(dst_buffer);
2425                                         Cpage = BufferGetPage(Cbuf);
2426
2427                                         Citemid = PageGetItemId(Cpage,
2428                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2429                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2430                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2431
2432                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2433                                                                          dst_buffer, dst_page, destvacpage,
2434                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2435
2436                                         /*
2437                                          * If the tuple we are moving is a heap-only tuple, this
2438                                          * move will generate an additional index entry, so
2439                                          * increment the rel_indexed_tuples count.
2440                                          */
2441                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2442                                                 vacrelstats->rel_indexed_tuples++;
2443
2444                                         num_moved++;
2445                                         if (destvacpage->blkno > last_move_dest_block)
2446                                                 last_move_dest_block = destvacpage->blkno;
2447
2448                                         /*
2449                                          * Remember that we moved tuple from the current page
2450                                          * (corresponding index tuple will be cleaned).
2451                                          */
2452                                         if (Cbuf == buf)
2453                                                 vacpage->offsets[vacpage->offsets_free++] =
2454                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2455                                         else
2456                                         {
2457                                                 /*
2458                                                  * When we move tuple chains, we may need to move
2459                                                  * tuples from a block that we haven't yet scanned in
2460                                                  * the outer walk-along-the-relation loop. Note that
2461                                                  * we can't be moving a tuple from a block that we
2462                                                  * have already scanned because if such a tuple
2463                                                  * exists, then we must have moved the chain along
2464                                                  * with that tuple when we scanned that block. IOW the
2465                                                  * test of (Cbuf != buf) guarantees that the tuple we
2466                                                  * are looking at right now is in a block which is yet
2467                                                  * to be scanned.
2468                                                  *
2469                                                  * We maintain two counters to correctly count the
2470                                                  * moved-off tuples from blocks that are not yet
2471                                                  * scanned (keep_tuples) and how many of them have
2472                                                  * index pointers (keep_indexed_tuples).  The main
2473                                                  * reason to track the latter is to help verify that
2474                                                  * indexes have the expected number of entries when
2475                                                  * all the dust settles.
2476                                                  */
2477                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2478                                                         keep_indexed_tuples++;
2479                                                 keep_tuples++;
2480                                         }
2481
2482                                         ReleaseBuffer(dst_buffer);
2483                                         ReleaseBuffer(Cbuf);
2484                                 }                               /* end of move-the-tuple-chain loop */
2485
2486                                 dst_buffer = InvalidBuffer;
2487                                 pfree(vtmove);
2488                                 chain_tuple_moved = true;
2489
2490                                 /* advance to next tuple in walk-along-page loop */
2491                                 continue;
2492                         }                                       /* end of is-tuple-in-chain test */
2493
2494                         /* try to find new page for this tuple */
2495                         if (dst_buffer == InvalidBuffer ||
2496                                 !enough_space(dst_vacpage, tuple_len))
2497                         {
2498                                 if (dst_buffer != InvalidBuffer)
2499                                 {
2500                                         ReleaseBuffer(dst_buffer);
2501                                         dst_buffer = InvalidBuffer;
2502                                 }
2503                                 for (i = 0; i < num_fraged_pages; i++)
2504                                 {
2505                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2506                                                 break;
2507                                 }
2508                                 if (i == num_fraged_pages)
2509                                         break;          /* can't move item anywhere */
2510                                 dst_vacpage = fraged_pages->pagedesc[i];
2511                                 dst_buffer = ReadBufferWithStrategy(onerel,
2512                                                                                                         dst_vacpage->blkno,
2513                                                                                                         vac_strategy);
2514                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2515                                 dst_page = BufferGetPage(dst_buffer);
2516                                 /* if this page was not used before - clean it */
2517                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2518                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2519                         }
2520                         else
2521                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2522
2523                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2524
2525                         move_plain_tuple(onerel, buf, page, &tuple,
2526                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2527
2528                         /*
2529                          * If the tuple we are moving is a heap-only tuple, this move will
2530                          * generate an additional index entry, so increment the
2531                          * rel_indexed_tuples count.
2532                          */
2533                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2534                                 vacrelstats->rel_indexed_tuples++;
2535
2536                         num_moved++;
2537                         if (dst_vacpage->blkno > last_move_dest_block)
2538                                 last_move_dest_block = dst_vacpage->blkno;
2539
2540                         /*
2541                          * Remember that we moved tuple from the current page
2542                          * (corresponding index tuple will be cleaned).
2543                          */
2544                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2545                 }                                               /* walk along page */
2546
2547                 /*
2548                  * If we broke out of the walk-along-page loop early (ie, still have
2549                  * offnum <= maxoff), then we failed to move some tuple off this page.
2550                  * No point in shrinking any more, so clean up and exit the per-page
2551                  * loop.
2552                  */
2553                 if (offnum < maxoff && keep_tuples > 0)
2554                 {
2555                         OffsetNumber off;
2556
2557                         /*
2558                          * Fix vacpage state for any unvisited tuples remaining on page
2559                          */
2560                         for (off = OffsetNumberNext(offnum);
2561                                  off <= maxoff;
2562                                  off = OffsetNumberNext(off))
2563                         {
2564                                 ItemId          itemid = PageGetItemId(page, off);
2565                                 HeapTupleHeader htup;
2566
2567                                 if (!ItemIdIsUsed(itemid))
2568                                         continue;
2569                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2570                                 Assert(ItemIdIsNormal(itemid));
2571
2572                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2573                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2574                                         continue;
2575
2576                                 /*
2577                                  * See comments in the walk-along-page loop above about why
2578                                  * only MOVED_OFF tuples should be found here.
2579                                  */
2580                                 if (htup->t_infomask & HEAP_MOVED_IN)
2581                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2582                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2583                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2584                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2585                                         elog(ERROR, "invalid XVAC in tuple header");
2586
2587                                 if (chain_tuple_moved)
2588                                 {
2589                                         /* some chains were moved while cleaning this page */
2590                                         Assert(vacpage->offsets_free > 0);
2591                                         for (i = 0; i < vacpage->offsets_free; i++)
2592                                         {
2593                                                 if (vacpage->offsets[i] == off)
2594                                                         break;
2595                                         }
2596                                         if (i >= vacpage->offsets_free)         /* not found */
2597                                         {
2598                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2599                                                 Assert(keep_tuples > 0);
2600
2601                                                 /*
2602                                                  * If this is not a heap-only tuple, there must be an
2603                                                  * index entry for this item which will be removed in
2604                                                  * the index cleanup. Decrement the
2605                                                  * keep_indexed_tuples count to remember this.
2606                                                  */
2607                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2608                                                         keep_indexed_tuples--;
2609                                                 keep_tuples--;
2610                                         }
2611                                 }
2612                                 else
2613                                 {
2614                                         vacpage->offsets[vacpage->offsets_free++] = off;
2615                                         Assert(keep_tuples > 0);
2616                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2617                                                 keep_indexed_tuples--;
2618                                         keep_tuples--;
2619                                 }
2620                         }
2621                 }
2622
2623                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2624                 {
2625                         if (chain_tuple_moved)          /* else - they are ordered */
2626                         {
2627                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2628                                           sizeof(OffsetNumber), vac_cmp_offno);
2629                         }
2630                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2631                 }
2632
2633                 ReleaseBuffer(buf);
2634
2635                 if (offnum <= maxoff)
2636                         break;                          /* had to quit early, see above note */
2637
2638         }                                                       /* walk along relation */
2639
2640         blkno++;                                        /* new number of blocks */
2641
2642         if (dst_buffer != InvalidBuffer)
2643         {
2644                 Assert(num_moved > 0);
2645                 ReleaseBuffer(dst_buffer);
2646         }
2647
2648         if (num_moved > 0)
2649         {
2650                 /*
2651                  * We have to commit our tuple movings before we truncate the
2652                  * relation.  Ideally we should do Commit/StartTransactionCommand
2653                  * here, relying on the session-level table lock to protect our
2654                  * exclusive access to the relation.  However, that would require a
2655                  * lot of extra code to close and re-open the relation, indexes, etc.
2656                  * For now, a quick hack: record status of current transaction as
2657                  * committed, and continue.  We force the commit to be synchronous so
2658                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2659                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2660                  * continues to be shown as running in the ProcArray.
2661                  *
2662                  * XXX This desperately needs to be revisited.  Any failure after this
2663                  * point will result in a PANIC "cannot abort transaction nnn, it was
2664                  * already committed"!
2665                  */
2666                 ForceSyncCommit();
2667                 (void) RecordTransactionCommit();
2668         }
2669
2670         /*
2671          * We are not going to move any more tuples across pages, but we still
2672          * need to apply vacuum_page to compact free space in the remaining pages
2673          * in vacuum_pages list.  Note that some of these pages may also be in the
2674          * fraged_pages list, and may have had tuples moved onto them; if so, we
2675          * already did vacuum_page and needn't do it again.
2676          */
2677         for (i = 0, curpage = vacuum_pages->pagedesc;
2678                  i < vacuumed_pages;
2679                  i++, curpage++)
2680         {
2681                 vacuum_delay_point();
2682
2683                 Assert((*curpage)->blkno < blkno);
2684                 if ((*curpage)->offsets_used == 0)
2685                 {
2686                         Buffer          buf;
2687                         Page            page;
2688
2689                         /* this page was not used as a move target, so must clean it */
2690                         buf = ReadBufferWithStrategy(onerel,
2691                                                                                  (*curpage)->blkno,
2692                                                                                  vac_strategy);
2693                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2694                         page = BufferGetPage(buf);
2695                         if (!PageIsEmpty(page))
2696                                 vacuum_page(onerel, buf, *curpage);
2697                         UnlockReleaseBuffer(buf);
2698                 }
2699         }
2700
2701         /*
2702          * Now scan all the pages that we moved tuples onto and update tuple
2703          * status bits.  This is not really necessary, but will save time for
2704          * future transactions examining these tuples.
2705          */
2706         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2707                                          last_move_dest_block, num_moved);
2708
2709         /*
2710          * It'd be cleaner to make this report at the bottom of this routine, but
2711          * then the rusage would double-count the second pass of index vacuuming.
2712          * So do it here and ignore the relatively small amount of processing that
2713          * occurs below.
2714          */
2715         ereport(elevel,
2716                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2717                                         RelationGetRelationName(onerel),
2718                                         num_moved, nblocks, blkno),
2719                          errdetail("%s.",
2720                                            pg_rusage_show(&ru0))));
2721
2722         /*
2723          * Reflect the motion of system tuples to catalog cache here.
2724          */
2725         CommandCounterIncrement();
2726
2727         if (Nvacpagelist.num_pages > 0)
2728         {
2729                 /* vacuum indexes again if needed */
2730                 if (Irel != NULL)
2731                 {
2732                         VacPage    *vpleft,
2733                                            *vpright,
2734                                                 vpsave;
2735
2736                         /* re-sort Nvacpagelist.pagedesc */
2737                         for (vpleft = Nvacpagelist.pagedesc,
2738                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2739                                  vpleft < vpright; vpleft++, vpright--)
2740                         {
2741                                 vpsave = *vpleft;
2742                                 *vpleft = *vpright;
2743                                 *vpright = vpsave;
2744                         }
2745
2746                         /*
2747                          * keep_tuples is the number of tuples that have been moved off a
2748                          * page during chain moves but not been scanned over subsequently.
2749                          * The tuple ids of these tuples are not recorded as free offsets
2750                          * for any VacPage, so they will not be cleared from the indexes.
2751                          * keep_indexed_tuples is the portion of these that are expected
2752                          * to have index entries.
2753                          */
2754                         Assert(keep_tuples >= 0);
2755                         for (i = 0; i < nindexes; i++)
2756                                 vacuum_index(&Nvacpagelist, Irel[i],
2757                                                          vacrelstats->rel_indexed_tuples,
2758                                                          keep_indexed_tuples);
2759                 }
2760
2761                 /*
2762                  * Clean moved-off tuples from last page in Nvacpagelist list.
2763                  *
2764                  * We need only do this in this one page, because higher-numbered
2765                  * pages are going to be truncated from the relation entirely. But see
2766                  * comments for update_hint_bits().
2767                  */
2768                 if (vacpage->blkno == (blkno - 1) &&
2769                         vacpage->offsets_free > 0)
2770                 {
2771                         Buffer          buf;
2772                         Page            page;
2773                         OffsetNumber unused[MaxOffsetNumber];
2774                         OffsetNumber offnum,
2775                                                 maxoff;
2776                         int                     uncnt = 0;
2777                         int                     num_tuples = 0;
2778
2779                         buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
2780                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2781                         page = BufferGetPage(buf);
2782                         maxoff = PageGetMaxOffsetNumber(page);
2783                         for (offnum = FirstOffsetNumber;
2784                                  offnum <= maxoff;
2785                                  offnum = OffsetNumberNext(offnum))
2786                         {
2787                                 ItemId          itemid = PageGetItemId(page, offnum);
2788                                 HeapTupleHeader htup;
2789
2790                                 if (!ItemIdIsUsed(itemid))
2791                                         continue;
2792                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2793                                 Assert(ItemIdIsNormal(itemid));
2794
2795                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2796                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2797                                         continue;
2798
2799                                 /*
2800                                  * See comments in the walk-along-page loop above about why
2801                                  * only MOVED_OFF tuples should be found here.
2802                                  */
2803                                 if (htup->t_infomask & HEAP_MOVED_IN)
2804                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2805                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2806                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2807                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2808                                         elog(ERROR, "invalid XVAC in tuple header");
2809
2810                                 ItemIdSetUnused(itemid);
2811                                 num_tuples++;
2812
2813                                 unused[uncnt++] = offnum;
2814                         }
2815                         Assert(vacpage->offsets_free == num_tuples);
2816
2817                         START_CRIT_SECTION();
2818
2819                         PageRepairFragmentation(page);
2820
2821                         MarkBufferDirty(buf);
2822
2823                         /* XLOG stuff */
2824                         if (!onerel->rd_istemp)
2825                         {
2826                                 XLogRecPtr      recptr;
2827
2828                                 recptr = log_heap_clean(onerel, buf,
2829                                                                                 NULL, 0, NULL, 0,
2830                                                                                 unused, uncnt,
2831                                                                                 false);
2832                                 PageSetLSN(page, recptr);
2833                                 PageSetTLI(page, ThisTimeLineID);
2834                         }
2835
2836                         END_CRIT_SECTION();
2837
2838                         UnlockReleaseBuffer(buf);
2839                 }
2840
2841                 /* now - free new list of reaped pages */
2842                 curpage = Nvacpagelist.pagedesc;
2843                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2844                         pfree(*curpage);
2845                 pfree(Nvacpagelist.pagedesc);
2846         }
2847
2848         /* Truncate relation, if needed */
2849         if (blkno < nblocks)
2850         {
2851                 RelationTruncate(onerel, blkno);
2852                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2853         }
2854
2855         /* clean up */
2856         pfree(vacpage);
2857         if (vacrelstats->vtlinks != NULL)
2858                 pfree(vacrelstats->vtlinks);
2859
2860         ExecContext_Finish(&ec);
2861 }
2862
2863 /*
2864  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2865  *
2866  *              This routine moves old_tup from old_page to dst_page.
2867  *              old_page and dst_page might be the same page.
2868  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2869  *              the single lock, if this is a intra-page-move) are released before
2870  *              exit.
2871  *
2872  *              Yes, a routine with ten parameters is ugly, but it's still better
2873  *              than having these 120 lines of code in repair_frag() which is
2874  *              already too long and almost unreadable.
2875  */
2876 static void
2877 move_chain_tuple(Relation rel,
2878                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2879                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2880                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2881 {
2882         TransactionId myXID = GetCurrentTransactionId();
2883         HeapTupleData newtup;
2884         OffsetNumber newoff;
2885         ItemId          newitemid;
2886         Size            tuple_len = old_tup->t_len;
2887
2888         /*
2889          * make a modifiable copy of the source tuple.
2890          */
2891         heap_copytuple_with_tuple(old_tup, &newtup);
2892
2893         /*
2894          * register invalidation of source tuple in catcaches.
2895          */
2896         CacheInvalidateHeapTuple(rel, old_tup);
2897
2898         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2899         START_CRIT_SECTION();
2900
2901         /*
2902          * mark the source tuple MOVED_OFF.
2903          */
2904         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2905                                                                          HEAP_XMIN_INVALID |
2906                                                                          HEAP_MOVED_IN);
2907         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2908         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2909
2910         /*
2911          * If this page was not used before - clean it.
2912          *
2913          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2914          * destination pages to be the same (since this tuple-chain member can be
2915          * on a page lower than the one we're currently processing in the outer
2916          * loop).  If that's true, then after vacuum_page() the source tuple will
2917          * have been moved, and tuple.t_data will be pointing at garbage.
2918          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2919          * step!!
2920          *
2921          * This path is different from the other callers of vacuum_page, because
2922          * we have already incremented the vacpage's offsets_used field to account
2923          * for the tuple(s) we expect to move onto the page. Therefore
2924          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2925          * good debugging check for all other callers, we work around it here
2926          * rather than remove it.
2927          */
2928         if (!PageIsEmpty(dst_page) && cleanVpd)
2929         {
2930                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2931
2932                 dst_vacpage->offsets_used = 0;
2933                 vacuum_page(rel, dst_buf, dst_vacpage);
2934                 dst_vacpage->offsets_used = sv_offsets_used;
2935         }
2936
2937         /*
2938          * Update the state of the copied tuple, and store it on the destination
2939          * page.  The copied tuple is never part of a HOT chain.
2940          */
2941         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2942                                                                    HEAP_XMIN_INVALID |
2943                                                                    HEAP_MOVED_OFF);
2944         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2945         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2946         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2947         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2948         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2949                                                  InvalidOffsetNumber, false, true);
2950         if (newoff == InvalidOffsetNumber)
2951                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2952                          (unsigned long) tuple_len, dst_vacpage->blkno);
2953         newitemid = PageGetItemId(dst_page, newoff);
2954         /* drop temporary copy, and point to the version on the dest page */
2955         pfree(newtup.t_data);
2956         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2957
2958         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2959
2960         /*
2961          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2962          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2963          * order, this is actually the previously processed tuple.)
2964          */
2965         if (!ItemPointerIsValid(ctid))
2966                 newtup.t_data->t_ctid = newtup.t_self;
2967         else
2968                 newtup.t_data->t_ctid = *ctid;
2969         *ctid = newtup.t_self;
2970
2971         MarkBufferDirty(dst_buf);
2972         if (dst_buf != old_buf)
2973                 MarkBufferDirty(old_buf);
2974
2975         /* XLOG stuff */
2976         if (!rel->rd_istemp)
2977         {
2978                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2979                                                                                    dst_buf, &newtup);
2980
2981                 if (old_buf != dst_buf)
2982                 {
2983                         PageSetLSN(old_page, recptr);
2984                         PageSetTLI(old_page, ThisTimeLineID);
2985                 }
2986                 PageSetLSN(dst_page, recptr);
2987                 PageSetTLI(dst_page, ThisTimeLineID);
2988         }
2989
2990         END_CRIT_SECTION();
2991
2992         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2993         if (dst_buf != old_buf)
2994                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2995
2996         /* Create index entries for the moved tuple */
2997         if (ec->resultRelInfo->ri_NumIndices > 0)
2998         {
2999                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3000                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3001                 ResetPerTupleExprContext(ec->estate);
3002         }
3003 }
3004
3005 /*
3006  *      move_plain_tuple() -- move one tuple that is not part of a chain
3007  *
3008  *              This routine moves old_tup from old_page to dst_page.
3009  *              On entry old_buf and dst_buf are locked exclusively, both locks are
3010  *              released before exit.
3011  *
3012  *              Yes, a routine with eight parameters is ugly, but it's still better
3013  *              than having these 90 lines of code in repair_frag() which is already
3014  *              too long and almost unreadable.
3015  */
3016 static void
3017 move_plain_tuple(Relation rel,
3018                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3019                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3020                                  ExecContext ec)
3021 {
3022         TransactionId myXID = GetCurrentTransactionId();
3023         HeapTupleData newtup;
3024         OffsetNumber newoff;
3025         ItemId          newitemid;
3026         Size            tuple_len = old_tup->t_len;
3027
3028         /* copy tuple */
3029         heap_copytuple_with_tuple(old_tup, &newtup);
3030
3031         /*
3032          * register invalidation of source tuple in catcaches.
3033          *
3034          * (Note: we do not need to register the copied tuple, because we are not
3035          * changing the tuple contents and so there cannot be any need to flush
3036          * negative catcache entries.)
3037          */
3038         CacheInvalidateHeapTuple(rel, old_tup);
3039
3040         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3041         START_CRIT_SECTION();
3042
3043         /*
3044          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3045          */
3046         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3047                                                                    HEAP_XMIN_INVALID |
3048                                                                    HEAP_MOVED_OFF);
3049         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3050         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3051         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3052         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3053
3054         /* add tuple to the page */
3055         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3056                                                  InvalidOffsetNumber, false, true);
3057         if (newoff == InvalidOffsetNumber)
3058                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3059                          (unsigned long) tuple_len,
3060                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3061                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3062         newitemid = PageGetItemId(dst_page, newoff);
3063         pfree(newtup.t_data);
3064         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3065         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3066         newtup.t_self = newtup.t_data->t_ctid;
3067
3068         /*
3069          * Mark old tuple as MOVED_OFF by me.
3070          */
3071         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3072                                                                          HEAP_XMIN_INVALID |
3073                                                                          HEAP_MOVED_IN);
3074         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3075         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3076
3077         MarkBufferDirty(dst_buf);
3078         MarkBufferDirty(old_buf);
3079
3080         /* XLOG stuff */
3081         if (!rel->rd_istemp)
3082         {
3083                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3084                                                                                    dst_buf, &newtup);
3085
3086                 PageSetLSN(old_page, recptr);
3087                 PageSetTLI(old_page, ThisTimeLineID);
3088                 PageSetLSN(dst_page, recptr);
3089                 PageSetTLI(dst_page, ThisTimeLineID);
3090         }
3091
3092         END_CRIT_SECTION();
3093
3094         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3095         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3096         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3097
3098         dst_vacpage->offsets_used++;
3099
3100         /* insert index' tuples if needed */
3101         if (ec->resultRelInfo->ri_NumIndices > 0)
3102         {
3103                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3104                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3105                 ResetPerTupleExprContext(ec->estate);
3106         }
3107 }
3108
3109 /*
3110  *      update_hint_bits() -- update hint bits in destination pages
3111  *
3112  * Scan all the pages that we moved tuples onto and update tuple status bits.
3113  * This is not really necessary, but it will save time for future transactions
3114  * examining these tuples.
3115  *
3116  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3117  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3118  *
3119  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3120  * pages that were move source pages but not move dest pages.  The bulk
3121  * of the move source pages will be physically truncated from the relation,
3122  * and the last page remaining in the rel will be fixed separately in
3123  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3124  * hint bits updated are tuples that are moved as part of a chain and were
3125  * on pages that were not either move destinations nor at the end of the rel.
3126  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3127  * to remember and revisit those pages too.
3128  *
3129  * One wonders whether it wouldn't be better to skip this work entirely,
3130  * and let the tuple status updates happen someplace that's not holding an
3131  * exclusive lock on the relation.
3132  */
3133 static void
3134 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3135                                  BlockNumber last_move_dest_block, int num_moved)
3136 {
3137         TransactionId myXID = GetCurrentTransactionId();
3138         int                     checked_moved = 0;
3139         int                     i;
3140         VacPage    *curpage;
3141
3142         for (i = 0, curpage = fraged_pages->pagedesc;
3143                  i < num_fraged_pages;
3144                  i++, curpage++)
3145         {
3146                 Buffer          buf;
3147                 Page            page;
3148                 OffsetNumber max_offset;
3149                 OffsetNumber off;
3150                 int                     num_tuples = 0;
3151
3152                 vacuum_delay_point();
3153
3154                 if ((*curpage)->blkno > last_move_dest_block)
3155                         break;                          /* no need to scan any further */
3156                 if ((*curpage)->offsets_used == 0)
3157                         continue;                       /* this page was never used as a move dest */
3158                 buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
3159                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3160                 page = BufferGetPage(buf);
3161                 max_offset = PageGetMaxOffsetNumber(page);
3162                 for (off = FirstOffsetNumber;
3163                          off <= max_offset;
3164                          off = OffsetNumberNext(off))
3165                 {
3166                         ItemId          itemid = PageGetItemId(page, off);
3167                         HeapTupleHeader htup;
3168
3169                         if (!ItemIdIsUsed(itemid))
3170                                 continue;
3171                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3172                         Assert(ItemIdIsNormal(itemid));
3173
3174                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3175                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3176                                 continue;
3177
3178                         /*
3179                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3180                          */
3181                         if (!(htup->t_infomask & HEAP_MOVED))
3182                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3183                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3184                                 elog(ERROR, "invalid XVAC in tuple header");
3185
3186                         if (htup->t_infomask & HEAP_MOVED_IN)
3187                         {
3188                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3189                                 htup->t_infomask &= ~HEAP_MOVED;
3190                                 num_tuples++;
3191                         }
3192                         else
3193                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3194                 }
3195                 MarkBufferDirty(buf);
3196                 UnlockReleaseBuffer(buf);
3197                 Assert((*curpage)->offsets_used == num_tuples);
3198                 checked_moved += num_tuples;
3199         }
3200         Assert(num_moved == checked_moved);
3201 }
3202
3203 /*
3204  *      vacuum_heap() -- free dead tuples
3205  *
3206  *              This routine marks dead tuples as unused and truncates relation
3207  *              if there are "empty" end-blocks.
3208  */
3209 static void
3210 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3211 {
3212         Buffer          buf;
3213         VacPage    *vacpage;
3214         BlockNumber relblocks;
3215         int                     nblocks;
3216         int                     i;
3217
3218         nblocks = vacuum_pages->num_pages;
3219         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3220
3221         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3222         {
3223                 vacuum_delay_point();
3224
3225                 if ((*vacpage)->offsets_free > 0)
3226                 {
3227                         buf = ReadBufferWithStrategy(onerel,
3228                                                                                  (*vacpage)->blkno,
3229                                                                                  vac_strategy);
3230                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3231                         vacuum_page(onerel, buf, *vacpage);
3232                         UnlockReleaseBuffer(buf);
3233                 }
3234         }
3235
3236         /* Truncate relation if there are some empty end-pages */
3237         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3238         if (vacuum_pages->empty_end_pages > 0)
3239         {
3240                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3241                 ereport(elevel,
3242                                 (errmsg("\"%s\": truncated %u to %u pages",
3243                                                 RelationGetRelationName(onerel),
3244                                                 vacrelstats->rel_pages, relblocks)));
3245                 RelationTruncate(onerel, relblocks);
3246                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3247         }
3248 }
3249
3250 /*
3251  *      vacuum_page() -- free dead tuples on a page
3252  *                                       and repair its fragmentation.
3253  *
3254  * Caller must hold pin and lock on buffer.
3255  */
3256 static void
3257 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3258 {
3259         Page            page = BufferGetPage(buffer);
3260         int                     i;
3261
3262         /* There shouldn't be any tuples moved onto the page yet! */
3263         Assert(vacpage->offsets_used == 0);
3264
3265         START_CRIT_SECTION();
3266
3267         for (i = 0; i < vacpage->offsets_free; i++)
3268         {
3269                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3270
3271                 ItemIdSetUnused(itemid);
3272         }
3273
3274         PageRepairFragmentation(page);
3275
3276         MarkBufferDirty(buffer);
3277
3278         /* XLOG stuff */
3279         if (!onerel->rd_istemp)
3280         {
3281                 XLogRecPtr      recptr;
3282
3283                 recptr = log_heap_clean(onerel, buffer,
3284                                                                 NULL, 0, NULL, 0,
3285                                                                 vacpage->offsets, vacpage->offsets_free,
3286                                                                 false);
3287                 PageSetLSN(page, recptr);
3288                 PageSetTLI(page, ThisTimeLineID);
3289         }
3290
3291         END_CRIT_SECTION();
3292 }
3293
3294 /*
3295  *      scan_index() -- scan one index relation to update pg_class statistics.
3296  *
3297  * We use this when we have no deletions to do.
3298  */
3299 static void
3300 scan_index(Relation indrel, double num_tuples)
3301 {
3302         IndexBulkDeleteResult *stats;
3303         IndexVacuumInfo ivinfo;
3304         PGRUsage        ru0;
3305
3306         pg_rusage_init(&ru0);
3307
3308         ivinfo.index = indrel;
3309         ivinfo.vacuum_full = true;
3310         ivinfo.message_level = elevel;
3311         ivinfo.num_heap_tuples = num_tuples;
3312         ivinfo.strategy = vac_strategy;
3313
3314         stats = index_vacuum_cleanup(&ivinfo, NULL);
3315
3316         if (!stats)
3317                 return;
3318
3319         /* now update statistics in pg_class */
3320         vac_update_relstats(RelationGetRelid(indrel),
3321                                                 stats->num_pages, stats->num_index_tuples,
3322                                                 false, InvalidTransactionId);
3323
3324         ereport(elevel,
3325                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3326                                         RelationGetRelationName(indrel),
3327                                         stats->num_index_tuples,
3328                                         stats->num_pages),
3329         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3330                           "%s.",
3331                           stats->pages_deleted, stats->pages_free,
3332                           pg_rusage_show(&ru0))));
3333
3334         /*
3335          * Check for tuple count mismatch.      If the index is partial, then it's OK
3336          * for it to have fewer tuples than the heap; else we got trouble.
3337          */
3338         if (stats->num_index_tuples != num_tuples)
3339         {
3340                 if (stats->num_index_tuples > num_tuples ||
3341                         !vac_is_partial_index(indrel))
3342                         ereport(WARNING,
3343                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3344                                                         RelationGetRelationName(indrel),
3345                                                         stats->num_index_tuples, num_tuples),
3346                                          errhint("Rebuild the index with REINDEX.")));
3347         }
3348
3349         pfree(stats);
3350 }
3351
3352 /*
3353  *      vacuum_index() -- vacuum one index relation.
3354  *
3355  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3356  *              It's locked. Indrel is an index relation on the vacuumed heap.
3357  *
3358  *              We don't bother to set locks on the index relation here, since
3359  *              the parent table is exclusive-locked already.
3360  *
3361  *              Finally, we arrange to update the index relation's statistics in
3362  *              pg_class.
3363  */
3364 static void
3365 vacuum_index(VacPageList vacpagelist, Relation indrel,
3366                          double num_tuples, int keep_tuples)
3367 {
3368         IndexBulkDeleteResult *stats;
3369         IndexVacuumInfo ivinfo;
3370         PGRUsage        ru0;
3371
3372         pg_rusage_init(&ru0);
3373
3374         ivinfo.index = indrel;
3375         ivinfo.vacuum_full = true;
3376         ivinfo.message_level = elevel;
3377         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3378         ivinfo.strategy = vac_strategy;
3379
3380         /* Do bulk deletion */
3381         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3382
3383         /* Do post-VACUUM cleanup */
3384         stats = index_vacuum_cleanup(&ivinfo, stats);
3385
3386         if (!stats)
3387                 return;
3388
3389         /* now update statistics in pg_class */
3390         vac_update_relstats(RelationGetRelid(indrel),
3391                                                 stats->num_pages, stats->num_index_tuples,
3392                                                 false, InvalidTransactionId);
3393
3394         ereport(elevel,
3395                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3396                                         RelationGetRelationName(indrel),
3397                                         stats->num_index_tuples,
3398                                         stats->num_pages),
3399                          errdetail("%.0f index row versions were removed.\n"
3400                          "%u index pages have been deleted, %u are currently reusable.\n"
3401                                            "%s.",
3402                                            stats->tuples_removed,
3403                                            stats->pages_deleted, stats->pages_free,
3404                                            pg_rusage_show(&ru0))));
3405
3406         /*
3407          * Check for tuple count mismatch.      If the index is partial, then it's OK
3408          * for it to have fewer tuples than the heap; else we got trouble.
3409          */
3410         if (stats->num_index_tuples != num_tuples + keep_tuples)
3411         {
3412                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3413                         !vac_is_partial_index(indrel))
3414                         ereport(WARNING,
3415                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3416                                                         RelationGetRelationName(indrel),
3417                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3418                                          errhint("Rebuild the index with REINDEX.")));
3419         }
3420
3421         pfree(stats);
3422 }
3423
3424 /*
3425  *      tid_reaped() -- is a particular tid reaped?
3426  *
3427  *              This has the right signature to be an IndexBulkDeleteCallback.
3428  *
3429  *              vacpagelist->VacPage_array is sorted in right order.
3430  */
3431 static bool
3432 tid_reaped(ItemPointer itemptr, void *state)
3433 {
3434         VacPageList vacpagelist = (VacPageList) state;
3435         OffsetNumber ioffno;
3436         OffsetNumber *voff;
3437         VacPage         vp,
3438                            *vpp;
3439         VacPageData vacpage;
3440
3441         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3442         ioffno = ItemPointerGetOffsetNumber(itemptr);
3443
3444         vp = &vacpage;
3445         vpp = (VacPage *) vac_bsearch((void *) &vp,
3446                                                                   (void *) (vacpagelist->pagedesc),
3447                                                                   vacpagelist->num_pages,
3448                                                                   sizeof(VacPage),
3449                                                                   vac_cmp_blk);
3450
3451         if (vpp == NULL)
3452                 return false;
3453
3454         /* ok - we are on a partially or fully reaped page */
3455         vp = *vpp;
3456
3457         if (vp->offsets_free == 0)
3458         {
3459                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3460                 return true;
3461         }
3462
3463         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3464                                                                                 (void *) (vp->offsets),
3465                                                                                 vp->offsets_free,
3466                                                                                 sizeof(OffsetNumber),
3467                                                                                 vac_cmp_offno);
3468
3469         if (voff == NULL)
3470                 return false;
3471
3472         /* tid is reaped */
3473         return true;
3474 }
3475
3476 /*
3477  * Update the shared Free Space Map with the info we now have about
3478  * free space in the relation, discarding any old info the map may have.
3479  */
3480 static void
3481 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3482                            BlockNumber rel_pages)
3483 {
3484         int                     nPages = fraged_pages->num_pages;
3485         VacPage    *pagedesc = fraged_pages->pagedesc;
3486         Size            threshold;
3487         FSMPageData *pageSpaces;
3488         int                     outPages;
3489         int                     i;
3490
3491         /*
3492          * We only report pages with free space at least equal to the average
3493          * request size --- this avoids cluttering FSM with uselessly-small bits
3494          * of space.  Although FSM would discard pages with little free space
3495          * anyway, it's important to do this prefiltering because (a) it reduces
3496          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3497          * FSM uses the number of pages reported as a statistic for guiding space
3498          * management.  If we didn't threshold our reports the same way
3499          * vacuumlazy.c does, we'd be skewing that statistic.
3500          */
3501         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3502
3503         pageSpaces = (FSMPageData *) palloc(nPages * sizeof(FSMPageData));
3504         outPages = 0;
3505
3506         for (i = 0; i < nPages; i++)
3507         {
3508                 /*
3509                  * fraged_pages may contain entries for pages that we later decided to
3510                  * truncate from the relation; don't enter them into the free space
3511                  * map!
3512                  */
3513                 if (pagedesc[i]->blkno >= rel_pages)
3514                         break;
3515
3516                 if (pagedesc[i]->free >= threshold)
3517                 {
3518                         FSMPageSetPageNum(&pageSpaces[outPages], pagedesc[i]->blkno);
3519                         FSMPageSetSpace(&pageSpaces[outPages], pagedesc[i]->free);
3520                         outPages++;
3521                 }
3522         }
3523
3524         RecordRelationFreeSpace(&onerel->rd_node, outPages, outPages, pageSpaces);
3525
3526         pfree(pageSpaces);
3527 }
3528
3529 /* Copy a VacPage structure */
3530 static VacPage
3531 copy_vac_page(VacPage vacpage)
3532 {
3533         VacPage         newvacpage;
3534
3535         /* allocate a VacPageData entry */
3536         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3537                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3538
3539         /* fill it in */
3540         if (vacpage->offsets_free > 0)
3541                 memcpy(newvacpage->offsets, vacpage->offsets,
3542                            vacpage->offsets_free * sizeof(OffsetNumber));
3543         newvacpage->blkno = vacpage->blkno;
3544         newvacpage->free = vacpage->free;
3545         newvacpage->offsets_used = vacpage->offsets_used;
3546         newvacpage->offsets_free = vacpage->offsets_free;
3547
3548         return newvacpage;
3549 }
3550
3551 /*
3552  * Add a VacPage pointer to a VacPageList.
3553  *
3554  *              As a side effect of the way that scan_heap works,
3555  *              higher pages come after lower pages in the array
3556  *              (and highest tid on a page is last).
3557  */
3558 static void
3559 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3560 {
3561 #define PG_NPAGEDESC 1024
3562
3563         /* allocate a VacPage entry if needed */
3564         if (vacpagelist->num_pages == 0)
3565         {
3566                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3567                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3568         }
3569         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3570         {
3571                 vacpagelist->num_allocated_pages *= 2;
3572                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3573         }
3574         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3575         (vacpagelist->num_pages)++;
3576 }
3577
3578 /*
3579  * vac_bsearch: just like standard C library routine bsearch(),
3580  * except that we first test to see whether the target key is outside
3581  * the range of the table entries.      This case is handled relatively slowly
3582  * by the normal binary search algorithm (ie, no faster than any other key)
3583  * but it occurs often enough in VACUUM to be worth optimizing.
3584  */
3585 static void *
3586 vac_bsearch(const void *key, const void *base,
3587                         size_t nelem, size_t size,
3588                         int (*compar) (const void *, const void *))
3589 {
3590         int                     res;
3591         const void *last;
3592
3593         if (nelem == 0)
3594                 return NULL;
3595         res = compar(key, base);
3596         if (res < 0)
3597                 return NULL;
3598         if (res == 0)
3599                 return (void *) base;
3600         if (nelem > 1)
3601         {
3602                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3603                 res = compar(key, last);
3604                 if (res > 0)
3605                         return NULL;
3606                 if (res == 0)
3607                         return (void *) last;
3608         }
3609         if (nelem <= 2)
3610                 return NULL;                    /* already checked 'em all */
3611         return bsearch(key, base, nelem, size, compar);
3612 }
3613
3614 /*
3615  * Comparator routines for use with qsort() and bsearch().
3616  */
3617 static int
3618 vac_cmp_blk(const void *left, const void *right)
3619 {
3620         BlockNumber lblk,
3621                                 rblk;
3622
3623         lblk = (*((VacPage *) left))->blkno;
3624         rblk = (*((VacPage *) right))->blkno;
3625
3626         if (lblk < rblk)
3627                 return -1;
3628         if (lblk == rblk)
3629                 return 0;
3630         return 1;
3631 }
3632
3633 static int
3634 vac_cmp_offno(const void *left, const void *right)
3635 {
3636         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3637                 return -1;
3638         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3639                 return 0;
3640         return 1;
3641 }
3642
3643 static int
3644 vac_cmp_vtlinks(const void *left, const void *right)
3645 {
3646         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3647                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3648                 return -1;
3649         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3650                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3651                 return 1;
3652         /* bi_hi-es are equal */
3653         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3654                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3655                 return -1;
3656         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3657                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3658                 return 1;
3659         /* bi_lo-es are equal */
3660         if (((VTupleLink) left)->new_tid.ip_posid <
3661                 ((VTupleLink) right)->new_tid.ip_posid)
3662                 return -1;
3663         if (((VTupleLink) left)->new_tid.ip_posid >
3664                 ((VTupleLink) right)->new_tid.ip_posid)
3665                 return 1;
3666         return 0;
3667 }
3668
3669
3670 /*
3671  * Open all the indexes of the given relation, obtaining the specified kind
3672  * of lock on each.  Return an array of Relation pointers for the indexes
3673  * into *Irel, and the number of indexes into *nindexes.
3674  */
3675 void
3676 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3677                                  int *nindexes, Relation **Irel)
3678 {
3679         List       *indexoidlist;
3680         ListCell   *indexoidscan;
3681         int                     i;
3682
3683         Assert(lockmode != NoLock);
3684
3685         indexoidlist = RelationGetIndexList(relation);
3686
3687         *nindexes = list_length(indexoidlist);
3688
3689         if (*nindexes > 0)
3690                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3691         else
3692                 *Irel = NULL;
3693
3694         i = 0;
3695         foreach(indexoidscan, indexoidlist)
3696         {
3697                 Oid                     indexoid = lfirst_oid(indexoidscan);
3698
3699                 (*Irel)[i++] = index_open(indexoid, lockmode);
3700         }
3701
3702         list_free(indexoidlist);
3703 }
3704
3705 /*
3706  * Release the resources acquired by vac_open_indexes.  Optionally release
3707  * the locks (say NoLock to keep 'em).
3708  */
3709 void
3710 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3711 {
3712         if (Irel == NULL)
3713                 return;
3714
3715         while (nindexes--)
3716         {
3717                 Relation        ind = Irel[nindexes];
3718
3719                 index_close(ind, lockmode);
3720         }
3721         pfree(Irel);
3722 }
3723
3724
3725 /*
3726  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3727  */
3728 bool
3729 vac_is_partial_index(Relation indrel)
3730 {
3731         /*
3732          * If the index's AM doesn't support nulls, it's partial for our purposes
3733          */
3734         if (!indrel->rd_am->amindexnulls)
3735                 return true;
3736
3737         /* Otherwise, look to see if there's a partial-index predicate */
3738         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3739                 return true;
3740
3741         return false;
3742 }
3743
3744
3745 static bool
3746 enough_space(VacPage vacpage, Size len)
3747 {
3748         len = MAXALIGN(len);
3749
3750         if (len > vacpage->free)
3751                 return false;
3752
3753         /* if there are free itemid(s) and len <= free_space... */
3754         if (vacpage->offsets_used < vacpage->offsets_free)
3755                 return true;
3756
3757         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3758         if (len + sizeof(ItemIdData) <= vacpage->free)
3759                 return true;
3760
3761         return false;
3762 }
3763
3764 static Size
3765 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3766 {
3767         /*
3768          * It is correct to use PageGetExactFreeSpace() here, *not*
3769          * PageGetHeapFreeSpace().  This is because (a) we do our own, exact
3770          * accounting for whether line pointers must be added, and (b) we will
3771          * recycle any LP_DEAD line pointers before starting to add rows to a
3772          * page, but that may not have happened yet at the time this function is
3773          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3774          * against too many line pointers on a page could fire incorrectly.  We do
3775          * not need that protection here: since VACUUM FULL always recycles all
3776          * dead line pointers first, it'd be physically impossible to insert more
3777          * than MaxHeapTuplesPerPage tuples anyway.
3778          */
3779         Size            freespace = PageGetExactFreeSpace(page);
3780         Size            targetfree;
3781
3782         targetfree = RelationGetTargetPageFreeSpace(relation,
3783                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3784         if (freespace > targetfree)
3785                 return freespace - targetfree;
3786         else
3787                 return 0;
3788 }
3789
3790 /*
3791  * vacuum_delay_point --- check for interrupts and cost-based delay.
3792  *
3793  * This should be called in each major loop of VACUUM processing,
3794  * typically once per page processed.
3795  */
3796 void
3797 vacuum_delay_point(void)
3798 {
3799         /* Always check for interrupts */
3800         CHECK_FOR_INTERRUPTS();
3801
3802         /* Nap if appropriate */
3803         if (VacuumCostActive && !InterruptPending &&
3804                 VacuumCostBalance >= VacuumCostLimit)
3805         {
3806                 int                     msec;
3807
3808                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3809                 if (msec > VacuumCostDelay * 4)
3810                         msec = VacuumCostDelay * 4;
3811
3812                 pg_usleep(msec * 1000L);
3813
3814                 VacuumCostBalance = 0;
3815
3816                 /* update balance values for workers */
3817                 AutoVacuumUpdateDelay();
3818
3819                 /* Might have gotten an interrupt while sleeping */
3820                 CHECK_FOR_INTERRUPTS();
3821         }
3822 }