]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Revise pgstat's tracking of tuple changes to improve the reliability of
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.401 2009/12/30 20:32:14 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/visibilitymap.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "catalog/namespace.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_namespace.h"
35 #include "catalog/storage.h"
36 #include "commands/dbcommands.h"
37 #include "commands/vacuum.h"
38 #include "executor/executor.h"
39 #include "miscadmin.h"
40 #include "pgstat.h"
41 #include "postmaster/autovacuum.h"
42 #include "storage/bufmgr.h"
43 #include "storage/freespace.h"
44 #include "storage/lmgr.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/inval.h"
52 #include "utils/lsyscache.h"
53 #include "utils/memutils.h"
54 #include "utils/pg_rusage.h"
55 #include "utils/relcache.h"
56 #include "utils/snapmgr.h"
57 #include "utils/syscache.h"
58 #include "utils/tqual.h"
59
60
61 /*
62  * GUC parameters
63  */
64 int                     vacuum_freeze_min_age;
65 int                     vacuum_freeze_table_age;
66
67 /*
68  * VacPage structures keep track of each page on which we find useful
69  * amounts of free space.
70  */
71 typedef struct VacPageData
72 {
73         BlockNumber blkno;                      /* BlockNumber of this Page */
74         Size            free;                   /* FreeSpace on this Page */
75         uint16          offsets_used;   /* Number of OffNums used by vacuum */
76         uint16          offsets_free;   /* Number of OffNums free or to be free */
77         OffsetNumber offsets[1];        /* Array of free OffNums */
78 } VacPageData;
79
80 typedef VacPageData *VacPage;
81
82 typedef struct VacPageListData
83 {
84         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
85         int                     num_pages;              /* Number of pages in pagedesc */
86         int                     num_allocated_pages;    /* Number of allocated pages in
87                                                                                  * pagedesc */
88         VacPage    *pagedesc;           /* Descriptions of pages */
89 } VacPageListData;
90
91 typedef VacPageListData *VacPageList;
92
93 /*
94  * The "vtlinks" array keeps information about each recently-updated tuple
95  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
96  * We store the tuple's own TID as well as its t_ctid (its link to the next
97  * newer tuple version).  Searching in this array allows us to follow update
98  * chains backwards from newer to older tuples.  When we move a member of an
99  * update chain, we must move *all* the live members of the chain, so that we
100  * can maintain their t_ctid link relationships (we must not just overwrite
101  * t_ctid in an existing tuple).
102  *
103  * Note: because t_ctid links can be stale (this would only occur if a prior
104  * VACUUM crashed partway through), it is possible that new_tid points to an
105  * empty slot or unrelated tuple.  We have to check the linkage as we follow
106  * it, just as is done in EvalPlanQualFetch.
107  */
108 typedef struct VTupleLinkData
109 {
110         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
111         ItemPointerData this_tid;       /* t_self of the tuple */
112 } VTupleLinkData;
113
114 typedef VTupleLinkData *VTupleLink;
115
116 /*
117  * We use an array of VTupleMoveData to plan a chain tuple move fully
118  * before we do it.
119  */
120 typedef struct VTupleMoveData
121 {
122         ItemPointerData tid;            /* tuple ID */
123         VacPage         vacpage;                /* where to move it to */
124         bool            cleanVpd;               /* clean vacpage before using? */
125 } VTupleMoveData;
126
127 typedef VTupleMoveData *VTupleMove;
128
129 /*
130  * VRelStats contains the data acquired by scan_heap for use later
131  */
132 typedef struct VRelStats
133 {
134         /* miscellaneous statistics */
135         BlockNumber rel_pages;          /* pages in relation */
136         double          rel_tuples;             /* tuples that remain after vacuuming */
137         double          rel_indexed_tuples;             /* indexed tuples that remain */
138         Size            min_tlen;               /* min surviving tuple size */
139         Size            max_tlen;               /* max surviving tuple size */
140         bool            hasindex;
141         /* vtlinks array for tuple chain following - sorted by new_tid */
142         int                     num_vtlinks;
143         VTupleLink      vtlinks;
144         TransactionId   latestRemovedXid;
145 } VRelStats;
146
147 /*----------------------------------------------------------------------
148  * ExecContext:
149  *
150  * As these variables always appear together, we put them into one struct
151  * and pull initialization and cleanup into separate routines.
152  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
153  * accurately:  It is *used* only in move_xxx_tuple(), but because this
154  * routine is called many times, we initialize the struct just once in
155  * repair_frag() and pass it on to move_xxx_tuple().
156  */
157 typedef struct ExecContextData
158 {
159         ResultRelInfo *resultRelInfo;
160         EState     *estate;
161         TupleTableSlot *slot;
162 } ExecContextData;
163
164 typedef ExecContextData *ExecContext;
165
166 static void
167 ExecContext_Init(ExecContext ec, Relation rel)
168 {
169         TupleDesc       tupdesc = RelationGetDescr(rel);
170
171         /*
172          * We need a ResultRelInfo and an EState so we can use the regular
173          * executor's index-entry-making machinery.
174          */
175         ec->estate = CreateExecutorState();
176
177         ec->resultRelInfo = makeNode(ResultRelInfo);
178         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
179         ec->resultRelInfo->ri_RelationDesc = rel;
180         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
181
182         ExecOpenIndices(ec->resultRelInfo);
183
184         ec->estate->es_result_relations = ec->resultRelInfo;
185         ec->estate->es_num_result_relations = 1;
186         ec->estate->es_result_relation_info = ec->resultRelInfo;
187
188         /* Set up a tuple slot too */
189         ec->slot = MakeSingleTupleTableSlot(tupdesc);
190 }
191
192 static void
193 ExecContext_Finish(ExecContext ec)
194 {
195         ExecDropSingleTupleTableSlot(ec->slot);
196         ExecCloseIndices(ec->resultRelInfo);
197         FreeExecutorState(ec->estate);
198 }
199
200 /*
201  * End of ExecContext Implementation
202  *----------------------------------------------------------------------
203  */
204
205 /* A few variables that don't seem worth passing around as parameters */
206 static MemoryContext vac_context = NULL;
207
208 static int      elevel = -1;
209
210 static TransactionId OldestXmin;
211 static TransactionId FreezeLimit;
212
213 static BufferAccessStrategy vac_strategy;
214
215
216 /* non-export function prototypes */
217 static List *get_rel_oids(Oid relid, const RangeVar *vacrel,
218                          const char *stmttype);
219 static void vac_truncate_clog(TransactionId frozenXID);
220 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast,
221                    bool for_wraparound, bool *scanned_all);
222 static bool full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
223 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
224                   VacPageList vacuum_pages, VacPageList fraged_pages);
225 static bool repair_frag(VRelStats *vacrelstats, Relation onerel,
226                         VacPageList vacuum_pages, VacPageList fraged_pages,
227                         int nindexes, Relation *Irel);
228 static void move_chain_tuple(VRelStats *vacrelstats, Relation rel,
229                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
230                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
231                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
232 static void move_plain_tuple(Relation rel,
233                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
234                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
235                                  ExecContext ec);
236 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
237                                  int num_fraged_pages, BlockNumber last_move_dest_block,
238                                  int num_moved);
239 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
240                         VacPageList vacpagelist);
241 static void vacuum_page(VRelStats *vacrelstats, Relation onerel, Buffer buffer, VacPage vacpage);
242 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
243                          double num_tuples, int keep_tuples);
244 static void scan_index(Relation indrel, double num_tuples);
245 static bool tid_reaped(ItemPointer itemptr, void *state);
246 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
247                            BlockNumber rel_pages);
248 static VacPage copy_vac_page(VacPage vacpage);
249 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
250 static void *vac_bsearch(const void *key, const void *base,
251                         size_t nelem, size_t size,
252                         int (*compar) (const void *, const void *));
253 static int      vac_cmp_blk(const void *left, const void *right);
254 static int      vac_cmp_offno(const void *left, const void *right);
255 static int      vac_cmp_vtlinks(const void *left, const void *right);
256 static bool enough_space(VacPage vacpage, Size len);
257 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
258
259
260 /****************************************************************************
261  *                                                                                                                                                      *
262  *                      Code common to all flavors of VACUUM and ANALYZE                                *
263  *                                                                                                                                                      *
264  ****************************************************************************
265  */
266
267
268 /*
269  * Primary entry point for VACUUM and ANALYZE commands.
270  *
271  * relid is normally InvalidOid; if it is not, then it provides the relation
272  * OID to be processed, and vacstmt->relation is ignored.  (The non-invalid
273  * case is currently only used by autovacuum.)
274  *
275  * do_toast is passed as FALSE by autovacuum, because it processes TOAST
276  * tables separately.
277  *
278  * for_wraparound is used by autovacuum to let us know when it's forcing
279  * a vacuum for wraparound, which should not be auto-cancelled.
280  *
281  * bstrategy is normally given as NULL, but in autovacuum it can be passed
282  * in to use the same buffer strategy object across multiple vacuum() calls.
283  *
284  * isTopLevel should be passed down from ProcessUtility.
285  *
286  * It is the caller's responsibility that vacstmt and bstrategy
287  * (if given) be allocated in a memory context that won't disappear
288  * at transaction commit.
289  */
290 void
291 vacuum(VacuumStmt *vacstmt, Oid relid, bool do_toast,
292            BufferAccessStrategy bstrategy, bool for_wraparound, bool isTopLevel)
293 {
294         const char *stmttype;
295         volatile bool all_rels,
296                                 in_outer_xact,
297                                 use_own_xacts;
298         List       *relations;
299
300         /* sanity checks on options */
301         Assert(vacstmt->options & (VACOPT_VACUUM | VACOPT_ANALYZE));
302         Assert((vacstmt->options & VACOPT_VACUUM) ||
303                    !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
304         Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
305
306         stmttype = (vacstmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
307
308         if (vacstmt->options & VACOPT_VERBOSE)
309                 elevel = INFO;
310         else
311                 elevel = DEBUG2;
312
313         /*
314          * We cannot run VACUUM inside a user transaction block; if we were inside
315          * a transaction, then our commit- and start-transaction-command calls
316          * would not have the intended effect! Furthermore, the forced commit that
317          * occurs before truncating the relation's file would have the effect of
318          * committing the rest of the user's transaction too, which would
319          * certainly not be the desired behavior.  (This only applies to VACUUM
320          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
321          * block, but we choose to disallow that case because we'd rather commit
322          * as soon as possible after finishing the vacuum.      This is mainly so that
323          * we can let go the AccessExclusiveLock that we may be holding.)
324          *
325          * ANALYZE (without VACUUM) can run either way.
326          */
327         if (vacstmt->options & VACOPT_VACUUM)
328         {
329                 PreventTransactionChain(isTopLevel, stmttype);
330                 in_outer_xact = false;
331         }
332         else
333                 in_outer_xact = IsInTransactionChain(isTopLevel);
334
335         /*
336          * Send info about dead objects to the statistics collector, unless we are
337          * in autovacuum --- autovacuum.c does this for itself.
338          */
339         if ((vacstmt->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
340                 pgstat_vacuum_stat();
341
342         /*
343          * Create special memory context for cross-transaction storage.
344          *
345          * Since it is a child of PortalContext, it will go away eventually even
346          * if we suffer an error; there's no need for special abort cleanup logic.
347          */
348         vac_context = AllocSetContextCreate(PortalContext,
349                                                                                 "Vacuum",
350                                                                                 ALLOCSET_DEFAULT_MINSIZE,
351                                                                                 ALLOCSET_DEFAULT_INITSIZE,
352                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
353
354         /*
355          * If caller didn't give us a buffer strategy object, make one in the
356          * cross-transaction memory context.
357          */
358         if (bstrategy == NULL)
359         {
360                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
361
362                 bstrategy = GetAccessStrategy(BAS_VACUUM);
363                 MemoryContextSwitchTo(old_context);
364         }
365         vac_strategy = bstrategy;
366
367         /* Remember whether we are processing everything in the DB */
368         all_rels = (!OidIsValid(relid) && vacstmt->relation == NULL);
369
370         /*
371          * Build list of relations to process, unless caller gave us one. (If we
372          * build one, we put it in vac_context for safekeeping.)
373          */
374         relations = get_rel_oids(relid, vacstmt->relation, stmttype);
375
376         /*
377          * Decide whether we need to start/commit our own transactions.
378          *
379          * For VACUUM (with or without ANALYZE): always do so, so that we can
380          * release locks as soon as possible.  (We could possibly use the outer
381          * transaction for a one-table VACUUM, but handling TOAST tables would be
382          * problematic.)
383          *
384          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
385          * start/commit our own transactions.  Also, there's no need to do so if
386          * only processing one relation.  For multiple relations when not within a
387          * transaction block, and also in an autovacuum worker, use own
388          * transactions so we can release locks sooner.
389          */
390         if (vacstmt->options & VACOPT_VACUUM)
391                 use_own_xacts = true;
392         else
393         {
394                 Assert(vacstmt->options & VACOPT_ANALYZE);
395                 if (IsAutoVacuumWorkerProcess())
396                         use_own_xacts = true;
397                 else if (in_outer_xact)
398                         use_own_xacts = false;
399                 else if (list_length(relations) > 1)
400                         use_own_xacts = true;
401                 else
402                         use_own_xacts = false;
403         }
404
405         /*
406          * vacuum_rel expects to be entered with no transaction active; it will
407          * start and commit its own transaction.  But we are called by an SQL
408          * command, and so we are executing inside a transaction already. We
409          * commit the transaction started in PostgresMain() here, and start
410          * another one before exiting to match the commit waiting for us back in
411          * PostgresMain().
412          */
413         if (use_own_xacts)
414         {
415                 /* ActiveSnapshot is not set by autovacuum */
416                 if (ActiveSnapshotSet())
417                         PopActiveSnapshot();
418
419                 /* matches the StartTransaction in PostgresMain() */
420                 CommitTransactionCommand();
421         }
422
423         /* Turn vacuum cost accounting on or off */
424         PG_TRY();
425         {
426                 ListCell   *cur;
427
428                 VacuumCostActive = (VacuumCostDelay > 0);
429                 VacuumCostBalance = 0;
430
431                 /*
432                  * Loop to process each selected relation.
433                  */
434                 foreach(cur, relations)
435                 {
436                         Oid                     relid = lfirst_oid(cur);
437                         bool            scanned_all = false;
438
439                         if (vacstmt->options & VACOPT_VACUUM)
440                                 vacuum_rel(relid, vacstmt, do_toast, for_wraparound,
441                                                    &scanned_all);
442
443                         if (vacstmt->options & VACOPT_ANALYZE)
444                         {
445                                 /*
446                                  * If using separate xacts, start one for analyze. Otherwise,
447                                  * we can use the outer transaction.
448                                  */
449                                 if (use_own_xacts)
450                                 {
451                                         StartTransactionCommand();
452                                         /* functions in indexes may want a snapshot set */
453                                         PushActiveSnapshot(GetTransactionSnapshot());
454                                 }
455
456                                 analyze_rel(relid, vacstmt, vac_strategy, !scanned_all);
457
458                                 if (use_own_xacts)
459                                 {
460                                         PopActiveSnapshot();
461                                         CommitTransactionCommand();
462                                 }
463                         }
464                 }
465         }
466         PG_CATCH();
467         {
468                 /* Make sure cost accounting is turned off after error */
469                 VacuumCostActive = false;
470                 PG_RE_THROW();
471         }
472         PG_END_TRY();
473
474         /* Turn off vacuum cost accounting */
475         VacuumCostActive = false;
476
477         /*
478          * Finish up processing.
479          */
480         if (use_own_xacts)
481         {
482                 /* here, we are not in a transaction */
483
484                 /*
485                  * This matches the CommitTransaction waiting for us in
486                  * PostgresMain().
487                  */
488                 StartTransactionCommand();
489         }
490
491         if ((vacstmt->options & VACOPT_VACUUM) && !IsAutoVacuumWorkerProcess())
492         {
493                 /*
494                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
495                  * (autovacuum.c does this for itself.)
496                  */
497                 vac_update_datfrozenxid();
498         }
499
500         /*
501          * Clean up working storage --- note we must do this after
502          * StartTransactionCommand, else we might be trying to delete the active
503          * context!
504          */
505         MemoryContextDelete(vac_context);
506         vac_context = NULL;
507 }
508
509 /*
510  * Build a list of Oids for each relation to be processed
511  *
512  * The list is built in vac_context so that it will survive across our
513  * per-relation transactions.
514  */
515 static List *
516 get_rel_oids(Oid relid, const RangeVar *vacrel, const char *stmttype)
517 {
518         List       *oid_list = NIL;
519         MemoryContext oldcontext;
520
521         /* OID supplied by VACUUM's caller? */
522         if (OidIsValid(relid))
523         {
524                 oldcontext = MemoryContextSwitchTo(vac_context);
525                 oid_list = lappend_oid(oid_list, relid);
526                 MemoryContextSwitchTo(oldcontext);
527         }
528         else if (vacrel)
529         {
530                 /* Process a specific relation */
531                 Oid                     relid;
532
533                 relid = RangeVarGetRelid(vacrel, false);
534
535                 /* Make a relation list entry for this guy */
536                 oldcontext = MemoryContextSwitchTo(vac_context);
537                 oid_list = lappend_oid(oid_list, relid);
538                 MemoryContextSwitchTo(oldcontext);
539         }
540         else
541         {
542                 /* Process all plain relations listed in pg_class */
543                 Relation        pgclass;
544                 HeapScanDesc scan;
545                 HeapTuple       tuple;
546                 ScanKeyData key;
547
548                 ScanKeyInit(&key,
549                                         Anum_pg_class_relkind,
550                                         BTEqualStrategyNumber, F_CHAREQ,
551                                         CharGetDatum(RELKIND_RELATION));
552
553                 pgclass = heap_open(RelationRelationId, AccessShareLock);
554
555                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
556
557                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
558                 {
559                         /* Make a relation list entry for this guy */
560                         oldcontext = MemoryContextSwitchTo(vac_context);
561                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
562                         MemoryContextSwitchTo(oldcontext);
563                 }
564
565                 heap_endscan(scan);
566                 heap_close(pgclass, AccessShareLock);
567         }
568
569         return oid_list;
570 }
571
572 /*
573  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
574  */
575 void
576 vacuum_set_xid_limits(int freeze_min_age,
577                                           int freeze_table_age,
578                                           bool sharedRel,
579                                           TransactionId *oldestXmin,
580                                           TransactionId *freezeLimit,
581                                           TransactionId *freezeTableLimit)
582 {
583         int                     freezemin;
584         TransactionId limit;
585         TransactionId safeLimit;
586
587         /*
588          * We can always ignore processes running lazy vacuum.  This is because we
589          * use these values only for deciding which tuples we must keep in the
590          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's safe to
591          * ignore it.  In theory it could be problematic to ignore lazy vacuums on
592          * a full vacuum, but keep in mind that only one vacuum process can be
593          * working on a particular table at any time, and that each vacuum is
594          * always an independent transaction.
595          */
596         *oldestXmin = GetOldestXmin(sharedRel, true);
597
598         Assert(TransactionIdIsNormal(*oldestXmin));
599
600         /*
601          * Determine the minimum freeze age to use: as specified by the caller, or
602          * vacuum_freeze_min_age, but in any case not more than half
603          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
604          * wraparound won't occur too frequently.
605          */
606         freezemin = freeze_min_age;
607         if (freezemin < 0)
608                 freezemin = vacuum_freeze_min_age;
609         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
610         Assert(freezemin >= 0);
611
612         /*
613          * Compute the cutoff XID, being careful not to generate a "permanent" XID
614          */
615         limit = *oldestXmin - freezemin;
616         if (!TransactionIdIsNormal(limit))
617                 limit = FirstNormalTransactionId;
618
619         /*
620          * If oldestXmin is very far back (in practice, more than
621          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a minimum
622          * freeze age of zero.
623          */
624         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
625         if (!TransactionIdIsNormal(safeLimit))
626                 safeLimit = FirstNormalTransactionId;
627
628         if (TransactionIdPrecedes(limit, safeLimit))
629         {
630                 ereport(WARNING,
631                                 (errmsg("oldest xmin is far in the past"),
632                                  errhint("Close open transactions soon to avoid wraparound problems.")));
633                 limit = *oldestXmin;
634         }
635
636         *freezeLimit = limit;
637
638         if (freezeTableLimit != NULL)
639         {
640                 int                     freezetable;
641
642                 /*
643                  * Determine the table freeze age to use: as specified by the caller,
644                  * or vacuum_freeze_table_age, but in any case not more than
645                  * autovacuum_freeze_max_age * 0.95, so that if you have e.g nightly
646                  * VACUUM schedule, the nightly VACUUM gets a chance to freeze tuples
647                  * before anti-wraparound autovacuum is launched.
648                  */
649                 freezetable = freeze_min_age;
650                 if (freezetable < 0)
651                         freezetable = vacuum_freeze_table_age;
652                 freezetable = Min(freezetable, autovacuum_freeze_max_age * 0.95);
653                 Assert(freezetable >= 0);
654
655                 /*
656                  * Compute the cutoff XID, being careful not to generate a "permanent"
657                  * XID.
658                  */
659                 limit = ReadNewTransactionId() - freezetable;
660                 if (!TransactionIdIsNormal(limit))
661                         limit = FirstNormalTransactionId;
662
663                 *freezeTableLimit = limit;
664         }
665 }
666
667
668 /*
669  *      vac_update_relstats() -- update statistics for one relation
670  *
671  *              Update the whole-relation statistics that are kept in its pg_class
672  *              row.  There are additional stats that will be updated if we are
673  *              doing ANALYZE, but we always update these stats.  This routine works
674  *              for both index and heap relation entries in pg_class.
675  *
676  *              We violate transaction semantics here by overwriting the rel's
677  *              existing pg_class tuple with the new values.  This is reasonably
678  *              safe since the new values are correct whether or not this transaction
679  *              commits.  The reason for this is that if we updated these tuples in
680  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
681  *              by the time we got done with a vacuum cycle, most of the tuples in
682  *              pg_class would've been obsoleted.  Of course, this only works for
683  *              fixed-size never-null columns, but these are.
684  *
685  *              Note another assumption: that two VACUUMs/ANALYZEs on a table can't
686  *              run in parallel, nor can VACUUM/ANALYZE run in parallel with a
687  *              schema alteration such as adding an index, rule, or trigger.  Otherwise
688  *              our updates of relhasindex etc might overwrite uncommitted updates.
689  *
690  *              Another reason for doing it this way is that when we are in a lazy
691  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
692  *              somebody vacuuming pg_class might think they could delete a tuple
693  *              marked with xmin = our xid.
694  *
695  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
696  *              ANALYZE.
697  */
698 void
699 vac_update_relstats(Relation relation,
700                                         BlockNumber num_pages, double num_tuples,
701                                         bool hasindex, TransactionId frozenxid)
702 {
703         Oid                     relid = RelationGetRelid(relation);
704         Relation        rd;
705         HeapTuple       ctup;
706         Form_pg_class pgcform;
707         bool            dirty;
708
709         rd = heap_open(RelationRelationId, RowExclusiveLock);
710
711         /* Fetch a copy of the tuple to scribble on */
712         ctup = SearchSysCacheCopy(RELOID,
713                                                           ObjectIdGetDatum(relid),
714                                                           0, 0, 0);
715         if (!HeapTupleIsValid(ctup))
716                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
717                          relid);
718         pgcform = (Form_pg_class) GETSTRUCT(ctup);
719
720         /* Apply required updates, if any, to copied tuple */
721
722         dirty = false;
723         if (pgcform->relpages != (int32) num_pages)
724         {
725                 pgcform->relpages = (int32) num_pages;
726                 dirty = true;
727         }
728         if (pgcform->reltuples != (float4) num_tuples)
729         {
730                 pgcform->reltuples = (float4) num_tuples;
731                 dirty = true;
732         }
733         if (pgcform->relhasindex != hasindex)
734         {
735                 pgcform->relhasindex = hasindex;
736                 dirty = true;
737         }
738
739         /*
740          * If we have discovered that there are no indexes, then there's no
741          * primary key either, nor any exclusion constraints.  This could be done
742          * more thoroughly...
743          */
744         if (!hasindex)
745         {
746                 if (pgcform->relhaspkey)
747                 {
748                         pgcform->relhaspkey = false;
749                         dirty = true;
750                 }
751                 if (pgcform->relhasexclusion && pgcform->relkind != RELKIND_INDEX)
752                 {
753                         pgcform->relhasexclusion = false;
754                         dirty = true;
755                 }
756         }
757
758         /* We also clear relhasrules and relhastriggers if needed */
759         if (pgcform->relhasrules && relation->rd_rules == NULL)
760         {
761                 pgcform->relhasrules = false;
762                 dirty = true;
763         }
764         if (pgcform->relhastriggers && relation->trigdesc == NULL)
765         {
766                 pgcform->relhastriggers = false;
767                 dirty = true;
768         }
769
770         /*
771          * relfrozenxid should never go backward.  Caller can pass
772          * InvalidTransactionId if it has no new data.
773          */
774         if (TransactionIdIsNormal(frozenxid) &&
775                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
776         {
777                 pgcform->relfrozenxid = frozenxid;
778                 dirty = true;
779         }
780
781         /* If anything changed, write out the tuple. */
782         if (dirty)
783                 heap_inplace_update(rd, ctup);
784
785         heap_close(rd, RowExclusiveLock);
786 }
787
788
789 /*
790  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
791  *
792  *              Update pg_database's datfrozenxid entry for our database to be the
793  *              minimum of the pg_class.relfrozenxid values.  If we are able to
794  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
795  *
796  *              We violate transaction semantics here by overwriting the database's
797  *              existing pg_database tuple with the new value.  This is reasonably
798  *              safe since the new value is correct whether or not this transaction
799  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
800  *              behind after a VACUUM.
801  *
802  *              This routine is shared by full and lazy VACUUM.
803  */
804 void
805 vac_update_datfrozenxid(void)
806 {
807         HeapTuple       tuple;
808         Form_pg_database dbform;
809         Relation        relation;
810         SysScanDesc scan;
811         HeapTuple       classTup;
812         TransactionId newFrozenXid;
813         bool            dirty = false;
814
815         /*
816          * Initialize the "min" calculation with GetOldestXmin, which is a
817          * reasonable approximation to the minimum relfrozenxid for not-yet-
818          * committed pg_class entries for new tables; see AddNewRelationTuple().
819          * Se we cannot produce a wrong minimum by starting with this.
820          */
821         newFrozenXid = GetOldestXmin(true, true);
822
823         /*
824          * We must seqscan pg_class to find the minimum Xid, because there is no
825          * index that can help us here.
826          */
827         relation = heap_open(RelationRelationId, AccessShareLock);
828
829         scan = systable_beginscan(relation, InvalidOid, false,
830                                                           SnapshotNow, 0, NULL);
831
832         while ((classTup = systable_getnext(scan)) != NULL)
833         {
834                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
835
836                 /*
837                  * Only consider heap and TOAST tables (anything else should have
838                  * InvalidTransactionId in relfrozenxid anyway.)
839                  */
840                 if (classForm->relkind != RELKIND_RELATION &&
841                         classForm->relkind != RELKIND_TOASTVALUE)
842                         continue;
843
844                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
845
846                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
847                         newFrozenXid = classForm->relfrozenxid;
848         }
849
850         /* we're done with pg_class */
851         systable_endscan(scan);
852         heap_close(relation, AccessShareLock);
853
854         Assert(TransactionIdIsNormal(newFrozenXid));
855
856         /* Now fetch the pg_database tuple we need to update. */
857         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
858
859         /* Fetch a copy of the tuple to scribble on */
860         tuple = SearchSysCacheCopy(DATABASEOID,
861                                                            ObjectIdGetDatum(MyDatabaseId),
862                                                            0, 0, 0);
863         if (!HeapTupleIsValid(tuple))
864                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
865         dbform = (Form_pg_database) GETSTRUCT(tuple);
866
867         /*
868          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
869          * and detect the common case where it doesn't go forward either.
870          */
871         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
872         {
873                 dbform->datfrozenxid = newFrozenXid;
874                 dirty = true;
875         }
876
877         if (dirty)
878                 heap_inplace_update(relation, tuple);
879
880         heap_freetuple(tuple);
881         heap_close(relation, RowExclusiveLock);
882
883         /*
884          * If we were able to advance datfrozenxid, see if we can truncate pg_clog.
885          * Also do it if the shared XID-wrap-limit info is stale, since this
886          * action will update that too.
887          */
888         if (dirty || ForceTransactionIdLimitUpdate())
889                 vac_truncate_clog(newFrozenXid);
890 }
891
892
893 /*
894  *      vac_truncate_clog() -- attempt to truncate the commit log
895  *
896  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
897  *              and use it to truncate the transaction commit log (pg_clog).
898  *              Also update the XID wrap limit info maintained by varsup.c.
899  *
900  *              The passed XID is simply the one I just wrote into my pg_database
901  *              entry.  It's used to initialize the "min" calculation.
902  *
903  *              This routine is shared by full and lazy VACUUM.  Note that it's
904  *              only invoked when we've managed to change our DB's datfrozenxid
905  *              entry, or we found that the shared XID-wrap-limit info is stale.
906  */
907 static void
908 vac_truncate_clog(TransactionId frozenXID)
909 {
910         TransactionId myXID = GetCurrentTransactionId();
911         Relation        relation;
912         HeapScanDesc scan;
913         HeapTuple       tuple;
914         Oid                     oldest_datoid;
915         bool            frozenAlreadyWrapped = false;
916
917         /* init oldest_datoid to sync with my frozenXID */
918         oldest_datoid = MyDatabaseId;
919
920         /*
921          * Scan pg_database to compute the minimum datfrozenxid
922          *
923          * Note: we need not worry about a race condition with new entries being
924          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
925          * existing DB's datfrozenxid, and that source DB cannot be ours because
926          * of the interlock against copying a DB containing an active backend.
927          * Hence the new entry will not reduce the minimum.  Also, if two VACUUMs
928          * concurrently modify the datfrozenxid's of different databases, the
929          * worst possible outcome is that pg_clog is not truncated as aggressively
930          * as it could be.
931          */
932         relation = heap_open(DatabaseRelationId, AccessShareLock);
933
934         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
935
936         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
937         {
938                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
939
940                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
941
942                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
943                         frozenAlreadyWrapped = true;
944                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
945                 {
946                         frozenXID = dbform->datfrozenxid;
947                         oldest_datoid = HeapTupleGetOid(tuple);
948                 }
949         }
950
951         heap_endscan(scan);
952
953         heap_close(relation, AccessShareLock);
954
955         /*
956          * Do not truncate CLOG if we seem to have suffered wraparound already;
957          * the computed minimum XID might be bogus.  This case should now be
958          * impossible due to the defenses in GetNewTransactionId, but we keep the
959          * test anyway.
960          */
961         if (frozenAlreadyWrapped)
962         {
963                 ereport(WARNING,
964                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
965                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
966                 return;
967         }
968
969         /* Truncate CLOG to the oldest frozenxid */
970         TruncateCLOG(frozenXID);
971
972         /*
973          * Update the wrap limit for GetNewTransactionId.  Note: this function
974          * will also signal the postmaster for an(other) autovac cycle if needed.
975          */
976         SetTransactionIdLimit(frozenXID, oldest_datoid);
977 }
978
979
980 /****************************************************************************
981  *                                                                                                                                                      *
982  *                      Code common to both flavors of VACUUM                                                   *
983  *                                                                                                                                                      *
984  ****************************************************************************
985  */
986
987
988 /*
989  *      vacuum_rel() -- vacuum one heap relation
990  *
991  *              Doing one heap at a time incurs extra overhead, since we need to
992  *              check that the heap exists again just before we vacuum it.      The
993  *              reason that we do this is so that vacuuming can be spread across
994  *              many small transactions.  Otherwise, two-phase locking would require
995  *              us to lock the entire database during one pass of the vacuum cleaner.
996  *
997  *              We'll return true in *scanned_all if the vacuum scanned all heap
998  *              pages, and updated pg_class.
999  *
1000  *              At entry and exit, we are not inside a transaction.
1001  */
1002 static void
1003 vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound,
1004                    bool *scanned_all)
1005 {
1006         LOCKMODE        lmode;
1007         Relation        onerel;
1008         LockRelId       onerelid;
1009         Oid                     toast_relid;
1010         Oid                     save_userid;
1011         int                     save_sec_context;
1012         int                     save_nestlevel;
1013         bool            heldoff;
1014
1015         if (scanned_all)
1016                 *scanned_all = false;
1017
1018         /* Begin a transaction for vacuuming this relation */
1019         StartTransactionCommand();
1020
1021         /*
1022          * Functions in indexes may want a snapshot set.  Also, setting a snapshot
1023          * ensures that RecentGlobalXmin is kept truly recent.
1024          */
1025         PushActiveSnapshot(GetTransactionSnapshot());
1026
1027         if (!(vacstmt->options & VACOPT_FULL))
1028         {
1029                 /*
1030                  * In lazy vacuum, we can set the PROC_IN_VACUUM flag, which lets
1031                  * other concurrent VACUUMs know that they can ignore this one while
1032                  * determining their OldestXmin.  (The reason we don't set it during a
1033                  * full VACUUM is exactly that we may have to run user- defined
1034                  * functions for functional indexes, and we want to make sure that if
1035                  * they use the snapshot set above, any tuples it requires can't get
1036                  * removed from other tables.  An index function that depends on the
1037                  * contents of other tables is arguably broken, but we won't break it
1038                  * here by violating transaction semantics.)
1039                  *
1040                  * We also set the VACUUM_FOR_WRAPAROUND flag, which is passed down by
1041                  * autovacuum; it's used to avoid cancelling a vacuum that was invoked
1042                  * in an emergency.
1043                  *
1044                  * Note: these flags remain set until CommitTransaction or
1045                  * AbortTransaction.  We don't want to clear them until we reset
1046                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1047                  * which is probably Not Good.
1048                  */
1049                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1050                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1051                 if (for_wraparound)
1052                         MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
1053                 LWLockRelease(ProcArrayLock);
1054         }
1055
1056         /*
1057          * Check for user-requested abort.      Note we want this to be inside a
1058          * transaction, so xact.c doesn't issue useless WARNING.
1059          */
1060         CHECK_FOR_INTERRUPTS();
1061
1062         /*
1063          * Determine the type of lock we want --- hard exclusive lock for a FULL
1064          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1065          * way, we can be sure that no other backend is vacuuming the same table.
1066          */
1067         lmode = (vacstmt->options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1068
1069         /*
1070          * Open the relation and get the appropriate lock on it.
1071          *
1072          * There's a race condition here: the rel may have gone away since the
1073          * last time we saw it.  If so, we don't need to vacuum it.
1074          */
1075         onerel = try_relation_open(relid, lmode);
1076
1077         if (!onerel)
1078         {
1079                 PopActiveSnapshot();
1080                 CommitTransactionCommand();
1081                 return;
1082         }
1083
1084         /*
1085          * Check permissions.
1086          *
1087          * We allow the user to vacuum a table if he is superuser, the table
1088          * owner, or the database owner (but in the latter case, only if it's not
1089          * a shared relation).  pg_class_ownercheck includes the superuser case.
1090          *
1091          * Note we choose to treat permissions failure as a WARNING and keep
1092          * trying to vacuum the rest of the DB --- is this appropriate?
1093          */
1094         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1095                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1096         {
1097                 if (onerel->rd_rel->relisshared)
1098                         ereport(WARNING,
1099                                   (errmsg("skipping \"%s\" --- only superuser can vacuum it",
1100                                                   RelationGetRelationName(onerel))));
1101                 else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
1102                         ereport(WARNING,
1103                                         (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
1104                                                         RelationGetRelationName(onerel))));
1105                 else
1106                         ereport(WARNING,
1107                                         (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1108                                                         RelationGetRelationName(onerel))));
1109                 relation_close(onerel, lmode);
1110                 PopActiveSnapshot();
1111                 CommitTransactionCommand();
1112                 return;
1113         }
1114
1115         /*
1116          * Check that it's a vacuumable table; we used to do this in
1117          * get_rel_oids() but seems safer to check after we've locked the
1118          * relation.
1119          */
1120         if (onerel->rd_rel->relkind != RELKIND_RELATION &&
1121                 onerel->rd_rel->relkind != RELKIND_TOASTVALUE)
1122         {
1123                 ereport(WARNING,
1124                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1125                                                 RelationGetRelationName(onerel))));
1126                 relation_close(onerel, lmode);
1127                 PopActiveSnapshot();
1128                 CommitTransactionCommand();
1129                 return;
1130         }
1131
1132         /*
1133          * Silently ignore tables that are temp tables of other backends ---
1134          * trying to vacuum these will lead to great unhappiness, since their
1135          * contents are probably not up-to-date on disk.  (We don't throw a
1136          * warning here; it would just lead to chatter during a database-wide
1137          * VACUUM.)
1138          */
1139         if (RELATION_IS_OTHER_TEMP(onerel))
1140         {
1141                 relation_close(onerel, lmode);
1142                 PopActiveSnapshot();
1143                 CommitTransactionCommand();
1144                 return;
1145         }
1146
1147         /*
1148          * Get a session-level lock too. This will protect our access to the
1149          * relation across multiple transactions, so that we can vacuum the
1150          * relation's TOAST table (if any) secure in the knowledge that no one is
1151          * deleting the parent relation.
1152          *
1153          * NOTE: this cannot block, even if someone else is waiting for access,
1154          * because the lock manager knows that both lock requests are from the
1155          * same process.
1156          */
1157         onerelid = onerel->rd_lockInfo.lockRelId;
1158         LockRelationIdForSession(&onerelid, lmode);
1159
1160         /*
1161          * Remember the relation's TOAST relation for later, if the caller asked
1162          * us to process it.
1163          */
1164         if (do_toast)
1165                 toast_relid = onerel->rd_rel->reltoastrelid;
1166         else
1167                 toast_relid = InvalidOid;
1168
1169         /*
1170          * Switch to the table owner's userid, so that any index functions are run
1171          * as that user.  Also lock down security-restricted operations and
1172          * arrange to make GUC variable changes local to this command.
1173          * (This is unnecessary, but harmless, for lazy VACUUM.)
1174          */
1175         GetUserIdAndSecContext(&save_userid, &save_sec_context);
1176         SetUserIdAndSecContext(onerel->rd_rel->relowner,
1177                                                    save_sec_context | SECURITY_RESTRICTED_OPERATION);
1178         save_nestlevel = NewGUCNestLevel();
1179
1180         /*
1181          * Do the actual work --- either FULL or "lazy" vacuum
1182          */
1183         if (vacstmt->options & VACOPT_FULL)
1184                 heldoff = full_vacuum_rel(onerel, vacstmt);
1185         else
1186                 heldoff = lazy_vacuum_rel(onerel, vacstmt, vac_strategy, scanned_all);
1187
1188         /* Roll back any GUC changes executed by index functions */
1189         AtEOXact_GUC(false, save_nestlevel);
1190
1191         /* Restore userid and security context */
1192         SetUserIdAndSecContext(save_userid, save_sec_context);
1193
1194         /* all done with this class, but hold lock until commit */
1195         relation_close(onerel, NoLock);
1196
1197         /*
1198          * Complete the transaction and free all temporary memory used.
1199          */
1200         PopActiveSnapshot();
1201         CommitTransactionCommand();
1202
1203         /* now we can allow interrupts again, if disabled */
1204         if (heldoff)
1205                 RESUME_INTERRUPTS();
1206
1207         /*
1208          * If the relation has a secondary toast rel, vacuum that too while we
1209          * still hold the session lock on the master table.  Note however that
1210          * "analyze" will not get done on the toast table.      This is good, because
1211          * the toaster always uses hardcoded index access and statistics are
1212          * totally unimportant for toast relations.
1213          */
1214         if (toast_relid != InvalidOid)
1215                 vacuum_rel(toast_relid, vacstmt, false, for_wraparound, NULL);
1216
1217         /*
1218          * Now release the session-level lock on the master table.
1219          */
1220         UnlockRelationIdForSession(&onerelid, lmode);
1221 }
1222
1223
1224 /****************************************************************************
1225  *                                                                                                                                                      *
1226  *                      Code for VACUUM FULL (only)                                                                             *
1227  *                                                                                                                                                      *
1228  ****************************************************************************
1229  */
1230
1231
1232 /*
1233  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1234  *
1235  *              This routine vacuums a single heap, cleans out its indexes, and
1236  *              updates its num_pages and num_tuples statistics.
1237  *
1238  *              At entry, we have already established a transaction and opened
1239  *              and locked the relation.
1240  *
1241  *              The return value indicates whether this function has held off
1242  *              interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
1243  */
1244 static bool
1245 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1246 {
1247         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1248                                                                                  * clean indexes */
1249         VacPageListData fraged_pages;           /* List of pages with space enough for
1250                                                                                  * re-using */
1251         Relation   *Irel;
1252         int                     nindexes,
1253                                 i;
1254         VRelStats  *vacrelstats;
1255         bool            heldoff = false;
1256
1257         vacuum_set_xid_limits(vacstmt->freeze_min_age, vacstmt->freeze_table_age,
1258                                                   onerel->rd_rel->relisshared,
1259                                                   &OldestXmin, &FreezeLimit, NULL);
1260
1261         /*
1262          * Flush any previous async-commit transactions.  This does not guarantee
1263          * that we will be able to set hint bits for tuples they inserted, but it
1264          * improves the probability, especially in simple sequential-commands
1265          * cases.  See scan_heap() and repair_frag() for more about this.
1266          */
1267         XLogAsyncCommitFlush();
1268
1269         /*
1270          * Set up statistics-gathering machinery.
1271          */
1272         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1273         vacrelstats->rel_pages = 0;
1274         vacrelstats->rel_tuples = 0;
1275         vacrelstats->rel_indexed_tuples = 0;
1276         vacrelstats->hasindex = false;
1277         vacrelstats->latestRemovedXid = InvalidTransactionId;
1278
1279         /* scan the heap */
1280         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1281         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1282
1283         /* Now open all indexes of the relation */
1284         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1285         if (nindexes > 0)
1286                 vacrelstats->hasindex = true;
1287
1288         /* Clean/scan index relation(s) */
1289         if (Irel != NULL)
1290         {
1291                 if (vacuum_pages.num_pages > 0)
1292                 {
1293                         for (i = 0; i < nindexes; i++)
1294                                 vacuum_index(&vacuum_pages, Irel[i],
1295                                                          vacrelstats->rel_indexed_tuples, 0);
1296                 }
1297                 else
1298                 {
1299                         /* just scan indexes to update statistic */
1300                         for (i = 0; i < nindexes; i++)
1301                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1302                 }
1303         }
1304
1305         if (fraged_pages.num_pages > 0)
1306         {
1307                 /* Try to shrink heap */
1308                 heldoff = repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1309                                         nindexes, Irel);
1310                 vac_close_indexes(nindexes, Irel, NoLock);
1311         }
1312         else
1313         {
1314                 vac_close_indexes(nindexes, Irel, NoLock);
1315                 if (vacuum_pages.num_pages > 0)
1316                 {
1317                         /* Clean pages from vacuum_pages list */
1318                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1319                 }
1320         }
1321
1322         /* update thefree space map with final free space info, and vacuum it */
1323         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1324         FreeSpaceMapVacuum(onerel);
1325
1326         /* update statistics in pg_class */
1327         vac_update_relstats(onerel,
1328                                                 vacrelstats->rel_pages, vacrelstats->rel_tuples,
1329                                                 vacrelstats->hasindex, FreezeLimit);
1330
1331         /* report results to the stats collector, too */
1332         pgstat_report_vacuum(RelationGetRelid(onerel),
1333                                                  onerel->rd_rel->relisshared,
1334                                                  true,
1335                                                  vacrelstats->rel_tuples);
1336
1337         return heldoff;
1338 }
1339
1340
1341 /*
1342  *      scan_heap() -- scan an open heap relation
1343  *
1344  *              This routine sets commit status bits, constructs vacuum_pages (list
1345  *              of pages we need to compact free space on and/or clean indexes of
1346  *              deleted tuples), constructs fraged_pages (list of pages with free
1347  *              space that tuples could be moved into), and calculates statistics
1348  *              on the number of live tuples in the heap.
1349  */
1350 static void
1351 scan_heap(VRelStats *vacrelstats, Relation onerel,
1352                   VacPageList vacuum_pages, VacPageList fraged_pages)
1353 {
1354         BlockNumber nblocks,
1355                                 blkno;
1356         char       *relname;
1357         VacPage         vacpage;
1358         BlockNumber empty_pages,
1359                                 empty_end_pages;
1360         double          num_tuples,
1361                                 num_indexed_tuples,
1362                                 tups_vacuumed,
1363                                 nkeep,
1364                                 nunused;
1365         double          free_space,
1366                                 usable_free_space;
1367         Size            min_tlen = MaxHeapTupleSize;
1368         Size            max_tlen = 0;
1369         bool            do_shrinking = true;
1370         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1371         int                     num_vtlinks = 0;
1372         int                     free_vtlinks = 100;
1373         PGRUsage        ru0;
1374
1375         pg_rusage_init(&ru0);
1376
1377         relname = RelationGetRelationName(onerel);
1378         ereport(elevel,
1379                         (errmsg("vacuuming \"%s.%s\"",
1380                                         get_namespace_name(RelationGetNamespace(onerel)),
1381                                         relname)));
1382
1383         empty_pages = empty_end_pages = 0;
1384         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1385         free_space = 0;
1386
1387         nblocks = RelationGetNumberOfBlocks(onerel);
1388
1389         /*
1390          * We initially create each VacPage item in a maximal-sized workspace,
1391          * then copy the workspace into a just-large-enough copy.
1392          */
1393         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1394
1395         for (blkno = 0; blkno < nblocks; blkno++)
1396         {
1397                 Page            page,
1398                                         tempPage = NULL;
1399                 bool            do_reap,
1400                                         do_frag;
1401                 Buffer          buf;
1402                 OffsetNumber offnum,
1403                                         maxoff;
1404                 bool            notup;
1405                 OffsetNumber frozen[MaxOffsetNumber];
1406                 int                     nfrozen;
1407
1408                 vacuum_delay_point();
1409
1410                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1411                                                                  vac_strategy);
1412                 page = BufferGetPage(buf);
1413
1414                 /*
1415                  * Since we are holding exclusive lock on the relation, no other
1416                  * backend can be accessing the page; however it is possible that the
1417                  * background writer will try to write the page if it's already marked
1418                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1419                  * must take exclusive buffer lock wherever we potentially modify
1420                  * pages.  In fact, we insist on cleanup lock so that we can safely
1421                  * call heap_page_prune().      (This might be overkill, since the
1422                  * bgwriter pays no attention to individual tuples, but on the other
1423                  * hand it's unlikely that the bgwriter has this particular page
1424                  * pinned at this instant.      So violating the coding rule would buy us
1425                  * little anyway.)
1426                  */
1427                 LockBufferForCleanup(buf);
1428
1429                 vacpage->blkno = blkno;
1430                 vacpage->offsets_used = 0;
1431                 vacpage->offsets_free = 0;
1432
1433                 if (PageIsNew(page))
1434                 {
1435                         VacPage         vacpagecopy;
1436
1437                         ereport(WARNING,
1438                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1439                                            relname, blkno)));
1440                         PageInit(page, BufferGetPageSize(buf), 0);
1441                         MarkBufferDirty(buf);
1442                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1443                         free_space += vacpage->free;
1444                         empty_pages++;
1445                         empty_end_pages++;
1446                         vacpagecopy = copy_vac_page(vacpage);
1447                         vpage_insert(vacuum_pages, vacpagecopy);
1448                         vpage_insert(fraged_pages, vacpagecopy);
1449                         UnlockReleaseBuffer(buf);
1450                         continue;
1451                 }
1452
1453                 if (PageIsEmpty(page))
1454                 {
1455                         VacPage         vacpagecopy;
1456
1457                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1458                         free_space += vacpage->free;
1459                         empty_pages++;
1460                         empty_end_pages++;
1461                         vacpagecopy = copy_vac_page(vacpage);
1462                         vpage_insert(vacuum_pages, vacpagecopy);
1463                         vpage_insert(fraged_pages, vacpagecopy);
1464                         UnlockReleaseBuffer(buf);
1465                         continue;
1466                 }
1467
1468                 /*
1469                  * Prune all HOT-update chains in this page.
1470                  *
1471                  * We use the redirect_move option so that redirecting line pointers
1472                  * get collapsed out; this allows us to not worry about them below.
1473                  *
1474                  * We count tuples removed by the pruning step as removed by VACUUM.
1475                  */
1476                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1477                                                                                  true, false);
1478
1479                 /*
1480                  * Now scan the page to collect vacuumable items and check for tuples
1481                  * requiring freezing.
1482                  */
1483                 nfrozen = 0;
1484                 notup = true;
1485                 maxoff = PageGetMaxOffsetNumber(page);
1486                 for (offnum = FirstOffsetNumber;
1487                          offnum <= maxoff;
1488                          offnum = OffsetNumberNext(offnum))
1489                 {
1490                         ItemId          itemid = PageGetItemId(page, offnum);
1491                         bool            tupgone = false;
1492                         HeapTupleData tuple;
1493
1494                         /*
1495                          * Collect un-used items too - it's possible to have indexes
1496                          * pointing here after crash.  (That's an ancient comment and is
1497                          * likely obsolete with WAL, but we might as well continue to
1498                          * check for such problems.)
1499                          */
1500                         if (!ItemIdIsUsed(itemid))
1501                         {
1502                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1503                                 nunused += 1;
1504                                 continue;
1505                         }
1506
1507                         /*
1508                          * DEAD item pointers are to be vacuumed normally; but we don't
1509                          * count them in tups_vacuumed, else we'd be double-counting (at
1510                          * least in the common case where heap_page_prune() just freed up
1511                          * a non-HOT tuple).
1512                          */
1513                         if (ItemIdIsDead(itemid))
1514                         {
1515                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1516                                 continue;
1517                         }
1518
1519                         /* Shouldn't have any redirected items anymore */
1520                         if (!ItemIdIsNormal(itemid))
1521                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1522                                          relname, blkno, offnum);
1523
1524                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1525                         tuple.t_len = ItemIdGetLength(itemid);
1526                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1527
1528                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1529                         {
1530                                 case HEAPTUPLE_LIVE:
1531                                         /* Tuple is good --- but let's do some validity checks */
1532                                         if (onerel->rd_rel->relhasoids &&
1533                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1534                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1535                                                          relname, blkno, offnum);
1536
1537                                         /*
1538                                          * The shrinkage phase of VACUUM FULL requires that all
1539                                          * live tuples have XMIN_COMMITTED set --- see comments in
1540                                          * repair_frag()'s walk-along-page loop.  Use of async
1541                                          * commit may prevent HeapTupleSatisfiesVacuum from
1542                                          * setting the bit for a recently committed tuple.      Rather
1543                                          * than trying to handle this corner case, we just give up
1544                                          * and don't shrink.
1545                                          */
1546                                         if (do_shrinking &&
1547                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1548                                         {
1549                                                 ereport(LOG,
1550                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1551                                                                                 relname, blkno, offnum,
1552                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1553                                                 do_shrinking = false;
1554                                         }
1555                                         break;
1556                                 case HEAPTUPLE_DEAD:
1557
1558                                         /*
1559                                          * Ordinarily, DEAD tuples would have been removed by
1560                                          * heap_page_prune(), but it's possible that the tuple
1561                                          * state changed since heap_page_prune() looked.  In
1562                                          * particular an INSERT_IN_PROGRESS tuple could have
1563                                          * changed to DEAD if the inserter aborted.  So this
1564                                          * cannot be considered an error condition, though it does
1565                                          * suggest that someone released a lock early.
1566                                          *
1567                                          * If the tuple is HOT-updated then it must only be
1568                                          * removed by a prune operation; so we keep it as if it
1569                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1570                                          * worth trying to make the shrinking code smart enough to
1571                                          * handle this?  It's an unusual corner case.)
1572                                          *
1573                                          * DEAD heap-only tuples can safely be removed if they
1574                                          * aren't themselves HOT-updated, although this is a bit
1575                                          * inefficient since we'll uselessly try to remove index
1576                                          * entries for them.
1577                                          */
1578                                         if (HeapTupleIsHotUpdated(&tuple))
1579                                         {
1580                                                 nkeep += 1;
1581                                                 if (do_shrinking)
1582                                                         ereport(LOG,
1583                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1584                                                                                         relname, blkno, offnum)));
1585                                                 do_shrinking = false;
1586                                         }
1587                                         else
1588                                         {
1589                                                 tupgone = true; /* we can delete the tuple */
1590
1591                                                 /*
1592                                                  * We need not require XMIN_COMMITTED or
1593                                                  * XMAX_COMMITTED to be set, since we will remove the
1594                                                  * tuple without any further examination of its hint
1595                                                  * bits.
1596                                                  */
1597                                         }
1598                                         break;
1599                                 case HEAPTUPLE_RECENTLY_DEAD:
1600
1601                                         /*
1602                                          * If tuple is recently deleted then we must not remove it
1603                                          * from relation.
1604                                          */
1605                                         nkeep += 1;
1606
1607                                         /*
1608                                          * As with the LIVE case, shrinkage requires
1609                                          * XMIN_COMMITTED to be set.
1610                                          */
1611                                         if (do_shrinking &&
1612                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1613                                         {
1614                                                 ereport(LOG,
1615                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1616                                                                                 relname, blkno, offnum,
1617                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1618                                                 do_shrinking = false;
1619                                         }
1620
1621                                         /*
1622                                          * If we do shrinking and this tuple is updated one then
1623                                          * remember it to construct updated tuple dependencies.
1624                                          */
1625                                         if (do_shrinking &&
1626                                                 !(ItemPointerEquals(&(tuple.t_self),
1627                                                                                         &(tuple.t_data->t_ctid))))
1628                                         {
1629                                                 if (free_vtlinks == 0)
1630                                                 {
1631                                                         free_vtlinks = 1000;
1632                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1633                                                                                            (free_vtlinks + num_vtlinks) *
1634                                                                                                          sizeof(VTupleLinkData));
1635                                                 }
1636                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1637                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1638                                                 free_vtlinks--;
1639                                                 num_vtlinks++;
1640                                         }
1641                                         break;
1642                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1643
1644                                         /*
1645                                          * This should not happen, since we hold exclusive lock on
1646                                          * the relation; shouldn't we raise an error?  (Actually,
1647                                          * it can happen in system catalogs, since we tend to
1648                                          * release write lock before commit there.)  As above, we
1649                                          * can't apply repair_frag() if the tuple state is
1650                                          * uncertain.
1651                                          */
1652                                         if (do_shrinking)
1653                                                 ereport(LOG,
1654                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1655                                                                                 relname, blkno, offnum,
1656                                                                          HeapTupleHeaderGetXmin(tuple.t_data))));
1657                                         do_shrinking = false;
1658                                         break;
1659                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1660
1661                                         /*
1662                                          * This should not happen, since we hold exclusive lock on
1663                                          * the relation; shouldn't we raise an error?  (Actually,
1664                                          * it can happen in system catalogs, since we tend to
1665                                          * release write lock before commit there.)  As above, we
1666                                          * can't apply repair_frag() if the tuple state is
1667                                          * uncertain.
1668                                          */
1669                                         if (do_shrinking)
1670                                                 ereport(LOG,
1671                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1672                                                                                 relname, blkno, offnum,
1673                                                                          HeapTupleHeaderGetXmax(tuple.t_data))));
1674                                         do_shrinking = false;
1675                                         break;
1676                                 default:
1677                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1678                                         break;
1679                         }
1680
1681                         if (tupgone)
1682                         {
1683                                 ItemId          lpp;
1684
1685                                 HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
1686                                                                                         &vacrelstats->latestRemovedXid);
1687
1688                                 /*
1689                                  * Here we are building a temporary copy of the page with dead
1690                                  * tuples removed.      Below we will apply
1691                                  * PageRepairFragmentation to the copy, so that we can
1692                                  * determine how much space will be available after removal of
1693                                  * dead tuples.  But note we are NOT changing the real page
1694                                  * yet...
1695                                  */
1696                                 if (tempPage == NULL)
1697                                 {
1698                                         Size            pageSize;
1699
1700                                         pageSize = PageGetPageSize(page);
1701                                         tempPage = (Page) palloc(pageSize);
1702                                         memcpy(tempPage, page, pageSize);
1703                                 }
1704
1705                                 /* mark it unused on the temp page */
1706                                 lpp = PageGetItemId(tempPage, offnum);
1707                                 ItemIdSetUnused(lpp);
1708
1709                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1710                                 tups_vacuumed += 1;
1711                         }
1712                         else
1713                         {
1714                                 num_tuples += 1;
1715                                 if (!HeapTupleIsHeapOnly(&tuple))
1716                                         num_indexed_tuples += 1;
1717                                 notup = false;
1718                                 if (tuple.t_len < min_tlen)
1719                                         min_tlen = tuple.t_len;
1720                                 if (tuple.t_len > max_tlen)
1721                                         max_tlen = tuple.t_len;
1722
1723                                 /*
1724                                  * Each non-removable tuple must be checked to see if it needs
1725                                  * freezing.
1726                                  */
1727                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1728                                                                           InvalidBuffer))
1729                                         frozen[nfrozen++] = offnum;
1730                         }
1731                 }                                               /* scan along page */
1732
1733                 if (tempPage != NULL)
1734                 {
1735                         /* Some tuples are removable; figure free space after removal */
1736                         PageRepairFragmentation(tempPage);
1737                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1738                         pfree(tempPage);
1739                         do_reap = true;
1740                 }
1741                 else
1742                 {
1743                         /* Just use current available space */
1744                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1745                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1746                         do_reap = (vacpage->offsets_free > 0);
1747                 }
1748
1749                 free_space += vacpage->free;
1750
1751                 /*
1752                  * Add the page to vacuum_pages if it requires reaping, and add it to
1753                  * fraged_pages if it has a useful amount of free space.  "Useful"
1754                  * means enough for a minimal-sized tuple.      But we don't know that
1755                  * accurately near the start of the relation, so add pages
1756                  * unconditionally if they have >= BLCKSZ/10 free space.  Also
1757                  * forcibly add pages with no live tuples, to avoid confusing the
1758                  * empty_end_pages logic.  (In the presence of unreasonably small
1759                  * fillfactor, it seems possible that such pages might not pass the
1760                  * free-space test, but they had better be in the list anyway.)
1761                  */
1762                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10 ||
1763                                    notup);
1764
1765                 if (do_reap || do_frag)
1766                 {
1767                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1768
1769                         if (do_reap)
1770                                 vpage_insert(vacuum_pages, vacpagecopy);
1771                         if (do_frag)
1772                                 vpage_insert(fraged_pages, vacpagecopy);
1773                 }
1774
1775                 /*
1776                  * Include the page in empty_end_pages if it will be empty after
1777                  * vacuuming; this is to keep us from using it as a move destination.
1778                  * Note that such pages are guaranteed to be in fraged_pages.
1779                  */
1780                 if (notup)
1781                 {
1782                         empty_pages++;
1783                         empty_end_pages++;
1784                 }
1785                 else
1786                         empty_end_pages = 0;
1787
1788                 /*
1789                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1790                  * record recording the changes.  We must log the changes to be
1791                  * crash-safe against future truncation of CLOG.
1792                  */
1793                 if (nfrozen > 0)
1794                 {
1795                         MarkBufferDirty(buf);
1796                         /* no XLOG for temp tables, though */
1797                         if (!onerel->rd_istemp)
1798                         {
1799                                 XLogRecPtr      recptr;
1800
1801                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1802                                                                                  frozen, nfrozen);
1803                                 PageSetLSN(page, recptr);
1804                                 PageSetTLI(page, ThisTimeLineID);
1805                         }
1806                 }
1807
1808                 UnlockReleaseBuffer(buf);
1809         }
1810
1811         pfree(vacpage);
1812
1813         /* save stats in the rel list for use later */
1814         vacrelstats->rel_tuples = num_tuples;
1815         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1816         vacrelstats->rel_pages = nblocks;
1817         if (num_tuples == 0)
1818                 min_tlen = max_tlen = 0;
1819         vacrelstats->min_tlen = min_tlen;
1820         vacrelstats->max_tlen = max_tlen;
1821
1822         vacuum_pages->empty_end_pages = empty_end_pages;
1823         fraged_pages->empty_end_pages = empty_end_pages;
1824
1825         /*
1826          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1827          * remove any "empty" end-pages from the list, and compute usable free
1828          * space = free space in remaining pages.
1829          */
1830         if (do_shrinking)
1831         {
1832                 int                     i;
1833
1834                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1835                 fraged_pages->num_pages -= empty_end_pages;
1836                 usable_free_space = 0;
1837                 for (i = 0; i < fraged_pages->num_pages; i++)
1838                         usable_free_space += fraged_pages->pagedesc[i]->free;
1839         }
1840         else
1841         {
1842                 fraged_pages->num_pages = 0;
1843                 usable_free_space = 0;
1844         }
1845
1846         /* don't bother to save vtlinks if we will not call repair_frag */
1847         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1848         {
1849                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1850                           vac_cmp_vtlinks);
1851                 vacrelstats->vtlinks = vtlinks;
1852                 vacrelstats->num_vtlinks = num_vtlinks;
1853         }
1854         else
1855         {
1856                 vacrelstats->vtlinks = NULL;
1857                 vacrelstats->num_vtlinks = 0;
1858                 pfree(vtlinks);
1859         }
1860
1861         ereport(elevel,
1862                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1863                                         RelationGetRelationName(onerel),
1864                                         tups_vacuumed, num_tuples, nblocks),
1865                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1866                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1867                                            "There were %.0f unused item pointers.\n"
1868            "Total free space (including removable row versions) is %.0f bytes.\n"
1869                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1870          "%u pages containing %.0f free bytes are potential move destinations.\n"
1871                                            "%s.",
1872                                            nkeep,
1873                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1874                                            nunused,
1875                                            free_space,
1876                                            empty_pages, empty_end_pages,
1877                                            fraged_pages->num_pages, usable_free_space,
1878                                            pg_rusage_show(&ru0))));
1879 }
1880
1881
1882 /*
1883  *      repair_frag() -- try to repair relation's fragmentation
1884  *
1885  *              This routine marks dead tuples as unused and tries re-use dead space
1886  *              by moving tuples (and inserting indexes if needed). It constructs
1887  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1888  *              for them after committing (in hack-manner - without losing locks
1889  *              and freeing memory!) current transaction. It truncates relation
1890  *              if some end-blocks are gone away.
1891  *
1892  *              The return value indicates whether this function has held off
1893  *              interrupts -- caller must RESUME_INTERRUPTS() after commit if true.
1894  */
1895 static bool
1896 repair_frag(VRelStats *vacrelstats, Relation onerel,
1897                         VacPageList vacuum_pages, VacPageList fraged_pages,
1898                         int nindexes, Relation *Irel)
1899 {
1900         TransactionId myXID = GetCurrentTransactionId();
1901         Buffer          dst_buffer = InvalidBuffer;
1902         BlockNumber nblocks,
1903                                 blkno;
1904         BlockNumber last_move_dest_block = 0,
1905                                 last_vacuum_block;
1906         Page            dst_page = NULL;
1907         ExecContextData ec;
1908         VacPageListData Nvacpagelist;
1909         VacPage         dst_vacpage = NULL,
1910                                 last_vacuum_page,
1911                                 vacpage,
1912                            *curpage;
1913         int                     i;
1914         int                     num_moved = 0,
1915                                 num_fraged_pages,
1916                                 vacuumed_pages;
1917         int                     keep_tuples = 0;
1918         int                     keep_indexed_tuples = 0;
1919         PGRUsage        ru0;
1920         bool            heldoff = false;
1921
1922         pg_rusage_init(&ru0);
1923
1924         ExecContext_Init(&ec, onerel);
1925
1926         Nvacpagelist.num_pages = 0;
1927         num_fraged_pages = fraged_pages->num_pages;
1928         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1929         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1930         if (vacuumed_pages > 0)
1931         {
1932                 /* get last reaped page from vacuum_pages */
1933                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1934                 last_vacuum_block = last_vacuum_page->blkno;
1935         }
1936         else
1937         {
1938                 last_vacuum_page = NULL;
1939                 last_vacuum_block = InvalidBlockNumber;
1940         }
1941
1942         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1943         vacpage->offsets_used = vacpage->offsets_free = 0;
1944
1945         /*
1946          * Scan pages backwards from the last nonempty page, trying to move tuples
1947          * down to lower pages.  Quit when we reach a page that we have moved any
1948          * tuples onto, or the first page if we haven't moved anything, or when we
1949          * find a page we cannot completely empty (this last condition is handled
1950          * by "break" statements within the loop).
1951          *
1952          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1953          * in order by blkno.
1954          */
1955         nblocks = vacrelstats->rel_pages;
1956         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1957                  blkno > last_move_dest_block;
1958                  blkno--)
1959         {
1960                 Buffer          buf;
1961                 Page            page;
1962                 OffsetNumber offnum,
1963                                         maxoff;
1964                 bool            isempty,
1965                                         chain_tuple_moved;
1966
1967                 vacuum_delay_point();
1968
1969                 /*
1970                  * Forget fraged_pages pages at or after this one; they're no longer
1971                  * useful as move targets, since we only want to move down. Note that
1972                  * since we stop the outer loop at last_move_dest_block, pages removed
1973                  * here cannot have had anything moved onto them already.
1974                  *
1975                  * Also note that we don't change the stored fraged_pages list, only
1976                  * our local variable num_fraged_pages; so the forgotten pages are
1977                  * still available to be loaded into the free space map later.
1978                  */
1979                 while (num_fraged_pages > 0 &&
1980                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1981                 {
1982                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1983                         --num_fraged_pages;
1984                 }
1985
1986                 /*
1987                  * Process this page of relation.
1988                  */
1989                 buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
1990                                                                  vac_strategy);
1991                 page = BufferGetPage(buf);
1992
1993                 vacpage->offsets_free = 0;
1994
1995                 isempty = PageIsEmpty(page);
1996
1997                 /* Is the page in the vacuum_pages list? */
1998                 if (blkno == last_vacuum_block)
1999                 {
2000                         if (last_vacuum_page->offsets_free > 0)
2001                         {
2002                                 /* there are dead tuples on this page - clean them */
2003                                 Assert(!isempty);
2004                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2005                                 vacuum_page(vacrelstats, onerel, buf, last_vacuum_page);
2006                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2007                         }
2008                         else
2009                                 Assert(isempty);
2010                         --vacuumed_pages;
2011                         if (vacuumed_pages > 0)
2012                         {
2013                                 /* get prev reaped page from vacuum_pages */
2014                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
2015                                 last_vacuum_block = last_vacuum_page->blkno;
2016                         }
2017                         else
2018                         {
2019                                 last_vacuum_page = NULL;
2020                                 last_vacuum_block = InvalidBlockNumber;
2021                         }
2022                         if (isempty)
2023                         {
2024                                 ReleaseBuffer(buf);
2025                                 continue;
2026                         }
2027                 }
2028                 else
2029                         Assert(!isempty);
2030
2031                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
2032                                                                                  * this page, yet */
2033                 vacpage->blkno = blkno;
2034                 maxoff = PageGetMaxOffsetNumber(page);
2035                 for (offnum = FirstOffsetNumber;
2036                          offnum <= maxoff;
2037                          offnum = OffsetNumberNext(offnum))
2038                 {
2039                         Size            tuple_len;
2040                         HeapTupleData tuple;
2041                         ItemId          itemid = PageGetItemId(page, offnum);
2042
2043                         if (!ItemIdIsUsed(itemid))
2044                                 continue;
2045
2046                         if (ItemIdIsDead(itemid))
2047                         {
2048                                 /* just remember it for vacuum_page() */
2049                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2050                                 continue;
2051                         }
2052
2053                         /* Shouldn't have any redirected items now */
2054                         Assert(ItemIdIsNormal(itemid));
2055
2056                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2057                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
2058                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
2059
2060                         /* ---
2061                          * VACUUM FULL has an exclusive lock on the relation.  So
2062                          * normally no other transaction can have pending INSERTs or
2063                          * DELETEs in this relation.  A tuple is either:
2064                          *              (a) live (XMIN_COMMITTED)
2065                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
2066                          *                      is visible to all active transactions)
2067                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
2068                          *                      but at least one active transaction does not see the
2069                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
2070                          *              (d) moved by the currently running VACUUM
2071                          *              (e) inserted or deleted by a not yet committed transaction,
2072                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
2073                          * In case (e) we wouldn't be in repair_frag() at all, because
2074                          * scan_heap() detects those cases and shuts off shrinking.
2075                          * We can't see case (b) here either, because such tuples were
2076                          * already removed by vacuum_page().  Cases (a) and (c) are
2077                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
2078                          * possible if a whole tuple chain has been moved while
2079                          * processing this or a higher numbered block.
2080                          * ---
2081                          */
2082                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2083                         {
2084                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2085                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2086                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
2087                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2088
2089                                 /*
2090                                  * MOVED_OFF by another VACUUM would have caused the
2091                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
2092                                  */
2093                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2094                                         elog(ERROR, "invalid XVAC in tuple header");
2095
2096                                 /*
2097                                  * If this (chain) tuple is moved by me already then I have to
2098                                  * check is it in vacpage or not - i.e. is it moved while
2099                                  * cleaning this page or some previous one.
2100                                  */
2101
2102                                 /* Can't we Assert(keep_tuples > 0) here? */
2103                                 if (keep_tuples == 0)
2104                                         continue;
2105                                 if (chain_tuple_moved)
2106                                 {
2107                                         /* some chains were moved while cleaning this page */
2108                                         Assert(vacpage->offsets_free > 0);
2109                                         for (i = 0; i < vacpage->offsets_free; i++)
2110                                         {
2111                                                 if (vacpage->offsets[i] == offnum)
2112                                                         break;
2113                                         }
2114                                         if (i >= vacpage->offsets_free)         /* not found */
2115                                         {
2116                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
2117
2118                                                 /*
2119                                                  * If this is not a heap-only tuple, there must be an
2120                                                  * index entry for this item which will be removed in
2121                                                  * the index cleanup. Decrement the
2122                                                  * keep_indexed_tuples count to remember this.
2123                                                  */
2124                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2125                                                         keep_indexed_tuples--;
2126                                                 keep_tuples--;
2127                                         }
2128                                 }
2129                                 else
2130                                 {
2131                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2132
2133                                         /*
2134                                          * If this is not a heap-only tuple, there must be an
2135                                          * index entry for this item which will be removed in the
2136                                          * index cleanup. Decrement the keep_indexed_tuples count
2137                                          * to remember this.
2138                                          */
2139                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2140                                                 keep_indexed_tuples--;
2141                                         keep_tuples--;
2142                                 }
2143                                 continue;
2144                         }
2145
2146                         /*
2147                          * If this tuple is in a chain of tuples created in updates by
2148                          * "recent" transactions then we have to move the whole chain of
2149                          * tuples to other places, so that we can write new t_ctid links
2150                          * that preserve the chain relationship.
2151                          *
2152                          * This test is complicated.  Read it as "if tuple is a recently
2153                          * created updated version, OR if it is an obsoleted version". (In
2154                          * the second half of the test, we needn't make any check on XMAX
2155                          * --- it must be recently obsoleted, else scan_heap would have
2156                          * deemed it removable.)
2157                          *
2158                          * NOTE: this test is not 100% accurate: it is possible for a
2159                          * tuple to be an updated one with recent xmin, and yet not match
2160                          * any new_tid entry in the vtlinks list.  Presumably there was
2161                          * once a parent tuple with xmax matching the xmin, but it's
2162                          * possible that that tuple has been removed --- for example, if
2163                          * it had xmin = xmax and wasn't itself an updated version, then
2164                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2165                          * xmin xact completes.
2166                          *
2167                          * To be on the safe side, we abandon the repair_frag process if
2168                          * we cannot find the parent tuple in vtlinks.  This may be overly
2169                          * conservative; AFAICS it would be safe to move the chain.
2170                          *
2171                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2172                          * using OldestXmin, which is a rather coarse test, it is quite
2173                          * possible to have an update chain in which a tuple we think is
2174                          * RECENTLY_DEAD links forward to one that is definitely DEAD. In
2175                          * such a case the RECENTLY_DEAD tuple must actually be dead, but
2176                          * it seems too complicated to try to make VACUUM remove it. We
2177                          * treat each contiguous set of RECENTLY_DEAD tuples as a
2178                          * separately movable chain, ignoring any intervening DEAD ones.
2179                          */
2180                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2181                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2182                                                                                 OldestXmin)) ||
2183                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2184                                                                                            HEAP_IS_LOCKED)) &&
2185                                  !(ItemPointerEquals(&(tuple.t_self),
2186                                                                          &(tuple.t_data->t_ctid)))))
2187                         {
2188                                 Buffer          Cbuf = buf;
2189                                 bool            freeCbuf = false;
2190                                 bool            chain_move_failed = false;
2191                                 bool            moved_target = false;
2192                                 ItemPointerData Ctid;
2193                                 HeapTupleData tp = tuple;
2194                                 Size            tlen = tuple_len;
2195                                 VTupleMove      vtmove;
2196                                 int                     num_vtmove;
2197                                 int                     free_vtmove;
2198                                 VacPage         to_vacpage = NULL;
2199                                 int                     to_item = 0;
2200                                 int                     ti;
2201
2202                                 if (dst_buffer != InvalidBuffer)
2203                                 {
2204                                         ReleaseBuffer(dst_buffer);
2205                                         dst_buffer = InvalidBuffer;
2206                                 }
2207
2208                                 /* Quick exit if we have no vtlinks to search in */
2209                                 if (vacrelstats->vtlinks == NULL)
2210                                 {
2211                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2212                                         break;          /* out of walk-along-page loop */
2213                                 }
2214
2215                                 /*
2216                                  * If this tuple is in the begin/middle of the chain then we
2217                                  * have to move to the end of chain.  As with any t_ctid
2218                                  * chase, we have to verify that each new tuple is really the
2219                                  * descendant of the tuple we came from; however, here we need
2220                                  * even more than the normal amount of paranoia. If t_ctid
2221                                  * links forward to a tuple determined to be DEAD, then
2222                                  * depending on where that tuple is, it might already have
2223                                  * been removed, and perhaps even replaced by a MOVED_IN
2224                                  * tuple.  We don't want to include any DEAD tuples in the
2225                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2226                                  */
2227                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2228                                                                                                   HEAP_IS_LOCKED)) &&
2229                                            !(ItemPointerEquals(&(tp.t_self),
2230                                                                                    &(tp.t_data->t_ctid))))
2231                                 {
2232                                         ItemPointerData nextTid;
2233                                         TransactionId priorXmax;
2234                                         Buffer          nextBuf;
2235                                         Page            nextPage;
2236                                         OffsetNumber nextOffnum;
2237                                         ItemId          nextItemid;
2238                                         HeapTupleHeader nextTdata;
2239                                         HTSV_Result nextTstatus;
2240
2241                                         nextTid = tp.t_data->t_ctid;
2242                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2243                                         /* assume block# is OK (see heap_fetch comments) */
2244                                         nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2245                                                                                  ItemPointerGetBlockNumber(&nextTid),
2246                                                                                                  RBM_NORMAL, vac_strategy);
2247                                         nextPage = BufferGetPage(nextBuf);
2248                                         /* If bogus or unused slot, assume tp is end of chain */
2249                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2250                                         if (nextOffnum < FirstOffsetNumber ||
2251                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2252                                         {
2253                                                 ReleaseBuffer(nextBuf);
2254                                                 break;
2255                                         }
2256                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2257                                         if (!ItemIdIsNormal(nextItemid))
2258                                         {
2259                                                 ReleaseBuffer(nextBuf);
2260                                                 break;
2261                                         }
2262                                         /* if not matching XMIN, assume tp is end of chain */
2263                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2264                                                                                                                           nextItemid);
2265                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2266                                                                                          priorXmax))
2267                                         {
2268                                                 ReleaseBuffer(nextBuf);
2269                                                 break;
2270                                         }
2271
2272                                         /*
2273                                          * Must check for DEAD or MOVED_IN tuple, too.  This could
2274                                          * potentially update hint bits, so we'd better hold the
2275                                          * buffer content lock.
2276                                          */
2277                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2278                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2279                                                                                                                    OldestXmin,
2280                                                                                                                    nextBuf);
2281                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2282                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2283                                         {
2284                                                 UnlockReleaseBuffer(nextBuf);
2285                                                 break;
2286                                         }
2287                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2288                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2289                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2290                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2291                                         /* OK, switch our attention to the next tuple in chain */
2292                                         tp.t_data = nextTdata;
2293                                         tp.t_self = nextTid;
2294                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2295                                         if (freeCbuf)
2296                                                 ReleaseBuffer(Cbuf);
2297                                         Cbuf = nextBuf;
2298                                         freeCbuf = true;
2299                                 }
2300
2301                                 /* Set up workspace for planning the chain move */
2302                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2303                                 num_vtmove = 0;
2304                                 free_vtmove = 100;
2305
2306                                 /*
2307                                  * Now, walk backwards up the chain (towards older tuples) and
2308                                  * check if all items in chain can be moved.  We record all
2309                                  * the moves that need to be made in the vtmove array.
2310                                  */
2311                                 for (;;)
2312                                 {
2313                                         Buffer          Pbuf;
2314                                         Page            Ppage;
2315                                         ItemId          Pitemid;
2316                                         HeapTupleHeader PTdata;
2317                                         VTupleLinkData vtld,
2318                                                            *vtlp;
2319
2320                                         /* Identify a target page to move this tuple to */
2321                                         if (to_vacpage == NULL ||
2322                                                 !enough_space(to_vacpage, tlen))
2323                                         {
2324                                                 for (i = 0; i < num_fraged_pages; i++)
2325                                                 {
2326                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2327                                                                 break;
2328                                                 }
2329
2330                                                 if (i == num_fraged_pages)
2331                                                 {
2332                                                         /* can't move item anywhere */
2333                                                         chain_move_failed = true;
2334                                                         break;          /* out of check-all-items loop */
2335                                                 }
2336                                                 to_item = i;
2337                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2338                                         }
2339                                         to_vacpage->free -= MAXALIGN(tlen);
2340                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2341                                                 to_vacpage->free -= sizeof(ItemIdData);
2342                                         (to_vacpage->offsets_used)++;
2343
2344                                         /* Add an entry to vtmove list */
2345                                         if (free_vtmove == 0)
2346                                         {
2347                                                 free_vtmove = 1000;
2348                                                 vtmove = (VTupleMove)
2349                                                         repalloc(vtmove,
2350                                                                          (free_vtmove + num_vtmove) *
2351                                                                          sizeof(VTupleMoveData));
2352                                         }
2353                                         vtmove[num_vtmove].tid = tp.t_self;
2354                                         vtmove[num_vtmove].vacpage = to_vacpage;
2355                                         if (to_vacpage->offsets_used == 1)
2356                                                 vtmove[num_vtmove].cleanVpd = true;
2357                                         else
2358                                                 vtmove[num_vtmove].cleanVpd = false;
2359                                         free_vtmove--;
2360                                         num_vtmove++;
2361
2362                                         /* Remember if we reached the original target tuple */
2363                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2364                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2365                                                 moved_target = true;
2366
2367                                         /* Done if at beginning of chain */
2368                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2369                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2370                                                                                    OldestXmin))
2371                                                 break;  /* out of check-all-items loop */
2372
2373                                         /* Move to tuple with prior row version */
2374                                         vtld.new_tid = tp.t_self;
2375                                         vtlp = (VTupleLink)
2376                                                 vac_bsearch((void *) &vtld,
2377                                                                         (void *) (vacrelstats->vtlinks),
2378                                                                         vacrelstats->num_vtlinks,
2379                                                                         sizeof(VTupleLinkData),
2380                                                                         vac_cmp_vtlinks);
2381                                         if (vtlp == NULL)
2382                                         {
2383                                                 /* see discussion above */
2384                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2385                                                 chain_move_failed = true;
2386                                                 break;  /* out of check-all-items loop */
2387                                         }
2388                                         tp.t_self = vtlp->this_tid;
2389                                         Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2390                                                                          ItemPointerGetBlockNumber(&(tp.t_self)),
2391                                                                                           RBM_NORMAL, vac_strategy);
2392                                         Ppage = BufferGetPage(Pbuf);
2393                                         Pitemid = PageGetItemId(Ppage,
2394                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2395                                         /* this can't happen since we saw tuple earlier: */
2396                                         if (!ItemIdIsNormal(Pitemid))
2397                                                 elog(ERROR, "parent itemid marked as unused");
2398                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2399
2400                                         /* ctid should not have changed since we saved it */
2401                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2402                                                                                          &(PTdata->t_ctid)));
2403
2404                                         /*
2405                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2406                                          * (child item is removed)... Due to the fact that at the
2407                                          * moment we don't remove unuseful part of update-chain,
2408                                          * it's possible to get non-matching parent row here. Like
2409                                          * as in the case which caused this problem, we stop
2410                                          * shrinking here. I could try to find real parent row but
2411                                          * want not to do it because of real solution will be
2412                                          * implemented anyway, later, and we are too close to 6.5
2413                                          * release. - vadim 06/11/99
2414                                          */
2415                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2416                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2417                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2418                                         {
2419                                                 ReleaseBuffer(Pbuf);
2420                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2421                                                 chain_move_failed = true;
2422                                                 break;  /* out of check-all-items loop */
2423                                         }
2424                                         tp.t_data = PTdata;
2425                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2426                                         if (freeCbuf)
2427                                                 ReleaseBuffer(Cbuf);
2428                                         Cbuf = Pbuf;
2429                                         freeCbuf = true;
2430                                 }                               /* end of check-all-items loop */
2431
2432                                 if (freeCbuf)
2433                                         ReleaseBuffer(Cbuf);
2434                                 freeCbuf = false;
2435
2436                                 /* Double-check that we will move the current target tuple */
2437                                 if (!moved_target && !chain_move_failed)
2438                                 {
2439                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2440                                         chain_move_failed = true;
2441                                 }
2442
2443                                 if (chain_move_failed)
2444                                 {
2445                                         /*
2446                                          * Undo changes to offsets_used state.  We don't bother
2447                                          * cleaning up the amount-free state, since we're not
2448                                          * going to do any further tuple motion.
2449                                          */
2450                                         for (i = 0; i < num_vtmove; i++)
2451                                         {
2452                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2453                                                 (vtmove[i].vacpage->offsets_used)--;
2454                                         }
2455                                         pfree(vtmove);
2456                                         break;          /* out of walk-along-page loop */
2457                                 }
2458
2459                                 /*
2460                                  * Okay, move the whole tuple chain in reverse order.
2461                                  *
2462                                  * Ctid tracks the new location of the previously-moved tuple.
2463                                  */
2464                                 ItemPointerSetInvalid(&Ctid);
2465                                 for (ti = 0; ti < num_vtmove; ti++)
2466                                 {
2467                                         VacPage         destvacpage = vtmove[ti].vacpage;
2468                                         Page            Cpage;
2469                                         ItemId          Citemid;
2470
2471                                         /* Get page to move from */
2472                                         tuple.t_self = vtmove[ti].tid;
2473                                         Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
2474                                                                   ItemPointerGetBlockNumber(&(tuple.t_self)),
2475                                                                                           RBM_NORMAL, vac_strategy);
2476
2477                                         /* Get page to move to */
2478                                         dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2479                                                                                                         destvacpage->blkno,
2480                                                                                                         RBM_NORMAL, vac_strategy);
2481
2482                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2483                                         if (dst_buffer != Cbuf)
2484                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2485
2486                                         dst_page = BufferGetPage(dst_buffer);
2487                                         Cpage = BufferGetPage(Cbuf);
2488
2489                                         Citemid = PageGetItemId(Cpage,
2490                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2491                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2492                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2493
2494                                         move_chain_tuple(vacrelstats, onerel, Cbuf, Cpage, &tuple,
2495                                                                          dst_buffer, dst_page, destvacpage,
2496                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2497
2498                                         /*
2499                                          * If the tuple we are moving is a heap-only tuple, this
2500                                          * move will generate an additional index entry, so
2501                                          * increment the rel_indexed_tuples count.
2502                                          */
2503                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2504                                                 vacrelstats->rel_indexed_tuples++;
2505
2506                                         num_moved++;
2507                                         if (destvacpage->blkno > last_move_dest_block)
2508                                                 last_move_dest_block = destvacpage->blkno;
2509
2510                                         /*
2511                                          * Remember that we moved tuple from the current page
2512                                          * (corresponding index tuple will be cleaned).
2513                                          */
2514                                         if (Cbuf == buf)
2515                                                 vacpage->offsets[vacpage->offsets_free++] =
2516                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2517                                         else
2518                                         {
2519                                                 /*
2520                                                  * When we move tuple chains, we may need to move
2521                                                  * tuples from a block that we haven't yet scanned in
2522                                                  * the outer walk-along-the-relation loop. Note that
2523                                                  * we can't be moving a tuple from a block that we
2524                                                  * have already scanned because if such a tuple
2525                                                  * exists, then we must have moved the chain along
2526                                                  * with that tuple when we scanned that block. IOW the
2527                                                  * test of (Cbuf != buf) guarantees that the tuple we
2528                                                  * are looking at right now is in a block which is yet
2529                                                  * to be scanned.
2530                                                  *
2531                                                  * We maintain two counters to correctly count the
2532                                                  * moved-off tuples from blocks that are not yet
2533                                                  * scanned (keep_tuples) and how many of them have
2534                                                  * index pointers (keep_indexed_tuples).  The main
2535                                                  * reason to track the latter is to help verify that
2536                                                  * indexes have the expected number of entries when
2537                                                  * all the dust settles.
2538                                                  */
2539                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2540                                                         keep_indexed_tuples++;
2541                                                 keep_tuples++;
2542                                         }
2543
2544                                         ReleaseBuffer(dst_buffer);
2545                                         ReleaseBuffer(Cbuf);
2546                                 }                               /* end of move-the-tuple-chain loop */
2547
2548                                 dst_buffer = InvalidBuffer;
2549                                 pfree(vtmove);
2550                                 chain_tuple_moved = true;
2551
2552                                 /* advance to next tuple in walk-along-page loop */
2553                                 continue;
2554                         }                                       /* end of is-tuple-in-chain test */
2555
2556                         /* try to find new page for this tuple */
2557                         if (dst_buffer == InvalidBuffer ||
2558                                 !enough_space(dst_vacpage, tuple_len))
2559                         {
2560                                 if (dst_buffer != InvalidBuffer)
2561                                 {
2562                                         ReleaseBuffer(dst_buffer);
2563                                         dst_buffer = InvalidBuffer;
2564                                 }
2565                                 for (i = 0; i < num_fraged_pages; i++)
2566                                 {
2567                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2568                                                 break;
2569                                 }
2570                                 if (i == num_fraged_pages)
2571                                         break;          /* can't move item anywhere */
2572                                 dst_vacpage = fraged_pages->pagedesc[i];
2573                                 dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
2574                                                                                                 dst_vacpage->blkno,
2575                                                                                                 RBM_NORMAL, vac_strategy);
2576                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2577                                 dst_page = BufferGetPage(dst_buffer);
2578                                 /* if this page was not used before - clean it */
2579                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2580                                         vacuum_page(vacrelstats, onerel, dst_buffer, dst_vacpage);
2581                         }
2582                         else
2583                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2584
2585                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2586
2587                         move_plain_tuple(onerel, buf, page, &tuple,
2588                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2589
2590                         /*
2591                          * If the tuple we are moving is a heap-only tuple, this move will
2592                          * generate an additional index entry, so increment the
2593                          * rel_indexed_tuples count.
2594                          */
2595                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2596                                 vacrelstats->rel_indexed_tuples++;
2597
2598                         num_moved++;
2599                         if (dst_vacpage->blkno > last_move_dest_block)
2600                                 last_move_dest_block = dst_vacpage->blkno;
2601
2602                         /*
2603                          * Remember that we moved tuple from the current page
2604                          * (corresponding index tuple will be cleaned).
2605                          */
2606                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2607                 }                                               /* walk along page */
2608
2609                 /*
2610                  * If we broke out of the walk-along-page loop early (ie, still have
2611                  * offnum <= maxoff), then we failed to move some tuple off this page.
2612                  * No point in shrinking any more, so clean up and exit the per-page
2613                  * loop.
2614                  */
2615                 if (offnum < maxoff && keep_tuples > 0)
2616                 {
2617                         OffsetNumber off;
2618
2619                         /*
2620                          * Fix vacpage state for any unvisited tuples remaining on page
2621                          */
2622                         for (off = OffsetNumberNext(offnum);
2623                                  off <= maxoff;
2624                                  off = OffsetNumberNext(off))
2625                         {
2626                                 ItemId          itemid = PageGetItemId(page, off);
2627                                 HeapTupleHeader htup;
2628
2629                                 if (!ItemIdIsUsed(itemid))
2630                                         continue;
2631                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2632                                 Assert(ItemIdIsNormal(itemid));
2633
2634                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2635                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2636                                         continue;
2637
2638                                 /*
2639                                  * See comments in the walk-along-page loop above about why
2640                                  * only MOVED_OFF tuples should be found here.
2641                                  */
2642                                 if (htup->t_infomask & HEAP_MOVED_IN)
2643                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2644                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2645                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2646                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2647                                         elog(ERROR, "invalid XVAC in tuple header");
2648
2649                                 if (chain_tuple_moved)
2650                                 {
2651                                         /* some chains were moved while cleaning this page */
2652                                         Assert(vacpage->offsets_free > 0);
2653                                         for (i = 0; i < vacpage->offsets_free; i++)
2654                                         {
2655                                                 if (vacpage->offsets[i] == off)
2656                                                         break;
2657                                         }
2658                                         if (i >= vacpage->offsets_free)         /* not found */
2659                                         {
2660                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2661                                                 Assert(keep_tuples > 0);
2662
2663                                                 /*
2664                                                  * If this is not a heap-only tuple, there must be an
2665                                                  * index entry for this item which will be removed in
2666                                                  * the index cleanup. Decrement the
2667                                                  * keep_indexed_tuples count to remember this.
2668                                                  */
2669                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2670                                                         keep_indexed_tuples--;
2671                                                 keep_tuples--;
2672                                         }
2673                                 }
2674                                 else
2675                                 {
2676                                         vacpage->offsets[vacpage->offsets_free++] = off;
2677                                         Assert(keep_tuples > 0);
2678                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2679                                                 keep_indexed_tuples--;
2680                                         keep_tuples--;
2681                                 }
2682                         }
2683                 }
2684
2685                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2686                 {
2687                         if (chain_tuple_moved)          /* else - they are ordered */
2688                         {
2689                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2690                                           sizeof(OffsetNumber), vac_cmp_offno);
2691                         }
2692                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2693                 }
2694
2695                 ReleaseBuffer(buf);
2696
2697                 if (offnum <= maxoff)
2698                         break;                          /* had to quit early, see above note */
2699
2700         }                                                       /* walk along relation */
2701
2702         blkno++;                                        /* new number of blocks */
2703
2704         if (dst_buffer != InvalidBuffer)
2705         {
2706                 Assert(num_moved > 0);
2707                 ReleaseBuffer(dst_buffer);
2708         }
2709
2710         if (num_moved > 0)
2711         {
2712                 /*
2713                  * We have to commit our tuple movings before we truncate the
2714                  * relation.  Ideally we should do Commit/StartTransactionCommand
2715                  * here, relying on the session-level table lock to protect our
2716                  * exclusive access to the relation.  However, that would require a
2717                  * lot of extra code to close and re-open the relation, indexes, etc.
2718                  * For now, a quick hack: record status of current transaction as
2719                  * committed, and continue.  We force the commit to be synchronous so
2720                  * that it's down to disk before we truncate.  (Note: tqual.c knows
2721                  * that VACUUM FULL always uses sync commit, too.)      The transaction
2722                  * continues to be shown as running in the ProcArray.
2723                  *
2724                  * XXX This desperately needs to be revisited.  Any failure after this
2725                  * point will result in a PANIC "cannot abort transaction nnn, it was
2726                  * already committed"!  As a precaution, we prevent cancel interrupts
2727                  * after this point to mitigate this problem; caller is responsible for
2728                  * re-enabling them after committing the transaction.
2729                  */
2730                 HOLD_INTERRUPTS();
2731                 heldoff = true;
2732                 ForceSyncCommit();
2733                 (void) RecordTransactionCommit(true);
2734         }
2735
2736         /*
2737          * We are not going to move any more tuples across pages, but we still
2738          * need to apply vacuum_page to compact free space in the remaining pages
2739          * in vacuum_pages list.  Note that some of these pages may also be in the
2740          * fraged_pages list, and may have had tuples moved onto them; if so, we
2741          * already did vacuum_page and needn't do it again.
2742          */
2743         for (i = 0, curpage = vacuum_pages->pagedesc;
2744                  i < vacuumed_pages;
2745                  i++, curpage++)
2746         {
2747                 vacuum_delay_point();
2748
2749                 Assert((*curpage)->blkno < blkno);
2750                 if ((*curpage)->offsets_used == 0)
2751                 {
2752                         Buffer          buf;
2753                         Page            page;
2754
2755                         /* this page was not used as a move target, so must clean it */
2756                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
2757                                                                          RBM_NORMAL, vac_strategy);
2758                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2759                         page = BufferGetPage(buf);
2760                         if (!PageIsEmpty(page))
2761                                 vacuum_page(vacrelstats, onerel, buf, *curpage);
2762                         UnlockReleaseBuffer(buf);
2763                 }
2764         }
2765
2766         /*
2767          * Now scan all the pages that we moved tuples onto and update tuple
2768          * status bits.  This is not really necessary, but will save time for
2769          * future transactions examining these tuples.
2770          */
2771         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2772                                          last_move_dest_block, num_moved);
2773
2774         /*
2775          * It'd be cleaner to make this report at the bottom of this routine, but
2776          * then the rusage would double-count the second pass of index vacuuming.
2777          * So do it here and ignore the relatively small amount of processing that
2778          * occurs below.
2779          */
2780         ereport(elevel,
2781                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2782                                         RelationGetRelationName(onerel),
2783                                         num_moved, nblocks, blkno),
2784                          errdetail("%s.",
2785                                            pg_rusage_show(&ru0))));
2786
2787         /*
2788          * Reflect the motion of system tuples to catalog cache here.
2789          */
2790         CommandCounterIncrement();
2791
2792         if (Nvacpagelist.num_pages > 0)
2793         {
2794                 /* vacuum indexes again if needed */
2795                 if (Irel != NULL)
2796                 {
2797                         VacPage    *vpleft,
2798                                            *vpright,
2799                                                 vpsave;
2800
2801                         /* re-sort Nvacpagelist.pagedesc */
2802                         for (vpleft = Nvacpagelist.pagedesc,
2803                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2804                                  vpleft < vpright; vpleft++, vpright--)
2805                         {
2806                                 vpsave = *vpleft;
2807                                 *vpleft = *vpright;
2808                                 *vpright = vpsave;
2809                         }
2810
2811                         /*
2812                          * keep_tuples is the number of tuples that have been moved off a
2813                          * page during chain moves but not been scanned over subsequently.
2814                          * The tuple ids of these tuples are not recorded as free offsets
2815                          * for any VacPage, so they will not be cleared from the indexes.
2816                          * keep_indexed_tuples is the portion of these that are expected
2817                          * to have index entries.
2818                          */
2819                         Assert(keep_tuples >= 0);
2820                         for (i = 0; i < nindexes; i++)
2821                                 vacuum_index(&Nvacpagelist, Irel[i],
2822                                                          vacrelstats->rel_indexed_tuples,
2823                                                          keep_indexed_tuples);
2824                 }
2825
2826                 /*
2827                  * Clean moved-off tuples from last page in Nvacpagelist list.
2828                  *
2829                  * We need only do this in this one page, because higher-numbered
2830                  * pages are going to be truncated from the relation entirely. But see
2831                  * comments for update_hint_bits().
2832                  */
2833                 if (vacpage->blkno == (blkno - 1) &&
2834                         vacpage->offsets_free > 0)
2835                 {
2836                         Buffer          buf;
2837                         Page            page;
2838                         OffsetNumber unused[MaxOffsetNumber];
2839                         OffsetNumber offnum,
2840                                                 maxoff;
2841                         int                     uncnt = 0;
2842                         int                     num_tuples = 0;
2843
2844                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
2845                                                                          RBM_NORMAL, vac_strategy);
2846                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2847                         page = BufferGetPage(buf);
2848                         maxoff = PageGetMaxOffsetNumber(page);
2849                         for (offnum = FirstOffsetNumber;
2850                                  offnum <= maxoff;
2851                                  offnum = OffsetNumberNext(offnum))
2852                         {
2853                                 ItemId          itemid = PageGetItemId(page, offnum);
2854                                 HeapTupleHeader htup;
2855
2856                                 if (!ItemIdIsUsed(itemid))
2857                                         continue;
2858                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2859                                 Assert(ItemIdIsNormal(itemid));
2860
2861                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2862                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2863                                         continue;
2864
2865                                 /*
2866                                  * See comments in the walk-along-page loop above about why
2867                                  * only MOVED_OFF tuples should be found here.
2868                                  */
2869                                 if (htup->t_infomask & HEAP_MOVED_IN)
2870                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2871                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2872                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2873                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2874                                         elog(ERROR, "invalid XVAC in tuple header");
2875
2876                                 ItemIdSetUnused(itemid);
2877                                 num_tuples++;
2878
2879                                 unused[uncnt++] = offnum;
2880                         }
2881                         Assert(vacpage->offsets_free == num_tuples);
2882
2883                         START_CRIT_SECTION();
2884
2885                         PageRepairFragmentation(page);
2886
2887                         MarkBufferDirty(buf);
2888
2889                         /* XLOG stuff */
2890                         if (!onerel->rd_istemp)
2891                         {
2892                                 XLogRecPtr      recptr;
2893
2894                                 recptr = log_heap_clean(onerel, buf,
2895                                                                                 NULL, 0, NULL, 0,
2896                                                                                 unused, uncnt,
2897                                                                                 vacrelstats->latestRemovedXid, false);
2898                                 PageSetLSN(page, recptr);
2899                                 PageSetTLI(page, ThisTimeLineID);
2900                         }
2901
2902                         END_CRIT_SECTION();
2903
2904                         UnlockReleaseBuffer(buf);
2905                 }
2906
2907                 /* now - free new list of reaped pages */
2908                 curpage = Nvacpagelist.pagedesc;
2909                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2910                         pfree(*curpage);
2911                 pfree(Nvacpagelist.pagedesc);
2912         }
2913
2914         /* Truncate relation, if needed */
2915         if (blkno < nblocks)
2916         {
2917                 RelationTruncate(onerel, blkno);
2918
2919                 /* force relcache inval so all backends reset their rd_targblock */
2920                 CacheInvalidateRelcache(onerel);
2921
2922                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2923         }
2924
2925         /* clean up */
2926         pfree(vacpage);
2927         if (vacrelstats->vtlinks != NULL)
2928                 pfree(vacrelstats->vtlinks);
2929
2930         ExecContext_Finish(&ec);
2931
2932         return heldoff;
2933 }
2934
2935 /*
2936  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2937  *
2938  *              This routine moves old_tup from old_page to dst_page.
2939  *              old_page and dst_page might be the same page.
2940  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2941  *              the single lock, if this is a intra-page-move) are released before
2942  *              exit.
2943  *
2944  *              Yes, a routine with ten parameters is ugly, but it's still better
2945  *              than having these 120 lines of code in repair_frag() which is
2946  *              already too long and almost unreadable.
2947  */
2948 static void
2949 move_chain_tuple(VRelStats *vacrelstats, Relation rel,
2950                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2951                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2952                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2953 {
2954         TransactionId myXID = GetCurrentTransactionId();
2955         HeapTupleData newtup;
2956         OffsetNumber newoff;
2957         ItemId          newitemid;
2958         Size            tuple_len = old_tup->t_len;
2959         bool            all_visible_cleared = false;
2960         bool            all_visible_cleared_new = false;
2961
2962         /*
2963          * make a modifiable copy of the source tuple.
2964          */
2965         heap_copytuple_with_tuple(old_tup, &newtup);
2966
2967         /*
2968          * register invalidation of source tuple in catcaches.
2969          */
2970         CacheInvalidateHeapTuple(rel, old_tup);
2971
2972         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2973         START_CRIT_SECTION();
2974
2975         /*
2976          * mark the source tuple MOVED_OFF.
2977          */
2978         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2979                                                                          HEAP_XMIN_INVALID |
2980                                                                          HEAP_MOVED_IN);
2981         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2982         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2983
2984         /*
2985          * If this page was not used before - clean it.
2986          *
2987          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2988          * destination pages to be the same (since this tuple-chain member can be
2989          * on a page lower than the one we're currently processing in the outer
2990          * loop).  If that's true, then after vacuum_page() the source tuple will
2991          * have been moved, and tuple.t_data will be pointing at garbage.
2992          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2993          * step!!
2994          *
2995          * This path is different from the other callers of vacuum_page, because
2996          * we have already incremented the vacpage's offsets_used field to account
2997          * for the tuple(s) we expect to move onto the page. Therefore
2998          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2999          * good debugging check for all other callers, we work around it here
3000          * rather than remove it.
3001          */
3002         if (!PageIsEmpty(dst_page) && cleanVpd)
3003         {
3004                 int                     sv_offsets_used = dst_vacpage->offsets_used;
3005
3006                 dst_vacpage->offsets_used = 0;
3007                 vacuum_page(vacrelstats, rel, dst_buf, dst_vacpage);
3008                 dst_vacpage->offsets_used = sv_offsets_used;
3009         }
3010
3011         /*
3012          * Update the state of the copied tuple, and store it on the destination
3013          * page.  The copied tuple is never part of a HOT chain.
3014          */
3015         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3016                                                                    HEAP_XMIN_INVALID |
3017                                                                    HEAP_MOVED_OFF);
3018         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3019         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3020         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3021         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3022         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3023                                                  InvalidOffsetNumber, false, true);
3024         if (newoff == InvalidOffsetNumber)
3025                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
3026                          (unsigned long) tuple_len, dst_vacpage->blkno);
3027         newitemid = PageGetItemId(dst_page, newoff);
3028         /* drop temporary copy, and point to the version on the dest page */
3029         pfree(newtup.t_data);
3030         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3031
3032         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
3033
3034         /*
3035          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
3036          * to next tuple in chain otherwise.  (Since we move the chain in reverse
3037          * order, this is actually the previously processed tuple.)
3038          */
3039         if (!ItemPointerIsValid(ctid))
3040                 newtup.t_data->t_ctid = newtup.t_self;
3041         else
3042                 newtup.t_data->t_ctid = *ctid;
3043         *ctid = newtup.t_self;
3044
3045         /* clear PD_ALL_VISIBLE flags */
3046         if (PageIsAllVisible(old_page))
3047         {
3048                 all_visible_cleared = true;
3049                 PageClearAllVisible(old_page);
3050         }
3051         if (dst_buf != old_buf && PageIsAllVisible(dst_page))
3052         {
3053                 all_visible_cleared_new = true;
3054                 PageClearAllVisible(dst_page);
3055         }
3056
3057         MarkBufferDirty(dst_buf);
3058         if (dst_buf != old_buf)
3059                 MarkBufferDirty(old_buf);
3060
3061         /* XLOG stuff */
3062         if (!rel->rd_istemp)
3063         {
3064                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3065                                                                                    dst_buf, &newtup,
3066                                                                                    all_visible_cleared,
3067                                                                                    all_visible_cleared_new);
3068
3069                 if (old_buf != dst_buf)
3070                 {
3071                         PageSetLSN(old_page, recptr);
3072                         PageSetTLI(old_page, ThisTimeLineID);
3073                 }
3074                 PageSetLSN(dst_page, recptr);
3075                 PageSetTLI(dst_page, ThisTimeLineID);
3076         }
3077
3078         END_CRIT_SECTION();
3079
3080         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3081         if (dst_buf != old_buf)
3082                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3083
3084         /* Clear bits in visibility map */
3085         if (all_visible_cleared)
3086                 visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3087         if (all_visible_cleared_new)
3088                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3089
3090         /* Create index entries for the moved tuple */
3091         if (ec->resultRelInfo->ri_NumIndices > 0)
3092         {
3093                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3094                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3095                 ResetPerTupleExprContext(ec->estate);
3096         }
3097 }
3098
3099 /*
3100  *      move_plain_tuple() -- move one tuple that is not part of a chain
3101  *
3102  *              This routine moves old_tup from old_page to dst_page.
3103  *              On entry old_buf and dst_buf are locked exclusively, both locks are
3104  *              released before exit.
3105  *
3106  *              Yes, a routine with eight parameters is ugly, but it's still better
3107  *              than having these 90 lines of code in repair_frag() which is already
3108  *              too long and almost unreadable.
3109  */
3110 static void
3111 move_plain_tuple(Relation rel,
3112                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
3113                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
3114                                  ExecContext ec)
3115 {
3116         TransactionId myXID = GetCurrentTransactionId();
3117         HeapTupleData newtup;
3118         OffsetNumber newoff;
3119         ItemId          newitemid;
3120         Size            tuple_len = old_tup->t_len;
3121         bool            all_visible_cleared = false;
3122         bool            all_visible_cleared_new = false;
3123
3124         /* copy tuple */
3125         heap_copytuple_with_tuple(old_tup, &newtup);
3126
3127         /*
3128          * register invalidation of source tuple in catcaches.
3129          *
3130          * (Note: we do not need to register the copied tuple, because we are not
3131          * changing the tuple contents and so there cannot be any need to flush
3132          * negative catcache entries.)
3133          */
3134         CacheInvalidateHeapTuple(rel, old_tup);
3135
3136         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
3137         START_CRIT_SECTION();
3138
3139         /*
3140          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
3141          */
3142         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3143                                                                    HEAP_XMIN_INVALID |
3144                                                                    HEAP_MOVED_OFF);
3145         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
3146         HeapTupleHeaderClearHotUpdated(newtup.t_data);
3147         HeapTupleHeaderClearHeapOnly(newtup.t_data);
3148         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
3149
3150         /* add tuple to the page */
3151         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
3152                                                  InvalidOffsetNumber, false, true);
3153         if (newoff == InvalidOffsetNumber)
3154                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
3155                          (unsigned long) tuple_len,
3156                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3157                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3158         newitemid = PageGetItemId(dst_page, newoff);
3159         pfree(newtup.t_data);
3160         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3161         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3162         newtup.t_self = newtup.t_data->t_ctid;
3163
3164         /*
3165          * Mark old tuple as MOVED_OFF by me.
3166          */
3167         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3168                                                                          HEAP_XMIN_INVALID |
3169                                                                          HEAP_MOVED_IN);
3170         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3171         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3172
3173         /* clear PD_ALL_VISIBLE flags */
3174         if (PageIsAllVisible(old_page))
3175         {
3176                 all_visible_cleared = true;
3177                 PageClearAllVisible(old_page);
3178         }
3179         if (PageIsAllVisible(dst_page))
3180         {
3181                 all_visible_cleared_new = true;
3182                 PageClearAllVisible(dst_page);
3183         }
3184
3185         MarkBufferDirty(dst_buf);
3186         MarkBufferDirty(old_buf);
3187
3188         /* XLOG stuff */
3189         if (!rel->rd_istemp)
3190         {
3191                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3192                                                                                    dst_buf, &newtup,
3193                                                                                    all_visible_cleared,
3194                                                                                    all_visible_cleared_new);
3195
3196                 PageSetLSN(old_page, recptr);
3197                 PageSetTLI(old_page, ThisTimeLineID);
3198                 PageSetLSN(dst_page, recptr);
3199                 PageSetTLI(dst_page, ThisTimeLineID);
3200         }
3201
3202         END_CRIT_SECTION();
3203
3204         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3205         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3206         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3207
3208         dst_vacpage->offsets_used++;
3209
3210         /* Clear bits in visibility map */
3211         if (all_visible_cleared)
3212                 visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
3213         if (all_visible_cleared_new)
3214                 visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
3215
3216         /* insert index' tuples if needed */
3217         if (ec->resultRelInfo->ri_NumIndices > 0)
3218         {
3219                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3220                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3221                 ResetPerTupleExprContext(ec->estate);
3222         }
3223 }
3224
3225 /*
3226  *      update_hint_bits() -- update hint bits in destination pages
3227  *
3228  * Scan all the pages that we moved tuples onto and update tuple status bits.
3229  * This is not really necessary, but it will save time for future transactions
3230  * examining these tuples.
3231  *
3232  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3233  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3234  *
3235  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3236  * pages that were move source pages but not move dest pages.  The bulk
3237  * of the move source pages will be physically truncated from the relation,
3238  * and the last page remaining in the rel will be fixed separately in
3239  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3240  * hint bits updated are tuples that are moved as part of a chain and were
3241  * on pages that were not either move destinations nor at the end of the rel.
3242  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3243  * to remember and revisit those pages too.
3244  *
3245  * One wonders whether it wouldn't be better to skip this work entirely,
3246  * and let the tuple status updates happen someplace that's not holding an
3247  * exclusive lock on the relation.
3248  */
3249 static void
3250 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3251                                  BlockNumber last_move_dest_block, int num_moved)
3252 {
3253         TransactionId myXID = GetCurrentTransactionId();
3254         int                     checked_moved = 0;
3255         int                     i;
3256         VacPage    *curpage;
3257
3258         for (i = 0, curpage = fraged_pages->pagedesc;
3259                  i < num_fraged_pages;
3260                  i++, curpage++)
3261         {
3262                 Buffer          buf;
3263                 Page            page;
3264                 OffsetNumber max_offset;
3265                 OffsetNumber off;
3266                 int                     num_tuples = 0;
3267
3268                 vacuum_delay_point();
3269
3270                 if ((*curpage)->blkno > last_move_dest_block)
3271                         break;                          /* no need to scan any further */
3272                 if ((*curpage)->offsets_used == 0)
3273                         continue;                       /* this page was never used as a move dest */
3274                 buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
3275                                                                  RBM_NORMAL, vac_strategy);
3276                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3277                 page = BufferGetPage(buf);
3278                 max_offset = PageGetMaxOffsetNumber(page);
3279                 for (off = FirstOffsetNumber;
3280                          off <= max_offset;
3281                          off = OffsetNumberNext(off))
3282                 {
3283                         ItemId          itemid = PageGetItemId(page, off);
3284                         HeapTupleHeader htup;
3285
3286                         if (!ItemIdIsUsed(itemid))
3287                                 continue;
3288                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3289                         Assert(ItemIdIsNormal(itemid));
3290
3291                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3292                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3293                                 continue;
3294
3295                         /*
3296                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3297                          */
3298                         if (!(htup->t_infomask & HEAP_MOVED))
3299                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3300                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3301                                 elog(ERROR, "invalid XVAC in tuple header");
3302
3303                         if (htup->t_infomask & HEAP_MOVED_IN)
3304                         {
3305                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3306                                 htup->t_infomask &= ~HEAP_MOVED;
3307                                 num_tuples++;
3308                         }
3309                         else
3310                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3311                 }
3312                 MarkBufferDirty(buf);
3313                 UnlockReleaseBuffer(buf);
3314                 Assert((*curpage)->offsets_used == num_tuples);
3315                 checked_moved += num_tuples;
3316         }
3317         Assert(num_moved == checked_moved);
3318 }
3319
3320 /*
3321  *      vacuum_heap() -- free dead tuples
3322  *
3323  *              This routine marks dead tuples as unused and truncates relation
3324  *              if there are "empty" end-blocks.
3325  */
3326 static void
3327 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3328 {
3329         Buffer          buf;
3330         VacPage    *vacpage;
3331         BlockNumber relblocks;
3332         int                     nblocks;
3333         int                     i;
3334
3335         nblocks = vacuum_pages->num_pages;
3336         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3337
3338         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3339         {
3340                 vacuum_delay_point();
3341
3342                 if ((*vacpage)->offsets_free > 0)
3343                 {
3344                         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
3345                                                                          RBM_NORMAL, vac_strategy);
3346                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3347                         vacuum_page(vacrelstats, onerel, buf, *vacpage);
3348                         UnlockReleaseBuffer(buf);
3349                 }
3350         }
3351
3352         /* Truncate relation if there are some empty end-pages */
3353         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3354         if (vacuum_pages->empty_end_pages > 0)
3355         {
3356                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3357                 ereport(elevel,
3358                                 (errmsg("\"%s\": truncated %u to %u pages",
3359                                                 RelationGetRelationName(onerel),
3360                                                 vacrelstats->rel_pages, relblocks)));
3361                 RelationTruncate(onerel, relblocks);
3362
3363                 /* force relcache inval so all backends reset their rd_targblock */
3364                 CacheInvalidateRelcache(onerel);
3365
3366                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3367         }
3368 }
3369
3370 /*
3371  *      vacuum_page() -- free dead tuples on a page
3372  *                                       and repair its fragmentation.
3373  *
3374  * Caller must hold pin and lock on buffer.
3375  */
3376 static void
3377 vacuum_page(VRelStats *vacrelstats, Relation onerel, Buffer buffer, VacPage vacpage)
3378 {
3379         Page            page = BufferGetPage(buffer);
3380         int                     i;
3381
3382         /* There shouldn't be any tuples moved onto the page yet! */
3383         Assert(vacpage->offsets_used == 0);
3384
3385         START_CRIT_SECTION();
3386
3387         for (i = 0; i < vacpage->offsets_free; i++)
3388         {
3389                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3390
3391                 ItemIdSetUnused(itemid);
3392         }
3393
3394         PageRepairFragmentation(page);
3395
3396         MarkBufferDirty(buffer);
3397
3398         /* XLOG stuff */
3399         if (!onerel->rd_istemp)
3400         {
3401                 XLogRecPtr      recptr;
3402
3403                 recptr = log_heap_clean(onerel, buffer,
3404                                                                 NULL, 0, NULL, 0,
3405                                                                 vacpage->offsets, vacpage->offsets_free,
3406                                                                 vacrelstats->latestRemovedXid, false);
3407                 PageSetLSN(page, recptr);
3408                 PageSetTLI(page, ThisTimeLineID);
3409         }
3410
3411         END_CRIT_SECTION();
3412 }
3413
3414 /*
3415  *      scan_index() -- scan one index relation to update pg_class statistics.
3416  *
3417  * We use this when we have no deletions to do.
3418  */
3419 static void
3420 scan_index(Relation indrel, double num_tuples)
3421 {
3422         IndexBulkDeleteResult *stats;
3423         IndexVacuumInfo ivinfo;
3424         PGRUsage        ru0;
3425
3426         pg_rusage_init(&ru0);
3427
3428         ivinfo.index = indrel;
3429         ivinfo.vacuum_full = true;
3430         ivinfo.analyze_only = false;
3431         ivinfo.estimated_count = false;
3432         ivinfo.message_level = elevel;
3433         ivinfo.num_heap_tuples = num_tuples;
3434         ivinfo.strategy = vac_strategy;
3435
3436         stats = index_vacuum_cleanup(&ivinfo, NULL);
3437
3438         if (!stats)
3439                 return;
3440
3441         /*
3442          * Now update statistics in pg_class, but only if the index says the count
3443          * is accurate.
3444          */
3445         if (!stats->estimated_count)
3446                 vac_update_relstats(indrel,
3447                                                         stats->num_pages, stats->num_index_tuples,
3448                                                         false, InvalidTransactionId);
3449
3450         ereport(elevel,
3451                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3452                                         RelationGetRelationName(indrel),
3453                                         stats->num_index_tuples,
3454                                         stats->num_pages),
3455         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3456                           "%s.",
3457                           stats->pages_deleted, stats->pages_free,
3458                           pg_rusage_show(&ru0))));
3459
3460         /*
3461          * Check for tuple count mismatch.      If the index is partial, then it's OK
3462          * for it to have fewer tuples than the heap; else we got trouble.
3463          */
3464         if (!stats->estimated_count &&
3465                 stats->num_index_tuples != num_tuples)
3466         {
3467                 if (stats->num_index_tuples > num_tuples ||
3468                         !vac_is_partial_index(indrel))
3469                         ereport(WARNING,
3470                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3471                                                         RelationGetRelationName(indrel),
3472                                                         stats->num_index_tuples, num_tuples),
3473                                          errhint("Rebuild the index with REINDEX.")));
3474         }
3475
3476         pfree(stats);
3477 }
3478
3479 /*
3480  *      vacuum_index() -- vacuum one index relation.
3481  *
3482  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3483  *              It's locked. Indrel is an index relation on the vacuumed heap.
3484  *
3485  *              We don't bother to set locks on the index relation here, since
3486  *              the parent table is exclusive-locked already.
3487  *
3488  *              Finally, we arrange to update the index relation's statistics in
3489  *              pg_class.
3490  */
3491 static void
3492 vacuum_index(VacPageList vacpagelist, Relation indrel,
3493                          double num_tuples, int keep_tuples)
3494 {
3495         IndexBulkDeleteResult *stats;
3496         IndexVacuumInfo ivinfo;
3497         PGRUsage        ru0;
3498
3499         pg_rusage_init(&ru0);
3500
3501         ivinfo.index = indrel;
3502         ivinfo.vacuum_full = true;
3503         ivinfo.analyze_only = false;
3504         ivinfo.estimated_count = false;
3505         ivinfo.message_level = elevel;
3506         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3507         ivinfo.strategy = vac_strategy;
3508
3509         /* Do bulk deletion */
3510         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3511
3512         /* Do post-VACUUM cleanup */
3513         stats = index_vacuum_cleanup(&ivinfo, stats);
3514
3515         if (!stats)
3516                 return;
3517
3518         /*
3519          * Now update statistics in pg_class, but only if the index says the count
3520          * is accurate.
3521          */
3522         if (!stats->estimated_count)
3523                 vac_update_relstats(indrel,
3524                                                         stats->num_pages, stats->num_index_tuples,
3525                                                         false, InvalidTransactionId);
3526
3527         ereport(elevel,
3528                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3529                                         RelationGetRelationName(indrel),
3530                                         stats->num_index_tuples,
3531                                         stats->num_pages),
3532                          errdetail("%.0f index row versions were removed.\n"
3533                          "%u index pages have been deleted, %u are currently reusable.\n"
3534                                            "%s.",
3535                                            stats->tuples_removed,
3536                                            stats->pages_deleted, stats->pages_free,
3537                                            pg_rusage_show(&ru0))));
3538
3539         /*
3540          * Check for tuple count mismatch.      If the index is partial, then it's OK
3541          * for it to have fewer tuples than the heap; else we got trouble.
3542          */
3543         if (!stats->estimated_count &&
3544                 stats->num_index_tuples != num_tuples + keep_tuples)
3545         {
3546                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3547                         !vac_is_partial_index(indrel))
3548                         ereport(WARNING,
3549                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3550                                                         RelationGetRelationName(indrel),
3551                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3552                                          errhint("Rebuild the index with REINDEX.")));
3553         }
3554
3555         pfree(stats);
3556 }
3557
3558 /*
3559  *      tid_reaped() -- is a particular tid reaped?
3560  *
3561  *              This has the right signature to be an IndexBulkDeleteCallback.
3562  *
3563  *              vacpagelist->VacPage_array is sorted in right order.
3564  */
3565 static bool
3566 tid_reaped(ItemPointer itemptr, void *state)
3567 {
3568         VacPageList vacpagelist = (VacPageList) state;
3569         OffsetNumber ioffno;
3570         OffsetNumber *voff;
3571         VacPage         vp,
3572                            *vpp;
3573         VacPageData vacpage;
3574
3575         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3576         ioffno = ItemPointerGetOffsetNumber(itemptr);
3577
3578         vp = &vacpage;
3579         vpp = (VacPage *) vac_bsearch((void *) &vp,
3580                                                                   (void *) (vacpagelist->pagedesc),
3581                                                                   vacpagelist->num_pages,
3582                                                                   sizeof(VacPage),
3583                                                                   vac_cmp_blk);
3584
3585         if (vpp == NULL)
3586                 return false;
3587
3588         /* ok - we are on a partially or fully reaped page */
3589         vp = *vpp;
3590
3591         if (vp->offsets_free == 0)
3592         {
3593                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3594                 return true;
3595         }
3596
3597         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3598                                                                                 (void *) (vp->offsets),
3599                                                                                 vp->offsets_free,
3600                                                                                 sizeof(OffsetNumber),
3601                                                                                 vac_cmp_offno);
3602
3603         if (voff == NULL)
3604                 return false;
3605
3606         /* tid is reaped */
3607         return true;
3608 }
3609
3610 /*
3611  * Update the Free Space Map with the info we now have about free space in
3612  * the relation.
3613  */
3614 static void
3615 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3616                            BlockNumber rel_pages)
3617 {
3618         int                     nPages = fraged_pages->num_pages;
3619         VacPage    *pagedesc = fraged_pages->pagedesc;
3620         int                     i;
3621
3622         for (i = 0; i < nPages; i++)
3623         {
3624                 /*
3625                  * fraged_pages may contain entries for pages that we later decided to
3626                  * truncate from the relation; don't enter them into the free space
3627                  * map!
3628                  */
3629                 if (pagedesc[i]->blkno >= rel_pages)
3630                         break;
3631
3632                 RecordPageWithFreeSpace(onerel, pagedesc[i]->blkno, pagedesc[i]->free);
3633         }
3634
3635 }
3636
3637 /* Copy a VacPage structure */
3638 static VacPage
3639 copy_vac_page(VacPage vacpage)
3640 {
3641         VacPage         newvacpage;
3642
3643         /* allocate a VacPageData entry */
3644         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3645                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3646
3647         /* fill it in */
3648         if (vacpage->offsets_free > 0)
3649                 memcpy(newvacpage->offsets, vacpage->offsets,
3650                            vacpage->offsets_free * sizeof(OffsetNumber));
3651         newvacpage->blkno = vacpage->blkno;
3652         newvacpage->free = vacpage->free;
3653         newvacpage->offsets_used = vacpage->offsets_used;
3654         newvacpage->offsets_free = vacpage->offsets_free;
3655
3656         return newvacpage;
3657 }
3658
3659 /*
3660  * Add a VacPage pointer to a VacPageList.
3661  *
3662  *              As a side effect of the way that scan_heap works,
3663  *              higher pages come after lower pages in the array
3664  *              (and highest tid on a page is last).
3665  */
3666 static void
3667 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3668 {
3669 #define PG_NPAGEDESC 1024
3670
3671         /* allocate a VacPage entry if needed */
3672         if (vacpagelist->num_pages == 0)
3673         {
3674                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3675                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3676         }
3677         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3678         {
3679                 vacpagelist->num_allocated_pages *= 2;
3680                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3681         }
3682         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3683         (vacpagelist->num_pages)++;
3684 }
3685
3686 /*
3687  * vac_bsearch: just like standard C library routine bsearch(),
3688  * except that we first test to see whether the target key is outside
3689  * the range of the table entries.      This case is handled relatively slowly
3690  * by the normal binary search algorithm (ie, no faster than any other key)
3691  * but it occurs often enough in VACUUM to be worth optimizing.
3692  */
3693 static void *
3694 vac_bsearch(const void *key, const void *base,
3695                         size_t nelem, size_t size,
3696                         int (*compar) (const void *, const void *))
3697 {
3698         int                     res;
3699         const void *last;
3700
3701         if (nelem == 0)
3702                 return NULL;
3703         res = compar(key, base);
3704         if (res < 0)
3705                 return NULL;
3706         if (res == 0)
3707                 return (void *) base;
3708         if (nelem > 1)
3709         {
3710                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3711                 res = compar(key, last);
3712                 if (res > 0)
3713                         return NULL;
3714                 if (res == 0)
3715                         return (void *) last;
3716         }
3717         if (nelem <= 2)
3718                 return NULL;                    /* already checked 'em all */
3719         return bsearch(key, base, nelem, size, compar);
3720 }
3721
3722 /*
3723  * Comparator routines for use with qsort() and bsearch().
3724  */
3725 static int
3726 vac_cmp_blk(const void *left, const void *right)
3727 {
3728         BlockNumber lblk,
3729                                 rblk;
3730
3731         lblk = (*((VacPage *) left))->blkno;
3732         rblk = (*((VacPage *) right))->blkno;
3733
3734         if (lblk < rblk)
3735                 return -1;
3736         if (lblk == rblk)
3737                 return 0;
3738         return 1;
3739 }
3740
3741 static int
3742 vac_cmp_offno(const void *left, const void *right)
3743 {
3744         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3745                 return -1;
3746         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3747                 return 0;
3748         return 1;
3749 }
3750
3751 static int
3752 vac_cmp_vtlinks(const void *left, const void *right)
3753 {
3754         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3755                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3756                 return -1;
3757         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3758                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3759                 return 1;
3760         /* bi_hi-es are equal */
3761         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3762                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3763                 return -1;
3764         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3765                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3766                 return 1;
3767         /* bi_lo-es are equal */
3768         if (((VTupleLink) left)->new_tid.ip_posid <
3769                 ((VTupleLink) right)->new_tid.ip_posid)
3770                 return -1;
3771         if (((VTupleLink) left)->new_tid.ip_posid >
3772                 ((VTupleLink) right)->new_tid.ip_posid)
3773                 return 1;
3774         return 0;
3775 }
3776
3777
3778 /*
3779  * Open all the indexes of the given relation, obtaining the specified kind
3780  * of lock on each.  Return an array of Relation pointers for the indexes
3781  * into *Irel, and the number of indexes into *nindexes.
3782  */
3783 void
3784 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3785                                  int *nindexes, Relation **Irel)
3786 {
3787         List       *indexoidlist;
3788         ListCell   *indexoidscan;
3789         int                     i;
3790
3791         Assert(lockmode != NoLock);
3792
3793         indexoidlist = RelationGetIndexList(relation);
3794
3795         *nindexes = list_length(indexoidlist);
3796
3797         if (*nindexes > 0)
3798                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3799         else
3800                 *Irel = NULL;
3801
3802         i = 0;
3803         foreach(indexoidscan, indexoidlist)
3804         {
3805                 Oid                     indexoid = lfirst_oid(indexoidscan);
3806
3807                 (*Irel)[i++] = index_open(indexoid, lockmode);
3808         }
3809
3810         list_free(indexoidlist);
3811 }
3812
3813 /*
3814  * Release the resources acquired by vac_open_indexes.  Optionally release
3815  * the locks (say NoLock to keep 'em).
3816  */
3817 void
3818 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3819 {
3820         if (Irel == NULL)
3821                 return;
3822
3823         while (nindexes--)
3824         {
3825                 Relation        ind = Irel[nindexes];
3826
3827                 index_close(ind, lockmode);
3828         }
3829         pfree(Irel);
3830 }
3831
3832
3833 /*
3834  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3835  */
3836 bool
3837 vac_is_partial_index(Relation indrel)
3838 {
3839         /*
3840          * If the index's AM doesn't support nulls, it's partial for our purposes
3841          */
3842         if (!indrel->rd_am->amindexnulls)
3843                 return true;
3844
3845         /* Otherwise, look to see if there's a partial-index predicate */
3846         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3847                 return true;
3848
3849         return false;
3850 }
3851
3852
3853 static bool
3854 enough_space(VacPage vacpage, Size len)
3855 {
3856         len = MAXALIGN(len);
3857
3858         if (len > vacpage->free)
3859                 return false;
3860
3861         /* if there are free itemid(s) and len <= free_space... */
3862         if (vacpage->offsets_used < vacpage->offsets_free)
3863                 return true;
3864
3865         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3866         if (len + sizeof(ItemIdData) <= vacpage->free)
3867                 return true;
3868
3869         return false;
3870 }
3871
3872 static Size
3873 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3874 {
3875         /*
3876          * It is correct to use PageGetExactFreeSpace() here, *not*
3877          * PageGetHeapFreeSpace().      This is because (a) we do our own, exact
3878          * accounting for whether line pointers must be added, and (b) we will
3879          * recycle any LP_DEAD line pointers before starting to add rows to a
3880          * page, but that may not have happened yet at the time this function is
3881          * applied to a page, which means PageGetHeapFreeSpace()'s protection
3882          * against too many line pointers on a page could fire incorrectly.  We do
3883          * not need that protection here: since VACUUM FULL always recycles all
3884          * dead line pointers first, it'd be physically impossible to insert more
3885          * than MaxHeapTuplesPerPage tuples anyway.
3886          */
3887         Size            freespace = PageGetExactFreeSpace(page);
3888         Size            targetfree;
3889
3890         targetfree = RelationGetTargetPageFreeSpace(relation,
3891                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3892         if (freespace > targetfree)
3893                 return freespace - targetfree;
3894         else
3895                 return 0;
3896 }
3897
3898 /*
3899  * vacuum_delay_point --- check for interrupts and cost-based delay.
3900  *
3901  * This should be called in each major loop of VACUUM processing,
3902  * typically once per page processed.
3903  */
3904 void
3905 vacuum_delay_point(void)
3906 {
3907         /* Always check for interrupts */
3908         CHECK_FOR_INTERRUPTS();
3909
3910         /* Nap if appropriate */
3911         if (VacuumCostActive && !InterruptPending &&
3912                 VacuumCostBalance >= VacuumCostLimit)
3913         {
3914                 int                     msec;
3915
3916                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3917                 if (msec > VacuumCostDelay * 4)
3918                         msec = VacuumCostDelay * 4;
3919
3920                 pg_usleep(msec * 1000L);
3921
3922                 VacuumCostBalance = 0;
3923
3924                 /* update balance values for workers */
3925                 AutoVacuumUpdateDelay();
3926
3927                 /* Might have gotten an interrupt while sleeping */
3928                 CHECK_FOR_INTERRUPTS();
3929         }
3930 }