]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Track the current XID wrap limit (or more accurately, the oldest unfrozen
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.391 2009/08/31 02:23:22 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/visibilitymap.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "catalog/namespace.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_namespace.h"
35 #include "catalog/storage.h"
36 #include "commands/dbcommands.h"
37 #include "commands/vacuum.h"
38 #include "executor/executor.h"
39 #include "miscadmin.h"
40 #include "pgstat.h"
41 #include "postmaster/autovacuum.h"
42 #include "storage/bufmgr.h"
43 #include "storage/freespace.h"
44 #include "storage/lmgr.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/flatfiles.h"
50 #include "utils/fmgroids.h"
51 #include "utils/inval.h"
52 #include "utils/lsyscache.h"
53 #include "utils/memutils.h"
54 #include "utils/pg_rusage.h"
55 #include "utils/relcache.h"
56 #include "utils/snapmgr.h"
57 #include "utils/syscache.h"
58 #include "utils/tqual.h"
59
60
61 /*
62  * GUC parameters
63  */
64 int                     vacuum_freeze_min_age;
65 int                     vacuum_freeze_table_age;
66
67 /*
68  * VacPage structures keep track of each page on which we find useful
69  * amounts of free space.
70  */
71 typedef struct VacPageData
72 {
73         BlockNumber blkno;                      /* BlockNumber of this Page */
74         Size            free;                   /* FreeSpace on this Page */
75         uint16          offsets_used;   /* Number of OffNums used by vacuum */
76         uint16          offsets_free;   /* Number of OffNums free or to be free */
77         OffsetNumber offsets[1];        /* Array of free OffNums */
78 } VacPageData;
79
80 typedef VacPageData *VacPage;
81
82 typedef struct VacPageListData
83 {
84         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
85         int                     num_pages;              /* Number of pages in pagedesc */
86         int                     num_allocated_pages;    /* Number of allocated pages in
87                                                                                  * pagedesc */
88         VacPage    *pagedesc;           /* Descriptions of pages */
89 } VacPageListData;
90
91 typedef VacPageListData *VacPageList;
92
93 /*
94  * The "vtlinks" array keeps information about each recently-updated tuple
95  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
96  * We store the tuple's own TID as well as its t_ctid (its link to the next
97  * newer tuple version).  Searching in this array allows us to follow update
98  * chains backwards from newer to older tuples.  When we move a member of an
99  * update chain, we must move *all* the live members of the chain, so that we
100  * can maintain their t_ctid link relationships (we must not just overwrite
101  * t_ctid in an existing tuple).
102  *
103  * Note: because t_ctid links can be stale (this would only occur if a prior
104  * VACUUM crashed partway through), it is possible that new_tid points to an
105  * empty slot or unrelated tuple.  We have to check the linkage as we follow
106  * it, just as is done in EvalPlanQual.
107  */
108 typedef struct VTupleLinkData
109 {
110         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
111         ItemPointerData this_tid;       /* t_self of the tuple */
112 } VTupleLinkData;
113
114 typedef VTupleLinkData *VTupleLink;
115
116 /*
117  * We use an array of VTupleMoveData to plan a chain tuple move fully
118  * before we do it.
119  */
120 typedef struct VTupleMoveData
121 {
122         ItemPointerData tid;            /* tuple ID */
123         VacPage         vacpage;                /* where to move it to */
124         bool            cleanVpd;               /* clean vacpage before using? */
125 } VTupleMoveData;
126
127 typedef VTupleMoveData *VTupleMove;
128
129 /*
130  * VRelStats contains the data acquired by scan_heap for use later
131  */
132 typedef struct VRelStats
133 {
134         /* miscellaneous statistics */
135         BlockNumber rel_pages;          /* pages in relation */
136         double          rel_tuples;             /* tuples that remain after vacuuming */
137         double          rel_indexed_tuples;             /* indexed tuples that remain */
138         Size            min_tlen;               /* min surviving tuple size */
139         Size            max_tlen;               /* max surviving tuple size */
140         bool            hasindex;
141         /* vtlinks array for tuple chain following - sorted by new_tid */
142         int                     num_vtlinks;
143         VTupleLink      vtlinks;
144 } VRelStats;
145
146 /*----------------------------------------------------------------------
147  * ExecContext:
148  *
149  * As these variables always appear together, we put them into one struct
150  * and pull initialization and cleanup into separate routines.
151  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
152  * accurately:  It is *used* only in move_xxx_tuple(), but because this
153  * routine is called many times, we initialize the struct just once in
154  * repair_frag() and pass it on to move_xxx_tuple().
155  */
156 typedef struct ExecContextData
157 {
158         ResultRelInfo *resultRelInfo;
159         EState     *estate;
160         TupleTableSlot *slot;
161 } ExecContextData;
162
163 typedef ExecContextData *ExecContext;
164
165 static void
166 ExecContext_Init(ExecContext ec, Relation rel)
167 {
168         TupleDesc       tupdesc = RelationGetDescr(rel);
169
170         /*
171          * We need a ResultRelInfo and an EState so we can use the regular
172          * executor's index-entry-making machinery.
173          */
174         ec->estate = CreateExecutorState();
175
176         ec->resultRelInfo = makeNode(ResultRelInfo);
177         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
178         ec->resultRelInfo->ri_RelationDesc = rel;
179         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
180
181         ExecOpenIndices(ec->resultRelInfo);
182
183         ec->estate->es_result_relations = ec->resultRelInfo;
184         ec->estate->es_num_result_relations = 1;
185         ec->estate->es_result_relation_info = ec->resultRelInfo;
186
187         /* Set up a tuple slot too */
188         ec->slot = MakeSingleTupleTableSlot(tupdesc);
189 }
190
191 static void
192 ExecContext_Finish(ExecContext ec)
193 {
194         ExecDropSingleTupleTableSlot(ec->slot);
195         ExecCloseIndices(ec->resultRelInfo);
196         FreeExecutorState(ec->estate);
197 }
198
199 /*
200  * End of ExecContext Implementation
201  *----------------------------------------------------------------------
202  */
203
204 /* A few variables that don't seem worth passing around as parameters */
205 static MemoryContext vac_context = NULL;
206
207 static int      elevel = -1;
208
209 static TransactionId OldestXmin;
210 static TransactionId FreezeLimit;
211
212 static BufferAccessStrategy vac_strategy;
213
214
215 /* non-export function prototypes */
216 static List *get_rel_oids(Oid relid, const RangeVar *vacrel,
217                          const char *stmttype);
218 static void vac_truncate_clog(TransactionId frozenXID);
219 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast,
220                    bool for_wraparound, bool *scanned_all);
221 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
222 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
223                   VacPageList vacuum_pages, VacPageList fraged_pages);
224 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
225                         VacPageList vacuum_pages, VacPageList fraged_pages,
226                         int nindexes, Relation *Irel);
227 static void move_chain_tuple(Relation rel,
228                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
229                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
230                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
231 static void move_plain_tuple(Relation rel,
232                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
233                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
234                                  ExecContext ec);
235 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
236                                  int num_fraged_pages, BlockNumber last_move_dest_block,
237                                  int num_moved);
238 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
239                         VacPageList vacpagelist);
240 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
241 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
242                          double num_tuples, int keep_tuples);
243 static void scan_index(Relation indrel, double num_tuples);
244 static bool tid_reaped(ItemPointer itemptr, void *state);
245 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
246                            BlockNumber rel_pages);
247 static VacPage copy_vac_page(VacPage vacpage);
248 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
249 static void *vac_bsearch(const void *key, const void *base,
250                         size_t nelem, size_t size,
251                         int (*compar) (const void *, const void *));
252 static int      vac_cmp_blk(const void *left, const void *right);
253 static int      vac_cmp_offno(const void *left, const void *right);
254 static int      vac_cmp_vtlinks(const void *left, const void *right);
255 static bool enough_space(VacPage vacpage, Size len);
256 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
257
258
259 /****************************************************************************
260  *                                                                                                                                                      *
261  *                      Code common to all flavors of VACUUM and ANALYZE                                *
262  *                                                                                                                                                      *
263  ****************************************************************************
264  */
265
266
267 /*
268  * Primary entry point for VACUUM and ANALYZE commands.
269  *
270  * relid is normally InvalidOid; if it is not, then it provides the relation
271  * OID to be processed, and vacstmt->relation is ignored.  (The non-invalid
272  * case is currently only used by autovacuum.)
273  *
274  * do_toast is passed as FALSE by autovacuum, because it processes TOAST
275  * tables separately.
276  *
277  * for_wraparound is used by autovacuum to let us know when it's forcing
278  * a vacuum for wraparound, which should not be auto-cancelled.
279  *
280  * bstrategy is normally given as NULL, but in autovacuum it can be passed
281  * in to use the same buffer strategy object across multiple vacuum() calls.
282  *
283  * isTopLevel should be passed down from ProcessUtility.
284  *
285  * It is the caller's responsibility that vacstmt and bstrategy
286  * (if given) be allocated in a memory context that won't disappear
287  * at transaction commit.
288  */
289 void
290 vacuum(VacuumStmt *vacstmt, Oid relid, bool do_toast,
291            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
292 {
293         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
294         volatile MemoryContext anl_context = NULL;
295         volatile bool all_rels,
296                                 in_outer_xact,
297                                 use_own_xacts;
298         List       *relations;
299
300         if (vacstmt->verbose)
301                 elevel = INFO;
302         else
303                 elevel = DEBUG2;
304
305         /*
306          * We cannot run VACUUM inside a user transaction block; if we were inside
307          * a transaction, then our commit- and start-transaction-command calls
308          * would not have the intended effect! Furthermore, the forced commit that
309          * occurs before truncating the relation's file would have the effect of
310          * committing the rest of the user's transaction too, which would
311          * certainly not be the desired behavior.  (This only applies to VACUUM
312          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
313          * block, but we choose to disallow that case because we'd rather commit
314          * as soon as possible after finishing the vacuum.      This is mainly so that
315          * we can let go the AccessExclusiveLock that we may be holding.)
316          *
317          * ANALYZE (without VACUUM) can run either way.
318          */
319         if (vacstmt->vacuum)
320         {
321                 PreventTransactionChain(isTopLevel, stmttype);
322                 in_outer_xact = false;
323         }
324         else
325                 in_outer_xact = IsInTransactionChain(isTopLevel);
326
327         /*
328          * Send info about dead objects to the statistics collector, unless we are
329          * in autovacuum --- autovacuum.c does this for itself.
330          */
331         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
332                 pgstat_vacuum_stat();
333
334         /*
335          * Create special memory context for cross-transaction storage.
336          *
337          * Since it is a child of PortalContext, it will go away eventually even
338          * if we suffer an error; there's no need for special abort cleanup logic.
339          */
340         vac_context = AllocSetContextCreate(PortalContext,
341                                                                                 "Vacuum",
342                                                                                 ALLOCSET_DEFAULT_MINSIZE,
343                                                                                 ALLOCSET_DEFAULT_INITSIZE,
344                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
345
346         /*
347          * If caller didn't give us a buffer strategy object, make one in the
348          * cross-transaction memory context.
349          */
350         if (bstrategy == NULL)
351         {
352                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
353
354                 bstrategy = GetAccessStrategy(BAS_VACUUM);
355                 MemoryContextSwitchTo(old_context);
356         }
357         vac_strategy = bstrategy;
358
359         /* Remember whether we are processing everything in the DB */
360         all_rels = (!OidIsValid(relid) && vacstmt->relation == NULL);
361
362         /*
363          * Build list of relations to process, unless caller gave us one. (If we
364          * build one, we put it in vac_context for safekeeping.)
365          */
366         relations = get_rel_oids(relid, vacstmt->relation, stmttype);
367
368         /*
369          * Decide whether we need to start/commit our own transactions.
370          *
371          * For VACUUM (with or without ANALYZE): always do so, so that we can
372          * release locks as soon as possible.  (We could possibly use the outer
373          * transaction for a one-table VACUUM, but handling TOAST tables would be
374          * problematic.)
375          *
376          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
377          * start/commit our own transactions.  Also, there's no need to do so if
378          * only processing one relation.  For multiple relations when not within a
379          * transaction block, and also in an autovacuum worker, use own
380          * transactions so we can release locks sooner.
381          */
382         if (vacstmt->vacuum)
383                 use_own_xacts = true;
384         else
385         {
386                 Assert(vacstmt->analyze);
387                 if (IsAutoVacuumWorkerProcess())
388                         use_own_xacts = true;
389                 else if (in_outer_xact)
390                         use_own_xacts = false;
391                 else if (list_length(relations) > 1)
392                         use_own_xacts = true;
393                 else
394                         use_own_xacts = false;
395         }
396
397         /*
398          * If we are running ANALYZE without per-table transactions, we'll need a
399          * memory context with table lifetime.
400          */
401         if (!use_own_xacts)
402                 anl_context = AllocSetContextCreate(PortalContext,
403                                                                                         "Analyze",
404                                                                                         ALLOCSET_DEFAULT_MINSIZE,
405                                                                                         ALLOCSET_DEFAULT_INITSIZE,
406                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
407
408         /*
409          * vacuum_rel expects to be entered with no transaction active; it will
410          * start and commit its own transaction.  But we are called by an SQL
411          * command, and so we are executing inside a transaction already. We
412          * commit the transaction started in PostgresMain() here, and start
413          * another one before exiting to match the commit waiting for us back in
414          * PostgresMain().
415          */
416         if (use_own_xacts)
417         {
418                 /* ActiveSnapshot is not set by autovacuum */
419                 if (ActiveSnapshotSet())
420                         PopActiveSnapshot();
421
422                 /* matches the StartTransaction in PostgresMain() */
423                 CommitTransactionCommand();
424         }
425
426         /* Turn vacuum cost accounting on or off */
427         PG_TRY();
428         {
429                 ListCell   *cur;
430
431                 VacuumCostActive = (VacuumCostDelay > 0);
432                 VacuumCostBalance = 0;
433
434                 /*
435                  * Loop to process each selected relation.
436                  */
437                 foreach(cur, relations)
438                 {
439                         Oid                     relid = lfirst_oid(cur);
440                         bool            scanned_all = false;
441
442                         if (vacstmt->vacuum)
443                                 vacuum_rel(relid, vacstmt, do_toast, for_wraparound,
444                                                    &scanned_all);
445
446                         if (vacstmt->analyze)
447                         {
448                                 MemoryContext old_context = NULL;
449
450                                 /*
451                                  * If using separate xacts, start one for analyze. Otherwise,
452                                  * we can use the outer transaction, but we still need to call
453                                  * analyze_rel in a memory context that will be cleaned up on
454                                  * return (else we leak memory while processing multiple
455                                  * tables).
456                                  */
457                                 if (use_own_xacts)
458                                 {
459                                         StartTransactionCommand();
460                                         /* functions in indexes may want a snapshot set */
461                                         PushActiveSnapshot(GetTransactionSnapshot());
462                                 }
463                                 else
464                                         old_context = MemoryContextSwitchTo(anl_context);
465
466                                 analyze_rel(relid, vacstmt, vac_strategy, !scanned_all);
467
468                                 if (use_own_xacts)
469                                 {
470                                         PopActiveSnapshot();
471                                         CommitTransactionCommand();
472                                 }
473                                 else
474                                 {
475                                         MemoryContextSwitchTo(old_context);
476                                         MemoryContextResetAndDeleteChildren(anl_context);
477                                 }
478                         }
479                 }
480         }
481         PG_CATCH();
482         {
483                 /* Make sure cost accounting is turned off after error */
484                 VacuumCostActive = false;
485                 PG_RE_THROW();
486         }
487         PG_END_TRY();
488
489         /* Turn off vacuum cost accounting */
490         VacuumCostActive = false;
491
492         /*
493          * Finish up processing.
494          */
495         if (use_own_xacts)
496         {
497                 /* here, we are not in a transaction */
498
499                 /*
500                  * This matches the CommitTransaction waiting for us in
501                  * PostgresMain().
502                  */
503                 StartTransactionCommand();
504         }
505
506         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
507         {
508                 /*
509                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
510                  * (autovacuum.c does this for itself.)
511                  */
512                 vac_update_datfrozenxid();
513         }
514
515         /*
516          * Clean up working storage --- note we must do this after
517          * StartTransactionCommand, else we might be trying to delete the active
518          * context!
519          */
520         MemoryContextDelete(vac_context);
521         vac_context = NULL;
522
523         if (anl_context)
524                 MemoryContextDelete(anl_context);
525 }
526
527 /*
528  * Build a list of Oids for each relation to be processed
529  *
530  * The list is built in vac_context so that it will survive across our
531  * per-relation transactions.
532  */
533 static List *
534 get_rel_oids(Oid relid, const RangeVar *vacrel, const char *stmttype)
535 {
536         List       *oid_list = NIL;
537         MemoryContext oldcontext;
538
539         /* OID supplied by VACUUM's caller? */
540         if (OidIsValid(relid))
541         {
542                 oldcontext = MemoryContextSwitchTo(vac_context);
543                 oid_list = lappend_oid(oid_list, relid);
544                 MemoryContextSwitchTo(oldcontext);
545         }
546         else if (vacrel)
547         {
548                 /* Process a specific relation */
549                 Oid                     relid;
550
551                 relid = RangeVarGetRelid(vacrel, false);
552
553                 /* Make a relation list entry for this guy */
554                 oldcontext = MemoryContextSwitchTo(vac_context);
555                 oid_list = lappend_oid(oid_list, relid);
556                 MemoryContextSwitchTo(oldcontext);
557         }
558         else
559         {
560                 /* Process all plain relations listed in pg_class */
561                 Relation        pgclass;
562                 HeapScanDesc scan;
563                 HeapTuple       tuple;
564                 ScanKeyData key;
565
566                 ScanKeyInit(&key,
567                                         Anum_pg_class_relkind,
568                                         BTEqualStrategyNumber, F_CHAREQ,
569                                         CharGetDatum(RELKIND_RELATION));
570
571                 pgclass = heap_open(RelationRelationId, AccessShareLock);
572
573                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
574
575                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
576                 {
577                         /* Make a relation list entry for this guy */
578                         oldcontext = MemoryContextSwitchTo(vac_context);
579                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
580                         MemoryContextSwitchTo(oldcontext);
581                 }
582
583                 heap_endscan(scan);
584                 heap_close(pgclass, AccessShareLock);
585         }
586
587         return oid_list;
588 }
589
590 /*
591  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
592  */
593 void
594 vacuum_set_xid_limits(int freeze_min_age,
595                                           int freeze_table_age,
596                                           bool sharedRel,
597                                           TransactionId *oldestXmin,
598                                           TransactionId *freezeLimit,
599                                           TransactionId *freezeTableLimit)
600 {
601         int                     freezemin;
602         TransactionId limit;
603         TransactionId safeLimit;
604
605         /*
606          * We can always ignore processes running lazy vacuum.  This is because we
607          * use these values only for deciding which tuples we must keep in the
608          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
609          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
610          * a full vacuum, but keep in mind that only one vacuum process can be
611          * working on a particular table at any time, and that each vacuum is
612          * always an independent transaction.
613          */
614         *oldestXmin = GetOldestXmin(sharedRel, true);
615
616         Assert(TransactionIdIsNormal(*oldestXmin));
617
618         /*
619          * Determine the minimum freeze age to use: as specified by the caller, or
620          * vacuum_freeze_min_age, but in any case not more than half
621          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
622          * wraparound won't occur too frequently.
623          */
624         freezemin = freeze_min_age;
625         if (freezemin < 0)
626                 freezemin = vacuum_freeze_min_age;
627         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
628         Assert(freezemin >= 0);
629
630         /*
631          * Compute the cutoff XID, being careful not to generate a "permanent" XID
632          */
633         limit = *oldestXmin - freezemin;
634         if (!TransactionIdIsNormal(limit))
635                 limit = FirstNormalTransactionId;
636
637         /*
638          * If oldestXmin is very far back (in practice, more than
639          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
640          * freeze age of zero.
641          */
642         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
643         if (!TransactionIdIsNormal(safeLimit))
644                 safeLimit = FirstNormalTransactionId;
645
646         if (TransactionIdPrecedes(limit, safeLimit))
647         {
648                 ereport(WARNING,
649                                 (errmsg("oldest xmin is far in the past"),
650                                  errhint("Close open transactions soon to avoid wraparound problems.")));
651                 limit = *oldestXmin;
652         }
653
654         *freezeLimit = limit;
655
656         if (freezeTableLimit != NULL)
657         {
658                 int                     freezetable;
659
660                 /*
661                  * Determine the table freeze age to use: as specified by the caller,
662                  * or vacuum_freeze_table_age, but in any case not more than
663                  * autovacuum_freeze_max_age * 0.95, so that if you have e.g nightly
664                  * VACUUM schedule, the nightly VACUUM gets a chance to freeze tuples
665                  * before anti-wraparound autovacuum is launched.
666                  */
667                 freezetable = freeze_min_age;
668                 if (freezetable < 0)
669                         freezetable = vacuum_freeze_table_age;
670                 freezetable = Min(freezetable, autovacuum_freeze_max_age * 0.95);
671                 Assert(freezetable >= 0);
672
673                 /*
674                  * Compute the cutoff XID, being careful not to generate a "permanent"
675                  * XID.
676                  */
677                 limit = ReadNewTransactionId() - freezetable;
678                 if (!TransactionIdIsNormal(limit))
679                         limit = FirstNormalTransactionId;
680
681                 *freezeTableLimit = limit;
682         }
683 }
684
685
686 /*
687  *      vac_update_relstats() -- update statistics for one relation
688  *
689  *              Update the whole-relation statistics that are kept in its pg_class
690  *              row.  There are additional stats that will be updated if we are
691  *              doing ANALYZE, but we always update these stats.  This routine works
692  *              for both index and heap relation entries in pg_class.
693  *
694  *              We violate transaction semantics here by overwriting the rel's
695  *              existing pg_class tuple with the new values.  This is reasonably
696  *              safe since the new values are correct whether or not this transaction
697  *              commits.  The reason for this is that if we updated these tuples in
698  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
699  *              by the time we got done with a vacuum cycle, most of the tuples in
700  *              pg_class would've been obsoleted.  Of course, this only works for
701  *              fixed-size never-null columns, but these are.
702  *
703  *              Note another assumption: that two VACUUMs/ANALYZEs on a table can't
704  *              run in parallel, nor can VACUUM/ANALYZE run in parallel with a
705  *              schema alteration such as adding an index, rule, or trigger.  Otherwise
706  *              our updates of relhasindex etc might overwrite uncommitted updates.
707  *
708  *              Another reason for doing it this way is that when we are in a lazy
709  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
710  *              somebody vacuuming pg_class might think they could delete a tuple
711  *              marked with xmin = our xid.
712  *
713  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
714  *              ANALYZE.
715  */
716 void
717 vac_update_relstats(Relation relation,
718                                         BlockNumber num_pages, double num_tuples,
719                                         bool hasindex, TransactionId frozenxid)
720 {
721         Oid                     relid = RelationGetRelid(relation);
722         Relation        rd;
723         HeapTuple       ctup;
724         Form_pg_class pgcform;
725         bool            dirty;
726
727         rd = heap_open(RelationRelationId, RowExclusiveLock);
728
729         /* Fetch a copy of the tuple to scribble on */
730         ctup = SearchSysCacheCopy(RELOID,
731                                                           ObjectIdGetDatum(relid),
732                                                           0, 0, 0);
733         if (!HeapTupleIsValid(ctup))
734                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
735                          relid);
736         pgcform = (Form_pg_class) GETSTRUCT(ctup);
737
738         /* Apply required updates, if any, to copied tuple */
739
740         dirty = false;
741         if (pgcform->relpages != (int32) num_pages)
742         {
743                 pgcform->relpages = (int32) num_pages;
744                 dirty = true;
745         }
746         if (pgcform->reltuples != (float4) num_tuples)
747         {
748                 pgcform->reltuples = (float4) num_tuples;
749                 dirty = true;
750         }
751         if (pgcform->relhasindex != hasindex)
752         {
753                 pgcform->relhasindex = hasindex;
754                 dirty = true;
755         }
756
757         /*
758          * If we have discovered that there are no indexes, then there's no
759          * primary key either.  This could be done more thoroughly...
760          */
761         if (!hasindex)
762         {
763                 if (pgcform->relhaspkey)
764                 {
765                         pgcform->relhaspkey = false;
766                         dirty = true;
767                 }
768         }
769
770         /* We also clear relhasrules and relhastriggers if needed */
771         if (pgcform->relhasrules && relation->rd_rules == NULL)
772         {
773                 pgcform->relhasrules = false;
774                 dirty = true;
775         }
776         if (pgcform->relhastriggers && relation->trigdesc == NULL)
777         {
778                 pgcform->relhastriggers = false;
779                 dirty = true;
780         }
781
782         /*
783          * relfrozenxid should never go backward.  Caller can pass
784          * InvalidTransactionId if it has no new data.
785          */
786         if (TransactionIdIsNormal(frozenxid) &&
787                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
788         {
789                 pgcform->relfrozenxid = frozenxid;
790                 dirty = true;
791         }
792
793         /* If anything changed, write out the tuple. */
794         if (dirty)
795                 heap_inplace_update(rd, ctup);
796
797         heap_close(rd, RowExclusiveLock);
798 }
799
800
801 /*
802  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
803  *
804  *              Update pg_database's datfrozenxid entry for our database to be the
805  *              minimum of the pg_class.relfrozenxid values.  If we are able to
806  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
807  *
808  *              We violate transaction semantics here by overwriting the database's
809  *              existing pg_database tuple with the new value.  This is reasonably
810  *              safe since the new value is correct whether or not this transaction
811  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
812  *              behind after a VACUUM.
813  *
814  *              This routine is shared by full and lazy VACUUM.
815  */
816 void
817 vac_update_datfrozenxid(void)
818 {
819         HeapTuple       tuple;
820         Form_pg_database dbform;
821         Relation        relation;
822         SysScanDesc scan;
823         HeapTuple       classTup;
824         TransactionId newFrozenXid;
825         bool            dirty = false;
826
827         /*
828          * Initialize the "min" calculation with GetOldestXmin, which is a
829          * reasonable approximation to the minimum relfrozenxid for not-yet-
830          * committed pg_class entries for new tables; see AddNewRelationTuple().
831          * Se we cannot produce a wrong minimum by starting with this.
832          */
833         newFrozenXid = GetOldestXmin(true, true);
834
835         /*
836          * We must seqscan pg_class to find the minimum Xid, because there is no
837          * index that can help us here.
838          */
839         relation = heap_open(RelationRelationId, AccessShareLock);
840
841         scan = systable_beginscan(relation, InvalidOid, false,
842                                                           SnapshotNow, 0, NULL);
843
844         while ((classTup = systable_getnext(scan)) != NULL)
845         {
846                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
847
848                 /*
849                  * Only consider heap and TOAST tables (anything else should have
850                  * InvalidTransactionId in relfrozenxid anyway.)
851                  */
852                 if (classForm->relkind != RELKIND_RELATION &&
853                         classForm->relkind != RELKIND_TOASTVALUE)
854                         continue;
855
856                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
857
858                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
859                         newFrozenXid = classForm->relfrozenxid;
860         }
861
862         /* we're done with pg_class */
863         systable_endscan(scan);
864         heap_close(relation, AccessShareLock);
865
866         Assert(TransactionIdIsNormal(newFrozenXid));
867
868         /* Now fetch the pg_database tuple we need to update. */
869         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
870
871         /* Fetch a copy of the tuple to scribble on */
872         tuple = SearchSysCacheCopy(DATABASEOID,
873                                                            ObjectIdGetDatum(MyDatabaseId),
874                                                            0, 0, 0);
875         if (!HeapTupleIsValid(tuple))
876                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
877         dbform = (Form_pg_database) GETSTRUCT(tuple);
878
879         /*
880          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
881          * and detect the common case where it doesn't go forward either.
882          */
883         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
884         {
885                 dbform->datfrozenxid = newFrozenXid;
886                 dirty = true;
887         }
888
889         if (dirty)
890                 heap_inplace_update(relation, tuple);
891
892         heap_freetuple(tuple);
893         heap_close(relation, RowExclusiveLock);
894
895         /*
896          * If we were able to advance datfrozenxid, mark the flat-file copy of
897          * pg_database for update at commit, and see if we can truncate pg_clog.
898          * Also force update if the shared XID-wrap-limit info is stale.
899          */
900         if (dirty || !TransactionIdLimitIsValid())
901         {
902                 database_file_update_needed();
903                 vac_truncate_clog(newFrozenXid);
904         }
905 }
906
907
908 /*
909  *      vac_truncate_clog() -- attempt to truncate the commit log
910  *
911  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
912  *              and use it to truncate the transaction commit log (pg_clog).
913  *              Also update the XID wrap limit info maintained by varsup.c.
914  *
915  *              The passed XID is simply the one I just wrote into my pg_database
916  *              entry.  It's used to initialize the "min" calculation.
917  *
918  *              This routine is shared by full and lazy VACUUM.  Note that it's
919  *              only invoked when we've managed to change our DB's datfrozenxid
920  *              entry, or we found that the shared XID-wrap-limit info is stale.
921  */
922 static void
923 vac_truncate_clog(TransactionId frozenXID)
924 {
925         TransactionId myXID = GetCurrentTransactionId();
926         Relation        relation;
927         HeapScanDesc scan;
928         HeapTuple       tuple;
929         Oid                     oldest_datoid;
930         bool            frozenAlreadyWrapped = false;
931
932         /* init oldest_datoid to sync with my frozenXID */
933         oldest_datoid = MyDatabaseId;
934
935         /*
936          * Scan pg_database to compute the minimum datfrozenxid
937          *
938          * Note: we need not worry about a race condition with new entries being
939          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
940          * existing DB's datfrozenxid, and that source DB cannot be ours because
941          * of the interlock against copying a DB containing an active backend.
942          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
943          * concurrently modify the datfrozenxid's of different databases, the
944          * worst possible outcome is that pg_clog is not truncated as aggressively
945          * as it could be.
946          */
947         relation = heap_open(DatabaseRelationId, AccessShareLock);
948
949         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
950
951         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
952         {
953                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
954
955                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
956
957                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
958                         frozenAlreadyWrapped = true;
959                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
960                 {
961                         frozenXID = dbform->datfrozenxid;
962                         oldest_datoid = HeapTupleGetOid(tuple);
963                 }
964         }
965
966         heap_endscan(scan);
967
968         heap_close(relation, AccessShareLock);
969
970         /*
971          * Do not truncate CLOG if we seem to have suffered wraparound already;
972          * the computed minimum XID might be bogus.  This case should now be
973          * impossible due to the defenses in GetNewTransactionId, but we keep the
974          * test anyway.
975          */
976         if (frozenAlreadyWrapped)
977         {
978                 ereport(WARNING,
979                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
980                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
981                 return;
982         }
983
984         /* Truncate CLOG to the oldest frozenxid */
985         TruncateCLOG(frozenXID);
986
987         /*
988          * Update the wrap limit for GetNewTransactionId.  Note: this function
989          * will also signal the postmaster for an(other) autovac cycle if needed.
990          */
991         SetTransactionIdLimit(frozenXID, oldest_datoid);
992 }
993
994
995 /****************************************************************************
996  *                                                                                                                                                      *
997  *                      Code common to both flavors of VACUUM                                                   *
998  *                                                                                                                                                      *
999  ****************************************************************************
1000  */
1001
1002
1003 /*
1004  *      vacuum_rel() -- vacuum one heap relation
1005  *
1006  *              Doing one heap at a time incurs extra overhead, since we need to
1007  *              check that the heap exists again just before we vacuum it.      The
1008  *              reason that we do this is so that vacuuming can be spread across
1009  *              many small transactions.  Otherwise, two-phase locking would require
1010  *              us to lock the entire database during one pass of the vacuum cleaner.
1011  *
1012  *              We'll return true in *scanned_all if the vacuum scanned all heap
1013  *              pages, and updated pg_class.
1014  *
1015  *              At entry and exit, we are not inside a transaction.
1016  */
1017 static void
1018 vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound,
1019                    bool *scanned_all)
1020 {
1021         LOCKMODE        lmode;
1022         Relation        onerel;
1023         LockRelId       onerelid;
1024         Oid                     toast_relid;
1025         Oid                     save_userid;
1026         bool            save_secdefcxt;
1027
1028         if (scanned_all)
1029                 *scanned_all = false;
1030
1031         /* Begin a transaction for vacuuming this relation */
1032         StartTransactionCommand();
1033
1034         /*
1035          * Functions in indexes may want a snapshot set.  Also, setting a snapshot
1036          * ensures that RecentGlobalXmin is kept truly recent.
1037          */
1038         PushActiveSnapshot(GetTransactionSnapshot());
1039
1040         if (!vacstmt->full)
1041         {
1042                 /*
1043                  * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
1044                  * other concurrent VACUUMs know that they can ignore this one while
1045                  * determining their OldestXmin.  (The reason we don't set it during a
1046                  * full VACUUM is exactly that we may have to run user- defined
1047                  * functions for functional indexes, and we want to make sure that if
1048                  * they use the snapshot set above, any tuples it requires can't get
1049                  * removed from other tables.  An index function that depends on the
1050                  * contents of other tables is arguably broken, but we won't break it
1051                  * here by violating transaction semantics.)
1052                  *
1053                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down by
1054                  * autovacuum; it's used to avoid cancelling a vacuum that was invoked
1055                  * in an emergency.
1056                  *
1057                  * Note: these flags remain set until CommitTransaction or
1058                  * AbortTransaction.  We don't want to clear them until we reset
1059                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1060                  * which is probably Not Good.
1061                  */
1062                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1063                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1064                 if (for_wraparound)
1065                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1066                 LWLockRelease(ProcArrayLock);
1067         }
1068
1069         /*
1070          * Check for user-requested abort.      Note we want this to be inside a
1071          * transaction, so xact.c doesn't issue useless WARNING.
1072          */
1073         CHECK_FOR_INTERRUPTS();
1074
1075         /*
1076          * Determine the type of lock we want --- hard exclusive lock for a FULL
1077          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1078          * way, we can be sure that no other backend is vacuuming the same table.
1079          */
1080         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1081
1082         /*
1083          * Open the relation and get the appropriate lock on it.
1084          *
1085          * There's a race condition here: the rel may have gone away since the
1086          * last time we saw it.  If so, we don't need to vacuum it.
1087          */
1088         onerel = try_relation_open(relid, lmode);
1089
1090         if (!onerel)
1091         {
1092                 PopActiveSnapshot();
1093                 CommitTransactionCommand();
1094                 return;
1095         }
1096
1097         /*
1098          * Check permissions.
1099          *
1100          * We allow the user to vacuum a table if he is superuser, the table
1101          * owner, or the database owner (but in the latter case, only if it's not
1102          * a shared relation).  pg_class_ownercheck includes the superuser case.
1103          *
1104          * Note we choose to treat permissions failure as a WARNING and keep
1105          * trying to vacuum the rest of the DB --- is this appropriate?
1106          */
1107         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1108                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1109         {
1110                 if (onerel->rd_rel->relisshared)
1111                         ereport(WARNING,
1112                                   (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1113                                                   RelationGetRelationName(onerel))));
1114                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1115                         ereport(WARNING,
1116                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1117                                                         RelationGetRelationName(onerel))));
1118                 else
1119                         ereport(WARNING,
1120                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1121                                                         RelationGetRelationName(onerel))));
1122                 relation_close(onerel, lmode);
1123                 PopActiveSnapshot();
1124                 CommitTransactionCommand();
1125                 return;
1126         }
1127
1128         /*
1129          * Check that it's a vacuumable table; we used to do this in
1130          * get_rel_oids() but seems safer to check after we've locked the
1131          * relation.
1132          */
1133         if (onerel->rd_rel->relkind != RELKIND_RELATION &&
1134                 onerel->rd_rel->relkind != RELKIND_TOASTVALUE)
1135         {
1136                 ereport(WARNING,
1137                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1138                                                 RelationGetRelationName(onerel))));
1139                 relation_close(onerel, lmode);
1140                 PopActiveSnapshot();
1141                 CommitTransactionCommand();
1142                 return;
1143         }
1144
1145         /*
1146          * Silently ignore tables that are temp tables of other backends ---
1147          * trying to vacuum these will lead to great unhappiness, since their
1148          * contents are probably not up-to-date on disk.  (We don't throw a
1149          * warning here; it would just lead to chatter during a database-wide
1150          * VACUUM.)
1151          */
1152         if (RELATION_IS_OTHER_TEMP(onerel))
1153         {
1154                 relation_close(onerel, lmode);
1155                 PopActiveSnapshot();
1156                 CommitTransactionCommand();
1157                 return;
1158         }
1159
1160         /*
1161          * Get a session-level lock too. This will protect our access to the
1162          * relation across multiple transactions, so that we can vacuum the
1163          * relation's TOAST table (if any) secure in the knowledge that no one is
1164          * deleting the parent relation.
1165          *
1166          * NOTE: this cannot block, even if someone else is waiting for access,
1167          * because the lock manager knows that both lock requests are from the
1168          * same process.
1169          */
1170         onerelid = onerel->rd_lockInfo.lockRelId;
1171         LockRelationIdForSession(&onerelid, lmode);
1172
1173         /*
1174          * Remember the relation's TOAST relation for later, if the caller asked
1175          * us to process it.
1176          */
1177         if (do_toast)
1178                 toast_relid = onerel->rd_rel->reltoastrelid;
1179         else
1180                 toast_relid = InvalidOid;
1181
1182         /*
1183          * Switch to the table owner's userid, so that any index functions are run
1184          * as that user.  (This is unnecessary, but harmless, for lazy VACUUM.)
1185          */
1186         GetUserIdAndContext(&save_userid, &save_secdefcxt);
1187         SetUserIdAndContext(onerel->rd_rel->relowner, true);
1188
1189         /*
1190          * Do the actual work --- either FULL or "lazy" vacuum
1191          */
1192         if (vacstmt->full)
1193                 full_vacuum_rel(onerel, vacstmt);
1194         else
1195                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy, scanned_all);
1196
1197         /* Restore userid */
1198         SetUserIdAndContext(save_userid, save_secdefcxt);
1199
1200         /* all done with this class, but hold lock until commit */
1201         relation_close(onerel, NoLock);
1202
1203         /*
1204          * Complete the transaction and free all temporary memory used.
1205          */
1206         PopActiveSnapshot();
1207         CommitTransactionCommand();
1208
1209         /*
1210          * If the relation has a secondary toast rel, vacuum that too while we
1211          * still hold the session lock on the master table.  Note however that
1212          * "analyze" will not get done on the toast table.      This is good, because
1213          * the toaster always uses hardcoded index access and statistics are
1214          * totally unimportant for toast relations.
1215          */
1216         if (toast_relid != InvalidOid)
1217                 vacuum_rel(toast_relid, vacstmt, false, for_wraparound, NULL);
1218
1219         /*
1220          * Now release the session-level lock on the master table.
1221          */
1222         UnlockRelationIdForSession(&onerelid, lmode);
1223 }
1224
1225
1226 /****************************************************************************
1227  *                                                                                                                                                      *
1228  *                      Code for VACUUM FULL (only)                                                                             *
1229  *                                                                                                                                                      *
1230  ****************************************************************************
1231  */
1232
1233
1234 /*
1235  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1236  *
1237  *              This routine vacuums a single heap, cleans out its indexes, and
1238  *              updates its num_pages and num_tuples statistics.
1239  *
1240  *              At entry, we have already established a transaction and opened
1241  *              and locked the relation.
1242  */
1243 static void
1244 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1245 {
1246         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1247                                                                                  * clean indexes */
1248         VacPageListData fraged_pages;           /* List of pages with space enough for
1249                                                                                  * re-using */
1250         Relation   *Irel;
1251         int                     nindexes,
1252                                 i;
1253         VRelStats  *vacrelstats;
1254
1255         vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
1256                                                   onerel->rd_rel->relisshared,
1257                                                   &OldestXmin, &FreezeLimit, NULL);
1258
1259         /*
1260          * Flush any previous async-commit transactions.  This does not guarantee
1261          * that we will be able to set hint bits for tuples they inserted, but it
1262          * improves the probability, especially in simple sequential-commands
1263          * cases.  See scan_heap() and repair_frag() for more about this.
1264          */
1265         XLogAsyncCommitFlush();
1266
1267         /*
1268          * Set up statistics-gathering machinery.
1269          */
1270         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1271         vacrelstats->rel_pages = 0;
1272         vacrelstats->rel_tuples = 0;
1273         vacrelstats->rel_indexed_tuples = 0;
1274         vacrelstats->hasindex = false;
1275
1276         /* scan the heap */
1277         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1278         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1279
1280         /* Now open all indexes of the relation */
1281         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1282         if (nindexes > 0)
1283                 vacrelstats->hasindex = true;
1284
1285         /* Clean/scan index relation(s) */
1286         if (Irel != NULL)
1287         {
1288                 if (vacuum_pages.num_pages > 0)
1289                 {
1290                         for (i = 0; i < nindexes; i++)
1291                                 vacuum_index(&vacuum_pages, Irel[i],
1292                                                          vacrelstats->rel_indexed_tuples, 0);
1293                 }
1294                 else
1295                 {
1296                         /* just scan indexes to update statistic */
1297                         for (i = 0; i < nindexes; i++)
1298                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1299                 }
1300         }
1301
1302         if (fraged_pages.num_pages > 0)
1303         {
1304                 /* Try to shrink heap */
1305                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1306                                         nindexes, Irel);
1307                 vac_close_indexes(nindexes, Irel, NoLock);
1308         }
1309         else
1310         {
1311                 vac_close_indexes(nindexes, Irel, NoLock);
1312                 if (vacuum_pages.num_pages > 0)
1313                 {
1314                         /* Clean pages from vacuum_pages list */
1315                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1316                 }
1317         }
1318
1319         /* update thefree space map with final free space info, and vacuum it */
1320         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1321         FreeSpaceMapVacuum(onerel);
1322
1323         /* update statistics in pg_class */
1324         vac_update_relstats(onerel,
1325                                                 vacrelstats->rel_pages, vacrelstats->rel_tuples,
1326                                                 vacrelstats->hasindex, FreezeLimit);
1327
1328         /* report results to the stats collector, too */
1329         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1330                                                  true, vacstmt->analyze, vacrelstats->rel_tuples);
1331 }
1332
1333
1334 /*
1335  *      scan_heap() -- scan an open heap relation
1336  *
1337  *              This routine sets commit status bits, constructs vacuum_pages (list
1338  *              of pages we need to compact free space on and/or clean indexes of
1339  *              deleted tuples), constructs fraged_pages (list of pages with free
1340  *              space that tuples could be moved into), and calculates statistics
1341  *              on the number of live tuples in the heap.
1342  */
1343 static void
1344 scan_heap(VRelStats *vacrelstats, Relation onerel,
1345                   VacPageList vacuum_pages, VacPageList fraged_pages)
1346 {
1347         BlockNumber nblocks,
1348                                 blkno;
1349         char       *relname;
1350         VacPage         vacpage;
1351         BlockNumber empty_pages,
1352                                 empty_end_pages;
1353         double          num_tuples,
1354                                 num_indexed_tuples,
1355                                 tups_vacuumed,
1356                                 nkeep,
1357                                 nunused;
1358         double          free_space,
1359                                 usable_free_space;
1360         Size            min_tlen = MaxHeapTupleSize;
1361         Size            max_tlen = 0;
1362         bool            do_shrinking = true;
1363         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1364         int                     num_vtlinks = 0;
1365         int                     free_vtlinks = 100;
1366         PGRUsage        ru0;
1367
1368         pg_rusage_init(&ru0);
1369
1370         relname = RelationGetRelationName(onerel);
1371         ereport(elevel,
1372                         (errmsg("vacuuming \"%s.%s\"",
1373                                         get_namespace_name(RelationGetNamespace(onerel)),
1374                                         relname)));
1375
1376         empty_pages = empty_end_pages = 0;
1377         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1378         free_space = 0;
1379
1380         nblocks = RelationGetNumberOfBlocks(onerel);
1381
1382         /*
1383          * We initially create each VacPage item in a maximal-sized workspace,
1384          * then copy the workspace into a just-large-enough copy.
1385          */
1386         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1387
1388         for (blkno = 0; blkno < nblocks; blkno++)
1389         {
1390                 Page            page,
1391                                         tempPage = NULL;
1392                 bool            do_reap,
1393                                         do_frag;
1394                 Buffer          buf;
1395                 OffsetNumber offnum,
1396                                         maxoff;
1397                 bool            notup;
1398                 OffsetNumber frozen[MaxOffsetNumber];
1399                 int                     nfrozen;
1400
1401                 vacuum_delay_point();
1402
1403                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1404                                                                  vac_strategy);
1405                 page = BufferGetPage(buf);
1406
1407                 /*
1408                  * Since we are holding exclusive lock on the relation, no other
1409                  * backend can be accessing the page; however it is possible that the
1410                  * background writer will try to write the page if it's already marked
1411                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1412                  * must take exclusive buffer lock wherever we potentially modify
1413                  * pages.  In fact, we insist on cleanup lock so that we can safely
1414                  * call heap_page_prune().      (This might be overkill, since the
1415                  * bgwriter pays no attention to individual tuples, but on the other
1416                  * hand it's unlikely that the bgwriter has this particular page
1417                  * pinned at this instant.      So violating the coding rule would buy us
1418                  * little anyway.)
1419                  */
1420                 LockBufferForCleanup(buf);
1421
1422                 vacpage->blkno = blkno;
1423                 vacpage->offsets_used = 0;
1424                 vacpage->offsets_free = 0;
1425
1426                 if (PageIsNew(page))
1427                 {
1428                         VacPage         vacpagecopy;
1429
1430                         ereport(WARNING,
1431                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1432                                            relname, blkno)));
1433                         PageInit(page, BufferGetPageSize(buf), 0);
1434                         MarkBufferDirty(buf);
1435                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1436                         free_space += vacpage->free;
1437                         empty_pages++;
1438                         empty_end_pages++;
1439                         vacpagecopy = copy_vac_page(vacpage);
1440                         vpage_insert(vacuum_pages, vacpagecopy);
1441                         vpage_insert(fraged_pages, vacpagecopy);
1442                         UnlockReleaseBuffer(buf);
1443                         continue;
1444                 }
1445
1446                 if (PageIsEmpty(page))
1447                 {
1448                         VacPage         vacpagecopy;
1449
1450                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1451                         free_space += vacpage->free;
1452                         empty_pages++;
1453                         empty_end_pages++;
1454                         vacpagecopy = copy_vac_page(vacpage);
1455                         vpage_insert(vacuum_pages, vacpagecopy);
1456                         vpage_insert(fraged_pages, vacpagecopy);
1457                         UnlockReleaseBuffer(buf);
1458                         continue;
1459                 }
1460
1461                 /*
1462                  * Prune all HOT-update chains in this page.
1463                  *
1464                  * We use the redirect_move option so that redirecting line pointers
1465                  * get collapsed out; this allows us to not worry about them below.
1466                  *
1467                  * We count tuples removed by the pruning step as removed by VACUUM.
1468                  */
1469                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1470                                                                                  true, false);
1471
1472                 /*
1473                  * Now scan the page to collect vacuumable items and check for tuples
1474                  * requiring freezing.
1475                  */
1476                 nfrozen = 0;
1477                 notup = true;
1478                 maxoff = PageGetMaxOffsetNumber(page);
1479                 for (offnum = FirstOffsetNumber;
1480                          offnum <= maxoff;
1481                          offnum = OffsetNumberNext(offnum))
1482                 {
1483                         ItemId          itemid = PageGetItemId(page, offnum);
1484                         bool            tupgone = false;
1485                         HeapTupleData tuple;
1486
1487                         /*
1488                          * Collect un-used items too - it's possible to have indexes
1489                          * pointing here after crash.  (That's an ancient comment and is
1490                          * likely obsolete with WAL, but we might as well continue to
1491                          * check for such problems.)
1492                          */
1493                         if (!ItemIdIsUsed(itemid))
1494                         {
1495                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1496                                 nunused += 1;
1497                                 continue;
1498                         }
1499
1500                         /*
1501                          * DEAD item pointers are to be vacuumed normally; but we don't
1502                          * count them in tups_vacuumed, else we'd be double-counting (at
1503                          * least in the common case where heap_page_prune() just freed up
1504                          * a non-HOT tuple).
1505                          */
1506                         if (ItemIdIsDead(itemid))
1507                         {
1508                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1509                                 continue;
1510                         }
1511
1512                         /* Shouldn't have any redirected items anymore */
1513                         if (!ItemIdIsNormal(itemid))
1514                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1515                                          relname, blkno, offnum);
1516
1517                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1518                         tuple.t_len = ItemIdGetLength(itemid);
1519                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1520
1521                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1522                         {
1523                                 case HEAPTUPLE_LIVE:
1524                                         /* Tuple is good --- but let's do some validity checks */
1525                                         if (onerel->rd_rel->relhasoids &&
1526                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1527                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1528                                                          relname, blkno, offnum);
1529
1530                                         /*
1531                                          * The shrinkage phase of VACUUM FULL requires that all
1532                                          * live tuples have XMIN_COMMITTED set --- see comments in
1533                                          * repair_frag()'s walk-along-page loop.  Use of async
1534                                          * commit may prevent HeapTupleSatisfiesVacuum from
1535                                          * setting the bit for a recently committed tuple.      Rather
1536                                          * than trying to handle this corner case, we just give up
1537                                          * and don't shrink.
1538                                          */
1539                                         if (do_shrinking &&
1540                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1541                                         {
1542                                                 ereport(LOG,
1543                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1544                                                                                 relname, blkno, offnum,
1545                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1546                                                 do_shrinking = false;
1547                                         }
1548                                         break;
1549                                 case HEAPTUPLE_DEAD:
1550
1551                                         /*
1552                                          * Ordinarily, DEAD tuples would have been removed by
1553                                          * heap_page_prune(), but it's possible that the tuple
1554                                          * state changed since heap_page_prune() looked.  In
1555                                          * particular an INSERT_IN_PROGRESS tuple could have
1556                                          * changed to DEAD if the inserter aborted.  So this
1557                                          * cannot be considered an error condition, though it does
1558                                          * suggest that someone released a lock early.
1559                                          *
1560                                          * If the tuple is HOT-updated then it must only be
1561                                          * removed by a prune operation; so we keep it as if it
1562                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1563                                          * worth trying to make the shrinking code smart enough to
1564                                          * handle this?  It's an unusual corner case.)
1565                                          *
1566                                          * DEAD heap-only tuples can safely be removed if they
1567                                          * aren't themselves HOT-updated, although this is a bit
1568                                          * inefficient since we'll uselessly try to remove index
1569                                          * entries for them.
1570                                          */
1571                                         if (HeapTupleIsHotUpdated(&tuple))
1572                                         {
1573                                                 nkeep += 1;
1574                                                 if (do_shrinking)
1575                                                         ereport(LOG,
1576                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1577                                                                                         relname, blkno, offnum)));
1578                                                 do_shrinking = false;
1579                                         }
1580                                         else
1581                                         {
1582                                                 tupgone = true; /* we can delete the tuple */
1583
1584                                                 /*
1585                                                  * We need not require XMIN_COMMITTED or
1586                                                  * XMAX_COMMITTED to be set, since we will remove the
1587                                                  * tuple without any further examination of its hint
1588                                                  * bits.
1589                                                  */
1590                                         }
1591                                         break;
1592                                 case HEAPTUPLE_RECENTLY_DEAD:
1593
1594                                         /*
1595                                          * If tuple is recently deleted then we must not remove it
1596                                          * from relation.
1597                                          */
1598                                         nkeep += 1;
1599
1600                                         /*
1601                                          * As with the LIVE case, shrinkage requires
1602                                          * XMIN_COMMITTED to be set.
1603                                          */
1604                                         if (do_shrinking &&
1605                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1606                                         {
1607                                                 ereport(LOG,
1608                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1609                                                                                 relname, blkno, offnum,
1610                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1611                                                 do_shrinking = false;
1612                                         }
1613
1614                                         /*
1615                                          * If we do shrinking and this tuple is updated one then
1616                                          * remember it to construct updated tuple dependencies.
1617                                          */
1618                                         if (do_shrinking &&
1619                                                 !(ItemPointerEquals(&(tuple.t_self),
1620                                                                                         &(tuple.t_data->t_ctid))))
1621                                         {
1622                                                 if (free_vtlinks == 0)
1623                                                 {
1624                                                         free_vtlinks = 1000;
1625                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1626                                                                                            (free_vtlinks + num_vtlinks) *
1627                                                                                                          sizeof(VTupleLinkData));
1628                                                 }
1629                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1630                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1631                                                 free_vtlinks--;
1632                                                 num_vtlinks++;
1633                                         }
1634                                         break;
1635                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1636
1637                                         /*
1638                                          * This should not happen, since we hold exclusive lock on
1639                                          * the relation; shouldn't we raise an error?  (Actually,
1640                                          * it can happen in system catalogs, since we tend to
1641                                          * release write lock before commit there.)  As above, we
1642                                          * can't apply repair_frag() if the tuple state is
1643                                          * uncertain.
1644                                          */
1645                                         if (do_shrinking)
1646                                                 ereport(LOG,
1647                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1648                                                                                 relname, blkno, offnum,
1649                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1650                                         do_shrinking = false;
1651                                         break;
1652                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1653
1654                                         /*
1655                                          * This should not happen, since we hold exclusive lock on
1656                                          * the relation; shouldn't we raise an error?  (Actually,
1657                                          * it can happen in system catalogs, since we tend to
1658                                          * release write lock before commit there.)  As above, we
1659                                          * can't apply repair_frag() if the tuple state is
1660                                          * uncertain.
1661                                          */
1662                                         if (do_shrinking)
1663                                                 ereport(LOG,
1664                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1665                                                                                 relname, blkno, offnum,
1666                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1667                                         do_shrinking = false;
1668                                         break;
1669                                 default:
1670                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1671                                         break;
1672                         }
1673
1674                         if (tupgone)
1675                         {
1676                                 ItemId          lpp;
1677
1678                                 /*
1679                                  * Here we are building a temporary copy of the page with dead
1680                                  * tuples removed.      Below we will apply
1681                                  * PageRepairFragmentation to the copy, so that we can
1682                                  * determine how much space will be available after removal of
1683                                  * dead tuples.  But note we are NOT changing the real page
1684                                  * yet...
1685                                  */
1686                                 if (tempPage == NULL)
1687                                 {
1688                                         Size            pageSize;
1689
1690                                         pageSize = PageGetPageSize(page);
1691                                         tempPage = (Page) palloc(pageSize);
1692                                         memcpy(tempPage, page, pageSize);
1693                                 }
1694
1695                                 /* mark it unused on the temp page */
1696                                 lpp = PageGetItemId(tempPage, offnum);
1697                                 ItemIdSetUnused(lpp);
1698
1699                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1700                                 tups_vacuumed += 1;
1701                         }
1702                         else
1703                         {
1704                                 num_tuples += 1;
1705                                 if (!HeapTupleIsHeapOnly(&tuple))
1706                                         num_indexed_tuples += 1;
1707                                 notup = false;
1708                                 if (tuple.t_len < min_tlen)
1709                                         min_tlen = tuple.t_len;
1710                                 if (tuple.t_len > max_tlen)
1711                                         max_tlen = tuple.t_len;
1712
1713                                 /*
1714                                  * Each non-removable tuple must be checked to see if it needs
1715                                  * freezing.
1716                                  */
1717                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1718                                                                           InvalidBuffer))
1719                                         frozen[nfrozen++] = offnum;
1720                         }
1721                 }                                               /* scan along page */
1722
1723                 if (tempPage != NULL)
1724                 {
1725                         /* Some tuples are removable; figure free space after removal */
1726                         PageRepairFragmentation(tempPage);
1727                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1728                         pfree(tempPage);
1729                         do_reap = true;
1730                 }
1731                 else
1732                 {
1733                         /* Just use current available space */
1734                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1735                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1736                         do_reap = (vacpage->offsets_free > 0);
1737                 }
1738
1739                 free_space += vacpage->free;
1740
1741                 /*
1742                  * Add the page to vacuum_pages if it requires reaping, and add it to
1743                  * fraged_pages if it has a useful amount of free space.  "Useful"
1744                  * means enough for a minimal-sized tuple.      But we don't know that
1745                  * accurately near the start of the relation, so add pages
1746                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1747                  * forcibly add pages with no live tuples, to avoid confusing the
1748                  * empty_end_pages logic.  (In the presence of unreasonably small
1749                  * fillfactor, it seems possible that such pages might not pass the
1750                  * free-space test, but they had better be in the list anyway.)
1751                  */
1752                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1753                                    notup);
1754
1755                 if (do_reap || do_frag)
1756                 {
1757                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1758
1759                         if (do_reap)
1760                                 vpage_insert(vacuum_pages, vacpagecopy);
1761                         if (do_frag)
1762                                 vpage_insert(fraged_pages, vacpagecopy);
1763                 }
1764
1765                 /*
1766                  * Include the page in empty_end_pages if it will be empty after
1767                  * vacuuming; this is to keep us from using it as a move destination.
1768                  * Note that such pages are guaranteed to be in fraged_pages.
1769                  */
1770                 if (notup)
1771                 {
1772                         empty_pages++;
1773                         empty_end_pages++;
1774                 }
1775                 else
1776                         empty_end_pages = 0;
1777
1778                 /*
1779                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1780                  * record recording the changes.  We must log the changes to be
1781                  * crash-safe against future truncation of CLOG.
1782                  */
1783                 if (nfrozen > 0)
1784                 {
1785                         MarkBufferDirty(buf);
1786                         /* no XLOG for temp tables, though */
1787                         if (!onerel->rd_istemp)
1788                         {
1789                                 XLogRecPtr      recptr;
1790
1791                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1792                                                                                  frozen, nfrozen);
1793                                 PageSetLSN(page, recptr);
1794                                 PageSetTLI(page, ThisTimeLineID);
1795                         }
1796                 }
1797
1798                 UnlockReleaseBuffer(buf);
1799         }
1800
1801         pfree(vacpage);
1802
1803         /* save stats in the rel list for use later */
1804         vacrelstats->rel_tuples = num_tuples;
1805         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1806         vacrelstats->rel_pages = nblocks;
1807         if (num_tuples == 0)
1808                 min_tlen = max_tlen = 0;
1809         vacrelstats->min_tlen = min_tlen;
1810         vacrelstats->max_tlen = max_tlen;
1811
1812         vacuum_pages->empty_end_pages = empty_end_pages;
1813         fraged_pages->empty_end_pages = empty_end_pages;
1814
1815         /*
1816          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1817          * remove any "empty" end-pages from the list, and compute usable free
1818          * space = free space in remaining pages.
1819          */
1820         if (do_shrinking)
1821         {
1822                 int                     i;
1823
1824                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1825                 fraged_pages->num_pages -= empty_end_pages;
1826                 usable_free_space = 0;
1827                 for (i = 0; i < fraged_pages->num_pages; i++)
1828                         usable_free_space += fraged_pages->pagedesc[i]->free;
1829         }
1830         else
1831         {
1832                 fraged_pages->num_pages = 0;
1833                 usable_free_space = 0;
1834         }
1835
1836         /* don't bother to save vtlinks if we will not call repair_frag */
1837         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1838         {
1839                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1840                           vac_cmp_vtlinks);
1841                 vacrelstats->vtlinks = vtlinks;
1842                 vacrelstats->num_vtlinks = num_vtlinks;
1843         }
1844         else
1845         {
1846                 vacrelstats->vtlinks = NULL;
1847                 vacrelstats->num_vtlinks = 0;
1848                 pfree(vtlinks);
1849         }
1850
1851         ereport(elevel,
1852                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1853                                         RelationGetRelationName(onerel),
1854                                         tups_vacuumed, num_tuples, nblocks),
1855                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1856                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1857                                            "There were %.0f unused item pointers.\n"
1858            "Total free space (including removable row versions) is %.0f bytes.\n"
1859                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1860          "%u pages containing %.0f free bytes are potential move destinations.\n"
1861                                            "%s.",
1862                                            nkeep,
1863                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1864                                            nunused,
1865                                            free_space,
1866                                            empty_pages, empty_end_pages,
1867                                            fraged_pages->num_pages, usable_free_space,
1868                                            pg_rusage_show(&ru0))));
1869 }
1870
1871
1872 /*
1873  *      repair_frag() -- try to repair relation's fragmentation
1874  *
1875  *              This routine marks dead tuples as unused and tries re-use dead space
1876  *              by moving tuples (and inserting indexes if needed). It constructs
1877  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1878  *              for them after committing (in hack-manner - without losing locks
1879  *              and freeing memory!) current transaction. It truncates relation
1880  *              if some end-blocks are gone away.
1881  */
1882 static void
1883 repair_frag(VRelStats *vacrelstats, Relation onerel,
1884                         VacPageList vacuum_pages, VacPageList fraged_pages,
1885                         int nindexes, Relation *Irel)
1886 {
1887         TransactionId myXID = GetCurrentTransactionId();
1888         Buffer          dst_buffer = InvalidBuffer;
1889         BlockNumber nblocks,
1890                                 blkno;
1891         BlockNumber last_move_dest_block = 0,
1892                                 last_vacuum_block;
1893         Page            dst_page = NULL;
1894         ExecContextData ec;
1895         VacPageListData Nvacpagelist;
1896         VacPage         dst_vacpage = NULL,
1897                                 last_vacuum_page,
1898                                 vacpage,
1899                            *curpage;
1900         int                     i;
1901         int                     num_moved = 0,
1902                                 num_fraged_pages,
1903                                 vacuumed_pages;
1904         int                     keep_tuples = 0;
1905         int                     keep_indexed_tuples = 0;
1906         PGRUsage        ru0;
1907
1908         pg_rusage_init(&ru0);
1909
1910         ExecContext_Init(&ec, onerel);
1911
1912         Nvacpagelist.num_pages = 0;
1913         num_fraged_pages = fraged_pages->num_pages;
1914         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1915         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1916         if (vacuumed_pages > 0)
1917         {
1918                 /* get last reaped page from vacuum_pages */
1919                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1920                 last_vacuum_block = last_vacuum_page->blkno;
1921         }
1922         else
1923         {
1924                 last_vacuum_page = NULL;
1925                 last_vacuum_block = InvalidBlockNumber;
1926         }
1927
1928         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1929         vacpage->offsets_used = vacpage->offsets_free = 0;
1930
1931         /*
1932          * Scan pages backwards from the last nonempty page, trying to move tuples
1933          * down to lower pages.  Quit when we reach a page that we have moved any
1934          * tuples onto, or the first page if we haven't moved anything, or when we
1935          * find a page we cannot completely empty (this last condition is handled
1936          * by "break" statements within the loop).
1937          *
1938          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1939          * in order by blkno.
1940          */
1941         nblocks = vacrelstats->rel_pages;
1942         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1943                  blkno > last_move_dest_block;
1944                  blkno--)
1945         {
1946                 Buffer          buf;
1947                 Page            page;
1948                 OffsetNumber offnum,
1949                                         maxoff;
1950                 bool            isempty,
1951                                         chain_tuple_moved;
1952
1953                 vacuum_delay_point();
1954
1955                 /*
1956                  * Forget fraged_pages pages at or after this one; they're no longer
1957                  * useful as move targets, since we only want to move down. Note that
1958                  * since we stop the outer loop at last_move_dest_block, pages removed
1959                  * here cannot have had anything moved onto them already.
1960                  *
1961                  * Also note that we don't change the stored fraged_pages list, only
1962                  * our local variable num_fraged_pages; so the forgotten pages are
1963                  * still available to be loaded into the free space map later.
1964                  */
1965                 while (num_fraged_pages > 0 &&
1966                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1967                 {
1968                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1969                         --num_fraged_pages;
1970                 }
1971
1972                 /*
1973                  * Process this page of relation.
1974                  */
1975                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1976                                                                  vac_strategy);
1977                 page = BufferGetPage(buf);
1978
1979                 vacpage->offsets_free = 0;
1980
1981                 isempty = PageIsEmpty(page);
1982
1983                 /* Is the page in the vacuum_pages list? */
1984                 if (blkno == last_vacuum_block)
1985                 {
1986                         if (last_vacuum_page->offsets_free > 0)
1987                         {
1988                                 /* there are dead tuples on this page - clean them */
1989                                 Assert(!isempty);
1990                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1991                                 vacuum_page(onerel, buf, last_vacuum_page);
1992                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1993                         }
1994                         else
1995                                 Assert(isempty);
1996                         --vacuumed_pages;
1997                         if (vacuumed_pages > 0)
1998                         {
1999                                 /* get prev reaped page from vacuum_pages */
2000                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
2001                                 last_vacuum_block = last_vacuum_page->blkno;
2002                         }
2003                         else
2004                         {
2005                                 last_vacuum_page = NULL;
2006                                 last_vacuum_block = InvalidBlockNumber;
2007                         }
2008                         if (isempty)
2009                         {
2010                                 ReleaseBuffer(buf);
2011                                 continue;
2012                         }
2013                 }
2014                 else
2015                         Assert(!isempty);
2016
2017                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
2018                                                                                  * this page, yet */
2019                 vacpage->blkno = blkno;
2020                 maxoff = PageGetMaxOffsetNumber(page);
2021                 for (offnum = FirstOffsetNumber;
2022                          offnum <= maxoff;
2023                          offnum = OffsetNumberNext(offnum))
2024                 {
2025                         Size            tuple_len;
2026                         HeapTupleData tuple;
2027                         ItemId          itemid = PageGetItemId(page, offnum);
2028
2029                         if (!ItemIdIsUsed(itemid))
2030                                 continue;
2031
2032                         if (ItemIdIsDead(itemid))
2033                         {
2034                                 /* just remember it for vacuum_page() */
2035                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2036                                 continue;
2037                         }
2038
2039                         /* Shouldn't have any redirected items now */
2040                         Assert(ItemIdIsNormal(itemid));
2041
2042                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2043                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
2044                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
2045
2046                         /* ---
2047                          * VACUUM FULL has an exclusive lock on the relation.  So
2048                          * normally no other transaction can have pending INSERTs or
2049                          * DELETEs in this relation.  A tuple is either:
2050                          *              (a) live (XMIN_COMMITTED)
2051                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
2052                          *                      is visible to all active transactions)
2053                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
2054                          *                      but at least one active transaction does not see the
2055                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
2056                          *              (d) moved by the currently running VACUUM
2057                          *              (e) inserted or deleted by a not yet committed transaction,
2058                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
2059                          * In case (e) we wouldn't be in repair_frag() at all, because
2060                          * scan_heap() detects those cases and shuts off shrinking.
2061                          * We can't see case (b) here either, because such tuples were
2062                          * already removed by vacuum_page().  Cases (a) and (c) are
2063                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
2064                          * possible if a whole tuple chain has been moved while
2065                          * processing this or a higher numbered block.
2066                          * ---
2067                          */
2068                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2069                         {
2070                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2071                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2072                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2073                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2074
2075                                 /*
2076                                  * MOVED_OFF by another VACUUM would have caused the
2077                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2078                                  */
2079                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2080                                         elog(ERROR, "invalid XVAC in tuple header");
2081
2082                                 /*
2083                                  * If this (chain) tuple is moved by me already then I have to
2084                                  * check is it in vacpage or not - i.e. is it moved while
2085                                  * cleaning this page or some previous one.
2086                                  */
2087
2088                                 /* Can't we Assert(keep_tuples > 0) here? */
2089                                 if (keep_tuples == 0)
2090                                         continue;
2091                                 if (chain_tuple_moved)
2092                                 {
2093                                         /* some chains were moved while cleaning this page */
2094                                         Assert(vacpage->offsets_free > 0);
2095                                         for (i = 0; i < vacpage->offsets_free; i++)
2096                                         {
2097                                                 if (vacpage->offsets[i] == offnum)
2098                                                         break;
2099                                         }
2100                                         if (i >= vacpage->offsets_free)         /* not found */
2101                                         {
2102                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2103
2104                                                 /*
2105                                                  * If this is not a heap-only tuple, there must be an
2106                                                  * index entry for this item which will be removed in
2107                                                  * the index cleanup. Decrement the
2108                                                  * keep_indexed_tuples count to remember this.
2109                                                  */
2110                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2111                                                         keep_indexed_tuples--;
2112                                                 keep_tuples--;
2113                                         }
2114                                 }
2115                                 else
2116                                 {
2117                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2118
2119                                         /*
2120                                          * If this is not a heap-only tuple, there must be an
2121                                          * index entry for this item which will be removed in the
2122                                          * index cleanup. Decrement the keep_indexed_tuples count
2123                                          * to remember this.
2124                                          */
2125                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2126                                                 keep_indexed_tuples--;
2127                                         keep_tuples--;
2128                                 }
2129                                 continue;
2130                         }
2131
2132                         /*
2133                          * If this tuple is in a chain of tuples created in updates by
2134                          * "recent" transactions then we have to move the whole chain of
2135                          * tuples to other places, so that we can write new t_ctid links
2136                          * that preserve the chain relationship.
2137                          *
2138                          * This test is complicated.  Read it as "if tuple is a recently
2139                          * created updated version, OR if it is an obsoleted version". (In
2140                          * the second half of the test, we needn't make any check on XMAX
2141                          * --- it must be recently obsoleted, else scan_heap would have
2142                          * deemed it removable.)
2143                          *
2144                          * NOTE: this test is not 100% accurate: it is possible for a
2145                          * tuple to be an updated one with recent xmin, and yet not match
2146                          * any new_tid entry in the vtlinks list.  Presumably there was
2147                          * once a parent tuple with xmax matching the xmin, but it's
2148                          * possible that that tuple has been removed --- for example, if
2149                          * it had xmin = xmax and wasn't itself an updated version, then
2150                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2151                          * xmin xact completes.
2152                          *
2153                          * To be on the safe side, we abandon the repair_frag process if
2154                          * we cannot find the parent tuple in vtlinks.  This may be overly
2155                          * conservative; AFAICS it would be safe to move the chain.
2156                          *
2157                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2158                          * using OldestXmin, which is a rather coarse test, it is quite
2159                          * possible to have an update chain in which a tuple we think is
2160                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2161                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2162                          * it seems too complicated to try to make VACUUM remove it. We
2163                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2164                          * separately movable chain, ignoring any intervening DEAD ones.
2165                          */
2166                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2167                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2168                                                                                 OldestXmin)) ||
2169                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2170                                                                                            HEAP_IS_LOCKED)) &&
2171                                  !(ItemPointerEquals(&(tuple.t_self),
2172                                                                          &(tuple.t_data->t_ctid)))))
2173                         {
2174                                 Buffer          Cbuf = buf;
2175                                 bool            freeCbuf = false;
2176                                 bool            chain_move_failed = false;
2177                                 bool            moved_target = false;
2178                                 ItemPointerData Ctid;
2179                                 HeapTupleData tp = tuple;
2180                                 Size            tlen = tuple_len;
2181                                 VTupleMove      vtmove;
2182                                 int                     num_vtmove;
2183                                 int                     free_vtmove;
2184                                 VacPage         to_vacpage = NULL;
2185                                 int                     to_item = 0;
2186                                 int                     ti;
2187
2188                                 if (dst_buffer != InvalidBuffer)
2189                                 {
2190                                         ReleaseBuffer(dst_buffer);
2191                                         dst_buffer = InvalidBuffer;
2192                                 }
2193
2194                                 /* Quick exit if we have no vtlinks to search in */
2195                                 if (vacrelstats->vtlinks == NULL)
2196                                 {
2197                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2198                                         break;          /* out of walk-along-page loop */
2199                                 }
2200
2201                                 /*
2202                                  * If this tuple is in the begin/middle of the chain then we
2203                                  * have to move to the end of chain.  As with any t_ctid
2204                                  * chase, we have to verify that each new tuple is really the
2205                                  * descendant of the tuple we came from; however, here we need
2206                                  * even more than the normal amount of paranoia. If t_ctid
2207                                  * links forward to a tuple determined to be DEAD, then
2208                                  * depending on where that tuple is, it might already have
2209                                  * been removed, and perhaps even replaced by a MOVED_IN
2210                                  * tuple.  We don't want to include any DEAD tuples in the
2211                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2212                                  */
2213                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2214                                                                                                   HEAP_IS_LOCKED)) &&
2215                                            !(ItemPointerEquals(&(tp.t_self),
2216                                                                                    &(tp.t_data->t_ctid))))
2217                                 {
2218                                         ItemPointerData nextTid;
2219                                         TransactionId priorXmax;
2220                                         Buffer          nextBuf;
2221                                         Page            nextPage;
2222                                         OffsetNumber nextOffnum;
2223                                         ItemId          nextItemid;
2224                                         HeapTupleHeader nextTdata;
2225                                         HTSV_Result nextTstatus;
2226
2227                                         nextTid = tp.t_data->t_ctid;
2228                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2229                                         /* assume block# is OK (see heap_fetch comments) */
2230                                         nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2231                                                                                  ItemPointerGetBlockNumber(&nextTid),
2232                                                                                                  RBM_NORMAL, vac_strategy);
2233                                         nextPage = BufferGetPage(nextBuf);
2234                                         /* If bogus or unused slot, assume tp is end of chain */
2235                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2236                                         if (nextOffnum < FirstOffsetNumber ||
2237                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2238                                         {
2239                                                 ReleaseBuffer(nextBuf);
2240                                                 break;
2241                                         }
2242                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2243                                         if (!ItemIdIsNormal(nextItemid))
2244                                         {
2245                                                 ReleaseBuffer(nextBuf);
2246                                                 break;
2247                                         }
2248                                         /* if not matching XMIN, assume tp is end of chain */
2249                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2250                                                                                                                           nextItemid);
2251                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2252                                                                                          priorXmax))
2253                                         {
2254                                                 ReleaseBuffer(nextBuf);
2255                                                 break;
2256                                         }
2257
2258                                         /*
2259                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2260                                          * potentially update hint bits, so we'd better hold the
2261                                          * buffer content lock.
2262                                          */
2263                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2264                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2265                                                                                                                    OldestXmin,
2266                                                                                                                    nextBuf);
2267                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2268                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2269                                         {
2270                                                 UnlockReleaseBuffer(nextBuf);
2271                                                 break;
2272                                         }
2273                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2274                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2275                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2276                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2277                                         /* OK, switch our attention to the next tuple in chain */
2278                                         tp.t_data = nextTdata;
2279                                         tp.t_self = nextTid;
2280                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2281                                         if (freeCbuf)
2282                                                 ReleaseBuffer(Cbuf);
2283                                         Cbuf = nextBuf;
2284                                         freeCbuf = true;
2285                                 }
2286
2287                                 /* Set up workspace for planning the chain move */
2288                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2289                                 num_vtmove = 0;
2290                                 free_vtmove = 100;
2291
2292                                 /*
2293                                  * Now, walk backwards up the chain (towards older tuples) and
2294                                  * check if all items in chain can be moved.  We record all
2295                                  * the moves that need to be made in the vtmove array.
2296                                  */
2297                                 for (;;)
2298                                 {
2299                                         Buffer          Pbuf;
2300                                         Page            Ppage;
2301                                         ItemId          Pitemid;
2302                                         HeapTupleHeader PTdata;
2303                                         VTupleLinkData vtld,
2304                                                            *vtlp;
2305
2306                                         /* Identify a target page to move this tuple to */
2307                                         if (to_vacpage == NULL ||
2308                                                 !enough_space(to_vacpage, tlen))
2309                                         {
2310                                                 for (i = 0; i < num_fraged_pages; i++)
2311                                                 {
2312                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2313                                                                 break;
2314                                                 }
2315
2316                                                 if (i == num_fraged_pages)
2317                                                 {
2318                                                         /* can't move item anywhere */
2319                                                         chain_move_failed = true;
2320                                                         break;          /* out of check-all-items loop */
2321                                                 }
2322                                                 to_item = i;
2323                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2324                                         }
2325                                         to_vacpage->free -= MAXALIGN(tlen);
2326                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2327                                                 to_vacpage->free -= sizeof(ItemIdData);
2328                                         (to_vacpage->offsets_used)++;
2329
2330                                         /* Add an entry to vtmove list */
2331                                         if (free_vtmove == 0)
2332                                         {
2333                                                 free_vtmove = 1000;
2334                                                 vtmove = (VTupleMove)
2335                                                         repalloc(vtmove,
2336                                                                          (free_vtmove + num_vtmove) *
2337                                                                          sizeof(VTupleMoveData));
2338                                         }
2339                                         vtmove[num_vtmove].tid = tp.t_self;
2340                                         vtmove[num_vtmove].vacpage = to_vacpage;
2341                                         if (to_vacpage->offsets_used == 1)
2342                                                 vtmove[num_vtmove].cleanVpd = true;
2343                                         else
2344                                                 vtmove[num_vtmove].cleanVpd = false;
2345                                         free_vtmove--;
2346                                         num_vtmove++;
2347
2348                                         /* Remember if we reached the original target tuple */
2349                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2350                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2351                                                 moved_target = true;
2352
2353                                         /* Done if at beginning of chain */
2354                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2355                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2356                                                                                    OldestXmin))
2357                                                 break;  /* out of check-all-items loop */
2358
2359                                         /* Move to tuple with prior row version */
2360                                         vtld.new_tid = tp.t_self;
2361                                         vtlp = (VTupleLink)
2362                                                 vac_bsearch((void *) &vtld,
2363                                                                         (void *) (vacrelstats->vtlinks),
2364                                                                         vacrelstats->num_vtlinks,
2365                                                                         sizeof(VTupleLinkData),
2366                                                                         vac_cmp_vtlinks);
2367                                         if (vtlp == NULL)
2368                                         {
2369                                                 /* see discussion above */
2370                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2371                                                 chain_move_failed = true;
2372                                                 break;  /* out of check-all-items loop */
2373                                         }
2374                                         tp.t_self = vtlp->this_tid;
2375                                         Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2376                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2377                                                                                           RBM_NORMAL, vac_strategy);
2378                                         Ppage = BufferGetPage(Pbuf);
2379                                         Pitemid = PageGetItemId(Ppage,
2380                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2381                                         /* this can't happen since we saw tuple earlier: */
2382                                         if (!ItemIdIsNormal(Pitemid))
2383                                                 elog(ERROR, "parent itemid marked as unused");
2384                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2385
2386                                         /* ctid should not have changed since we saved it */
2387                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2388                                                                                          &(PTdata->t_ctid)));
2389
2390                                         /*
2391                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2392                                          * (child item is removed)... Due to the fact that at the
2393                                          * moment we don't remove unuseful part of update-chain,
2394                                          * it's possible to get non-matching parent row here. Like
2395                                          * as in the case which caused this problem, we stop
2396                                          * shrinking here. I could try to find real parent row but
2397                                          * want not to do it because of real solution will be
2398                                          * implemented anyway, later, and we are too close to 6.5
2399                                          * release. - vadim 06/11/99
2400                                          */
2401                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2402                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2403                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2404                                         {
2405                                                 ReleaseBuffer(Pbuf);
2406                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2407                                                 chain_move_failed = true;
2408                                                 break;  /* out of check-all-items loop */
2409                                         }
2410                                         tp.t_data = PTdata;
2411                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2412                                         if (freeCbuf)
2413                                                 ReleaseBuffer(Cbuf);
2414                                         Cbuf = Pbuf;
2415                                         freeCbuf = true;
2416                                 }                               /* end of check-all-items loop */
2417
2418                                 if (freeCbuf)
2419                                         ReleaseBuffer(Cbuf);
2420                                 freeCbuf = false;
2421
2422                                 /* Double-check that we will move the current target tuple */
2423                                 if (!moved_target && !chain_move_failed)
2424                                 {
2425                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2426                                         chain_move_failed = true;
2427                                 }
2428
2429                                 if (chain_move_failed)
2430                                 {
2431                                         /*
2432                                          * Undo changes to offsets_used state.  We don't bother
2433                                          * cleaning up the amount-free state, since we're not
2434                                          * going to do any further tuple motion.
2435                                          */
2436                                         for (i = 0; i < num_vtmove; i++)
2437                                         {
2438                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2439                                                 (vtmove[i].vacpage->offsets_used)--;
2440                                         }
2441                                         pfree(vtmove);
2442                                         break;          /* out of walk-along-page loop */
2443                                 }
2444
2445                                 /*
2446                                  * Okay, move the whole tuple chain in reverse order.
2447                                  *
2448                                  * Ctid tracks the new location of the previously-moved tuple.
2449                                  */
2450                                 ItemPointerSetInvalid(&Ctid);
2451                                 for (ti = 0; ti < num_vtmove; ti++)
2452                                 {
2453                                         VacPage         destvacpage = vtmove[ti].vacpage;
2454                                         Page            Cpage;
2455                                         ItemId          Citemid;
2456
2457                                         /* Get page to move from */
2458                                         tuple.t_self = vtmove[ti].tid;
2459                                         Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2460                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2461                                                                                           RBM_NORMAL, vac_strategy);
2462
2463                                         /* Get page to move to */
2464                                         dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2465                                                                                                         destvacpage->blkno,
2466                                                                                                         RBM_NORMAL, vac_strategy);
2467
2468                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2469                                         if (dst_buffer != Cbuf)
2470                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2471
2472                                         dst_page = BufferGetPage(dst_buffer);
2473                                         Cpage = BufferGetPage(Cbuf);
2474
2475                                         Citemid = PageGetItemId(Cpage,
2476                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2477                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2478                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2479
2480                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2481                                                                          dst_buffer, dst_page, destvacpage,
2482                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2483
2484                                         /*
2485                                          * If the tuple we are moving is a heap-only tuple, this
2486                                          * move will generate an additional index entry, so
2487                                          * increment the rel_indexed_tuples count.
2488                                          */
2489                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2490                                                 vacrelstats->rel_indexed_tuples++;
2491
2492                                         num_moved++;
2493                                         if (destvacpage->blkno > last_move_dest_block)
2494                                                 last_move_dest_block = destvacpage->blkno;
2495
2496                                         /*
2497                                          * Remember that we moved tuple from the current page
2498                                          * (corresponding index tuple will be cleaned).
2499                                          */
2500                                         if (Cbuf == buf)
2501                                                 vacpage->offsets[vacpage->offsets_free++] =
2502                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2503                                         else
2504                                         {
2505                                                 /*
2506                                                  * When we move tuple chains, we may need to move
2507                                                  * tuples from a block that we haven't yet scanned in
2508                                                  * the outer walk-along-the-relation loop. Note that
2509                                                  * we can't be moving a tuple from a block that we
2510                                                  * have already scanned because if such a tuple
2511                                                  * exists, then we must have moved the chain along
2512                                                  * with that tuple when we scanned that block. IOW the
2513                                                  * test of (Cbuf != buf) guarantees that the tuple we
2514                                                  * are looking at right now is in a block which is yet
2515                                                  * to be scanned.
2516                                                  *
2517                                                  * We maintain two counters to correctly count the
2518                                                  * moved-off tuples from blocks that are not yet
2519                                                  * scanned (keep_tuples) and how many of them have
2520                                                  * index pointers (keep_indexed_tuples).  The main
2521                                                  * reason to track the latter is to help verify that
2522                                                  * indexes have the expected number of entries when
2523                                                  * all the dust settles.
2524                                                  */
2525                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2526                                                         keep_indexed_tuples++;
2527                                                 keep_tuples++;
2528                                         }
2529
2530                                         ReleaseBuffer(dst_buffer);
2531                                         ReleaseBuffer(Cbuf);
2532                                 }                               /* end of move-the-tuple-chain loop */
2533
2534                                 dst_buffer = InvalidBuffer;
2535                                 pfree(vtmove);
2536                                 chain_tuple_moved = true;
2537
2538                                 /* advance to next tuple in walk-along-page loop */
2539                                 continue;
2540                         }                                       /* end of is-tuple-in-chain test */
2541
2542                         /* try to find new page for this tuple */
2543                         if (dst_buffer == InvalidBuffer ||
2544                                 !enough_space(dst_vacpage, tuple_len))
2545                         {
2546                                 if (dst_buffer != InvalidBuffer)
2547                                 {
2548                                         ReleaseBuffer(dst_buffer);
2549                                         dst_buffer = InvalidBuffer;
2550                                 }
2551                                 for (i = 0; i < num_fraged_pages; i++)
2552                                 {
2553                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2554                                                 break;
2555                                 }
2556                                 if (i == num_fraged_pages)
2557                                         break;          /* can't move item anywhere */
2558                                 dst_vacpage = fraged_pages->pagedesc[i];
2559                                 dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2560                                                                                                 dst_vacpage->blkno,
2561                                                                                                 RBM_NORMAL, vac_strategy);
2562                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2563                                 dst_page = BufferGetPage(dst_buffer);
2564                                 /* if this page was not used before - clean it */
2565                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2566                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2567                         }
2568                         else
2569                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2570
2571                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2572
2573                         move_plain_tuple(onerel, buf, page, &tuple,
2574                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2575
2576                         /*
2577                          * If the tuple we are moving is a heap-only tuple, this move will
2578                          * generate an additional index entry, so increment the
2579                          * rel_indexed_tuples count.
2580                          */
2581                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2582                                 vacrelstats->rel_indexed_tuples++;
2583
2584                         num_moved++;
2585                         if (dst_vacpage->blkno > last_move_dest_block)
2586                                 last_move_dest_block = dst_vacpage->blkno;
2587
2588                         /*
2589                          * Remember that we moved tuple from the current page
2590                          * (corresponding index tuple will be cleaned).
2591                          */
2592                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2593                 }                                               /* walk along page */
2594
2595                 /*
2596                  * If we broke out of the walk-along-page loop early (ie, still have
2597                  * offnum <= maxoff), then we failed to move some tuple off this page.
2598                  * No point in shrinking any more, so clean up and exit the per-page
2599                  * loop.
2600                  */
2601                 if (offnum < maxoff && keep_tuples > 0)
2602                 {
2603                         OffsetNumber off;
2604
2605                         /*
2606                          * Fix vacpage state for any unvisited tuples remaining on page
2607                          */
2608                         for (off = OffsetNumberNext(offnum);
2609                                  off <= maxoff;
2610                                  off = OffsetNumberNext(off))
2611                         {
2612                                 ItemId          itemid = PageGetItemId(page, off);
2613                                 HeapTupleHeader htup;
2614
2615                                 if (!ItemIdIsUsed(itemid))
2616                                         continue;
2617                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2618                                 Assert(ItemIdIsNormal(itemid));
2619
2620                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2621                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2622                                         continue;
2623
2624                                 /*
2625                                  * See comments in the walk-along-page loop above about why
2626                                  * only MOVED_OFF tuples should be found here.
2627                                  */
2628                                 if (htup->t_infomask & HEAP_MOVED_IN)
2629                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2630                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2631                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2632                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2633                                         elog(ERROR, "invalid XVAC in tuple header");
2634
2635                                 if (chain_tuple_moved)
2636                                 {
2637                                         /* some chains were moved while cleaning this page */
2638                                         Assert(vacpage->offsets_free > 0);
2639                                         for (i = 0; i < vacpage->offsets_free; i++)
2640                                         {
2641                                                 if (vacpage->offsets[i] == off)
2642                                                         break;
2643                                         }
2644                                         if (i >= vacpage->offsets_free)         /* not found */
2645                                         {
2646                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2647                                                 Assert(keep_tuples > 0);
2648
2649                                                 /*
2650                                                  * If this is not a heap-only tuple, there must be an
2651                                                  * index entry for this item which will be removed in
2652                                                  * the index cleanup. Decrement the
2653                                                  * keep_indexed_tuples count to remember this.
2654                                                  */
2655                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2656                                                         keep_indexed_tuples--;
2657                                                 keep_tuples--;
2658                                         }
2659                                 }
2660                                 else
2661                                 {
2662                                         vacpage->offsets[vacpage->offsets_free++] = off;
2663                                         Assert(keep_tuples > 0);
2664                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2665                                                 keep_indexed_tuples--;
2666                                         keep_tuples--;
2667                                 }
2668                         }
2669                 }
2670
2671                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2672                 {
2673                         if (chain_tuple_moved)          /* else - they are ordered */
2674                         {
2675                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2676                                           sizeof(OffsetNumber), vac_cmp_offno);
2677                         }
2678                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2679                 }
2680
2681                 ReleaseBuffer(buf);
2682
2683                 if (offnum <= maxoff)
2684                         break;                          /* had to quit early, see above note */
2685
2686         }                                                       /* walk along relation */
2687
2688         blkno++;                                        /* new number of blocks */
2689
2690         if (dst_buffer != InvalidBuffer)
2691         {
2692                 Assert(num_moved > 0);
2693                 ReleaseBuffer(dst_buffer);
2694         }
2695
2696         if (num_moved > 0)
2697         {
2698                 /*
2699                  * We have to commit our tuple movings before we truncate the
2700                  * relation.  Ideally we should do Commit/StartTransactionCommand
2701                  * here, relying on the session-level table lock to protect our
2702                  * exclusive access to the relation.  However, that would require a
2703                  * lot of extra code to close and re-open the relation, indexes, etc.
2704                  * For now, a quick hack: record status of current transaction as
2705                  * committed, and continue.  We force the commit to be synchronous so
2706                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2707                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2708                  * continues to be shown as running in the ProcArray.
2709                  *
2710                  * XXX This desperately needs to be revisited.  Any failure after this
2711                  * point will result in a PANIC "cannot abort transaction nnn, it was
2712                  * already committed"!
2713                  */
2714                 ForceSyncCommit();
2715                 (void) RecordTransactionCommit();
2716         }
2717
2718         /*
2719          * We are not going to move any more tuples across pages, but we still
2720          * need to apply vacuum_page to compact free space in the remaining pages
2721          * in vacuum_pages list.  Note that some of these pages may also be in the
2722          * fraged_pages list, and may have had tuples moved onto them; if so, we
2723          * already did vacuum_page and needn't do it again.
2724          */
2725         for (i = 0, curpage = vacuum_pages->pagedesc;
2726                  i < vacuumed_pages;
2727                  i++, curpage++)
2728         {
2729                 vacuum_delay_point();
2730
2731                 Assert((*curpage)->blkno < blkno);
2732                 if ((*curpage)->offsets_used == 0)
2733                 {
2734                         Buffer          buf;
2735                         Page            page;
2736
2737                         /* this page was not used as a move target, so must clean it */
2738                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
2739                                                                          RBM_NORMAL, vac_strategy);
2740                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2741                         page = BufferGetPage(buf);
2742                         if (!PageIsEmpty(page))
2743                                 vacuum_page(onerel, buf, *curpage);
2744                         UnlockReleaseBuffer(buf);
2745                 }
2746         }
2747
2748         /*
2749          * Now scan all the pages that we moved tuples onto and update tuple
2750          * status bits.  This is not really necessary, but will save time for
2751          * future transactions examining these tuples.
2752          */
2753         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2754                                          last_move_dest_block, num_moved);
2755
2756         /*
2757          * It'd be cleaner to make this report at the bottom of this routine, but
2758          * then the rusage would double-count the second pass of index vacuuming.
2759          * So do it here and ignore the relatively small amount of processing that
2760          * occurs below.
2761          */
2762         ereport(elevel,
2763                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2764                                         RelationGetRelationName(onerel),
2765                                         num_moved, nblocks, blkno),
2766                          errdetail("%s.",
2767                                            pg_rusage_show(&ru0))));
2768
2769         /*
2770          * Reflect the motion of system tuples to catalog cache here.
2771          */
2772         CommandCounterIncrement();
2773
2774         if (Nvacpagelist.num_pages > 0)
2775         {
2776                 /* vacuum indexes again if needed */
2777                 if (Irel != NULL)
2778                 {
2779                         VacPage    *vpleft,
2780                                            *vpright,
2781                                                 vpsave;
2782
2783                         /* re-sort Nvacpagelist.pagedesc */
2784                         for (vpleft = Nvacpagelist.pagedesc,
2785                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2786                                  vpleft < vpright; vpleft++, vpright--)
2787                         {
2788                                 vpsave = *vpleft;
2789                                 *vpleft = *vpright;
2790                                 *vpright = vpsave;
2791                         }
2792
2793                         /*
2794                          * keep_tuples is the number of tuples that have been moved off a
2795                          * page during chain moves but not been scanned over subsequently.
2796                          * The tuple ids of these tuples are not recorded as free offsets
2797                          * for any VacPage, so they will not be cleared from the indexes.
2798                          * keep_indexed_tuples is the portion of these that are expected
2799                          * to have index entries.
2800                          */
2801                         Assert(keep_tuples >= 0);
2802                         for (i = 0; i < nindexes; i++)
2803                                 vacuum_index(&Nvacpagelist, Irel[i],
2804                                                          vacrelstats->rel_indexed_tuples,
2805                                                          keep_indexed_tuples);
2806                 }
2807
2808                 /*
2809                  * Clean moved-off tuples from last page in Nvacpagelist list.
2810                  *
2811                  * We need only do this in this one page, because higher-numbered
2812                  * pages are going to be truncated from the relation entirely. But see
2813                  * comments for update_hint_bits().
2814                  */
2815                 if (vacpage->blkno == (blkno - 1) &&
2816                         vacpage->offsets_free > 0)
2817                 {
2818                         Buffer          buf;
2819                         Page            page;
2820                         OffsetNumber unused[MaxOffsetNumber];
2821                         OffsetNumber offnum,
2822                                                 maxoff;
2823                         int                     uncnt = 0;
2824                         int                     num_tuples = 0;
2825
2826                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
2827                                                                          RBM_NORMAL, vac_strategy);
2828                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2829                         page = BufferGetPage(buf);
2830                         maxoff = PageGetMaxOffsetNumber(page);
2831                         for (offnum = FirstOffsetNumber;
2832                                  offnum <= maxoff;
2833                                  offnum = OffsetNumberNext(offnum))
2834                         {
2835                                 ItemId          itemid = PageGetItemId(page, offnum);
2836                                 HeapTupleHeader htup;
2837
2838                                 if (!ItemIdIsUsed(itemid))
2839                                         continue;
2840                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2841                                 Assert(ItemIdIsNormal(itemid));
2842
2843                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2844                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2845                                         continue;
2846
2847                                 /*
2848                                  * See comments in the walk-along-page loop above about why
2849                                  * only MOVED_OFF tuples should be found here.
2850                                  */
2851                                 if (htup->t_infomask & HEAP_MOVED_IN)
2852                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2853                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2854                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2855                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2856                                         elog(ERROR, "invalid XVAC in tuple header");
2857
2858                                 ItemIdSetUnused(itemid);
2859                                 num_tuples++;
2860
2861                                 unused[uncnt++] = offnum;
2862                         }
2863                         Assert(vacpage->offsets_free == num_tuples);
2864
2865                         START_CRIT_SECTION();
2866
2867                         PageRepairFragmentation(page);
2868
2869                         MarkBufferDirty(buf);
2870
2871                         /* XLOG stuff */
2872                         if (!onerel->rd_istemp)
2873                         {
2874                                 XLogRecPtr      recptr;
2875
2876                                 recptr = log_heap_clean(onerel, buf,
2877                                                                                 NULL, 0, NULL, 0,
2878                                                                                 unused, uncnt,
2879                                                                                 false);
2880                                 PageSetLSN(page, recptr);
2881                                 PageSetTLI(page, ThisTimeLineID);
2882                         }
2883
2884                         END_CRIT_SECTION();
2885
2886                         UnlockReleaseBuffer(buf);
2887                 }
2888
2889                 /* now - free new list of reaped pages */
2890                 curpage = Nvacpagelist.pagedesc;
2891                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2892                         pfree(*curpage);
2893                 pfree(Nvacpagelist.pagedesc);
2894         }
2895
2896         /* Truncate relation, if needed */
2897         if (blkno < nblocks)
2898         {
2899                 RelationTruncate(onerel, blkno);
2900
2901                 /* force relcache inval so all backends reset their rd_targblock */
2902                 CacheInvalidateRelcache(onerel);
2903
2904                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2905         }
2906
2907         /* clean up */
2908         pfree(vacpage);
2909         if (vacrelstats->vtlinks != NULL)
2910                 pfree(vacrelstats->vtlinks);
2911
2912         ExecContext_Finish(&ec);
2913 }
2914
2915 /*
2916  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2917  *
2918  *              This routine moves old_tup from old_page to dst_page.
2919  *              old_page and dst_page might be the same page.
2920  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2921  *              the single lock, if this is a intra-page-move) are released before
2922  *              exit.
2923  *
2924  *              Yes, a routine with ten parameters is ugly, but it's still better
2925  *              than having these 120 lines of code in repair_frag() which is
2926  *              already too long and almost unreadable.
2927  */
2928 static void
2929 move_chain_tuple(Relation rel,
2930                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2931                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2932                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2933 {
2934         TransactionId myXID = GetCurrentTransactionId();
2935         HeapTupleData newtup;
2936         OffsetNumber newoff;
2937         ItemId          newitemid;
2938         Size            tuple_len = old_tup->t_len;
2939         bool            all_visible_cleared = false;
2940         bool            all_visible_cleared_new = false;
2941
2942         /*
2943          * make a modifiable copy of the source tuple.
2944          */
2945         heap_copytuple_with_tuple(old_tup, &newtup);
2946
2947         /*
2948          * register invalidation of source tuple in catcaches.
2949          */
2950         CacheInvalidateHeapTuple(rel, old_tup);
2951
2952         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2953         START_CRIT_SECTION();
2954
2955         /*
2956          * mark the source tuple MOVED_OFF.
2957          */
2958         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2959                                                                          HEAP_XMIN_INVALID |
2960                                                                          HEAP_MOVED_IN);
2961         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2962         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2963
2964         /*
2965          * If this page was not used before - clean it.
2966          *
2967          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2968          * destination pages to be the same (since this tuple-chain member can be
2969          * on a page lower than the one we're currently processing in the outer
2970          * loop).  If that's true, then after vacuum_page() the source tuple will
2971          * have been moved, and tuple.t_data will be pointing at garbage.
2972          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2973          * step!!
2974          *
2975          * This path is different from the other callers of vacuum_page, because
2976          * we have already incremented the vacpage's offsets_used field to account
2977          * for the tuple(s) we expect to move onto the page. Therefore
2978          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2979          * good debugging check for all other callers, we work around it here
2980          * rather than remove it.
2981          */
2982         if (!PageIsEmpty(dst_page) && cleanVpd)
2983         {
2984                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2985
2986                 dst_vacpage->offsets_used = 0;
2987                 vacuum_page(rel, dst_buf, dst_vacpage);
2988                 dst_vacpage->offsets_used = sv_offsets_used;
2989         }
2990
2991         /*
2992          * Update the state of the copied tuple, and store it on the destination
2993          * page.  The copied tuple is never part of a HOT chain.
2994          */
2995         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2996                                                                    HEAP_XMIN_INVALID |
2997                                                                    HEAP_MOVED_OFF);
2998         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2999         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3000         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3001         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3002         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3003                                                  InvalidOffsetNumber, false, true);
3004         if (newoff == InvalidOffsetNumber)
3005                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
3006                          (unsigned long) tuple_len, dst_vacpage->blkno);
3007         newitemid = PageGetItemId(dst_page, newoff);
3008         /* drop temporary copy, and point to the version on the dest page */
3009         pfree(newtup.t_data);
3010         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3011
3012         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
3013
3014         /*
3015          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
3016          * to next tuple in chain otherwise.  (Since we move the chain in reverse
3017          * order, this is actually the previously processed tuple.)
3018          */
3019         if (!ItemPointerIsValid(ctid))
3020                 newtup.t_data->t_ctid = newtup.t_self;
3021         else
3022                 newtup.t_data->t_ctid = *ctid;
3023         *ctid = newtup.t_self;
3024
3025         /* clear PD_ALL_VISIBLE flags */
3026         if (PageIsAllVisible(old_page))
3027         {
3028                 all_visible_cleared = true;
3029                 PageClearAllVisible(old_page);
3030         }
3031         if (dst_buf != old_buf && PageIsAllVisible(dst_page))
3032         {
3033                 all_visible_cleared_new = true;
3034                 PageClearAllVisible(dst_page);
3035         }
3036
3037         MarkBufferDirty(dst_buf);
3038         if (dst_buf != old_buf)
3039                 MarkBufferDirty(old_buf);
3040
3041         /* XLOG stuff */
3042         if (!rel->rd_istemp)
3043         {
3044                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3045                                                                                    dst_buf, &newtup,
3046                                                                                    all_visible_cleared,
3047                                                                                    all_visible_cleared_new);
3048
3049                 if (old_buf != dst_buf)
3050                 {
3051                         PageSetLSN(old_page, recptr);
3052                         PageSetTLI(old_page, ThisTimeLineID);
3053                 }
3054                 PageSetLSN(dst_page, recptr);
3055                 PageSetTLI(dst_page, ThisTimeLineID);
3056         }
3057
3058         END_CRIT_SECTION();
3059
3060         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3061         if (dst_buf != old_buf)
3062                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3063
3064         /* Clear bits in visibility map */
3065         if (all_visible_cleared)
3066                 visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3067         if (all_visible_cleared_new)
3068                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3069
3070         /* Create index entries for the moved tuple */
3071         if (ec->resultRelInfo->ri_NumIndices > 0)
3072         {
3073                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3074                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3075                 ResetPerTupleExprContext(ec->estate);
3076         }
3077 }
3078
3079 /*
3080  *      move_plain_tuple() -- move one tuple that is not part of a chain
3081  *
3082  *              This routine moves old_tup from old_page to dst_page.
3083  *              On entry old_buf and dst_buf are locked exclusively, both locks are
3084  *              released before exit.
3085  *
3086  *              Yes, a routine with eight parameters is ugly, but it's still better
3087  *              than having these 90 lines of code in repair_frag() which is already
3088  *              too long and almost unreadable.
3089  */
3090 static void
3091 move_plain_tuple(Relation rel,
3092                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3093                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3094                                  ExecContext ec)
3095 {
3096         TransactionId myXID = GetCurrentTransactionId();
3097         HeapTupleData newtup;
3098         OffsetNumber newoff;
3099         ItemId          newitemid;
3100         Size            tuple_len = old_tup->t_len;
3101         bool            all_visible_cleared = false;
3102         bool            all_visible_cleared_new = false;
3103
3104         /* copy tuple */
3105         heap_copytuple_with_tuple(old_tup, &newtup);
3106
3107         /*
3108          * register invalidation of source tuple in catcaches.
3109          *
3110          * (Note: we do not need to register the copied tuple, because we are not
3111          * changing the tuple contents and so there cannot be any need to flush
3112          * negative catcache entries.)
3113          */
3114         CacheInvalidateHeapTuple(rel, old_tup);
3115
3116         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3117         START_CRIT_SECTION();
3118
3119         /*
3120          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3121          */
3122         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3123                                                                    HEAP_XMIN_INVALID |
3124                                                                    HEAP_MOVED_OFF);
3125         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3126         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3127         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3128         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3129
3130         /* add tuple to the page */
3131         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3132                                                  InvalidOffsetNumber, false, true);
3133         if (newoff == InvalidOffsetNumber)
3134                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3135                          (unsigned long) tuple_len,
3136                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3137                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3138         newitemid = PageGetItemId(dst_page, newoff);
3139         pfree(newtup.t_data);
3140         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3141         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3142         newtup.t_self = newtup.t_data->t_ctid;
3143
3144         /*
3145          * Mark old tuple as MOVED_OFF by me.
3146          */
3147         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3148                                                                          HEAP_XMIN_INVALID |
3149                                                                          HEAP_MOVED_IN);
3150         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3151         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3152
3153         /* clear PD_ALL_VISIBLE flags */
3154         if (PageIsAllVisible(old_page))
3155         {
3156                 all_visible_cleared = true;
3157                 PageClearAllVisible(old_page);
3158         }
3159         if (PageIsAllVisible(dst_page))
3160         {
3161                 all_visible_cleared_new = true;
3162                 PageClearAllVisible(dst_page);
3163         }
3164
3165         MarkBufferDirty(dst_buf);
3166         MarkBufferDirty(old_buf);
3167
3168         /* XLOG stuff */
3169         if (!rel->rd_istemp)
3170         {
3171                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3172                                                                                    dst_buf, &newtup,
3173                                                                                    all_visible_cleared,
3174                                                                                    all_visible_cleared_new);
3175
3176                 PageSetLSN(old_page, recptr);
3177                 PageSetTLI(old_page, ThisTimeLineID);
3178                 PageSetLSN(dst_page, recptr);
3179                 PageSetTLI(dst_page, ThisTimeLineID);
3180         }
3181
3182         END_CRIT_SECTION();
3183
3184         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3185         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3186         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3187
3188         dst_vacpage->offsets_used++;
3189
3190         /* Clear bits in visibility map */
3191         if (all_visible_cleared)
3192                 visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3193         if (all_visible_cleared_new)
3194                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3195
3196         /* insert index' tuples if needed */
3197         if (ec->resultRelInfo->ri_NumIndices > 0)
3198         {
3199                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3200                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3201                 ResetPerTupleExprContext(ec->estate);
3202         }
3203 }
3204
3205 /*
3206  *      update_hint_bits() -- update hint bits in destination pages
3207  *
3208  * Scan all the pages that we moved tuples onto and update tuple status bits.
3209  * This is not really necessary, but it will save time for future transactions
3210  * examining these tuples.
3211  *
3212  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3213  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3214  *
3215  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3216  * pages that were move source pages but not move dest pages.  The bulk
3217  * of the move source pages will be physically truncated from the relation,
3218  * and the last page remaining in the rel will be fixed separately in
3219  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3220  * hint bits updated are tuples that are moved as part of a chain and were
3221  * on pages that were not either move destinations nor at the end of the rel.
3222  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3223  * to remember and revisit those pages too.
3224  *
3225  * One wonders whether it wouldn't be better to skip this work entirely,
3226  * and let the tuple status updates happen someplace that's not holding an
3227  * exclusive lock on the relation.
3228  */
3229 static void
3230 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3231                                  BlockNumber last_move_dest_block, int num_moved)
3232 {
3233         TransactionId myXID = GetCurrentTransactionId();
3234         int                     checked_moved = 0;
3235         int                     i;
3236         VacPage    *curpage;
3237
3238         for (i = 0, curpage = fraged_pages->pagedesc;
3239                  i < num_fraged_pages;
3240                  i++, curpage++)
3241         {
3242                 Buffer          buf;
3243                 Page            page;
3244                 OffsetNumber max_offset;
3245                 OffsetNumber off;
3246                 int                     num_tuples = 0;
3247
3248                 vacuum_delay_point();
3249
3250                 if ((*curpage)->blkno > last_move_dest_block)
3251                         break;                          /* no need to scan any further */
3252                 if ((*curpage)->offsets_used == 0)
3253                         continue;                       /* this page was never used as a move dest */
3254                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
3255                                                                  RBM_NORMAL, vac_strategy);
3256                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3257                 page = BufferGetPage(buf);
3258                 max_offset = PageGetMaxOffsetNumber(page);
3259                 for (off = FirstOffsetNumber;
3260                          off <= max_offset;
3261                          off = OffsetNumberNext(off))
3262                 {
3263                         ItemId          itemid = PageGetItemId(page, off);
3264                         HeapTupleHeader htup;
3265
3266                         if (!ItemIdIsUsed(itemid))
3267                                 continue;
3268                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3269                         Assert(ItemIdIsNormal(itemid));
3270
3271                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3272                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3273                                 continue;
3274
3275                         /*
3276                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3277                          */
3278                         if (!(htup->t_infomask & HEAP_MOVED))
3279                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3280                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3281                                 elog(ERROR, "invalid XVAC in tuple header");
3282
3283                         if (htup->t_infomask & HEAP_MOVED_IN)
3284                         {
3285                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3286                                 htup->t_infomask &= ~HEAP_MOVED;
3287                                 num_tuples++;
3288                         }
3289                         else
3290                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3291                 }
3292                 MarkBufferDirty(buf);
3293                 UnlockReleaseBuffer(buf);
3294                 Assert((*curpage)->offsets_used == num_tuples);
3295                 checked_moved += num_tuples;
3296         }
3297         Assert(num_moved == checked_moved);
3298 }
3299
3300 /*
3301  *      vacuum_heap() -- free dead tuples
3302  *
3303  *              This routine marks dead tuples as unused and truncates relation
3304  *              if there are "empty" end-blocks.
3305  */
3306 static void
3307 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3308 {
3309         Buffer          buf;
3310         VacPage    *vacpage;
3311         BlockNumber relblocks;
3312         int                     nblocks;
3313         int                     i;
3314
3315         nblocks = vacuum_pages->num_pages;
3316         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3317
3318         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3319         {
3320                 vacuum_delay_point();
3321
3322                 if ((*vacpage)->offsets_free > 0)
3323                 {
3324                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
3325                                                                          RBM_NORMAL, vac_strategy);
3326                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3327                         vacuum_page(onerel, buf, *vacpage);
3328                         UnlockReleaseBuffer(buf);
3329                 }
3330         }
3331
3332         /* Truncate relation if there are some empty end-pages */
3333         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3334         if (vacuum_pages->empty_end_pages > 0)
3335         {
3336                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3337                 ereport(elevel,
3338                                 (errmsg("\"%s\": truncated %u to %u pages",
3339                                                 RelationGetRelationName(onerel),
3340                                                 vacrelstats->rel_pages, relblocks)));
3341                 RelationTruncate(onerel, relblocks);
3342
3343                 /* force relcache inval so all backends reset their rd_targblock */
3344                 CacheInvalidateRelcache(onerel);
3345
3346                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3347         }
3348 }
3349
3350 /*
3351  *      vacuum_page() -- free dead tuples on a page
3352  *                                       and repair its fragmentation.
3353  *
3354  * Caller must hold pin and lock on buffer.
3355  */
3356 static void
3357 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3358 {
3359         Page            page = BufferGetPage(buffer);
3360         int                     i;
3361
3362         /* There shouldn't be any tuples moved onto the page yet! */
3363         Assert(vacpage->offsets_used == 0);
3364
3365         START_CRIT_SECTION();
3366
3367         for (i = 0; i < vacpage->offsets_free; i++)
3368         {
3369                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3370
3371                 ItemIdSetUnused(itemid);
3372         }
3373
3374         PageRepairFragmentation(page);
3375
3376         MarkBufferDirty(buffer);
3377
3378         /* XLOG stuff */
3379         if (!onerel->rd_istemp)
3380         {
3381                 XLogRecPtr      recptr;
3382
3383                 recptr = log_heap_clean(onerel, buffer,
3384                                                                 NULL, 0, NULL, 0,
3385                                                                 vacpage->offsets, vacpage->offsets_free,
3386                                                                 false);
3387                 PageSetLSN(page, recptr);
3388                 PageSetTLI(page, ThisTimeLineID);
3389         }
3390
3391         END_CRIT_SECTION();
3392 }
3393
3394 /*
3395  *      scan_index() -- scan one index relation to update pg_class statistics.
3396  *
3397  * We use this when we have no deletions to do.
3398  */
3399 static void
3400 scan_index(Relation indrel, double num_tuples)
3401 {
3402         IndexBulkDeleteResult *stats;
3403         IndexVacuumInfo ivinfo;
3404         PGRUsage        ru0;
3405
3406         pg_rusage_init(&ru0);
3407
3408         ivinfo.index = indrel;
3409         ivinfo.vacuum_full = true;
3410         ivinfo.analyze_only = false;
3411         ivinfo.estimated_count = false;
3412         ivinfo.message_level = elevel;
3413         ivinfo.num_heap_tuples = num_tuples;
3414         ivinfo.strategy = vac_strategy;
3415
3416         stats = index_vacuum_cleanup(&ivinfo, NULL);
3417
3418         if (!stats)
3419                 return;
3420
3421         /*
3422          * Now update statistics in pg_class, but only if the index says the count
3423          * is accurate.
3424          */
3425         if (!stats->estimated_count)
3426                 vac_update_relstats(indrel,
3427                                                         stats->num_pages, stats->num_index_tuples,
3428                                                         false, InvalidTransactionId);
3429
3430         ereport(elevel,
3431                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3432                                         RelationGetRelationName(indrel),
3433                                         stats->num_index_tuples,
3434                                         stats->num_pages),
3435         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3436                           "%s.",
3437                           stats->pages_deleted, stats->pages_free,
3438                           pg_rusage_show(&ru0))));
3439
3440         /*
3441          * Check for tuple count mismatch.      If the index is partial, then it's OK
3442          * for it to have fewer tuples than the heap; else we got trouble.
3443          */
3444         if (!stats->estimated_count &&
3445                 stats->num_index_tuples != num_tuples)
3446         {
3447                 if (stats->num_index_tuples > num_tuples ||
3448                         !vac_is_partial_index(indrel))
3449                         ereport(WARNING,
3450                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3451                                                         RelationGetRelationName(indrel),
3452                                                         stats->num_index_tuples, num_tuples),
3453                                          errhint("Rebuild the index with REINDEX.")));
3454         }
3455
3456         pfree(stats);
3457 }
3458
3459 /*
3460  *      vacuum_index() -- vacuum one index relation.
3461  *
3462  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3463  *              It's locked. Indrel is an index relation on the vacuumed heap.
3464  *
3465  *              We don't bother to set locks on the index relation here, since
3466  *              the parent table is exclusive-locked already.
3467  *
3468  *              Finally, we arrange to update the index relation's statistics in
3469  *              pg_class.
3470  */
3471 static void
3472 vacuum_index(VacPageList vacpagelist, Relation indrel,
3473                          double num_tuples, int keep_tuples)
3474 {
3475         IndexBulkDeleteResult *stats;
3476         IndexVacuumInfo ivinfo;
3477         PGRUsage        ru0;
3478
3479         pg_rusage_init(&ru0);
3480
3481         ivinfo.index = indrel;
3482         ivinfo.vacuum_full = true;
3483         ivinfo.analyze_only = false;
3484         ivinfo.estimated_count = false;
3485         ivinfo.message_level = elevel;
3486         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3487         ivinfo.strategy = vac_strategy;
3488
3489         /* Do bulk deletion */
3490         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3491
3492         /* Do post-VACUUM cleanup */
3493         stats = index_vacuum_cleanup(&ivinfo, stats);
3494
3495         if (!stats)
3496                 return;
3497
3498         /*
3499          * Now update statistics in pg_class, but only if the index says the count
3500          * is accurate.
3501          */
3502         if (!stats->estimated_count)
3503                 vac_update_relstats(indrel,
3504                                                         stats->num_pages, stats->num_index_tuples,
3505                                                         false, InvalidTransactionId);
3506
3507         ereport(elevel,
3508                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3509                                         RelationGetRelationName(indrel),
3510                                         stats->num_index_tuples,
3511                                         stats->num_pages),
3512                          errdetail("%.0f index row versions were removed.\n"
3513                          "%u index pages have been deleted, %u are currently reusable.\n"
3514                                            "%s.",
3515                                            stats->tuples_removed,
3516                                            stats->pages_deleted, stats->pages_free,
3517                                            pg_rusage_show(&ru0))));
3518
3519         /*
3520          * Check for tuple count mismatch.      If the index is partial, then it's OK
3521          * for it to have fewer tuples than the heap; else we got trouble.
3522          */
3523         if (!stats->estimated_count &&
3524                 stats->num_index_tuples != num_tuples + keep_tuples)
3525         {
3526                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3527                         !vac_is_partial_index(indrel))
3528                         ereport(WARNING,
3529                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3530                                                         RelationGetRelationName(indrel),
3531                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3532                                          errhint("Rebuild the index with REINDEX.")));
3533         }
3534
3535         pfree(stats);
3536 }
3537
3538 /*
3539  *      tid_reaped() -- is a particular tid reaped?
3540  *
3541  *              This has the right signature to be an IndexBulkDeleteCallback.
3542  *
3543  *              vacpagelist->VacPage_array is sorted in right order.
3544  */
3545 static bool
3546 tid_reaped(ItemPointer itemptr, void *state)
3547 {
3548         VacPageList vacpagelist = (VacPageList) state;
3549         OffsetNumber ioffno;
3550         OffsetNumber *voff;
3551         VacPage         vp,
3552                            *vpp;
3553         VacPageData vacpage;
3554
3555         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3556         ioffno = ItemPointerGetOffsetNumber(itemptr);
3557
3558         vp = &vacpage;
3559         vpp = (VacPage *) vac_bsearch((void *) &vp,
3560                                                                   (void *) (vacpagelist->pagedesc),
3561                                                                   vacpagelist->num_pages,
3562                                                                   sizeof(VacPage),
3563                                                                   vac_cmp_blk);
3564
3565         if (vpp == NULL)
3566                 return false;
3567
3568         /* ok - we are on a partially or fully reaped page */
3569         vp = *vpp;
3570
3571         if (vp->offsets_free == 0)
3572         {
3573                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3574                 return true;
3575         }
3576
3577         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3578                                                                                 (void *) (vp->offsets),
3579                                                                                 vp->offsets_free,
3580                                                                                 sizeof(OffsetNumber),
3581                                                                                 vac_cmp_offno);
3582
3583         if (voff == NULL)
3584                 return false;
3585
3586         /* tid is reaped */
3587         return true;
3588 }
3589
3590 /*
3591  * Update the Free Space Map with the info we now have about free space in
3592  * the relation.
3593  */
3594 static void
3595 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3596                            BlockNumber rel_pages)
3597 {
3598         int                     nPages = fraged_pages->num_pages;
3599         VacPage    *pagedesc = fraged_pages->pagedesc;
3600         int                     i;
3601
3602         for (i = 0; i < nPages; i++)
3603         {
3604                 /*
3605                  * fraged_pages may contain entries for pages that we later decided to
3606                  * truncate from the relation; don't enter them into the free space
3607                  * map!
3608                  */
3609                 if (pagedesc[i]->blkno >= rel_pages)
3610                         break;
3611
3612                 RecordPageWithFreeSpace(onerel, pagedesc[i]->blkno, pagedesc[i]->free);
3613         }
3614
3615 }
3616
3617 /* Copy a VacPage structure */
3618 static VacPage
3619 copy_vac_page(VacPage vacpage)
3620 {
3621         VacPage         newvacpage;
3622
3623         /* allocate a VacPageData entry */
3624         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3625                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3626
3627         /* fill it in */
3628         if (vacpage->offsets_free > 0)
3629                 memcpy(newvacpage->offsets, vacpage->offsets,
3630                            vacpage->offsets_free * sizeof(OffsetNumber));
3631         newvacpage->blkno = vacpage->blkno;
3632         newvacpage->free = vacpage->free;
3633         newvacpage->offsets_used = vacpage->offsets_used;
3634         newvacpage->offsets_free = vacpage->offsets_free;
3635
3636         return newvacpage;
3637 }
3638
3639 /*
3640  * Add a VacPage pointer to a VacPageList.
3641  *
3642  *              As a side effect of the way that scan_heap works,
3643  *              higher pages come after lower pages in the array
3644  *              (and highest tid on a page is last).
3645  */
3646 static void
3647 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3648 {
3649 #define PG_NPAGEDESC 1024
3650
3651         /* allocate a VacPage entry if needed */
3652         if (vacpagelist->num_pages == 0)
3653         {
3654                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3655                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3656         }
3657         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3658         {
3659                 vacpagelist->num_allocated_pages *= 2;
3660                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3661         }
3662         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3663         (vacpagelist->num_pages)++;
3664 }
3665
3666 /*
3667  * vac_bsearch: just like standard C library routine bsearch(),
3668  * except that we first test to see whether the target key is outside
3669  * the range of the table entries.      This case is handled relatively slowly
3670  * by the normal binary search algorithm (ie, no faster than any other key)
3671  * but it occurs often enough in VACUUM to be worth optimizing.
3672  */
3673 static void *
3674 vac_bsearch(const void *key, const void *base,
3675                         size_t nelem, size_t size,
3676                         int (*compar) (const void *, const void *))
3677 {
3678         int                     res;
3679         const void *last;
3680
3681         if (nelem == 0)
3682                 return NULL;
3683         res = compar(key, base);
3684         if (res < 0)
3685                 return NULL;
3686         if (res == 0)
3687                 return (void *) base;
3688         if (nelem > 1)
3689         {
3690                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3691                 res = compar(key, last);
3692                 if (res > 0)
3693                         return NULL;
3694                 if (res == 0)
3695                         return (void *) last;
3696         }
3697         if (nelem <= 2)
3698                 return NULL;                    /* already checked 'em all */
3699         return bsearch(key, base, nelem, size, compar);
3700 }
3701
3702 /*
3703  * Comparator routines for use with qsort() and bsearch().
3704  */
3705 static int
3706 vac_cmp_blk(const void *left, const void *right)
3707 {
3708         BlockNumber lblk,
3709                                 rblk;
3710
3711         lblk = (*((VacPage *) left))->blkno;
3712         rblk = (*((VacPage *) right))->blkno;
3713
3714         if (lblk < rblk)
3715                 return -1;
3716         if (lblk == rblk)
3717                 return 0;
3718         return 1;
3719 }
3720
3721 static int
3722 vac_cmp_offno(const void *left, const void *right)
3723 {
3724         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3725                 return -1;
3726         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3727                 return 0;
3728         return 1;
3729 }
3730
3731 static int
3732 vac_cmp_vtlinks(const void *left, const void *right)
3733 {
3734         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3735                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3736                 return -1;
3737         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3738                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3739                 return 1;
3740         /* bi_hi-es are equal */
3741         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3742                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3743                 return -1;
3744         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3745                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3746                 return 1;
3747         /* bi_lo-es are equal */
3748         if (((VTupleLink) left)->new_tid.ip_posid <
3749                 ((VTupleLink) right)->new_tid.ip_posid)
3750                 return -1;
3751         if (((VTupleLink) left)->new_tid.ip_posid >
3752                 ((VTupleLink) right)->new_tid.ip_posid)
3753                 return 1;
3754         return 0;
3755 }
3756
3757
3758 /*
3759  * Open all the indexes of the given relation, obtaining the specified kind
3760  * of lock on each.  Return an array of Relation pointers for the indexes
3761  * into *Irel, and the number of indexes into *nindexes.
3762  */
3763 void
3764 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3765                                  int *nindexes, Relation **Irel)
3766 {
3767         List       *indexoidlist;
3768         ListCell   *indexoidscan;
3769         int                     i;
3770
3771         Assert(lockmode != NoLock);
3772
3773         indexoidlist = RelationGetIndexList(relation);
3774
3775         *nindexes = list_length(indexoidlist);
3776
3777         if (*nindexes > 0)
3778                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3779         else
3780                 *Irel = NULL;
3781
3782         i = 0;
3783         foreach(indexoidscan, indexoidlist)
3784         {
3785                 Oid                     indexoid = lfirst_oid(indexoidscan);
3786
3787                 (*Irel)[i++] = index_open(indexoid, lockmode);
3788         }
3789
3790         list_free(indexoidlist);
3791 }
3792
3793 /*
3794  * Release the resources acquired by vac_open_indexes.  Optionally release
3795  * the locks (say NoLock to keep 'em).
3796  */
3797 void
3798 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3799 {
3800         if (Irel == NULL)
3801                 return;
3802
3803         while (nindexes--)
3804         {
3805                 Relation        ind = Irel[nindexes];
3806
3807                 index_close(ind, lockmode);
3808         }
3809         pfree(Irel);
3810 }
3811
3812
3813 /*
3814  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3815  */
3816 bool
3817 vac_is_partial_index(Relation indrel)
3818 {
3819         /*
3820          * If the index's AM doesn't support nulls, it's partial for our purposes
3821          */
3822         if (!indrel->rd_am->amindexnulls)
3823                 return true;
3824
3825         /* Otherwise, look to see if there's a partial-index predicate */
3826         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3827                 return true;
3828
3829         return false;
3830 }
3831
3832
3833 static bool
3834 enough_space(VacPage vacpage, Size len)
3835 {
3836         len = MAXALIGN(len);
3837
3838         if (len > vacpage->free)
3839                 return false;
3840
3841         /* if there are free itemid(s) and len <= free_space... */
3842         if (vacpage->offsets_used < vacpage->offsets_free)
3843                 return true;
3844
3845         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3846         if (len + sizeof(ItemIdData) <= vacpage->free)
3847                 return true;
3848
3849         return false;
3850 }
3851
3852 static Size
3853 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3854 {
3855         /*
3856          * It is correct to use PageGetExactFreeSpace() here, *not*
3857          * PageGetHeapFreeSpace().      This is because (a) we do our own, exact
3858          * accounting for whether line pointers must be added, and (b) we will
3859          * recycle any LP_DEAD line pointers before starting to add rows to a
3860          * page, but that may not have happened yet at the time this function is
3861          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3862          * against too many line pointers on a page could fire incorrectly.  We do
3863          * not need that protection here: since VACUUM FULL always recycles all
3864          * dead line pointers first, it'd be physically impossible to insert more
3865          * than MaxHeapTuplesPerPage tuples anyway.
3866          */
3867         Size            freespace = PageGetExactFreeSpace(page);
3868         Size            targetfree;
3869
3870         targetfree = RelationGetTargetPageFreeSpace(relation,
3871                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3872         if (freespace > targetfree)
3873                 return freespace - targetfree;
3874         else
3875                 return 0;
3876 }
3877
3878 /*
3879  * vacuum_delay_point --- check for interrupts and cost-based delay.
3880  *
3881  * This should be called in each major loop of VACUUM processing,
3882  * typically once per page processed.
3883  */
3884 void
3885 vacuum_delay_point(void)
3886 {
3887         /* Always check for interrupts */
3888         CHECK_FOR_INTERRUPTS();
3889
3890         /* Nap if appropriate */
3891         if (VacuumCostActive && !InterruptPending &&
3892                 VacuumCostBalance >= VacuumCostLimit)
3893         {
3894                 int                     msec;
3895
3896                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3897                 if (msec > VacuumCostDelay * 4)
3898                         msec = VacuumCostDelay * 4;
3899
3900                 pg_usleep(msec * 1000L);
3901
3902                 VacuumCostBalance = 0;
3903
3904                 /* update balance values for workers */
3905                 AutoVacuumUpdateDelay();
3906
3907                 /* Might have gotten an interrupt while sleeping */
3908                 CHECK_FOR_INTERRUPTS();
3909         }
3910 }