]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Improve the IndexVacuumInfo/IndexBulkDeleteResult API to allow somewhat sane
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.388 2009/06/06 22:13:51 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/visibilitymap.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "catalog/namespace.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_namespace.h"
35 #include "catalog/storage.h"
36 #include "commands/dbcommands.h"
37 #include "commands/vacuum.h"
38 #include "executor/executor.h"
39 #include "miscadmin.h"
40 #include "pgstat.h"
41 #include "postmaster/autovacuum.h"
42 #include "storage/bufmgr.h"
43 #include "storage/freespace.h"
44 #include "storage/lmgr.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/flatfiles.h"
50 #include "utils/fmgroids.h"
51 #include "utils/inval.h"
52 #include "utils/lsyscache.h"
53 #include "utils/memutils.h"
54 #include "utils/pg_rusage.h"
55 #include "utils/relcache.h"
56 #include "utils/snapmgr.h"
57 #include "utils/syscache.h"
58 #include "utils/tqual.h"
59
60
61 /*
62  * GUC parameters
63  */
64 int                     vacuum_freeze_min_age;
65 int                     vacuum_freeze_table_age;
66
67 /*
68  * VacPage structures keep track of each page on which we find useful
69  * amounts of free space.
70  */
71 typedef struct VacPageData
72 {
73         BlockNumber blkno;                      /* BlockNumber of this Page */
74         Size            free;                   /* FreeSpace on this Page */
75         uint16          offsets_used;   /* Number of OffNums used by vacuum */
76         uint16          offsets_free;   /* Number of OffNums free or to be free */
77         OffsetNumber offsets[1];        /* Array of free OffNums */
78 } VacPageData;
79
80 typedef VacPageData *VacPage;
81
82 typedef struct VacPageListData
83 {
84         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
85         int                     num_pages;              /* Number of pages in pagedesc */
86         int                     num_allocated_pages;    /* Number of allocated pages in
87                                                                                  * pagedesc */
88         VacPage    *pagedesc;           /* Descriptions of pages */
89 } VacPageListData;
90
91 typedef VacPageListData *VacPageList;
92
93 /*
94  * The "vtlinks" array keeps information about each recently-updated tuple
95  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
96  * We store the tuple's own TID as well as its t_ctid (its link to the next
97  * newer tuple version).  Searching in this array allows us to follow update
98  * chains backwards from newer to older tuples.  When we move a member of an
99  * update chain, we must move *all* the live members of the chain, so that we
100  * can maintain their t_ctid link relationships (we must not just overwrite
101  * t_ctid in an existing tuple).
102  *
103  * Note: because t_ctid links can be stale (this would only occur if a prior
104  * VACUUM crashed partway through), it is possible that new_tid points to an
105  * empty slot or unrelated tuple.  We have to check the linkage as we follow
106  * it, just as is done in EvalPlanQual.
107  */
108 typedef struct VTupleLinkData
109 {
110         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
111         ItemPointerData this_tid;       /* t_self of the tuple */
112 } VTupleLinkData;
113
114 typedef VTupleLinkData *VTupleLink;
115
116 /*
117  * We use an array of VTupleMoveData to plan a chain tuple move fully
118  * before we do it.
119  */
120 typedef struct VTupleMoveData
121 {
122         ItemPointerData tid;            /* tuple ID */
123         VacPage         vacpage;                /* where to move it to */
124         bool            cleanVpd;               /* clean vacpage before using? */
125 } VTupleMoveData;
126
127 typedef VTupleMoveData *VTupleMove;
128
129 /*
130  * VRelStats contains the data acquired by scan_heap for use later
131  */
132 typedef struct VRelStats
133 {
134         /* miscellaneous statistics */
135         BlockNumber rel_pages;          /* pages in relation */
136         double          rel_tuples;             /* tuples that remain after vacuuming */
137         double          rel_indexed_tuples;             /* indexed tuples that remain */
138         Size            min_tlen;               /* min surviving tuple size */
139         Size            max_tlen;               /* max surviving tuple size */
140         bool            hasindex;
141         /* vtlinks array for tuple chain following - sorted by new_tid */
142         int                     num_vtlinks;
143         VTupleLink      vtlinks;
144 } VRelStats;
145
146 /*----------------------------------------------------------------------
147  * ExecContext:
148  *
149  * As these variables always appear together, we put them into one struct
150  * and pull initialization and cleanup into separate routines.
151  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
152  * accurately:  It is *used* only in move_xxx_tuple(), but because this
153  * routine is called many times, we initialize the struct just once in
154  * repair_frag() and pass it on to move_xxx_tuple().
155  */
156 typedef struct ExecContextData
157 {
158         ResultRelInfo *resultRelInfo;
159         EState     *estate;
160         TupleTableSlot *slot;
161 } ExecContextData;
162
163 typedef ExecContextData *ExecContext;
164
165 static void
166 ExecContext_Init(ExecContext ec, Relation rel)
167 {
168         TupleDesc       tupdesc = RelationGetDescr(rel);
169
170         /*
171          * We need a ResultRelInfo and an EState so we can use the regular
172          * executor's index-entry-making machinery.
173          */
174         ec->estate = CreateExecutorState();
175
176         ec->resultRelInfo = makeNode(ResultRelInfo);
177         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
178         ec->resultRelInfo->ri_RelationDesc = rel;
179         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
180
181         ExecOpenIndices(ec->resultRelInfo);
182
183         ec->estate->es_result_relations = ec->resultRelInfo;
184         ec->estate->es_num_result_relations = 1;
185         ec->estate->es_result_relation_info = ec->resultRelInfo;
186
187         /* Set up a tuple slot too */
188         ec->slot = MakeSingleTupleTableSlot(tupdesc);
189 }
190
191 static void
192 ExecContext_Finish(ExecContext ec)
193 {
194         ExecDropSingleTupleTableSlot(ec->slot);
195         ExecCloseIndices(ec->resultRelInfo);
196         FreeExecutorState(ec->estate);
197 }
198
199 /*
200  * End of ExecContext Implementation
201  *----------------------------------------------------------------------
202  */
203
204 /* A few variables that don't seem worth passing around as parameters */
205 static MemoryContext vac_context = NULL;
206
207 static int      elevel = -1;
208
209 static TransactionId OldestXmin;
210 static TransactionId FreezeLimit;
211
212 static BufferAccessStrategy vac_strategy;
213
214
215 /* non-export function prototypes */
216 static List *get_rel_oids(Oid relid, const RangeVar *vacrel,
217                          const char *stmttype);
218 static void vac_truncate_clog(TransactionId frozenXID);
219 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast,
220                    bool for_wraparound, bool *scanned_all);
221 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
222 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
223                   VacPageList vacuum_pages, VacPageList fraged_pages);
224 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
225                         VacPageList vacuum_pages, VacPageList fraged_pages,
226                         int nindexes, Relation *Irel);
227 static void move_chain_tuple(Relation rel,
228                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
229                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
230                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
231 static void move_plain_tuple(Relation rel,
232                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
233                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
234                                  ExecContext ec);
235 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
236                                  int num_fraged_pages, BlockNumber last_move_dest_block,
237                                  int num_moved);
238 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
239                         VacPageList vacpagelist);
240 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
241 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
242                          double num_tuples, int keep_tuples);
243 static void scan_index(Relation indrel, double num_tuples);
244 static bool tid_reaped(ItemPointer itemptr, void *state);
245 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
246                            BlockNumber rel_pages);
247 static VacPage copy_vac_page(VacPage vacpage);
248 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
249 static void *vac_bsearch(const void *key, const void *base,
250                         size_t nelem, size_t size,
251                         int (*compar) (const void *, const void *));
252 static int      vac_cmp_blk(const void *left, const void *right);
253 static int      vac_cmp_offno(const void *left, const void *right);
254 static int      vac_cmp_vtlinks(const void *left, const void *right);
255 static bool enough_space(VacPage vacpage, Size len);
256 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
257
258
259 /****************************************************************************
260  *                                                                                                                                                      *
261  *                      Code common to all flavors of VACUUM and ANALYZE                                *
262  *                                                                                                                                                      *
263  ****************************************************************************
264  */
265
266
267 /*
268  * Primary entry point for VACUUM and ANALYZE commands.
269  *
270  * relid is normally InvalidOid; if it is not, then it provides the relation
271  * OID to be processed, and vacstmt->relation is ignored.  (The non-invalid
272  * case is currently only used by autovacuum.)
273  *
274  * do_toast is passed as FALSE by autovacuum, because it processes TOAST
275  * tables separately.
276  *
277  * for_wraparound is used by autovacuum to let us know when it's forcing
278  * a vacuum for wraparound, which should not be auto-cancelled.
279  *
280  * bstrategy is normally given as NULL, but in autovacuum it can be passed
281  * in to use the same buffer strategy object across multiple vacuum() calls.
282  *
283  * isTopLevel should be passed down from ProcessUtility.
284  *
285  * It is the caller's responsibility that vacstmt and bstrategy
286  * (if given) be allocated in a memory context that won't disappear
287  * at transaction commit.
288  */
289 void
290 vacuum(VacuumStmt *vacstmt, Oid relid, bool do_toast,
291            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
292 {
293         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
294         volatile MemoryContext anl_context = NULL;
295         volatile bool all_rels,
296                                 in_outer_xact,
297                                 use_own_xacts;
298         List       *relations;
299
300         if (vacstmt->verbose)
301                 elevel = INFO;
302         else
303                 elevel = DEBUG2;
304
305         /*
306          * We cannot run VACUUM inside a user transaction block; if we were inside
307          * a transaction, then our commit- and start-transaction-command calls
308          * would not have the intended effect! Furthermore, the forced commit that
309          * occurs before truncating the relation's file would have the effect of
310          * committing the rest of the user's transaction too, which would
311          * certainly not be the desired behavior.  (This only applies to VACUUM
312          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
313          * block, but we choose to disallow that case because we'd rather commit
314          * as soon as possible after finishing the vacuum.      This is mainly so that
315          * we can let go the AccessExclusiveLock that we may be holding.)
316          *
317          * ANALYZE (without VACUUM) can run either way.
318          */
319         if (vacstmt->vacuum)
320         {
321                 PreventTransactionChain(isTopLevel, stmttype);
322                 in_outer_xact = false;
323         }
324         else
325                 in_outer_xact = IsInTransactionChain(isTopLevel);
326
327         /*
328          * Send info about dead objects to the statistics collector, unless we are
329          * in autovacuum --- autovacuum.c does this for itself.
330          */
331         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
332                 pgstat_vacuum_stat();
333
334         /*
335          * Create special memory context for cross-transaction storage.
336          *
337          * Since it is a child of PortalContext, it will go away eventually even
338          * if we suffer an error; there's no need for special abort cleanup logic.
339          */
340         vac_context = AllocSetContextCreate(PortalContext,
341                                                                                 "Vacuum",
342                                                                                 ALLOCSET_DEFAULT_MINSIZE,
343                                                                                 ALLOCSET_DEFAULT_INITSIZE,
344                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
345
346         /*
347          * If caller didn't give us a buffer strategy object, make one in the
348          * cross-transaction memory context.
349          */
350         if (bstrategy == NULL)
351         {
352                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
353
354                 bstrategy = GetAccessStrategy(BAS_VACUUM);
355                 MemoryContextSwitchTo(old_context);
356         }
357         vac_strategy = bstrategy;
358
359         /* Remember whether we are processing everything in the DB */
360         all_rels = (!OidIsValid(relid) && vacstmt->relation == NULL);
361
362         /*
363          * Build list of relations to process, unless caller gave us one. (If we
364          * build one, we put it in vac_context for safekeeping.)
365          */
366         relations = get_rel_oids(relid, vacstmt->relation, stmttype);
367
368         /*
369          * Decide whether we need to start/commit our own transactions.
370          *
371          * For VACUUM (with or without ANALYZE): always do so, so that we can
372          * release locks as soon as possible.  (We could possibly use the outer
373          * transaction for a one-table VACUUM, but handling TOAST tables would be
374          * problematic.)
375          *
376          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
377          * start/commit our own transactions.  Also, there's no need to do so if
378          * only processing one relation.  For multiple relations when not within a
379          * transaction block, and also in an autovacuum worker, use own
380          * transactions so we can release locks sooner.
381          */
382         if (vacstmt->vacuum)
383                 use_own_xacts = true;
384         else
385         {
386                 Assert(vacstmt->analyze);
387                 if (IsAutoVacuumWorkerProcess())
388                         use_own_xacts = true;
389                 else if (in_outer_xact)
390                         use_own_xacts = false;
391                 else if (list_length(relations) > 1)
392                         use_own_xacts = true;
393                 else
394                         use_own_xacts = false;
395         }
396
397         /*
398          * If we are running ANALYZE without per-table transactions, we'll need a
399          * memory context with table lifetime.
400          */
401         if (!use_own_xacts)
402                 anl_context = AllocSetContextCreate(PortalContext,
403                                                                                         "Analyze",
404                                                                                         ALLOCSET_DEFAULT_MINSIZE,
405                                                                                         ALLOCSET_DEFAULT_INITSIZE,
406                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
407
408         /*
409          * vacuum_rel expects to be entered with no transaction active; it will
410          * start and commit its own transaction.  But we are called by an SQL
411          * command, and so we are executing inside a transaction already. We
412          * commit the transaction started in PostgresMain() here, and start
413          * another one before exiting to match the commit waiting for us back in
414          * PostgresMain().
415          */
416         if (use_own_xacts)
417         {
418                 /* ActiveSnapshot is not set by autovacuum */
419                 if (ActiveSnapshotSet())
420                         PopActiveSnapshot();
421
422                 /* matches the StartTransaction in PostgresMain() */
423                 CommitTransactionCommand();
424         }
425
426         /* Turn vacuum cost accounting on or off */
427         PG_TRY();
428         {
429                 ListCell   *cur;
430
431                 VacuumCostActive = (VacuumCostDelay > 0);
432                 VacuumCostBalance = 0;
433
434                 /*
435                  * Loop to process each selected relation.
436                  */
437                 foreach(cur, relations)
438                 {
439                         Oid                     relid = lfirst_oid(cur);
440                         bool            scanned_all = false;
441
442                         if (vacstmt->vacuum)
443                                 vacuum_rel(relid, vacstmt, do_toast, for_wraparound,
444                                                    &scanned_all);
445
446                         if (vacstmt->analyze)
447                         {
448                                 MemoryContext old_context = NULL;
449
450                                 /*
451                                  * If using separate xacts, start one for analyze. Otherwise,
452                                  * we can use the outer transaction, but we still need to call
453                                  * analyze_rel in a memory context that will be cleaned up on
454                                  * return (else we leak memory while processing multiple
455                                  * tables).
456                                  */
457                                 if (use_own_xacts)
458                                 {
459                                         StartTransactionCommand();
460                                         /* functions in indexes may want a snapshot set */
461                                         PushActiveSnapshot(GetTransactionSnapshot());
462                                 }
463                                 else
464                                         old_context = MemoryContextSwitchTo(anl_context);
465
466                                 analyze_rel(relid, vacstmt, vac_strategy, !scanned_all);
467
468                                 if (use_own_xacts)
469                                 {
470                                         PopActiveSnapshot();
471                                         CommitTransactionCommand();
472                                 }
473                                 else
474                                 {
475                                         MemoryContextSwitchTo(old_context);
476                                         MemoryContextResetAndDeleteChildren(anl_context);
477                                 }
478                         }
479                 }
480         }
481         PG_CATCH();
482         {
483                 /* Make sure cost accounting is turned off after error */
484                 VacuumCostActive = false;
485                 PG_RE_THROW();
486         }
487         PG_END_TRY();
488
489         /* Turn off vacuum cost accounting */
490         VacuumCostActive = false;
491
492         /*
493          * Finish up processing.
494          */
495         if (use_own_xacts)
496         {
497                 /* here, we are not in a transaction */
498
499                 /*
500                  * This matches the CommitTransaction waiting for us in
501                  * PostgresMain().
502                  */
503                 StartTransactionCommand();
504         }
505
506         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
507         {
508                 /*
509                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
510                  * (autovacuum.c does this for itself.)
511                  */
512                 vac_update_datfrozenxid();
513         }
514
515         /*
516          * Clean up working storage --- note we must do this after
517          * StartTransactionCommand, else we might be trying to delete the active
518          * context!
519          */
520         MemoryContextDelete(vac_context);
521         vac_context = NULL;
522
523         if (anl_context)
524                 MemoryContextDelete(anl_context);
525 }
526
527 /*
528  * Build a list of Oids for each relation to be processed
529  *
530  * The list is built in vac_context so that it will survive across our
531  * per-relation transactions.
532  */
533 static List *
534 get_rel_oids(Oid relid, const RangeVar *vacrel, const char *stmttype)
535 {
536         List       *oid_list = NIL;
537         MemoryContext oldcontext;
538
539         /* OID supplied by VACUUM's caller? */
540         if (OidIsValid(relid))
541         {
542                 oldcontext = MemoryContextSwitchTo(vac_context);
543                 oid_list = lappend_oid(oid_list, relid);
544                 MemoryContextSwitchTo(oldcontext);
545         }
546         else if (vacrel)
547         {
548                 /* Process a specific relation */
549                 Oid                     relid;
550
551                 relid = RangeVarGetRelid(vacrel, false);
552
553                 /* Make a relation list entry for this guy */
554                 oldcontext = MemoryContextSwitchTo(vac_context);
555                 oid_list = lappend_oid(oid_list, relid);
556                 MemoryContextSwitchTo(oldcontext);
557         }
558         else
559         {
560                 /* Process all plain relations listed in pg_class */
561                 Relation        pgclass;
562                 HeapScanDesc scan;
563                 HeapTuple       tuple;
564                 ScanKeyData key;
565
566                 ScanKeyInit(&key,
567                                         Anum_pg_class_relkind,
568                                         BTEqualStrategyNumber, F_CHAREQ,
569                                         CharGetDatum(RELKIND_RELATION));
570
571                 pgclass = heap_open(RelationRelationId, AccessShareLock);
572
573                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
574
575                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
576                 {
577                         /* Make a relation list entry for this guy */
578                         oldcontext = MemoryContextSwitchTo(vac_context);
579                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
580                         MemoryContextSwitchTo(oldcontext);
581                 }
582
583                 heap_endscan(scan);
584                 heap_close(pgclass, AccessShareLock);
585         }
586
587         return oid_list;
588 }
589
590 /*
591  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
592  */
593 void
594 vacuum_set_xid_limits(int freeze_min_age,
595                                           int freeze_table_age,
596                                           bool sharedRel,
597                                           TransactionId *oldestXmin,
598                                           TransactionId *freezeLimit,
599                                           TransactionId *freezeTableLimit)
600 {
601         int                     freezemin;
602         TransactionId limit;
603         TransactionId safeLimit;
604
605         /*
606          * We can always ignore processes running lazy vacuum.  This is because we
607          * use these values only for deciding which tuples we must keep in the
608          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
609          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
610          * a full vacuum, but keep in mind that only one vacuum process can be
611          * working on a particular table at any time, and that each vacuum is
612          * always an independent transaction.
613          */
614         *oldestXmin = GetOldestXmin(sharedRel, true);
615
616         Assert(TransactionIdIsNormal(*oldestXmin));
617
618         /*
619          * Determine the minimum freeze age to use: as specified by the caller, or
620          * vacuum_freeze_min_age, but in any case not more than half
621          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
622          * wraparound won't occur too frequently.
623          */
624         freezemin = freeze_min_age;
625         if (freezemin < 0)
626                 freezemin = vacuum_freeze_min_age;
627         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
628         Assert(freezemin >= 0);
629
630         /*
631          * Compute the cutoff XID, being careful not to generate a "permanent" XID
632          */
633         limit = *oldestXmin - freezemin;
634         if (!TransactionIdIsNormal(limit))
635                 limit = FirstNormalTransactionId;
636
637         /*
638          * If oldestXmin is very far back (in practice, more than
639          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
640          * freeze age of zero.
641          */
642         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
643         if (!TransactionIdIsNormal(safeLimit))
644                 safeLimit = FirstNormalTransactionId;
645
646         if (TransactionIdPrecedes(limit, safeLimit))
647         {
648                 ereport(WARNING,
649                                 (errmsg("oldest xmin is far in the past"),
650                                  errhint("Close open transactions soon to avoid wraparound problems.")));
651                 limit = *oldestXmin;
652         }
653
654         *freezeLimit = limit;
655
656         if (freezeTableLimit != NULL)
657         {
658                 int freezetable;
659
660                 /*
661                  * Determine the table freeze age to use: as specified by the caller,
662                  * or vacuum_freeze_table_age, but in any case not more than
663                  * autovacuum_freeze_max_age * 0.95, so that if you have e.g nightly
664                  * VACUUM schedule, the nightly VACUUM gets a chance to freeze tuples
665                  * before anti-wraparound autovacuum is launched.
666                  */
667                 freezetable = freeze_min_age;
668                 if (freezetable < 0)
669                         freezetable = vacuum_freeze_table_age;
670                 freezetable = Min(freezetable, autovacuum_freeze_max_age * 0.95);
671                 Assert(freezetable >= 0);
672
673                 /*
674                  * Compute the cutoff XID, being careful not to generate a
675                  * "permanent" XID.
676                  */
677                 limit = ReadNewTransactionId() - freezetable;
678                 if (!TransactionIdIsNormal(limit))
679                         limit = FirstNormalTransactionId;
680
681                 *freezeTableLimit = limit;
682         }
683 }
684
685
686 /*
687  *      vac_update_relstats() -- update statistics for one relation
688  *
689  *              Update the whole-relation statistics that are kept in its pg_class
690  *              row.  There are additional stats that will be updated if we are
691  *              doing ANALYZE, but we always update these stats.  This routine works
692  *              for both index and heap relation entries in pg_class.
693  *
694  *              We violate transaction semantics here by overwriting the rel's
695  *              existing pg_class tuple with the new values.  This is reasonably
696  *              safe since the new values are correct whether or not this transaction
697  *              commits.  The reason for this is that if we updated these tuples in
698  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
699  *              by the time we got done with a vacuum cycle, most of the tuples in
700  *              pg_class would've been obsoleted.  Of course, this only works for
701  *              fixed-size never-null columns, but these are.
702  *
703  *              Note another assumption: that two VACUUMs/ANALYZEs on a table can't
704  *              run in parallel, nor can VACUUM/ANALYZE run in parallel with a
705  *              schema alteration such as adding an index, rule, or trigger.  Otherwise
706  *              our updates of relhasindex etc might overwrite uncommitted updates.
707  *
708  *              Another reason for doing it this way is that when we are in a lazy
709  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
710  *              somebody vacuuming pg_class might think they could delete a tuple
711  *              marked with xmin = our xid.
712  *
713  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
714  *              ANALYZE.
715  */
716 void
717 vac_update_relstats(Relation relation,
718                                         BlockNumber num_pages, double num_tuples,
719                                         bool hasindex, TransactionId frozenxid)
720 {
721         Oid                     relid = RelationGetRelid(relation);
722         Relation        rd;
723         HeapTuple       ctup;
724         Form_pg_class pgcform;
725         bool            dirty;
726
727         rd = heap_open(RelationRelationId, RowExclusiveLock);
728
729         /* Fetch a copy of the tuple to scribble on */
730         ctup = SearchSysCacheCopy(RELOID,
731                                                           ObjectIdGetDatum(relid),
732                                                           0, 0, 0);
733         if (!HeapTupleIsValid(ctup))
734                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
735                          relid);
736         pgcform = (Form_pg_class) GETSTRUCT(ctup);
737
738         /* Apply required updates, if any, to copied tuple */
739
740         dirty = false;
741         if (pgcform->relpages != (int32) num_pages)
742         {
743                 pgcform->relpages = (int32) num_pages;
744                 dirty = true;
745         }
746         if (pgcform->reltuples != (float4) num_tuples)
747         {
748                 pgcform->reltuples = (float4) num_tuples;
749                 dirty = true;
750         }
751         if (pgcform->relhasindex != hasindex)
752         {
753                 pgcform->relhasindex = hasindex;
754                 dirty = true;
755         }
756
757         /*
758          * If we have discovered that there are no indexes, then there's no
759          * primary key either.  This could be done more thoroughly...
760          */
761         if (!hasindex)
762         {
763                 if (pgcform->relhaspkey)
764                 {
765                         pgcform->relhaspkey = false;
766                         dirty = true;
767                 }
768         }
769
770         /* We also clear relhasrules and relhastriggers if needed */
771         if (pgcform->relhasrules && relation->rd_rules == NULL)
772         {
773                 pgcform->relhasrules = false;
774                 dirty = true;
775         }
776         if (pgcform->relhastriggers && relation->trigdesc == NULL)
777         {
778                 pgcform->relhastriggers = false;
779                 dirty = true;
780         }
781
782         /*
783          * relfrozenxid should never go backward.  Caller can pass
784          * InvalidTransactionId if it has no new data.
785          */
786         if (TransactionIdIsNormal(frozenxid) &&
787                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
788         {
789                 pgcform->relfrozenxid = frozenxid;
790                 dirty = true;
791         }
792
793         /* If anything changed, write out the tuple. */
794         if (dirty)
795                 heap_inplace_update(rd, ctup);
796
797         heap_close(rd, RowExclusiveLock);
798 }
799
800
801 /*
802  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
803  *
804  *              Update pg_database's datfrozenxid entry for our database to be the
805  *              minimum of the pg_class.relfrozenxid values.  If we are able to
806  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
807  *
808  *              We violate transaction semantics here by overwriting the database's
809  *              existing pg_database tuple with the new value.  This is reasonably
810  *              safe since the new value is correct whether or not this transaction
811  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
812  *              behind after a VACUUM.
813  *
814  *              This routine is shared by full and lazy VACUUM.
815  */
816 void
817 vac_update_datfrozenxid(void)
818 {
819         HeapTuple       tuple;
820         Form_pg_database dbform;
821         Relation        relation;
822         SysScanDesc scan;
823         HeapTuple       classTup;
824         TransactionId newFrozenXid;
825         bool            dirty = false;
826
827         /*
828          * Initialize the "min" calculation with GetOldestXmin, which is a
829          * reasonable approximation to the minimum relfrozenxid for not-yet-
830          * committed pg_class entries for new tables; see AddNewRelationTuple().
831          * Se we cannot produce a wrong minimum by starting with this.
832          */
833         newFrozenXid = GetOldestXmin(true, true);
834
835         /*
836          * We must seqscan pg_class to find the minimum Xid, because there is no
837          * index that can help us here.
838          */
839         relation = heap_open(RelationRelationId, AccessShareLock);
840
841         scan = systable_beginscan(relation, InvalidOid, false,
842                                                           SnapshotNow, 0, NULL);
843
844         while ((classTup = systable_getnext(scan)) != NULL)
845         {
846                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
847
848                 /*
849                  * Only consider heap and TOAST tables (anything else should have
850                  * InvalidTransactionId in relfrozenxid anyway.)
851                  */
852                 if (classForm->relkind != RELKIND_RELATION &&
853                         classForm->relkind != RELKIND_TOASTVALUE)
854                         continue;
855
856                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
857
858                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
859                         newFrozenXid = classForm->relfrozenxid;
860         }
861
862         /* we're done with pg_class */
863         systable_endscan(scan);
864         heap_close(relation, AccessShareLock);
865
866         Assert(TransactionIdIsNormal(newFrozenXid));
867
868         /* Now fetch the pg_database tuple we need to update. */
869         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
870
871         /* Fetch a copy of the tuple to scribble on */
872         tuple = SearchSysCacheCopy(DATABASEOID,
873                                                            ObjectIdGetDatum(MyDatabaseId),
874                                                            0, 0, 0);
875         if (!HeapTupleIsValid(tuple))
876                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
877         dbform = (Form_pg_database) GETSTRUCT(tuple);
878
879         /*
880          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
881          * and detect the common case where it doesn't go forward either.
882          */
883         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
884         {
885                 dbform->datfrozenxid = newFrozenXid;
886                 dirty = true;
887         }
888
889         if (dirty)
890                 heap_inplace_update(relation, tuple);
891
892         heap_freetuple(tuple);
893         heap_close(relation, RowExclusiveLock);
894
895         /*
896          * If we were able to advance datfrozenxid, mark the flat-file copy of
897          * pg_database for update at commit, and see if we can truncate pg_clog.
898          */
899         if (dirty)
900         {
901                 database_file_update_needed();
902                 vac_truncate_clog(newFrozenXid);
903         }
904 }
905
906
907 /*
908  *      vac_truncate_clog() -- attempt to truncate the commit log
909  *
910  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
911  *              and use it to truncate the transaction commit log (pg_clog).
912  *              Also update the XID wrap limit info maintained by varsup.c.
913  *
914  *              The passed XID is simply the one I just wrote into my pg_database
915  *              entry.  It's used to initialize the "min" calculation.
916  *
917  *              This routine is shared by full and lazy VACUUM.  Note that it's
918  *              only invoked when we've managed to change our DB's datfrozenxid
919  *              entry.
920  */
921 static void
922 vac_truncate_clog(TransactionId frozenXID)
923 {
924         TransactionId myXID = GetCurrentTransactionId();
925         Relation        relation;
926         HeapScanDesc scan;
927         HeapTuple       tuple;
928         NameData        oldest_datname;
929         bool            frozenAlreadyWrapped = false;
930
931         /* init oldest_datname to sync with my frozenXID */
932         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
933
934         /*
935          * Scan pg_database to compute the minimum datfrozenxid
936          *
937          * Note: we need not worry about a race condition with new entries being
938          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
939          * existing DB's datfrozenxid, and that source DB cannot be ours because
940          * of the interlock against copying a DB containing an active backend.
941          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
942          * concurrently modify the datfrozenxid's of different databases, the
943          * worst possible outcome is that pg_clog is not truncated as aggressively
944          * as it could be.
945          */
946         relation = heap_open(DatabaseRelationId, AccessShareLock);
947
948         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
949
950         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
951         {
952                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
953
954                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
955
956                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
957                         frozenAlreadyWrapped = true;
958                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
959                 {
960                         frozenXID = dbform->datfrozenxid;
961                         namecpy(&oldest_datname, &dbform->datname);
962                 }
963         }
964
965         heap_endscan(scan);
966
967         heap_close(relation, AccessShareLock);
968
969         /*
970          * Do not truncate CLOG if we seem to have suffered wraparound already;
971          * the computed minimum XID might be bogus.  This case should now be
972          * impossible due to the defenses in GetNewTransactionId, but we keep the
973          * test anyway.
974          */
975         if (frozenAlreadyWrapped)
976         {
977                 ereport(WARNING,
978                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
979                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
980                 return;
981         }
982
983         /* Truncate CLOG to the oldest frozenxid */
984         TruncateCLOG(frozenXID);
985
986         /*
987          * Update the wrap limit for GetNewTransactionId.  Note: this function
988          * will also signal the postmaster for an(other) autovac cycle if needed.
989          */
990         SetTransactionIdLimit(frozenXID, &oldest_datname);
991 }
992
993
994 /****************************************************************************
995  *                                                                                                                                                      *
996  *                      Code common to both flavors of VACUUM                                                   *
997  *                                                                                                                                                      *
998  ****************************************************************************
999  */
1000
1001
1002 /*
1003  *      vacuum_rel() -- vacuum one heap relation
1004  *
1005  *              Doing one heap at a time incurs extra overhead, since we need to
1006  *              check that the heap exists again just before we vacuum it.      The
1007  *              reason that we do this is so that vacuuming can be spread across
1008  *              many small transactions.  Otherwise, two-phase locking would require
1009  *              us to lock the entire database during one pass of the vacuum cleaner.
1010  *
1011  *              We'll return true in *scanned_all if the vacuum scanned all heap
1012  *              pages, and updated pg_class.
1013  *
1014  *              At entry and exit, we are not inside a transaction.
1015  */
1016 static void
1017 vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound,
1018                    bool *scanned_all)
1019 {
1020         LOCKMODE        lmode;
1021         Relation        onerel;
1022         LockRelId       onerelid;
1023         Oid                     toast_relid;
1024         Oid                     save_userid;
1025         bool            save_secdefcxt;
1026
1027         if (scanned_all)
1028                 *scanned_all = false;
1029
1030         /* Begin a transaction for vacuuming this relation */
1031         StartTransactionCommand();
1032
1033         /*
1034          * Functions in indexes may want a snapshot set.  Also, setting
1035          * a snapshot ensures that RecentGlobalXmin is kept truly recent.
1036          */
1037         PushActiveSnapshot(GetTransactionSnapshot());
1038
1039         if (!vacstmt->full)
1040         {
1041                 /*
1042                  * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets other
1043                  * concurrent VACUUMs know that they can ignore this one while
1044                  * determining their OldestXmin.  (The reason we don't set it during a
1045                  * full VACUUM is exactly that we may have to run user- defined
1046                  * functions for functional indexes, and we want to make sure that if
1047                  * they use the snapshot set above, any tuples it requires can't get
1048                  * removed from other tables.  An index function that depends on the
1049                  * contents of other tables is arguably broken, but we won't break it
1050                  * here by violating transaction semantics.)
1051                  *
1052                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down
1053                  * by autovacuum; it's used to avoid cancelling a vacuum that was
1054                  * invoked in an emergency.
1055                  *
1056                  * Note: these flags remain set until CommitTransaction or
1057                  * AbortTransaction.  We don't want to clear them until we reset
1058                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1059                  * which is probably Not Good.
1060                  */
1061                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1062                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1063                 if (for_wraparound)
1064                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1065                 LWLockRelease(ProcArrayLock);
1066         }
1067
1068         /*
1069          * Check for user-requested abort.      Note we want this to be inside a
1070          * transaction, so xact.c doesn't issue useless WARNING.
1071          */
1072         CHECK_FOR_INTERRUPTS();
1073
1074         /*
1075          * Determine the type of lock we want --- hard exclusive lock for a FULL
1076          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1077          * way, we can be sure that no other backend is vacuuming the same table.
1078          */
1079         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1080
1081         /*
1082          * Open the relation and get the appropriate lock on it.
1083          *
1084          * There's a race condition here: the rel may have gone away since the
1085          * last time we saw it.  If so, we don't need to vacuum it.
1086          */
1087         onerel = try_relation_open(relid, lmode);
1088
1089         if (!onerel)
1090         {
1091                 PopActiveSnapshot();
1092                 CommitTransactionCommand();
1093                 return;
1094         }
1095
1096         /*
1097          * Check permissions.
1098          *
1099          * We allow the user to vacuum a table if he is superuser, the table
1100          * owner, or the database owner (but in the latter case, only if it's not
1101          * a shared relation).  pg_class_ownercheck includes the superuser case.
1102          *
1103          * Note we choose to treat permissions failure as a WARNING and keep
1104          * trying to vacuum the rest of the DB --- is this appropriate?
1105          */
1106         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1107                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1108         {
1109                 if (onerel->rd_rel->relisshared)
1110                         ereport(WARNING,
1111                                         (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1112                                                         RelationGetRelationName(onerel))));
1113                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1114                         ereport(WARNING,
1115                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1116                                                         RelationGetRelationName(onerel))));
1117                 else
1118                         ereport(WARNING,
1119                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1120                                                         RelationGetRelationName(onerel))));
1121                 relation_close(onerel, lmode);
1122                 PopActiveSnapshot();
1123                 CommitTransactionCommand();
1124                 return;
1125         }
1126
1127         /*
1128          * Check that it's a vacuumable table; we used to do this in get_rel_oids()
1129          * but seems safer to check after we've locked the relation.
1130          */
1131         if (onerel->rd_rel->relkind != RELKIND_RELATION &&
1132                 onerel->rd_rel->relkind != RELKIND_TOASTVALUE)
1133         {
1134                 ereport(WARNING,
1135                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1136                                                 RelationGetRelationName(onerel))));
1137                 relation_close(onerel, lmode);
1138                 PopActiveSnapshot();
1139                 CommitTransactionCommand();
1140                 return;
1141         }
1142
1143         /*
1144          * Silently ignore tables that are temp tables of other backends ---
1145          * trying to vacuum these will lead to great unhappiness, since their
1146          * contents are probably not up-to-date on disk.  (We don't throw a
1147          * warning here; it would just lead to chatter during a database-wide
1148          * VACUUM.)
1149          */
1150         if (RELATION_IS_OTHER_TEMP(onerel))
1151         {
1152                 relation_close(onerel, lmode);
1153                 PopActiveSnapshot();
1154                 CommitTransactionCommand();
1155                 return;
1156         }
1157
1158         /*
1159          * Get a session-level lock too. This will protect our access to the
1160          * relation across multiple transactions, so that we can vacuum the
1161          * relation's TOAST table (if any) secure in the knowledge that no one is
1162          * deleting the parent relation.
1163          *
1164          * NOTE: this cannot block, even if someone else is waiting for access,
1165          * because the lock manager knows that both lock requests are from the
1166          * same process.
1167          */
1168         onerelid = onerel->rd_lockInfo.lockRelId;
1169         LockRelationIdForSession(&onerelid, lmode);
1170
1171         /*
1172          * Remember the relation's TOAST relation for later, if the caller asked
1173          * us to process it.
1174          */
1175         if (do_toast)
1176                 toast_relid = onerel->rd_rel->reltoastrelid;
1177         else
1178                 toast_relid = InvalidOid;
1179
1180         /*
1181          * Switch to the table owner's userid, so that any index functions are
1182          * run as that user.  (This is unnecessary, but harmless, for lazy
1183          * VACUUM.)
1184          */
1185         GetUserIdAndContext(&save_userid, &save_secdefcxt);
1186         SetUserIdAndContext(onerel->rd_rel->relowner, true);
1187
1188         /*
1189          * Do the actual work --- either FULL or "lazy" vacuum
1190          */
1191         if (vacstmt->full)
1192                 full_vacuum_rel(onerel, vacstmt);
1193         else
1194                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy, scanned_all);
1195
1196         /* Restore userid */
1197         SetUserIdAndContext(save_userid, save_secdefcxt);
1198
1199         /* all done with this class, but hold lock until commit */
1200         relation_close(onerel, NoLock);
1201
1202         /*
1203          * Complete the transaction and free all temporary memory used.
1204          */
1205         PopActiveSnapshot();
1206         CommitTransactionCommand();
1207
1208         /*
1209          * If the relation has a secondary toast rel, vacuum that too while we
1210          * still hold the session lock on the master table.  Note however that
1211          * "analyze" will not get done on the toast table.      This is good, because
1212          * the toaster always uses hardcoded index access and statistics are
1213          * totally unimportant for toast relations.
1214          */
1215         if (toast_relid != InvalidOid)
1216                 vacuum_rel(toast_relid, vacstmt, false, for_wraparound, NULL);
1217
1218         /*
1219          * Now release the session-level lock on the master table.
1220          */
1221         UnlockRelationIdForSession(&onerelid, lmode);
1222 }
1223
1224
1225 /****************************************************************************
1226  *                                                                                                                                                      *
1227  *                      Code for VACUUM FULL (only)                                                                             *
1228  *                                                                                                                                                      *
1229  ****************************************************************************
1230  */
1231
1232
1233 /*
1234  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1235  *
1236  *              This routine vacuums a single heap, cleans out its indexes, and
1237  *              updates its num_pages and num_tuples statistics.
1238  *
1239  *              At entry, we have already established a transaction and opened
1240  *              and locked the relation.
1241  */
1242 static void
1243 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1244 {
1245         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1246                                                                                  * clean indexes */
1247         VacPageListData fraged_pages;           /* List of pages with space enough for
1248                                                                                  * re-using */
1249         Relation   *Irel;
1250         int                     nindexes,
1251                                 i;
1252         VRelStats  *vacrelstats;
1253
1254         vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
1255                                                   onerel->rd_rel->relisshared,
1256                                                   &OldestXmin, &FreezeLimit, NULL);
1257
1258         /*
1259          * Flush any previous async-commit transactions.  This does not guarantee
1260          * that we will be able to set hint bits for tuples they inserted, but it
1261          * improves the probability, especially in simple sequential-commands
1262          * cases.  See scan_heap() and repair_frag() for more about this.
1263          */
1264         XLogAsyncCommitFlush();
1265
1266         /*
1267          * Set up statistics-gathering machinery.
1268          */
1269         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1270         vacrelstats->rel_pages = 0;
1271         vacrelstats->rel_tuples = 0;
1272         vacrelstats->rel_indexed_tuples = 0;
1273         vacrelstats->hasindex = false;
1274
1275         /* scan the heap */
1276         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1277         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1278
1279         /* Now open all indexes of the relation */
1280         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1281         if (nindexes > 0)
1282                 vacrelstats->hasindex = true;
1283
1284         /* Clean/scan index relation(s) */
1285         if (Irel != NULL)
1286         {
1287                 if (vacuum_pages.num_pages > 0)
1288                 {
1289                         for (i = 0; i < nindexes; i++)
1290                                 vacuum_index(&vacuum_pages, Irel[i],
1291                                                          vacrelstats->rel_indexed_tuples, 0);
1292                 }
1293                 else
1294                 {
1295                         /* just scan indexes to update statistic */
1296                         for (i = 0; i < nindexes; i++)
1297                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1298                 }
1299         }
1300
1301         if (fraged_pages.num_pages > 0)
1302         {
1303                 /* Try to shrink heap */
1304                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1305                                         nindexes, Irel);
1306                 vac_close_indexes(nindexes, Irel, NoLock);
1307         }
1308         else
1309         {
1310                 vac_close_indexes(nindexes, Irel, NoLock);
1311                 if (vacuum_pages.num_pages > 0)
1312                 {
1313                         /* Clean pages from vacuum_pages list */
1314                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1315                 }
1316         }
1317
1318         /* update thefree space map with final free space info, and vacuum it */
1319         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1320         FreeSpaceMapVacuum(onerel);
1321
1322         /* update statistics in pg_class */
1323         vac_update_relstats(onerel,
1324                                                 vacrelstats->rel_pages, vacrelstats->rel_tuples,
1325                                                 vacrelstats->hasindex, FreezeLimit);
1326
1327         /* report results to the stats collector, too */
1328         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1329                                                  true, vacstmt->analyze, vacrelstats->rel_tuples);
1330 }
1331
1332
1333 /*
1334  *      scan_heap() -- scan an open heap relation
1335  *
1336  *              This routine sets commit status bits, constructs vacuum_pages (list
1337  *              of pages we need to compact free space on and/or clean indexes of
1338  *              deleted tuples), constructs fraged_pages (list of pages with free
1339  *              space that tuples could be moved into), and calculates statistics
1340  *              on the number of live tuples in the heap.
1341  */
1342 static void
1343 scan_heap(VRelStats *vacrelstats, Relation onerel,
1344                   VacPageList vacuum_pages, VacPageList fraged_pages)
1345 {
1346         BlockNumber nblocks,
1347                                 blkno;
1348         char       *relname;
1349         VacPage         vacpage;
1350         BlockNumber empty_pages,
1351                                 empty_end_pages;
1352         double          num_tuples,
1353                                 num_indexed_tuples,
1354                                 tups_vacuumed,
1355                                 nkeep,
1356                                 nunused;
1357         double          free_space,
1358                                 usable_free_space;
1359         Size            min_tlen = MaxHeapTupleSize;
1360         Size            max_tlen = 0;
1361         bool            do_shrinking = true;
1362         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1363         int                     num_vtlinks = 0;
1364         int                     free_vtlinks = 100;
1365         PGRUsage        ru0;
1366
1367         pg_rusage_init(&ru0);
1368
1369         relname = RelationGetRelationName(onerel);
1370         ereport(elevel,
1371                         (errmsg("vacuuming \"%s.%s\"",
1372                                         get_namespace_name(RelationGetNamespace(onerel)),
1373                                         relname)));
1374
1375         empty_pages = empty_end_pages = 0;
1376         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1377         free_space = 0;
1378
1379         nblocks = RelationGetNumberOfBlocks(onerel);
1380
1381         /*
1382          * We initially create each VacPage item in a maximal-sized workspace,
1383          * then copy the workspace into a just-large-enough copy.
1384          */
1385         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1386
1387         for (blkno = 0; blkno < nblocks; blkno++)
1388         {
1389                 Page            page,
1390                                         tempPage = NULL;
1391                 bool            do_reap,
1392                                         do_frag;
1393                 Buffer          buf;
1394                 OffsetNumber offnum,
1395                                         maxoff;
1396                 bool            notup;
1397                 OffsetNumber frozen[MaxOffsetNumber];
1398                 int                     nfrozen;
1399
1400                 vacuum_delay_point();
1401
1402                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1403                                                                  vac_strategy);
1404                 page = BufferGetPage(buf);
1405
1406                 /*
1407                  * Since we are holding exclusive lock on the relation, no other
1408                  * backend can be accessing the page; however it is possible that the
1409                  * background writer will try to write the page if it's already marked
1410                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1411                  * must take exclusive buffer lock wherever we potentially modify
1412                  * pages.  In fact, we insist on cleanup lock so that we can safely
1413                  * call heap_page_prune().      (This might be overkill, since the
1414                  * bgwriter pays no attention to individual tuples, but on the other
1415                  * hand it's unlikely that the bgwriter has this particular page
1416                  * pinned at this instant.      So violating the coding rule would buy us
1417                  * little anyway.)
1418                  */
1419                 LockBufferForCleanup(buf);
1420
1421                 vacpage->blkno = blkno;
1422                 vacpage->offsets_used = 0;
1423                 vacpage->offsets_free = 0;
1424
1425                 if (PageIsNew(page))
1426                 {
1427                         VacPage         vacpagecopy;
1428
1429                         ereport(WARNING,
1430                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1431                                            relname, blkno)));
1432                         PageInit(page, BufferGetPageSize(buf), 0);
1433                         MarkBufferDirty(buf);
1434                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1435                         free_space += vacpage->free;
1436                         empty_pages++;
1437                         empty_end_pages++;
1438                         vacpagecopy = copy_vac_page(vacpage);
1439                         vpage_insert(vacuum_pages, vacpagecopy);
1440                         vpage_insert(fraged_pages, vacpagecopy);
1441                         UnlockReleaseBuffer(buf);
1442                         continue;
1443                 }
1444
1445                 if (PageIsEmpty(page))
1446                 {
1447                         VacPage         vacpagecopy;
1448
1449                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1450                         free_space += vacpage->free;
1451                         empty_pages++;
1452                         empty_end_pages++;
1453                         vacpagecopy = copy_vac_page(vacpage);
1454                         vpage_insert(vacuum_pages, vacpagecopy);
1455                         vpage_insert(fraged_pages, vacpagecopy);
1456                         UnlockReleaseBuffer(buf);
1457                         continue;
1458                 }
1459
1460                 /*
1461                  * Prune all HOT-update chains in this page.
1462                  *
1463                  * We use the redirect_move option so that redirecting line pointers
1464                  * get collapsed out; this allows us to not worry about them below.
1465                  *
1466                  * We count tuples removed by the pruning step as removed by VACUUM.
1467                  */
1468                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1469                                                                                  true, false);
1470
1471                 /*
1472                  * Now scan the page to collect vacuumable items and check for tuples
1473                  * requiring freezing.
1474                  */
1475                 nfrozen = 0;
1476                 notup = true;
1477                 maxoff = PageGetMaxOffsetNumber(page);
1478                 for (offnum = FirstOffsetNumber;
1479                          offnum <= maxoff;
1480                          offnum = OffsetNumberNext(offnum))
1481                 {
1482                         ItemId          itemid = PageGetItemId(page, offnum);
1483                         bool            tupgone = false;
1484                         HeapTupleData tuple;
1485
1486                         /*
1487                          * Collect un-used items too - it's possible to have indexes
1488                          * pointing here after crash.  (That's an ancient comment and is
1489                          * likely obsolete with WAL, but we might as well continue to
1490                          * check for such problems.)
1491                          */
1492                         if (!ItemIdIsUsed(itemid))
1493                         {
1494                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1495                                 nunused += 1;
1496                                 continue;
1497                         }
1498
1499                         /*
1500                          * DEAD item pointers are to be vacuumed normally; but we don't
1501                          * count them in tups_vacuumed, else we'd be double-counting (at
1502                          * least in the common case where heap_page_prune() just freed up
1503                          * a non-HOT tuple).
1504                          */
1505                         if (ItemIdIsDead(itemid))
1506                         {
1507                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1508                                 continue;
1509                         }
1510
1511                         /* Shouldn't have any redirected items anymore */
1512                         if (!ItemIdIsNormal(itemid))
1513                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1514                                          relname, blkno, offnum);
1515
1516                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1517                         tuple.t_len = ItemIdGetLength(itemid);
1518                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1519
1520                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1521                         {
1522                                 case HEAPTUPLE_LIVE:
1523                                         /* Tuple is good --- but let's do some validity checks */
1524                                         if (onerel->rd_rel->relhasoids &&
1525                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1526                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1527                                                          relname, blkno, offnum);
1528
1529                                         /*
1530                                          * The shrinkage phase of VACUUM FULL requires that all
1531                                          * live tuples have XMIN_COMMITTED set --- see comments in
1532                                          * repair_frag()'s walk-along-page loop.  Use of async
1533                                          * commit may prevent HeapTupleSatisfiesVacuum from
1534                                          * setting the bit for a recently committed tuple.      Rather
1535                                          * than trying to handle this corner case, we just give up
1536                                          * and don't shrink.
1537                                          */
1538                                         if (do_shrinking &&
1539                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1540                                         {
1541                                                 ereport(LOG,
1542                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1543                                                                                 relname, blkno, offnum,
1544                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1545                                                 do_shrinking = false;
1546                                         }
1547                                         break;
1548                                 case HEAPTUPLE_DEAD:
1549
1550                                         /*
1551                                          * Ordinarily, DEAD tuples would have been removed by
1552                                          * heap_page_prune(), but it's possible that the tuple
1553                                          * state changed since heap_page_prune() looked.  In
1554                                          * particular an INSERT_IN_PROGRESS tuple could have
1555                                          * changed to DEAD if the inserter aborted.  So this
1556                                          * cannot be considered an error condition, though it does
1557                                          * suggest that someone released a lock early.
1558                                          *
1559                                          * If the tuple is HOT-updated then it must only be
1560                                          * removed by a prune operation; so we keep it as if it
1561                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1562                                          * worth trying to make the shrinking code smart enough to
1563                                          * handle this?  It's an unusual corner case.)
1564                                          *
1565                                          * DEAD heap-only tuples can safely be removed if they
1566                                          * aren't themselves HOT-updated, although this is a bit
1567                                          * inefficient since we'll uselessly try to remove index
1568                                          * entries for them.
1569                                          */
1570                                         if (HeapTupleIsHotUpdated(&tuple))
1571                                         {
1572                                                 nkeep += 1;
1573                                                 if (do_shrinking)
1574                                                         ereport(LOG,
1575                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1576                                                                                         relname, blkno, offnum)));
1577                                                 do_shrinking = false;
1578                                         }
1579                                         else
1580                                         {
1581                                                 tupgone = true; /* we can delete the tuple */
1582
1583                                                 /*
1584                                                  * We need not require XMIN_COMMITTED or
1585                                                  * XMAX_COMMITTED to be set, since we will remove the
1586                                                  * tuple without any further examination of its hint
1587                                                  * bits.
1588                                                  */
1589                                         }
1590                                         break;
1591                                 case HEAPTUPLE_RECENTLY_DEAD:
1592
1593                                         /*
1594                                          * If tuple is recently deleted then we must not remove it
1595                                          * from relation.
1596                                          */
1597                                         nkeep += 1;
1598
1599                                         /*
1600                                          * As with the LIVE case, shrinkage requires
1601                                          * XMIN_COMMITTED to be set.
1602                                          */
1603                                         if (do_shrinking &&
1604                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1605                                         {
1606                                                 ereport(LOG,
1607                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1608                                                                                 relname, blkno, offnum,
1609                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1610                                                 do_shrinking = false;
1611                                         }
1612
1613                                         /*
1614                                          * If we do shrinking and this tuple is updated one then
1615                                          * remember it to construct updated tuple dependencies.
1616                                          */
1617                                         if (do_shrinking &&
1618                                                 !(ItemPointerEquals(&(tuple.t_self),
1619                                                                                         &(tuple.t_data->t_ctid))))
1620                                         {
1621                                                 if (free_vtlinks == 0)
1622                                                 {
1623                                                         free_vtlinks = 1000;
1624                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1625                                                                                            (free_vtlinks + num_vtlinks) *
1626                                                                                                          sizeof(VTupleLinkData));
1627                                                 }
1628                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1629                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1630                                                 free_vtlinks--;
1631                                                 num_vtlinks++;
1632                                         }
1633                                         break;
1634                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1635
1636                                         /*
1637                                          * This should not happen, since we hold exclusive lock on
1638                                          * the relation; shouldn't we raise an error?  (Actually,
1639                                          * it can happen in system catalogs, since we tend to
1640                                          * release write lock before commit there.)  As above, we
1641                                          * can't apply repair_frag() if the tuple state is
1642                                          * uncertain.
1643                                          */
1644                                         if (do_shrinking)
1645                                                 ereport(LOG,
1646                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1647                                                                                 relname, blkno, offnum,
1648                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1649                                         do_shrinking = false;
1650                                         break;
1651                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1652
1653                                         /*
1654                                          * This should not happen, since we hold exclusive lock on
1655                                          * the relation; shouldn't we raise an error?  (Actually,
1656                                          * it can happen in system catalogs, since we tend to
1657                                          * release write lock before commit there.)  As above, we
1658                                          * can't apply repair_frag() if the tuple state is
1659                                          * uncertain.
1660                                          */
1661                                         if (do_shrinking)
1662                                                 ereport(LOG,
1663                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1664                                                                                 relname, blkno, offnum,
1665                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1666                                         do_shrinking = false;
1667                                         break;
1668                                 default:
1669                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1670                                         break;
1671                         }
1672
1673                         if (tupgone)
1674                         {
1675                                 ItemId          lpp;
1676
1677                                 /*
1678                                  * Here we are building a temporary copy of the page with dead
1679                                  * tuples removed.      Below we will apply
1680                                  * PageRepairFragmentation to the copy, so that we can
1681                                  * determine how much space will be available after removal of
1682                                  * dead tuples.  But note we are NOT changing the real page
1683                                  * yet...
1684                                  */
1685                                 if (tempPage == NULL)
1686                                 {
1687                                         Size            pageSize;
1688
1689                                         pageSize = PageGetPageSize(page);
1690                                         tempPage = (Page) palloc(pageSize);
1691                                         memcpy(tempPage, page, pageSize);
1692                                 }
1693
1694                                 /* mark it unused on the temp page */
1695                                 lpp = PageGetItemId(tempPage, offnum);
1696                                 ItemIdSetUnused(lpp);
1697
1698                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1699                                 tups_vacuumed += 1;
1700                         }
1701                         else
1702                         {
1703                                 num_tuples += 1;
1704                                 if (!HeapTupleIsHeapOnly(&tuple))
1705                                         num_indexed_tuples += 1;
1706                                 notup = false;
1707                                 if (tuple.t_len < min_tlen)
1708                                         min_tlen = tuple.t_len;
1709                                 if (tuple.t_len > max_tlen)
1710                                         max_tlen = tuple.t_len;
1711
1712                                 /*
1713                                  * Each non-removable tuple must be checked to see if it needs
1714                                  * freezing.
1715                                  */
1716                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1717                                                                           InvalidBuffer))
1718                                         frozen[nfrozen++] = offnum;
1719                         }
1720                 }                                               /* scan along page */
1721
1722                 if (tempPage != NULL)
1723                 {
1724                         /* Some tuples are removable; figure free space after removal */
1725                         PageRepairFragmentation(tempPage);
1726                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1727                         pfree(tempPage);
1728                         do_reap = true;
1729                 }
1730                 else
1731                 {
1732                         /* Just use current available space */
1733                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1734                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1735                         do_reap = (vacpage->offsets_free > 0);
1736                 }
1737
1738                 free_space += vacpage->free;
1739
1740                 /*
1741                  * Add the page to vacuum_pages if it requires reaping, and add it to
1742                  * fraged_pages if it has a useful amount of free space.  "Useful"
1743                  * means enough for a minimal-sized tuple.  But we don't know that
1744                  * accurately near the start of the relation, so add pages
1745                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1746                  * forcibly add pages with no live tuples, to avoid confusing the
1747                  * empty_end_pages logic.  (In the presence of unreasonably small
1748                  * fillfactor, it seems possible that such pages might not pass
1749                  * the free-space test, but they had better be in the list anyway.)
1750                  */
1751                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1752                                    notup);
1753
1754                 if (do_reap || do_frag)
1755                 {
1756                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1757
1758                         if (do_reap)
1759                                 vpage_insert(vacuum_pages, vacpagecopy);
1760                         if (do_frag)
1761                                 vpage_insert(fraged_pages, vacpagecopy);
1762                 }
1763
1764                 /*
1765                  * Include the page in empty_end_pages if it will be empty after
1766                  * vacuuming; this is to keep us from using it as a move destination.
1767                  * Note that such pages are guaranteed to be in fraged_pages.
1768                  */
1769                 if (notup)
1770                 {
1771                         empty_pages++;
1772                         empty_end_pages++;
1773                 }
1774                 else
1775                         empty_end_pages = 0;
1776
1777                 /*
1778                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1779                  * record recording the changes.  We must log the changes to be
1780                  * crash-safe against future truncation of CLOG.
1781                  */
1782                 if (nfrozen > 0)
1783                 {
1784                         MarkBufferDirty(buf);
1785                         /* no XLOG for temp tables, though */
1786                         if (!onerel->rd_istemp)
1787                         {
1788                                 XLogRecPtr      recptr;
1789
1790                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1791                                                                                  frozen, nfrozen);
1792                                 PageSetLSN(page, recptr);
1793                                 PageSetTLI(page, ThisTimeLineID);
1794                         }
1795                 }
1796
1797                 UnlockReleaseBuffer(buf);
1798         }
1799
1800         pfree(vacpage);
1801
1802         /* save stats in the rel list for use later */
1803         vacrelstats->rel_tuples = num_tuples;
1804         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1805         vacrelstats->rel_pages = nblocks;
1806         if (num_tuples == 0)
1807                 min_tlen = max_tlen = 0;
1808         vacrelstats->min_tlen = min_tlen;
1809         vacrelstats->max_tlen = max_tlen;
1810
1811         vacuum_pages->empty_end_pages = empty_end_pages;
1812         fraged_pages->empty_end_pages = empty_end_pages;
1813
1814         /*
1815          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1816          * remove any "empty" end-pages from the list, and compute usable free
1817          * space = free space in remaining pages.
1818          */
1819         if (do_shrinking)
1820         {
1821                 int                     i;
1822
1823                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1824                 fraged_pages->num_pages -= empty_end_pages;
1825                 usable_free_space = 0;
1826                 for (i = 0; i < fraged_pages->num_pages; i++)
1827                         usable_free_space += fraged_pages->pagedesc[i]->free;
1828         }
1829         else
1830         {
1831                 fraged_pages->num_pages = 0;
1832                 usable_free_space = 0;
1833         }
1834
1835         /* don't bother to save vtlinks if we will not call repair_frag */
1836         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1837         {
1838                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1839                           vac_cmp_vtlinks);
1840                 vacrelstats->vtlinks = vtlinks;
1841                 vacrelstats->num_vtlinks = num_vtlinks;
1842         }
1843         else
1844         {
1845                 vacrelstats->vtlinks = NULL;
1846                 vacrelstats->num_vtlinks = 0;
1847                 pfree(vtlinks);
1848         }
1849
1850         ereport(elevel,
1851                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1852                                         RelationGetRelationName(onerel),
1853                                         tups_vacuumed, num_tuples, nblocks),
1854                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1855                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1856                                            "There were %.0f unused item pointers.\n"
1857            "Total free space (including removable row versions) is %.0f bytes.\n"
1858                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1859          "%u pages containing %.0f free bytes are potential move destinations.\n"
1860                                            "%s.",
1861                                            nkeep,
1862                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1863                                            nunused,
1864                                            free_space,
1865                                            empty_pages, empty_end_pages,
1866                                            fraged_pages->num_pages, usable_free_space,
1867                                            pg_rusage_show(&ru0))));
1868 }
1869
1870
1871 /*
1872  *      repair_frag() -- try to repair relation's fragmentation
1873  *
1874  *              This routine marks dead tuples as unused and tries re-use dead space
1875  *              by moving tuples (and inserting indexes if needed). It constructs
1876  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1877  *              for them after committing (in hack-manner - without losing locks
1878  *              and freeing memory!) current transaction. It truncates relation
1879  *              if some end-blocks are gone away.
1880  */
1881 static void
1882 repair_frag(VRelStats *vacrelstats, Relation onerel,
1883                         VacPageList vacuum_pages, VacPageList fraged_pages,
1884                         int nindexes, Relation *Irel)
1885 {
1886         TransactionId myXID = GetCurrentTransactionId();
1887         Buffer          dst_buffer = InvalidBuffer;
1888         BlockNumber nblocks,
1889                                 blkno;
1890         BlockNumber last_move_dest_block = 0,
1891                                 last_vacuum_block;
1892         Page            dst_page = NULL;
1893         ExecContextData ec;
1894         VacPageListData Nvacpagelist;
1895         VacPage         dst_vacpage = NULL,
1896                                 last_vacuum_page,
1897                                 vacpage,
1898                            *curpage;
1899         int                     i;
1900         int                     num_moved = 0,
1901                                 num_fraged_pages,
1902                                 vacuumed_pages;
1903         int                     keep_tuples = 0;
1904         int                     keep_indexed_tuples = 0;
1905         PGRUsage        ru0;
1906
1907         pg_rusage_init(&ru0);
1908
1909         ExecContext_Init(&ec, onerel);
1910
1911         Nvacpagelist.num_pages = 0;
1912         num_fraged_pages = fraged_pages->num_pages;
1913         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1914         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1915         if (vacuumed_pages > 0)
1916         {
1917                 /* get last reaped page from vacuum_pages */
1918                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1919                 last_vacuum_block = last_vacuum_page->blkno;
1920         }
1921         else
1922         {
1923                 last_vacuum_page = NULL;
1924                 last_vacuum_block = InvalidBlockNumber;
1925         }
1926
1927         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1928         vacpage->offsets_used = vacpage->offsets_free = 0;
1929
1930         /*
1931          * Scan pages backwards from the last nonempty page, trying to move tuples
1932          * down to lower pages.  Quit when we reach a page that we have moved any
1933          * tuples onto, or the first page if we haven't moved anything, or when we
1934          * find a page we cannot completely empty (this last condition is handled
1935          * by "break" statements within the loop).
1936          *
1937          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1938          * in order by blkno.
1939          */
1940         nblocks = vacrelstats->rel_pages;
1941         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1942                  blkno > last_move_dest_block;
1943                  blkno--)
1944         {
1945                 Buffer          buf;
1946                 Page            page;
1947                 OffsetNumber offnum,
1948                                         maxoff;
1949                 bool            isempty,
1950                                         chain_tuple_moved;
1951
1952                 vacuum_delay_point();
1953
1954                 /*
1955                  * Forget fraged_pages pages at or after this one; they're no longer
1956                  * useful as move targets, since we only want to move down. Note that
1957                  * since we stop the outer loop at last_move_dest_block, pages removed
1958                  * here cannot have had anything moved onto them already.
1959                  *
1960                  * Also note that we don't change the stored fraged_pages list, only
1961                  * our local variable num_fraged_pages; so the forgotten pages are
1962                  * still available to be loaded into the free space map later.
1963                  */
1964                 while (num_fraged_pages > 0 &&
1965                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1966                 {
1967                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1968                         --num_fraged_pages;
1969                 }
1970
1971                 /*
1972                  * Process this page of relation.
1973                  */
1974                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1975                                                                  vac_strategy);
1976                 page = BufferGetPage(buf);
1977
1978                 vacpage->offsets_free = 0;
1979
1980                 isempty = PageIsEmpty(page);
1981
1982                 /* Is the page in the vacuum_pages list? */
1983                 if (blkno == last_vacuum_block)
1984                 {
1985                         if (last_vacuum_page->offsets_free > 0)
1986                         {
1987                                 /* there are dead tuples on this page - clean them */
1988                                 Assert(!isempty);
1989                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1990                                 vacuum_page(onerel, buf, last_vacuum_page);
1991                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1992                         }
1993                         else
1994                                 Assert(isempty);
1995                         --vacuumed_pages;
1996                         if (vacuumed_pages > 0)
1997                         {
1998                                 /* get prev reaped page from vacuum_pages */
1999                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
2000                                 last_vacuum_block = last_vacuum_page->blkno;
2001                         }
2002                         else
2003                         {
2004                                 last_vacuum_page = NULL;
2005                                 last_vacuum_block = InvalidBlockNumber;
2006                         }
2007                         if (isempty)
2008                         {
2009                                 ReleaseBuffer(buf);
2010                                 continue;
2011                         }
2012                 }
2013                 else
2014                         Assert(!isempty);
2015
2016                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
2017                                                                                  * this page, yet */
2018                 vacpage->blkno = blkno;
2019                 maxoff = PageGetMaxOffsetNumber(page);
2020                 for (offnum = FirstOffsetNumber;
2021                          offnum <= maxoff;
2022                          offnum = OffsetNumberNext(offnum))
2023                 {
2024                         Size            tuple_len;
2025                         HeapTupleData tuple;
2026                         ItemId          itemid = PageGetItemId(page, offnum);
2027
2028                         if (!ItemIdIsUsed(itemid))
2029                                 continue;
2030
2031                         if (ItemIdIsDead(itemid))
2032                         {
2033                                 /* just remember it for vacuum_page() */
2034                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2035                                 continue;
2036                         }
2037
2038                         /* Shouldn't have any redirected items now */
2039                         Assert(ItemIdIsNormal(itemid));
2040
2041                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2042                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
2043                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
2044
2045                         /* ---
2046                          * VACUUM FULL has an exclusive lock on the relation.  So
2047                          * normally no other transaction can have pending INSERTs or
2048                          * DELETEs in this relation.  A tuple is either:
2049                          *              (a) live (XMIN_COMMITTED)
2050                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
2051                          *                      is visible to all active transactions)
2052                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
2053                          *                      but at least one active transaction does not see the
2054                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
2055                          *              (d) moved by the currently running VACUUM
2056                          *              (e) inserted or deleted by a not yet committed transaction,
2057                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
2058                          * In case (e) we wouldn't be in repair_frag() at all, because
2059                          * scan_heap() detects those cases and shuts off shrinking.
2060                          * We can't see case (b) here either, because such tuples were
2061                          * already removed by vacuum_page().  Cases (a) and (c) are
2062                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
2063                          * possible if a whole tuple chain has been moved while
2064                          * processing this or a higher numbered block.
2065                          * ---
2066                          */
2067                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2068                         {
2069                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2070                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2071                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2072                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2073
2074                                 /*
2075                                  * MOVED_OFF by another VACUUM would have caused the
2076                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2077                                  */
2078                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2079                                         elog(ERROR, "invalid XVAC in tuple header");
2080
2081                                 /*
2082                                  * If this (chain) tuple is moved by me already then I have to
2083                                  * check is it in vacpage or not - i.e. is it moved while
2084                                  * cleaning this page or some previous one.
2085                                  */
2086
2087                                 /* Can't we Assert(keep_tuples > 0) here? */
2088                                 if (keep_tuples == 0)
2089                                         continue;
2090                                 if (chain_tuple_moved)
2091                                 {
2092                                         /* some chains were moved while cleaning this page */
2093                                         Assert(vacpage->offsets_free > 0);
2094                                         for (i = 0; i < vacpage->offsets_free; i++)
2095                                         {
2096                                                 if (vacpage->offsets[i] == offnum)
2097                                                         break;
2098                                         }
2099                                         if (i >= vacpage->offsets_free)         /* not found */
2100                                         {
2101                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2102
2103                                                 /*
2104                                                  * If this is not a heap-only tuple, there must be an
2105                                                  * index entry for this item which will be removed in
2106                                                  * the index cleanup. Decrement the
2107                                                  * keep_indexed_tuples count to remember this.
2108                                                  */
2109                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2110                                                         keep_indexed_tuples--;
2111                                                 keep_tuples--;
2112                                         }
2113                                 }
2114                                 else
2115                                 {
2116                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2117
2118                                         /*
2119                                          * If this is not a heap-only tuple, there must be an
2120                                          * index entry for this item which will be removed in the
2121                                          * index cleanup. Decrement the keep_indexed_tuples count
2122                                          * to remember this.
2123                                          */
2124                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2125                                                 keep_indexed_tuples--;
2126                                         keep_tuples--;
2127                                 }
2128                                 continue;
2129                         }
2130
2131                         /*
2132                          * If this tuple is in a chain of tuples created in updates by
2133                          * "recent" transactions then we have to move the whole chain of
2134                          * tuples to other places, so that we can write new t_ctid links
2135                          * that preserve the chain relationship.
2136                          *
2137                          * This test is complicated.  Read it as "if tuple is a recently
2138                          * created updated version, OR if it is an obsoleted version". (In
2139                          * the second half of the test, we needn't make any check on XMAX
2140                          * --- it must be recently obsoleted, else scan_heap would have
2141                          * deemed it removable.)
2142                          *
2143                          * NOTE: this test is not 100% accurate: it is possible for a
2144                          * tuple to be an updated one with recent xmin, and yet not match
2145                          * any new_tid entry in the vtlinks list.  Presumably there was
2146                          * once a parent tuple with xmax matching the xmin, but it's
2147                          * possible that that tuple has been removed --- for example, if
2148                          * it had xmin = xmax and wasn't itself an updated version, then
2149                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2150                          * xmin xact completes.
2151                          *
2152                          * To be on the safe side, we abandon the repair_frag process if
2153                          * we cannot find the parent tuple in vtlinks.  This may be overly
2154                          * conservative; AFAICS it would be safe to move the chain.
2155                          *
2156                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2157                          * using OldestXmin, which is a rather coarse test, it is quite
2158                          * possible to have an update chain in which a tuple we think is
2159                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2160                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2161                          * it seems too complicated to try to make VACUUM remove it. We
2162                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2163                          * separately movable chain, ignoring any intervening DEAD ones.
2164                          */
2165                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2166                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2167                                                                                 OldestXmin)) ||
2168                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2169                                                                                            HEAP_IS_LOCKED)) &&
2170                                  !(ItemPointerEquals(&(tuple.t_self),
2171                                                                          &(tuple.t_data->t_ctid)))))
2172                         {
2173                                 Buffer          Cbuf = buf;
2174                                 bool            freeCbuf = false;
2175                                 bool            chain_move_failed = false;
2176                                 bool            moved_target = false;
2177                                 ItemPointerData Ctid;
2178                                 HeapTupleData tp = tuple;
2179                                 Size            tlen = tuple_len;
2180                                 VTupleMove      vtmove;
2181                                 int                     num_vtmove;
2182                                 int                     free_vtmove;
2183                                 VacPage         to_vacpage = NULL;
2184                                 int                     to_item = 0;
2185                                 int                     ti;
2186
2187                                 if (dst_buffer != InvalidBuffer)
2188                                 {
2189                                         ReleaseBuffer(dst_buffer);
2190                                         dst_buffer = InvalidBuffer;
2191                                 }
2192
2193                                 /* Quick exit if we have no vtlinks to search in */
2194                                 if (vacrelstats->vtlinks == NULL)
2195                                 {
2196                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2197                                         break;          /* out of walk-along-page loop */
2198                                 }
2199
2200                                 /*
2201                                  * If this tuple is in the begin/middle of the chain then we
2202                                  * have to move to the end of chain.  As with any t_ctid
2203                                  * chase, we have to verify that each new tuple is really the
2204                                  * descendant of the tuple we came from; however, here we need
2205                                  * even more than the normal amount of paranoia. If t_ctid
2206                                  * links forward to a tuple determined to be DEAD, then
2207                                  * depending on where that tuple is, it might already have
2208                                  * been removed, and perhaps even replaced by a MOVED_IN
2209                                  * tuple.  We don't want to include any DEAD tuples in the
2210                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2211                                  */
2212                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2213                                                                                                   HEAP_IS_LOCKED)) &&
2214                                            !(ItemPointerEquals(&(tp.t_self),
2215                                                                                    &(tp.t_data->t_ctid))))
2216                                 {
2217                                         ItemPointerData nextTid;
2218                                         TransactionId priorXmax;
2219                                         Buffer          nextBuf;
2220                                         Page            nextPage;
2221                                         OffsetNumber nextOffnum;
2222                                         ItemId          nextItemid;
2223                                         HeapTupleHeader nextTdata;
2224                                         HTSV_Result nextTstatus;
2225
2226                                         nextTid = tp.t_data->t_ctid;
2227                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2228                                         /* assume block# is OK (see heap_fetch comments) */
2229                                         nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2230                                                                                  ItemPointerGetBlockNumber(&nextTid),
2231                                                                                  RBM_NORMAL, vac_strategy);
2232                                         nextPage = BufferGetPage(nextBuf);
2233                                         /* If bogus or unused slot, assume tp is end of chain */
2234                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2235                                         if (nextOffnum < FirstOffsetNumber ||
2236                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2237                                         {
2238                                                 ReleaseBuffer(nextBuf);
2239                                                 break;
2240                                         }
2241                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2242                                         if (!ItemIdIsNormal(nextItemid))
2243                                         {
2244                                                 ReleaseBuffer(nextBuf);
2245                                                 break;
2246                                         }
2247                                         /* if not matching XMIN, assume tp is end of chain */
2248                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2249                                                                                                                           nextItemid);
2250                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2251                                                                                          priorXmax))
2252                                         {
2253                                                 ReleaseBuffer(nextBuf);
2254                                                 break;
2255                                         }
2256
2257                                         /*
2258                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2259                                          * potentially update hint bits, so we'd better hold the
2260                                          * buffer content lock.
2261                                          */
2262                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2263                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2264                                                                                                                    OldestXmin,
2265                                                                                                                    nextBuf);
2266                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2267                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2268                                         {
2269                                                 UnlockReleaseBuffer(nextBuf);
2270                                                 break;
2271                                         }
2272                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2273                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2274                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2275                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2276                                         /* OK, switch our attention to the next tuple in chain */
2277                                         tp.t_data = nextTdata;
2278                                         tp.t_self = nextTid;
2279                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2280                                         if (freeCbuf)
2281                                                 ReleaseBuffer(Cbuf);
2282                                         Cbuf = nextBuf;
2283                                         freeCbuf = true;
2284                                 }
2285
2286                                 /* Set up workspace for planning the chain move */
2287                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2288                                 num_vtmove = 0;
2289                                 free_vtmove = 100;
2290
2291                                 /*
2292                                  * Now, walk backwards up the chain (towards older tuples) and
2293                                  * check if all items in chain can be moved.  We record all
2294                                  * the moves that need to be made in the vtmove array.
2295                                  */
2296                                 for (;;)
2297                                 {
2298                                         Buffer          Pbuf;
2299                                         Page            Ppage;
2300                                         ItemId          Pitemid;
2301                                         HeapTupleHeader PTdata;
2302                                         VTupleLinkData vtld,
2303                                                            *vtlp;
2304
2305                                         /* Identify a target page to move this tuple to */
2306                                         if (to_vacpage == NULL ||
2307                                                 !enough_space(to_vacpage, tlen))
2308                                         {
2309                                                 for (i = 0; i < num_fraged_pages; i++)
2310                                                 {
2311                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2312                                                                 break;
2313                                                 }
2314
2315                                                 if (i == num_fraged_pages)
2316                                                 {
2317                                                         /* can't move item anywhere */
2318                                                         chain_move_failed = true;
2319                                                         break;          /* out of check-all-items loop */
2320                                                 }
2321                                                 to_item = i;
2322                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2323                                         }
2324                                         to_vacpage->free -= MAXALIGN(tlen);
2325                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2326                                                 to_vacpage->free -= sizeof(ItemIdData);
2327                                         (to_vacpage->offsets_used)++;
2328
2329                                         /* Add an entry to vtmove list */
2330                                         if (free_vtmove == 0)
2331                                         {
2332                                                 free_vtmove = 1000;
2333                                                 vtmove = (VTupleMove)
2334                                                         repalloc(vtmove,
2335                                                                          (free_vtmove + num_vtmove) *
2336                                                                          sizeof(VTupleMoveData));
2337                                         }
2338                                         vtmove[num_vtmove].tid = tp.t_self;
2339                                         vtmove[num_vtmove].vacpage = to_vacpage;
2340                                         if (to_vacpage->offsets_used == 1)
2341                                                 vtmove[num_vtmove].cleanVpd = true;
2342                                         else
2343                                                 vtmove[num_vtmove].cleanVpd = false;
2344                                         free_vtmove--;
2345                                         num_vtmove++;
2346
2347                                         /* Remember if we reached the original target tuple */
2348                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2349                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2350                                                 moved_target = true;
2351
2352                                         /* Done if at beginning of chain */
2353                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2354                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2355                                                                                    OldestXmin))
2356                                                 break;  /* out of check-all-items loop */
2357
2358                                         /* Move to tuple with prior row version */
2359                                         vtld.new_tid = tp.t_self;
2360                                         vtlp = (VTupleLink)
2361                                                 vac_bsearch((void *) &vtld,
2362                                                                         (void *) (vacrelstats->vtlinks),
2363                                                                         vacrelstats->num_vtlinks,
2364                                                                         sizeof(VTupleLinkData),
2365                                                                         vac_cmp_vtlinks);
2366                                         if (vtlp == NULL)
2367                                         {
2368                                                 /* see discussion above */
2369                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2370                                                 chain_move_failed = true;
2371                                                 break;  /* out of check-all-items loop */
2372                                         }
2373                                         tp.t_self = vtlp->this_tid;
2374                                         Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2375                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2376                                                                          RBM_NORMAL, vac_strategy);
2377                                         Ppage = BufferGetPage(Pbuf);
2378                                         Pitemid = PageGetItemId(Ppage,
2379                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2380                                         /* this can't happen since we saw tuple earlier: */
2381                                         if (!ItemIdIsNormal(Pitemid))
2382                                                 elog(ERROR, "parent itemid marked as unused");
2383                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2384
2385                                         /* ctid should not have changed since we saved it */
2386                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2387                                                                                          &(PTdata->t_ctid)));
2388
2389                                         /*
2390                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2391                                          * (child item is removed)... Due to the fact that at the
2392                                          * moment we don't remove unuseful part of update-chain,
2393                                          * it's possible to get non-matching parent row here. Like
2394                                          * as in the case which caused this problem, we stop
2395                                          * shrinking here. I could try to find real parent row but
2396                                          * want not to do it because of real solution will be
2397                                          * implemented anyway, later, and we are too close to 6.5
2398                                          * release. - vadim 06/11/99
2399                                          */
2400                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2401                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2402                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2403                                         {
2404                                                 ReleaseBuffer(Pbuf);
2405                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2406                                                 chain_move_failed = true;
2407                                                 break;  /* out of check-all-items loop */
2408                                         }
2409                                         tp.t_data = PTdata;
2410                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2411                                         if (freeCbuf)
2412                                                 ReleaseBuffer(Cbuf);
2413                                         Cbuf = Pbuf;
2414                                         freeCbuf = true;
2415                                 }                               /* end of check-all-items loop */
2416
2417                                 if (freeCbuf)
2418                                         ReleaseBuffer(Cbuf);
2419                                 freeCbuf = false;
2420
2421                                 /* Double-check that we will move the current target tuple */
2422                                 if (!moved_target && !chain_move_failed)
2423                                 {
2424                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2425                                         chain_move_failed = true;
2426                                 }
2427
2428                                 if (chain_move_failed)
2429                                 {
2430                                         /*
2431                                          * Undo changes to offsets_used state.  We don't bother
2432                                          * cleaning up the amount-free state, since we're not
2433                                          * going to do any further tuple motion.
2434                                          */
2435                                         for (i = 0; i < num_vtmove; i++)
2436                                         {
2437                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2438                                                 (vtmove[i].vacpage->offsets_used)--;
2439                                         }
2440                                         pfree(vtmove);
2441                                         break;          /* out of walk-along-page loop */
2442                                 }
2443
2444                                 /*
2445                                  * Okay, move the whole tuple chain in reverse order.
2446                                  *
2447                                  * Ctid tracks the new location of the previously-moved tuple.
2448                                  */
2449                                 ItemPointerSetInvalid(&Ctid);
2450                                 for (ti = 0; ti < num_vtmove; ti++)
2451                                 {
2452                                         VacPage         destvacpage = vtmove[ti].vacpage;
2453                                         Page            Cpage;
2454                                         ItemId          Citemid;
2455
2456                                         /* Get page to move from */
2457                                         tuple.t_self = vtmove[ti].tid;
2458                                         Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2459                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2460                                                                   RBM_NORMAL, vac_strategy);
2461
2462                                         /* Get page to move to */
2463                                         dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2464                                                                                                         destvacpage->blkno,
2465                                                                                                         RBM_NORMAL, vac_strategy);
2466
2467                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2468                                         if (dst_buffer != Cbuf)
2469                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2470
2471                                         dst_page = BufferGetPage(dst_buffer);
2472                                         Cpage = BufferGetPage(Cbuf);
2473
2474                                         Citemid = PageGetItemId(Cpage,
2475                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2476                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2477                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2478
2479                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2480                                                                          dst_buffer, dst_page, destvacpage,
2481                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2482
2483                                         /*
2484                                          * If the tuple we are moving is a heap-only tuple, this
2485                                          * move will generate an additional index entry, so
2486                                          * increment the rel_indexed_tuples count.
2487                                          */
2488                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2489                                                 vacrelstats->rel_indexed_tuples++;
2490
2491                                         num_moved++;
2492                                         if (destvacpage->blkno > last_move_dest_block)
2493                                                 last_move_dest_block = destvacpage->blkno;
2494
2495                                         /*
2496                                          * Remember that we moved tuple from the current page
2497                                          * (corresponding index tuple will be cleaned).
2498                                          */
2499                                         if (Cbuf == buf)
2500                                                 vacpage->offsets[vacpage->offsets_free++] =
2501                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2502                                         else
2503                                         {
2504                                                 /*
2505                                                  * When we move tuple chains, we may need to move
2506                                                  * tuples from a block that we haven't yet scanned in
2507                                                  * the outer walk-along-the-relation loop. Note that
2508                                                  * we can't be moving a tuple from a block that we
2509                                                  * have already scanned because if such a tuple
2510                                                  * exists, then we must have moved the chain along
2511                                                  * with that tuple when we scanned that block. IOW the
2512                                                  * test of (Cbuf != buf) guarantees that the tuple we
2513                                                  * are looking at right now is in a block which is yet
2514                                                  * to be scanned.
2515                                                  *
2516                                                  * We maintain two counters to correctly count the
2517                                                  * moved-off tuples from blocks that are not yet
2518                                                  * scanned (keep_tuples) and how many of them have
2519                                                  * index pointers (keep_indexed_tuples).  The main
2520                                                  * reason to track the latter is to help verify that
2521                                                  * indexes have the expected number of entries when
2522                                                  * all the dust settles.
2523                                                  */
2524                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2525                                                         keep_indexed_tuples++;
2526                                                 keep_tuples++;
2527                                         }
2528
2529                                         ReleaseBuffer(dst_buffer);
2530                                         ReleaseBuffer(Cbuf);
2531                                 }                               /* end of move-the-tuple-chain loop */
2532
2533                                 dst_buffer = InvalidBuffer;
2534                                 pfree(vtmove);
2535                                 chain_tuple_moved = true;
2536
2537                                 /* advance to next tuple in walk-along-page loop */
2538                                 continue;
2539                         }                                       /* end of is-tuple-in-chain test */
2540
2541                         /* try to find new page for this tuple */
2542                         if (dst_buffer == InvalidBuffer ||
2543                                 !enough_space(dst_vacpage, tuple_len))
2544                         {
2545                                 if (dst_buffer != InvalidBuffer)
2546                                 {
2547                                         ReleaseBuffer(dst_buffer);
2548                                         dst_buffer = InvalidBuffer;
2549                                 }
2550                                 for (i = 0; i < num_fraged_pages; i++)
2551                                 {
2552                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2553                                                 break;
2554                                 }
2555                                 if (i == num_fraged_pages)
2556                                         break;          /* can't move item anywhere */
2557                                 dst_vacpage = fraged_pages->pagedesc[i];
2558                                 dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2559                                                                                                 dst_vacpage->blkno,
2560                                                                                                 RBM_NORMAL, vac_strategy);
2561                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2562                                 dst_page = BufferGetPage(dst_buffer);
2563                                 /* if this page was not used before - clean it */
2564                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2565                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2566                         }
2567                         else
2568                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2569
2570                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2571
2572                         move_plain_tuple(onerel, buf, page, &tuple,
2573                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2574
2575                         /*
2576                          * If the tuple we are moving is a heap-only tuple, this move will
2577                          * generate an additional index entry, so increment the
2578                          * rel_indexed_tuples count.
2579                          */
2580                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2581                                 vacrelstats->rel_indexed_tuples++;
2582
2583                         num_moved++;
2584                         if (dst_vacpage->blkno > last_move_dest_block)
2585                                 last_move_dest_block = dst_vacpage->blkno;
2586
2587                         /*
2588                          * Remember that we moved tuple from the current page
2589                          * (corresponding index tuple will be cleaned).
2590                          */
2591                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2592                 }                                               /* walk along page */
2593
2594                 /*
2595                  * If we broke out of the walk-along-page loop early (ie, still have
2596                  * offnum <= maxoff), then we failed to move some tuple off this page.
2597                  * No point in shrinking any more, so clean up and exit the per-page
2598                  * loop.
2599                  */
2600                 if (offnum < maxoff && keep_tuples > 0)
2601                 {
2602                         OffsetNumber off;
2603
2604                         /*
2605                          * Fix vacpage state for any unvisited tuples remaining on page
2606                          */
2607                         for (off = OffsetNumberNext(offnum);
2608                                  off <= maxoff;
2609                                  off = OffsetNumberNext(off))
2610                         {
2611                                 ItemId          itemid = PageGetItemId(page, off);
2612                                 HeapTupleHeader htup;
2613
2614                                 if (!ItemIdIsUsed(itemid))
2615                                         continue;
2616                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2617                                 Assert(ItemIdIsNormal(itemid));
2618
2619                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2620                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2621                                         continue;
2622
2623                                 /*
2624                                  * See comments in the walk-along-page loop above about why
2625                                  * only MOVED_OFF tuples should be found here.
2626                                  */
2627                                 if (htup->t_infomask & HEAP_MOVED_IN)
2628                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2629                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2630                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2631                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2632                                         elog(ERROR, "invalid XVAC in tuple header");
2633
2634                                 if (chain_tuple_moved)
2635                                 {
2636                                         /* some chains were moved while cleaning this page */
2637                                         Assert(vacpage->offsets_free > 0);
2638                                         for (i = 0; i < vacpage->offsets_free; i++)
2639                                         {
2640                                                 if (vacpage->offsets[i] == off)
2641                                                         break;
2642                                         }
2643                                         if (i >= vacpage->offsets_free)         /* not found */
2644                                         {
2645                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2646                                                 Assert(keep_tuples > 0);
2647
2648                                                 /*
2649                                                  * If this is not a heap-only tuple, there must be an
2650                                                  * index entry for this item which will be removed in
2651                                                  * the index cleanup. Decrement the
2652                                                  * keep_indexed_tuples count to remember this.
2653                                                  */
2654                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2655                                                         keep_indexed_tuples--;
2656                                                 keep_tuples--;
2657                                         }
2658                                 }
2659                                 else
2660                                 {
2661                                         vacpage->offsets[vacpage->offsets_free++] = off;
2662                                         Assert(keep_tuples > 0);
2663                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2664                                                 keep_indexed_tuples--;
2665                                         keep_tuples--;
2666                                 }
2667                         }
2668                 }
2669
2670                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2671                 {
2672                         if (chain_tuple_moved)          /* else - they are ordered */
2673                         {
2674                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2675                                           sizeof(OffsetNumber), vac_cmp_offno);
2676                         }
2677                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2678                 }
2679
2680                 ReleaseBuffer(buf);
2681
2682                 if (offnum <= maxoff)
2683                         break;                          /* had to quit early, see above note */
2684
2685         }                                                       /* walk along relation */
2686
2687         blkno++;                                        /* new number of blocks */
2688
2689         if (dst_buffer != InvalidBuffer)
2690         {
2691                 Assert(num_moved > 0);
2692                 ReleaseBuffer(dst_buffer);
2693         }
2694
2695         if (num_moved > 0)
2696         {
2697                 /*
2698                  * We have to commit our tuple movings before we truncate the
2699                  * relation.  Ideally we should do Commit/StartTransactionCommand
2700                  * here, relying on the session-level table lock to protect our
2701                  * exclusive access to the relation.  However, that would require a
2702                  * lot of extra code to close and re-open the relation, indexes, etc.
2703                  * For now, a quick hack: record status of current transaction as
2704                  * committed, and continue.  We force the commit to be synchronous so
2705                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2706                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2707                  * continues to be shown as running in the ProcArray.
2708                  *
2709                  * XXX This desperately needs to be revisited.  Any failure after this
2710                  * point will result in a PANIC "cannot abort transaction nnn, it was
2711                  * already committed"!
2712                  */
2713                 ForceSyncCommit();
2714                 (void) RecordTransactionCommit();
2715         }
2716
2717         /*
2718          * We are not going to move any more tuples across pages, but we still
2719          * need to apply vacuum_page to compact free space in the remaining pages
2720          * in vacuum_pages list.  Note that some of these pages may also be in the
2721          * fraged_pages list, and may have had tuples moved onto them; if so, we
2722          * already did vacuum_page and needn't do it again.
2723          */
2724         for (i = 0, curpage = vacuum_pages->pagedesc;
2725                  i < vacuumed_pages;
2726                  i++, curpage++)
2727         {
2728                 vacuum_delay_point();
2729
2730                 Assert((*curpage)->blkno < blkno);
2731                 if ((*curpage)->offsets_used == 0)
2732                 {
2733                         Buffer          buf;
2734                         Page            page;
2735
2736                         /* this page was not used as a move target, so must clean it */
2737                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
2738                                                                          RBM_NORMAL, vac_strategy);
2739                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2740                         page = BufferGetPage(buf);
2741                         if (!PageIsEmpty(page))
2742                                 vacuum_page(onerel, buf, *curpage);
2743                         UnlockReleaseBuffer(buf);
2744                 }
2745         }
2746
2747         /*
2748          * Now scan all the pages that we moved tuples onto and update tuple
2749          * status bits.  This is not really necessary, but will save time for
2750          * future transactions examining these tuples.
2751          */
2752         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2753                                          last_move_dest_block, num_moved);
2754
2755         /*
2756          * It'd be cleaner to make this report at the bottom of this routine, but
2757          * then the rusage would double-count the second pass of index vacuuming.
2758          * So do it here and ignore the relatively small amount of processing that
2759          * occurs below.
2760          */
2761         ereport(elevel,
2762                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2763                                         RelationGetRelationName(onerel),
2764                                         num_moved, nblocks, blkno),
2765                          errdetail("%s.",
2766                                            pg_rusage_show(&ru0))));
2767
2768         /*
2769          * Reflect the motion of system tuples to catalog cache here.
2770          */
2771         CommandCounterIncrement();
2772
2773         if (Nvacpagelist.num_pages > 0)
2774         {
2775                 /* vacuum indexes again if needed */
2776                 if (Irel != NULL)
2777                 {
2778                         VacPage    *vpleft,
2779                                            *vpright,
2780                                                 vpsave;
2781
2782                         /* re-sort Nvacpagelist.pagedesc */
2783                         for (vpleft = Nvacpagelist.pagedesc,
2784                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2785                                  vpleft < vpright; vpleft++, vpright--)
2786                         {
2787                                 vpsave = *vpleft;
2788                                 *vpleft = *vpright;
2789                                 *vpright = vpsave;
2790                         }
2791
2792                         /*
2793                          * keep_tuples is the number of tuples that have been moved off a
2794                          * page during chain moves but not been scanned over subsequently.
2795                          * The tuple ids of these tuples are not recorded as free offsets
2796                          * for any VacPage, so they will not be cleared from the indexes.
2797                          * keep_indexed_tuples is the portion of these that are expected
2798                          * to have index entries.
2799                          */
2800                         Assert(keep_tuples >= 0);
2801                         for (i = 0; i < nindexes; i++)
2802                                 vacuum_index(&Nvacpagelist, Irel[i],
2803                                                          vacrelstats->rel_indexed_tuples,
2804                                                          keep_indexed_tuples);
2805                 }
2806
2807                 /*
2808                  * Clean moved-off tuples from last page in Nvacpagelist list.
2809                  *
2810                  * We need only do this in this one page, because higher-numbered
2811                  * pages are going to be truncated from the relation entirely. But see
2812                  * comments for update_hint_bits().
2813                  */
2814                 if (vacpage->blkno == (blkno - 1) &&
2815                         vacpage->offsets_free > 0)
2816                 {
2817                         Buffer          buf;
2818                         Page            page;
2819                         OffsetNumber unused[MaxOffsetNumber];
2820                         OffsetNumber offnum,
2821                                                 maxoff;
2822                         int                     uncnt = 0;
2823                         int                     num_tuples = 0;
2824
2825                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
2826                                                                          RBM_NORMAL, vac_strategy);
2827                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2828                         page = BufferGetPage(buf);
2829                         maxoff = PageGetMaxOffsetNumber(page);
2830                         for (offnum = FirstOffsetNumber;
2831                                  offnum <= maxoff;
2832                                  offnum = OffsetNumberNext(offnum))
2833                         {
2834                                 ItemId          itemid = PageGetItemId(page, offnum);
2835                                 HeapTupleHeader htup;
2836
2837                                 if (!ItemIdIsUsed(itemid))
2838                                         continue;
2839                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2840                                 Assert(ItemIdIsNormal(itemid));
2841
2842                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2843                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2844                                         continue;
2845
2846                                 /*
2847                                  * See comments in the walk-along-page loop above about why
2848                                  * only MOVED_OFF tuples should be found here.
2849                                  */
2850                                 if (htup->t_infomask & HEAP_MOVED_IN)
2851                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2852                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2853                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2854                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2855                                         elog(ERROR, "invalid XVAC in tuple header");
2856
2857                                 ItemIdSetUnused(itemid);
2858                                 num_tuples++;
2859
2860                                 unused[uncnt++] = offnum;
2861                         }
2862                         Assert(vacpage->offsets_free == num_tuples);
2863
2864                         START_CRIT_SECTION();
2865
2866                         PageRepairFragmentation(page);
2867
2868                         MarkBufferDirty(buf);
2869
2870                         /* XLOG stuff */
2871                         if (!onerel->rd_istemp)
2872                         {
2873                                 XLogRecPtr      recptr;
2874
2875                                 recptr = log_heap_clean(onerel, buf,
2876                                                                                 NULL, 0, NULL, 0,
2877                                                                                 unused, uncnt,
2878                                                                                 false);
2879                                 PageSetLSN(page, recptr);
2880                                 PageSetTLI(page, ThisTimeLineID);
2881                         }
2882
2883                         END_CRIT_SECTION();
2884
2885                         UnlockReleaseBuffer(buf);
2886                 }
2887
2888                 /* now - free new list of reaped pages */
2889                 curpage = Nvacpagelist.pagedesc;
2890                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2891                         pfree(*curpage);
2892                 pfree(Nvacpagelist.pagedesc);
2893         }
2894
2895         /* Truncate relation, if needed */
2896         if (blkno < nblocks)
2897         {
2898                 RelationTruncate(onerel, blkno);
2899
2900                 /* force relcache inval so all backends reset their rd_targblock */
2901                 CacheInvalidateRelcache(onerel);
2902
2903                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2904         }
2905
2906         /* clean up */
2907         pfree(vacpage);
2908         if (vacrelstats->vtlinks != NULL)
2909                 pfree(vacrelstats->vtlinks);
2910
2911         ExecContext_Finish(&ec);
2912 }
2913
2914 /*
2915  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2916  *
2917  *              This routine moves old_tup from old_page to dst_page.
2918  *              old_page and dst_page might be the same page.
2919  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2920  *              the single lock, if this is a intra-page-move) are released before
2921  *              exit.
2922  *
2923  *              Yes, a routine with ten parameters is ugly, but it's still better
2924  *              than having these 120 lines of code in repair_frag() which is
2925  *              already too long and almost unreadable.
2926  */
2927 static void
2928 move_chain_tuple(Relation rel,
2929                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2930                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2931                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2932 {
2933         TransactionId myXID = GetCurrentTransactionId();
2934         HeapTupleData newtup;
2935         OffsetNumber newoff;
2936         ItemId          newitemid;
2937         Size            tuple_len = old_tup->t_len;
2938
2939         /*
2940          * make a modifiable copy of the source tuple.
2941          */
2942         heap_copytuple_with_tuple(old_tup, &newtup);
2943
2944         /*
2945          * register invalidation of source tuple in catcaches.
2946          */
2947         CacheInvalidateHeapTuple(rel, old_tup);
2948
2949         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2950         START_CRIT_SECTION();
2951
2952         /*
2953          * mark the source tuple MOVED_OFF.
2954          */
2955         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2956                                                                          HEAP_XMIN_INVALID |
2957                                                                          HEAP_MOVED_IN);
2958         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2959         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2960
2961         /*
2962          * If this page was not used before - clean it.
2963          *
2964          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2965          * destination pages to be the same (since this tuple-chain member can be
2966          * on a page lower than the one we're currently processing in the outer
2967          * loop).  If that's true, then after vacuum_page() the source tuple will
2968          * have been moved, and tuple.t_data will be pointing at garbage.
2969          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2970          * step!!
2971          *
2972          * This path is different from the other callers of vacuum_page, because
2973          * we have already incremented the vacpage's offsets_used field to account
2974          * for the tuple(s) we expect to move onto the page. Therefore
2975          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2976          * good debugging check for all other callers, we work around it here
2977          * rather than remove it.
2978          */
2979         if (!PageIsEmpty(dst_page) && cleanVpd)
2980         {
2981                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2982
2983                 dst_vacpage->offsets_used = 0;
2984                 vacuum_page(rel, dst_buf, dst_vacpage);
2985                 dst_vacpage->offsets_used = sv_offsets_used;
2986         }
2987
2988         /*
2989          * Update the state of the copied tuple, and store it on the destination
2990          * page.  The copied tuple is never part of a HOT chain.
2991          */
2992         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2993                                                                    HEAP_XMIN_INVALID |
2994                                                                    HEAP_MOVED_OFF);
2995         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2996         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2997         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2998         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2999         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3000                                                  InvalidOffsetNumber, false, true);
3001         if (newoff == InvalidOffsetNumber)
3002                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
3003                          (unsigned long) tuple_len, dst_vacpage->blkno);
3004         newitemid = PageGetItemId(dst_page, newoff);
3005         /* drop temporary copy, and point to the version on the dest page */
3006         pfree(newtup.t_data);
3007         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3008
3009         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
3010
3011         /*
3012          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
3013          * to next tuple in chain otherwise.  (Since we move the chain in reverse
3014          * order, this is actually the previously processed tuple.)
3015          */
3016         if (!ItemPointerIsValid(ctid))
3017                 newtup.t_data->t_ctid = newtup.t_self;
3018         else
3019                 newtup.t_data->t_ctid = *ctid;
3020         *ctid = newtup.t_self;
3021
3022         MarkBufferDirty(dst_buf);
3023         if (dst_buf != old_buf)
3024                 MarkBufferDirty(old_buf);
3025
3026         /* XLOG stuff */
3027         if (!rel->rd_istemp)
3028         {
3029                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3030                                                                                    dst_buf, &newtup);
3031
3032                 if (old_buf != dst_buf)
3033                 {
3034                         PageSetLSN(old_page, recptr);
3035                         PageSetTLI(old_page, ThisTimeLineID);
3036                 }
3037                 PageSetLSN(dst_page, recptr);
3038                 PageSetTLI(dst_page, ThisTimeLineID);
3039         }
3040
3041         END_CRIT_SECTION();
3042
3043         PageClearAllVisible(BufferGetPage(old_buf));
3044         if (dst_buf != old_buf)
3045                 PageClearAllVisible(BufferGetPage(dst_buf));
3046
3047         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3048         if (dst_buf != old_buf)
3049                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3050
3051         /* Clear the bits in the visibility map. */
3052         visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3053         if (dst_buf != old_buf)
3054                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3055
3056         /* Create index entries for the moved tuple */
3057         if (ec->resultRelInfo->ri_NumIndices > 0)
3058         {
3059                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3060                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3061                 ResetPerTupleExprContext(ec->estate);
3062         }
3063 }
3064
3065 /*
3066  *      move_plain_tuple() -- move one tuple that is not part of a chain
3067  *
3068  *              This routine moves old_tup from old_page to dst_page.
3069  *              On entry old_buf and dst_buf are locked exclusively, both locks are
3070  *              released before exit.
3071  *
3072  *              Yes, a routine with eight parameters is ugly, but it's still better
3073  *              than having these 90 lines of code in repair_frag() which is already
3074  *              too long and almost unreadable.
3075  */
3076 static void
3077 move_plain_tuple(Relation rel,
3078                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3079                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3080                                  ExecContext ec)
3081 {
3082         TransactionId myXID = GetCurrentTransactionId();
3083         HeapTupleData newtup;
3084         OffsetNumber newoff;
3085         ItemId          newitemid;
3086         Size            tuple_len = old_tup->t_len;
3087
3088         /* copy tuple */
3089         heap_copytuple_with_tuple(old_tup, &newtup);
3090
3091         /*
3092          * register invalidation of source tuple in catcaches.
3093          *
3094          * (Note: we do not need to register the copied tuple, because we are not
3095          * changing the tuple contents and so there cannot be any need to flush
3096          * negative catcache entries.)
3097          */
3098         CacheInvalidateHeapTuple(rel, old_tup);
3099
3100         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3101         START_CRIT_SECTION();
3102
3103         /*
3104          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3105          */
3106         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3107                                                                    HEAP_XMIN_INVALID |
3108                                                                    HEAP_MOVED_OFF);
3109         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3110         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3111         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3112         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3113
3114         /* add tuple to the page */
3115         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3116                                                  InvalidOffsetNumber, false, true);
3117         if (newoff == InvalidOffsetNumber)
3118                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3119                          (unsigned long) tuple_len,
3120                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3121                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3122         newitemid = PageGetItemId(dst_page, newoff);
3123         pfree(newtup.t_data);
3124         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3125         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3126         newtup.t_self = newtup.t_data->t_ctid;
3127
3128         /*
3129          * Mark old tuple as MOVED_OFF by me.
3130          */
3131         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3132                                                                          HEAP_XMIN_INVALID |
3133                                                                          HEAP_MOVED_IN);
3134         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3135         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3136
3137         MarkBufferDirty(dst_buf);
3138         MarkBufferDirty(old_buf);
3139
3140         /* XLOG stuff */
3141         if (!rel->rd_istemp)
3142         {
3143                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3144                                                                                    dst_buf, &newtup);
3145
3146                 PageSetLSN(old_page, recptr);
3147                 PageSetTLI(old_page, ThisTimeLineID);
3148                 PageSetLSN(dst_page, recptr);
3149                 PageSetTLI(dst_page, ThisTimeLineID);
3150         }
3151
3152         END_CRIT_SECTION();
3153
3154         /*
3155          * Clear the visible-to-all hint bits on the page, and bits in the
3156          * visibility map. Normally we'd release the locks on the heap pages
3157          * before updating the visibility map, but doesn't really matter here
3158          * because we're holding an AccessExclusiveLock on the relation anyway.
3159          */
3160         if (PageIsAllVisible(dst_page))
3161         {
3162                 PageClearAllVisible(dst_page);
3163                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3164         }
3165         if (PageIsAllVisible(old_page))
3166         {
3167                 PageClearAllVisible(old_page);
3168                 visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3169         }
3170
3171         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3172         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3173         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3174
3175         dst_vacpage->offsets_used++;
3176
3177         /* insert index' tuples if needed */
3178         if (ec->resultRelInfo->ri_NumIndices > 0)
3179         {
3180                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3181                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3182                 ResetPerTupleExprContext(ec->estate);
3183         }
3184 }
3185
3186 /*
3187  *      update_hint_bits() -- update hint bits in destination pages
3188  *
3189  * Scan all the pages that we moved tuples onto and update tuple status bits.
3190  * This is not really necessary, but it will save time for future transactions
3191  * examining these tuples.
3192  *
3193  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3194  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3195  *
3196  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3197  * pages that were move source pages but not move dest pages.  The bulk
3198  * of the move source pages will be physically truncated from the relation,
3199  * and the last page remaining in the rel will be fixed separately in
3200  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3201  * hint bits updated are tuples that are moved as part of a chain and were
3202  * on pages that were not either move destinations nor at the end of the rel.
3203  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3204  * to remember and revisit those pages too.
3205  *
3206  * One wonders whether it wouldn't be better to skip this work entirely,
3207  * and let the tuple status updates happen someplace that's not holding an
3208  * exclusive lock on the relation.
3209  */
3210 static void
3211 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3212                                  BlockNumber last_move_dest_block, int num_moved)
3213 {
3214         TransactionId myXID = GetCurrentTransactionId();
3215         int                     checked_moved = 0;
3216         int                     i;
3217         VacPage    *curpage;
3218
3219         for (i = 0, curpage = fraged_pages->pagedesc;
3220                  i < num_fraged_pages;
3221                  i++, curpage++)
3222         {
3223                 Buffer          buf;
3224                 Page            page;
3225                 OffsetNumber max_offset;
3226                 OffsetNumber off;
3227                 int                     num_tuples = 0;
3228
3229                 vacuum_delay_point();
3230
3231                 if ((*curpage)->blkno > last_move_dest_block)
3232                         break;                          /* no need to scan any further */
3233                 if ((*curpage)->offsets_used == 0)
3234                         continue;                       /* this page was never used as a move dest */
3235                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
3236                                                                  RBM_NORMAL, vac_strategy);
3237                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3238                 page = BufferGetPage(buf);
3239                 max_offset = PageGetMaxOffsetNumber(page);
3240                 for (off = FirstOffsetNumber;
3241                          off <= max_offset;
3242                          off = OffsetNumberNext(off))
3243                 {
3244                         ItemId          itemid = PageGetItemId(page, off);
3245                         HeapTupleHeader htup;
3246
3247                         if (!ItemIdIsUsed(itemid))
3248                                 continue;
3249                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3250                         Assert(ItemIdIsNormal(itemid));
3251
3252                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3253                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3254                                 continue;
3255
3256                         /*
3257                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3258                          */
3259                         if (!(htup->t_infomask & HEAP_MOVED))
3260                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3261                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3262                                 elog(ERROR, "invalid XVAC in tuple header");
3263
3264                         if (htup->t_infomask & HEAP_MOVED_IN)
3265                         {
3266                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3267                                 htup->t_infomask &= ~HEAP_MOVED;
3268                                 num_tuples++;
3269                         }
3270                         else
3271                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3272                 }
3273                 MarkBufferDirty(buf);
3274                 UnlockReleaseBuffer(buf);
3275                 Assert((*curpage)->offsets_used == num_tuples);
3276                 checked_moved += num_tuples;
3277         }
3278         Assert(num_moved == checked_moved);
3279 }
3280
3281 /*
3282  *      vacuum_heap() -- free dead tuples
3283  *
3284  *              This routine marks dead tuples as unused and truncates relation
3285  *              if there are "empty" end-blocks.
3286  */
3287 static void
3288 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3289 {
3290         Buffer          buf;
3291         VacPage    *vacpage;
3292         BlockNumber relblocks;
3293         int                     nblocks;
3294         int                     i;
3295
3296         nblocks = vacuum_pages->num_pages;
3297         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3298
3299         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3300         {
3301                 vacuum_delay_point();
3302
3303                 if ((*vacpage)->offsets_free > 0)
3304                 {
3305                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
3306                                                                          RBM_NORMAL, vac_strategy);
3307                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3308                         vacuum_page(onerel, buf, *vacpage);
3309                         UnlockReleaseBuffer(buf);
3310                 }
3311         }
3312
3313         /* Truncate relation if there are some empty end-pages */
3314         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3315         if (vacuum_pages->empty_end_pages > 0)
3316         {
3317                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3318                 ereport(elevel,
3319                                 (errmsg("\"%s\": truncated %u to %u pages",
3320                                                 RelationGetRelationName(onerel),
3321                                                 vacrelstats->rel_pages, relblocks)));
3322                 RelationTruncate(onerel, relblocks);
3323
3324                 /* force relcache inval so all backends reset their rd_targblock */
3325                 CacheInvalidateRelcache(onerel);
3326
3327                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3328         }
3329 }
3330
3331 /*
3332  *      vacuum_page() -- free dead tuples on a page
3333  *                                       and repair its fragmentation.
3334  *
3335  * Caller must hold pin and lock on buffer.
3336  */
3337 static void
3338 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3339 {
3340         Page            page = BufferGetPage(buffer);
3341         int                     i;
3342
3343         /* There shouldn't be any tuples moved onto the page yet! */
3344         Assert(vacpage->offsets_used == 0);
3345
3346         START_CRIT_SECTION();
3347
3348         for (i = 0; i < vacpage->offsets_free; i++)
3349         {
3350                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3351
3352                 ItemIdSetUnused(itemid);
3353         }
3354
3355         PageRepairFragmentation(page);
3356
3357         MarkBufferDirty(buffer);
3358
3359         /* XLOG stuff */
3360         if (!onerel->rd_istemp)
3361         {
3362                 XLogRecPtr      recptr;
3363
3364                 recptr = log_heap_clean(onerel, buffer,
3365                                                                 NULL, 0, NULL, 0,
3366                                                                 vacpage->offsets, vacpage->offsets_free,
3367                                                                 false);
3368                 PageSetLSN(page, recptr);
3369                 PageSetTLI(page, ThisTimeLineID);
3370         }
3371
3372         END_CRIT_SECTION();
3373 }
3374
3375 /*
3376  *      scan_index() -- scan one index relation to update pg_class statistics.
3377  *
3378  * We use this when we have no deletions to do.
3379  */
3380 static void
3381 scan_index(Relation indrel, double num_tuples)
3382 {
3383         IndexBulkDeleteResult *stats;
3384         IndexVacuumInfo ivinfo;
3385         PGRUsage        ru0;
3386
3387         pg_rusage_init(&ru0);
3388
3389         ivinfo.index = indrel;
3390         ivinfo.vacuum_full = true;
3391         ivinfo.analyze_only = false;
3392         ivinfo.estimated_count = false;
3393         ivinfo.message_level = elevel;
3394         ivinfo.num_heap_tuples = num_tuples;
3395         ivinfo.strategy = vac_strategy;
3396
3397         stats = index_vacuum_cleanup(&ivinfo, NULL);
3398
3399         if (!stats)
3400                 return;
3401
3402         /*
3403          * Now update statistics in pg_class, but only if the index says the
3404          * count is accurate.
3405          */
3406         if (!stats->estimated_count)
3407                 vac_update_relstats(indrel,
3408                                                         stats->num_pages, stats->num_index_tuples,
3409                                                         false, InvalidTransactionId);
3410
3411         ereport(elevel,
3412                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3413                                         RelationGetRelationName(indrel),
3414                                         stats->num_index_tuples,
3415                                         stats->num_pages),
3416         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3417                           "%s.",
3418                           stats->pages_deleted, stats->pages_free,
3419                           pg_rusage_show(&ru0))));
3420
3421         /*
3422          * Check for tuple count mismatch.      If the index is partial, then it's OK
3423          * for it to have fewer tuples than the heap; else we got trouble.
3424          */
3425         if (!stats->estimated_count &&
3426                 stats->num_index_tuples != num_tuples)
3427         {
3428                 if (stats->num_index_tuples > num_tuples ||
3429                         !vac_is_partial_index(indrel))
3430                         ereport(WARNING,
3431                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3432                                                         RelationGetRelationName(indrel),
3433                                                         stats->num_index_tuples, num_tuples),
3434                                          errhint("Rebuild the index with REINDEX.")));
3435         }
3436
3437         pfree(stats);
3438 }
3439
3440 /*
3441  *      vacuum_index() -- vacuum one index relation.
3442  *
3443  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3444  *              It's locked. Indrel is an index relation on the vacuumed heap.
3445  *
3446  *              We don't bother to set locks on the index relation here, since
3447  *              the parent table is exclusive-locked already.
3448  *
3449  *              Finally, we arrange to update the index relation's statistics in
3450  *              pg_class.
3451  */
3452 static void
3453 vacuum_index(VacPageList vacpagelist, Relation indrel,
3454                          double num_tuples, int keep_tuples)
3455 {
3456         IndexBulkDeleteResult *stats;
3457         IndexVacuumInfo ivinfo;
3458         PGRUsage        ru0;
3459
3460         pg_rusage_init(&ru0);
3461
3462         ivinfo.index = indrel;
3463         ivinfo.vacuum_full = true;
3464         ivinfo.analyze_only = false;
3465         ivinfo.estimated_count = false;
3466         ivinfo.message_level = elevel;
3467         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3468         ivinfo.strategy = vac_strategy;
3469
3470         /* Do bulk deletion */
3471         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3472
3473         /* Do post-VACUUM cleanup */
3474         stats = index_vacuum_cleanup(&ivinfo, stats);
3475
3476         if (!stats)
3477                 return;
3478
3479         /*
3480          * Now update statistics in pg_class, but only if the index says the
3481          * count is accurate.
3482          */
3483         if (!stats->estimated_count)
3484                 vac_update_relstats(indrel,
3485                                                         stats->num_pages, stats->num_index_tuples,
3486                                                         false, InvalidTransactionId);
3487
3488         ereport(elevel,
3489                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3490                                         RelationGetRelationName(indrel),
3491                                         stats->num_index_tuples,
3492                                         stats->num_pages),
3493                          errdetail("%.0f index row versions were removed.\n"
3494                          "%u index pages have been deleted, %u are currently reusable.\n"
3495                                            "%s.",
3496                                            stats->tuples_removed,
3497                                            stats->pages_deleted, stats->pages_free,
3498                                            pg_rusage_show(&ru0))));
3499
3500         /*
3501          * Check for tuple count mismatch.      If the index is partial, then it's OK
3502          * for it to have fewer tuples than the heap; else we got trouble.
3503          */
3504         if (!stats->estimated_count &&
3505                 stats->num_index_tuples != num_tuples + keep_tuples)
3506         {
3507                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3508                         !vac_is_partial_index(indrel))
3509                         ereport(WARNING,
3510                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3511                                                         RelationGetRelationName(indrel),
3512                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3513                                          errhint("Rebuild the index with REINDEX.")));
3514         }
3515
3516         pfree(stats);
3517 }
3518
3519 /*
3520  *      tid_reaped() -- is a particular tid reaped?
3521  *
3522  *              This has the right signature to be an IndexBulkDeleteCallback.
3523  *
3524  *              vacpagelist->VacPage_array is sorted in right order.
3525  */
3526 static bool
3527 tid_reaped(ItemPointer itemptr, void *state)
3528 {
3529         VacPageList vacpagelist = (VacPageList) state;
3530         OffsetNumber ioffno;
3531         OffsetNumber *voff;
3532         VacPage         vp,
3533                            *vpp;
3534         VacPageData vacpage;
3535
3536         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3537         ioffno = ItemPointerGetOffsetNumber(itemptr);
3538
3539         vp = &vacpage;
3540         vpp = (VacPage *) vac_bsearch((void *) &vp,
3541                                                                   (void *) (vacpagelist->pagedesc),
3542                                                                   vacpagelist->num_pages,
3543                                                                   sizeof(VacPage),
3544                                                                   vac_cmp_blk);
3545
3546         if (vpp == NULL)
3547                 return false;
3548
3549         /* ok - we are on a partially or fully reaped page */
3550         vp = *vpp;
3551
3552         if (vp->offsets_free == 0)
3553         {
3554                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3555                 return true;
3556         }
3557
3558         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3559                                                                                 (void *) (vp->offsets),
3560                                                                                 vp->offsets_free,
3561                                                                                 sizeof(OffsetNumber),
3562                                                                                 vac_cmp_offno);
3563
3564         if (voff == NULL)
3565                 return false;
3566
3567         /* tid is reaped */
3568         return true;
3569 }
3570
3571 /*
3572  * Update the Free Space Map with the info we now have about free space in
3573  * the relation.
3574  */
3575 static void
3576 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3577                            BlockNumber rel_pages)
3578 {
3579         int                     nPages = fraged_pages->num_pages;
3580         VacPage    *pagedesc = fraged_pages->pagedesc;
3581         int                     i;
3582
3583         for (i = 0; i < nPages; i++)
3584         {
3585                 /*
3586                  * fraged_pages may contain entries for pages that we later decided to
3587                  * truncate from the relation; don't enter them into the free space
3588                  * map!
3589                  */
3590                 if (pagedesc[i]->blkno >= rel_pages)
3591                         break;
3592
3593                 RecordPageWithFreeSpace(onerel, pagedesc[i]->blkno, pagedesc[i]->free);
3594         }
3595
3596 }
3597
3598 /* Copy a VacPage structure */
3599 static VacPage
3600 copy_vac_page(VacPage vacpage)
3601 {
3602         VacPage         newvacpage;
3603
3604         /* allocate a VacPageData entry */
3605         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3606                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3607
3608         /* fill it in */
3609         if (vacpage->offsets_free > 0)
3610                 memcpy(newvacpage->offsets, vacpage->offsets,
3611                            vacpage->offsets_free * sizeof(OffsetNumber));
3612         newvacpage->blkno = vacpage->blkno;
3613         newvacpage->free = vacpage->free;
3614         newvacpage->offsets_used = vacpage->offsets_used;
3615         newvacpage->offsets_free = vacpage->offsets_free;
3616
3617         return newvacpage;
3618 }
3619
3620 /*
3621  * Add a VacPage pointer to a VacPageList.
3622  *
3623  *              As a side effect of the way that scan_heap works,
3624  *              higher pages come after lower pages in the array
3625  *              (and highest tid on a page is last).
3626  */
3627 static void
3628 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3629 {
3630 #define PG_NPAGEDESC 1024
3631
3632         /* allocate a VacPage entry if needed */
3633         if (vacpagelist->num_pages == 0)
3634         {
3635                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3636                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3637         }
3638         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3639         {
3640                 vacpagelist->num_allocated_pages *= 2;
3641                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3642         }
3643         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3644         (vacpagelist->num_pages)++;
3645 }
3646
3647 /*
3648  * vac_bsearch: just like standard C library routine bsearch(),
3649  * except that we first test to see whether the target key is outside
3650  * the range of the table entries.      This case is handled relatively slowly
3651  * by the normal binary search algorithm (ie, no faster than any other key)
3652  * but it occurs often enough in VACUUM to be worth optimizing.
3653  */
3654 static void *
3655 vac_bsearch(const void *key, const void *base,
3656                         size_t nelem, size_t size,
3657                         int (*compar) (const void *, const void *))
3658 {
3659         int                     res;
3660         const void *last;
3661
3662         if (nelem == 0)
3663                 return NULL;
3664         res = compar(key, base);
3665         if (res < 0)
3666                 return NULL;
3667         if (res == 0)
3668                 return (void *) base;
3669         if (nelem > 1)
3670         {
3671                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3672                 res = compar(key, last);
3673                 if (res > 0)
3674                         return NULL;
3675                 if (res == 0)
3676                         return (void *) last;
3677         }
3678         if (nelem <= 2)
3679                 return NULL;                    /* already checked 'em all */
3680         return bsearch(key, base, nelem, size, compar);
3681 }
3682
3683 /*
3684  * Comparator routines for use with qsort() and bsearch().
3685  */
3686 static int
3687 vac_cmp_blk(const void *left, const void *right)
3688 {
3689         BlockNumber lblk,
3690                                 rblk;
3691
3692         lblk = (*((VacPage *) left))->blkno;
3693         rblk = (*((VacPage *) right))->blkno;
3694
3695         if (lblk < rblk)
3696                 return -1;
3697         if (lblk == rblk)
3698                 return 0;
3699         return 1;
3700 }
3701
3702 static int
3703 vac_cmp_offno(const void *left, const void *right)
3704 {
3705         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3706                 return -1;
3707         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3708                 return 0;
3709         return 1;
3710 }
3711
3712 static int
3713 vac_cmp_vtlinks(const void *left, const void *right)
3714 {
3715         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3716                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3717                 return -1;
3718         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3719                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3720                 return 1;
3721         /* bi_hi-es are equal */
3722         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3723                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3724                 return -1;
3725         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3726                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3727                 return 1;
3728         /* bi_lo-es are equal */
3729         if (((VTupleLink) left)->new_tid.ip_posid <
3730                 ((VTupleLink) right)->new_tid.ip_posid)
3731                 return -1;
3732         if (((VTupleLink) left)->new_tid.ip_posid >
3733                 ((VTupleLink) right)->new_tid.ip_posid)
3734                 return 1;
3735         return 0;
3736 }
3737
3738
3739 /*
3740  * Open all the indexes of the given relation, obtaining the specified kind
3741  * of lock on each.  Return an array of Relation pointers for the indexes
3742  * into *Irel, and the number of indexes into *nindexes.
3743  */
3744 void
3745 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3746                                  int *nindexes, Relation **Irel)
3747 {
3748         List       *indexoidlist;
3749         ListCell   *indexoidscan;
3750         int                     i;
3751
3752         Assert(lockmode != NoLock);
3753
3754         indexoidlist = RelationGetIndexList(relation);
3755
3756         *nindexes = list_length(indexoidlist);
3757
3758         if (*nindexes > 0)
3759                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3760         else
3761                 *Irel = NULL;
3762
3763         i = 0;
3764         foreach(indexoidscan, indexoidlist)
3765         {
3766                 Oid                     indexoid = lfirst_oid(indexoidscan);
3767
3768                 (*Irel)[i++] = index_open(indexoid, lockmode);
3769         }
3770
3771         list_free(indexoidlist);
3772 }
3773
3774 /*
3775  * Release the resources acquired by vac_open_indexes.  Optionally release
3776  * the locks (say NoLock to keep 'em).
3777  */
3778 void
3779 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3780 {
3781         if (Irel == NULL)
3782                 return;
3783
3784         while (nindexes--)
3785         {
3786                 Relation        ind = Irel[nindexes];
3787
3788                 index_close(ind, lockmode);
3789         }
3790         pfree(Irel);
3791 }
3792
3793
3794 /*
3795  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3796  */
3797 bool
3798 vac_is_partial_index(Relation indrel)
3799 {
3800         /*
3801          * If the index's AM doesn't support nulls, it's partial for our purposes
3802          */
3803         if (!indrel->rd_am->amindexnulls)
3804                 return true;
3805
3806         /* Otherwise, look to see if there's a partial-index predicate */
3807         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3808                 return true;
3809
3810         return false;
3811 }
3812
3813
3814 static bool
3815 enough_space(VacPage vacpage, Size len)
3816 {
3817         len = MAXALIGN(len);
3818
3819         if (len > vacpage->free)
3820                 return false;
3821
3822         /* if there are free itemid(s) and len <= free_space... */
3823         if (vacpage->offsets_used < vacpage->offsets_free)
3824                 return true;
3825
3826         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3827         if (len + sizeof(ItemIdData) <= vacpage->free)
3828                 return true;
3829
3830         return false;
3831 }
3832
3833 static Size
3834 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3835 {
3836         /*
3837          * It is correct to use PageGetExactFreeSpace() here, *not*
3838          * PageGetHeapFreeSpace().  This is because (a) we do our own, exact
3839          * accounting for whether line pointers must be added, and (b) we will
3840          * recycle any LP_DEAD line pointers before starting to add rows to a
3841          * page, but that may not have happened yet at the time this function is
3842          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3843          * against too many line pointers on a page could fire incorrectly.  We do
3844          * not need that protection here: since VACUUM FULL always recycles all
3845          * dead line pointers first, it'd be physically impossible to insert more
3846          * than MaxHeapTuplesPerPage tuples anyway.
3847          */
3848         Size            freespace = PageGetExactFreeSpace(page);
3849         Size            targetfree;
3850
3851         targetfree = RelationGetTargetPageFreeSpace(relation,
3852                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3853         if (freespace > targetfree)
3854                 return freespace - targetfree;
3855         else
3856                 return 0;
3857 }
3858
3859 /*
3860  * vacuum_delay_point --- check for interrupts and cost-based delay.
3861  *
3862  * This should be called in each major loop of VACUUM processing,
3863  * typically once per page processed.
3864  */
3865 void
3866 vacuum_delay_point(void)
3867 {
3868         /* Always check for interrupts */
3869         CHECK_FOR_INTERRUPTS();
3870
3871         /* Nap if appropriate */
3872         if (VacuumCostActive && !InterruptPending &&
3873                 VacuumCostBalance >= VacuumCostLimit)
3874         {
3875                 int                     msec;
3876
3877                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3878                 if (msec > VacuumCostDelay * 4)
3879                         msec = VacuumCostDelay * 4;
3880
3881                 pg_usleep(msec * 1000L);
3882
3883                 VacuumCostBalance = 0;
3884
3885                 /* update balance values for workers */
3886                 AutoVacuumUpdateDelay();
3887
3888                 /* Might have gotten an interrupt while sleeping */
3889                 CHECK_FOR_INTERRUPTS();
3890         }
3891 }