]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Redefine the lp_flags field of item pointers as having four states, rather
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.358 2007/09/12 22:10:26 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "commands/dbcommands.h"
34 #include "commands/vacuum.h"
35 #include "executor/executor.h"
36 #include "miscadmin.h"
37 #include "postmaster/autovacuum.h"
38 #include "storage/freespace.h"
39 #include "storage/proc.h"
40 #include "storage/procarray.h"
41 #include "utils/acl.h"
42 #include "utils/builtins.h"
43 #include "utils/flatfiles.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/pg_rusage.h"
49 #include "utils/relcache.h"
50 #include "utils/syscache.h"
51 #include "pgstat.h"
52
53
54 /*
55  * GUC parameters
56  */
57 int                     vacuum_freeze_min_age;
58
59 /*
60  * VacPage structures keep track of each page on which we find useful
61  * amounts of free space.
62  */
63 typedef struct VacPageData
64 {
65         BlockNumber blkno;                      /* BlockNumber of this Page */
66         Size            free;                   /* FreeSpace on this Page */
67         uint16          offsets_used;   /* Number of OffNums used by vacuum */
68         uint16          offsets_free;   /* Number of OffNums free or to be free */
69         OffsetNumber offsets[1];        /* Array of free OffNums */
70 } VacPageData;
71
72 typedef VacPageData *VacPage;
73
74 typedef struct VacPageListData
75 {
76         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
77         int                     num_pages;              /* Number of pages in pagedesc */
78         int                     num_allocated_pages;    /* Number of allocated pages in
79                                                                                  * pagedesc */
80         VacPage    *pagedesc;           /* Descriptions of pages */
81 } VacPageListData;
82
83 typedef VacPageListData *VacPageList;
84
85 /*
86  * The "vtlinks" array keeps information about each recently-updated tuple
87  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
88  * We store the tuple's own TID as well as its t_ctid (its link to the next
89  * newer tuple version).  Searching in this array allows us to follow update
90  * chains backwards from newer to older tuples.  When we move a member of an
91  * update chain, we must move *all* the live members of the chain, so that we
92  * can maintain their t_ctid link relationships (we must not just overwrite
93  * t_ctid in an existing tuple).
94  *
95  * Note: because t_ctid links can be stale (this would only occur if a prior
96  * VACUUM crashed partway through), it is possible that new_tid points to an
97  * empty slot or unrelated tuple.  We have to check the linkage as we follow
98  * it, just as is done in EvalPlanQual.
99  */
100 typedef struct VTupleLinkData
101 {
102         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
103         ItemPointerData this_tid;       /* t_self of the tuple */
104 } VTupleLinkData;
105
106 typedef VTupleLinkData *VTupleLink;
107
108 /*
109  * We use an array of VTupleMoveData to plan a chain tuple move fully
110  * before we do it.
111  */
112 typedef struct VTupleMoveData
113 {
114         ItemPointerData tid;            /* tuple ID */
115         VacPage         vacpage;                /* where to move it to */
116         bool            cleanVpd;               /* clean vacpage before using? */
117 } VTupleMoveData;
118
119 typedef VTupleMoveData *VTupleMove;
120
121 /*
122  * VRelStats contains the data acquired by scan_heap for use later
123  */
124 typedef struct VRelStats
125 {
126         /* miscellaneous statistics */
127         BlockNumber rel_pages;
128         double          rel_tuples;
129         Size            min_tlen;
130         Size            max_tlen;
131         bool            hasindex;
132         /* vtlinks array for tuple chain following - sorted by new_tid */
133         int                     num_vtlinks;
134         VTupleLink      vtlinks;
135 } VRelStats;
136
137 /*----------------------------------------------------------------------
138  * ExecContext:
139  *
140  * As these variables always appear together, we put them into one struct
141  * and pull initialization and cleanup into separate routines.
142  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
143  * accurately:  It is *used* only in move_xxx_tuple(), but because this
144  * routine is called many times, we initialize the struct just once in
145  * repair_frag() and pass it on to move_xxx_tuple().
146  */
147 typedef struct ExecContextData
148 {
149         ResultRelInfo *resultRelInfo;
150         EState     *estate;
151         TupleTableSlot *slot;
152 } ExecContextData;
153
154 typedef ExecContextData *ExecContext;
155
156 static void
157 ExecContext_Init(ExecContext ec, Relation rel)
158 {
159         TupleDesc       tupdesc = RelationGetDescr(rel);
160
161         /*
162          * We need a ResultRelInfo and an EState so we can use the regular
163          * executor's index-entry-making machinery.
164          */
165         ec->estate = CreateExecutorState();
166
167         ec->resultRelInfo = makeNode(ResultRelInfo);
168         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
169         ec->resultRelInfo->ri_RelationDesc = rel;
170         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
171
172         ExecOpenIndices(ec->resultRelInfo);
173
174         ec->estate->es_result_relations = ec->resultRelInfo;
175         ec->estate->es_num_result_relations = 1;
176         ec->estate->es_result_relation_info = ec->resultRelInfo;
177
178         /* Set up a tuple slot too */
179         ec->slot = MakeSingleTupleTableSlot(tupdesc);
180 }
181
182 static void
183 ExecContext_Finish(ExecContext ec)
184 {
185         ExecDropSingleTupleTableSlot(ec->slot);
186         ExecCloseIndices(ec->resultRelInfo);
187         FreeExecutorState(ec->estate);
188 }
189
190 /*
191  * End of ExecContext Implementation
192  *----------------------------------------------------------------------
193  */
194
195 /* A few variables that don't seem worth passing around as parameters */
196 static MemoryContext vac_context = NULL;
197
198 static int      elevel = -1;
199
200 static TransactionId OldestXmin;
201 static TransactionId FreezeLimit;
202
203 static BufferAccessStrategy vac_strategy;
204
205
206 /* non-export function prototypes */
207 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
208                          const char *stmttype);
209 static void vac_truncate_clog(TransactionId frozenXID);
210 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
211 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
212 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
213                   VacPageList vacuum_pages, VacPageList fraged_pages);
214 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
215                         VacPageList vacuum_pages, VacPageList fraged_pages,
216                         int nindexes, Relation *Irel);
217 static void move_chain_tuple(Relation rel,
218                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
219                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
220                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
221 static void move_plain_tuple(Relation rel,
222                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
223                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
224                                  ExecContext ec);
225 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
226                                  int num_fraged_pages, BlockNumber last_move_dest_block,
227                                  int num_moved);
228 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
229                         VacPageList vacpagelist);
230 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
231 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
232                          double num_tuples, int keep_tuples);
233 static void scan_index(Relation indrel, double num_tuples);
234 static bool tid_reaped(ItemPointer itemptr, void *state);
235 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
236                            BlockNumber rel_pages);
237 static VacPage copy_vac_page(VacPage vacpage);
238 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
239 static void *vac_bsearch(const void *key, const void *base,
240                         size_t nelem, size_t size,
241                         int (*compar) (const void *, const void *));
242 static int      vac_cmp_blk(const void *left, const void *right);
243 static int      vac_cmp_offno(const void *left, const void *right);
244 static int      vac_cmp_vtlinks(const void *left, const void *right);
245 static bool enough_space(VacPage vacpage, Size len);
246 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
247
248
249 /****************************************************************************
250  *                                                                                                                                                      *
251  *                      Code common to all flavors of VACUUM and ANALYZE                                *
252  *                                                                                                                                                      *
253  ****************************************************************************
254  */
255
256
257 /*
258  * Primary entry point for VACUUM and ANALYZE commands.
259  *
260  * relids is normally NIL; if it is not, then it provides the list of
261  * relation OIDs to be processed, and vacstmt->relation is ignored.
262  * (The non-NIL case is currently only used by autovacuum.)
263  *
264  * bstrategy is normally given as NULL, but in autovacuum it can be passed
265  * in to use the same buffer strategy object across multiple vacuum() calls.
266  *
267  * isTopLevel should be passed down from ProcessUtility.
268  *
269  * It is the caller's responsibility that vacstmt, relids, and bstrategy
270  * (if given) be allocated in a memory context that won't disappear
271  * at transaction commit.
272  */
273 void
274 vacuum(VacuumStmt *vacstmt, List *relids,
275            BufferAccessStrategy bstrategy, bool isTopLevel)
276 {
277         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
278         volatile MemoryContext anl_context = NULL;
279         volatile bool all_rels,
280                                 in_outer_xact,
281                                 use_own_xacts;
282         List       *relations;
283
284         if (vacstmt->verbose)
285                 elevel = INFO;
286         else
287                 elevel = DEBUG2;
288
289         /*
290          * We cannot run VACUUM inside a user transaction block; if we were inside
291          * a transaction, then our commit- and start-transaction-command calls
292          * would not have the intended effect! Furthermore, the forced commit that
293          * occurs before truncating the relation's file would have the effect of
294          * committing the rest of the user's transaction too, which would
295          * certainly not be the desired behavior.  (This only applies to VACUUM
296          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
297          * block, but we choose to disallow that case because we'd rather commit
298          * as soon as possible after finishing the vacuum.      This is mainly so that
299          * we can let go the AccessExclusiveLock that we may be holding.)
300          *
301          * ANALYZE (without VACUUM) can run either way.
302          */
303         if (vacstmt->vacuum)
304         {
305                 PreventTransactionChain(isTopLevel, stmttype);
306                 in_outer_xact = false;
307         }
308         else
309                 in_outer_xact = IsInTransactionChain(isTopLevel);
310
311         /*
312          * Send info about dead objects to the statistics collector, unless we are
313          * in autovacuum --- autovacuum.c does this for itself.
314          */
315         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
316                 pgstat_vacuum_tabstat();
317
318         /*
319          * Create special memory context for cross-transaction storage.
320          *
321          * Since it is a child of PortalContext, it will go away eventually even
322          * if we suffer an error; there's no need for special abort cleanup logic.
323          */
324         vac_context = AllocSetContextCreate(PortalContext,
325                                                                                 "Vacuum",
326                                                                                 ALLOCSET_DEFAULT_MINSIZE,
327                                                                                 ALLOCSET_DEFAULT_INITSIZE,
328                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
329
330         /*
331          * If caller didn't give us a buffer strategy object, make one in the
332          * cross-transaction memory context.
333          */
334         if (bstrategy == NULL)
335         {
336                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
337
338                 bstrategy = GetAccessStrategy(BAS_VACUUM);
339                 MemoryContextSwitchTo(old_context);
340         }
341         vac_strategy = bstrategy;
342
343         /* Remember whether we are processing everything in the DB */
344         all_rels = (relids == NIL && vacstmt->relation == NULL);
345
346         /*
347          * Build list of relations to process, unless caller gave us one. (If we
348          * build one, we put it in vac_context for safekeeping.)
349          */
350         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
351
352         /*
353          * Decide whether we need to start/commit our own transactions.
354          *
355          * For VACUUM (with or without ANALYZE): always do so, so that we can
356          * release locks as soon as possible.  (We could possibly use the outer
357          * transaction for a one-table VACUUM, but handling TOAST tables would be
358          * problematic.)
359          *
360          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
361          * start/commit our own transactions.  Also, there's no need to do so if
362          * only processing one relation.  For multiple relations when not within a
363          * transaction block, and also in an autovacuum worker, use own
364          * transactions so we can release locks sooner.
365          */
366         if (vacstmt->vacuum)
367                 use_own_xacts = true;
368         else
369         {
370                 Assert(vacstmt->analyze);
371                 if (IsAutoVacuumWorkerProcess())
372                         use_own_xacts = true;
373                 else if (in_outer_xact)
374                         use_own_xacts = false;
375                 else if (list_length(relations) > 1)
376                         use_own_xacts = true;
377                 else
378                         use_own_xacts = false;
379         }
380
381         /*
382          * If we are running ANALYZE without per-table transactions, we'll need a
383          * memory context with table lifetime.
384          */
385         if (!use_own_xacts)
386                 anl_context = AllocSetContextCreate(PortalContext,
387                                                                                         "Analyze",
388                                                                                         ALLOCSET_DEFAULT_MINSIZE,
389                                                                                         ALLOCSET_DEFAULT_INITSIZE,
390                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
391
392         /*
393          * vacuum_rel expects to be entered with no transaction active; it will
394          * start and commit its own transaction.  But we are called by an SQL
395          * command, and so we are executing inside a transaction already. We
396          * commit the transaction started in PostgresMain() here, and start
397          * another one before exiting to match the commit waiting for us back in
398          * PostgresMain().
399          */
400         if (use_own_xacts)
401         {
402                 /* matches the StartTransaction in PostgresMain() */
403                 CommitTransactionCommand();
404         }
405
406         /* Turn vacuum cost accounting on or off */
407         PG_TRY();
408         {
409                 ListCell   *cur;
410
411                 VacuumCostActive = (VacuumCostDelay > 0);
412                 VacuumCostBalance = 0;
413
414                 /*
415                  * Loop to process each selected relation.
416                  */
417                 foreach(cur, relations)
418                 {
419                         Oid                     relid = lfirst_oid(cur);
420
421                         if (vacstmt->vacuum)
422                                 vacuum_rel(relid, vacstmt, RELKIND_RELATION);
423
424                         if (vacstmt->analyze)
425                         {
426                                 MemoryContext old_context = NULL;
427
428                                 /*
429                                  * If using separate xacts, start one for analyze. Otherwise,
430                                  * we can use the outer transaction, but we still need to call
431                                  * analyze_rel in a memory context that will be cleaned up on
432                                  * return (else we leak memory while processing multiple
433                                  * tables).
434                                  */
435                                 if (use_own_xacts)
436                                 {
437                                         StartTransactionCommand();
438                                         /* functions in indexes may want a snapshot set */
439                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
440                                 }
441                                 else
442                                         old_context = MemoryContextSwitchTo(anl_context);
443
444                                 analyze_rel(relid, vacstmt, vac_strategy);
445
446                                 if (use_own_xacts)
447                                         CommitTransactionCommand();
448                                 else
449                                 {
450                                         MemoryContextSwitchTo(old_context);
451                                         MemoryContextResetAndDeleteChildren(anl_context);
452                                 }
453                         }
454                 }
455         }
456         PG_CATCH();
457         {
458                 /* Make sure cost accounting is turned off after error */
459                 VacuumCostActive = false;
460                 PG_RE_THROW();
461         }
462         PG_END_TRY();
463
464         /* Turn off vacuum cost accounting */
465         VacuumCostActive = false;
466
467         /*
468          * Finish up processing.
469          */
470         if (use_own_xacts)
471         {
472                 /* here, we are not in a transaction */
473
474                 /*
475                  * This matches the CommitTransaction waiting for us in
476                  * PostgresMain().
477                  */
478                 StartTransactionCommand();
479
480                 /*
481                  * Re-establish the transaction snapshot.  This is wasted effort when
482                  * we are called as a normal utility command, because the new
483                  * transaction will be dropped immediately by PostgresMain(); but it's
484                  * necessary if we are called from autovacuum because autovacuum might
485                  * continue on to do an ANALYZE-only call.
486                  */
487                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
488         }
489
490         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
491         {
492                 /*
493                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
494                  * (autovacuum.c does this for itself.)
495                  */
496                 vac_update_datfrozenxid();
497
498                 /*
499                  * If it was a database-wide VACUUM, print FSM usage statistics (we
500                  * don't make you be superuser to see these).  We suppress this in
501                  * autovacuum, too.
502                  */
503                 if (all_rels)
504                         PrintFreeSpaceMapStatistics(elevel);
505         }
506
507         /*
508          * Clean up working storage --- note we must do this after
509          * StartTransactionCommand, else we might be trying to delete the active
510          * context!
511          */
512         MemoryContextDelete(vac_context);
513         vac_context = NULL;
514
515         if (anl_context)
516                 MemoryContextDelete(anl_context);
517 }
518
519 /*
520  * Build a list of Oids for each relation to be processed
521  *
522  * The list is built in vac_context so that it will survive across our
523  * per-relation transactions.
524  */
525 static List *
526 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
527 {
528         List       *oid_list = NIL;
529         MemoryContext oldcontext;
530
531         /* List supplied by VACUUM's caller? */
532         if (relids)
533                 return relids;
534
535         if (vacrel)
536         {
537                 /* Process a specific relation */
538                 Oid                     relid;
539
540                 relid = RangeVarGetRelid(vacrel, false);
541
542                 /* Make a relation list entry for this guy */
543                 oldcontext = MemoryContextSwitchTo(vac_context);
544                 oid_list = lappend_oid(oid_list, relid);
545                 MemoryContextSwitchTo(oldcontext);
546         }
547         else
548         {
549                 /* Process all plain relations listed in pg_class */
550                 Relation        pgclass;
551                 HeapScanDesc scan;
552                 HeapTuple       tuple;
553                 ScanKeyData key;
554
555                 ScanKeyInit(&key,
556                                         Anum_pg_class_relkind,
557                                         BTEqualStrategyNumber, F_CHAREQ,
558                                         CharGetDatum(RELKIND_RELATION));
559
560                 pgclass = heap_open(RelationRelationId, AccessShareLock);
561
562                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
563
564                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
565                 {
566                         /* Make a relation list entry for this guy */
567                         oldcontext = MemoryContextSwitchTo(vac_context);
568                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
569                         MemoryContextSwitchTo(oldcontext);
570                 }
571
572                 heap_endscan(scan);
573                 heap_close(pgclass, AccessShareLock);
574         }
575
576         return oid_list;
577 }
578
579 /*
580  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
581  */
582 void
583 vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
584                                           TransactionId *oldestXmin,
585                                           TransactionId *freezeLimit)
586 {
587         int                     freezemin;
588         TransactionId limit;
589         TransactionId safeLimit;
590
591         /*
592          * We can always ignore processes running lazy vacuum.  This is because we
593          * use these values only for deciding which tuples we must keep in the
594          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's
595          * safe to ignore it.  In theory it could be problematic to ignore lazy
596          * vacuums on a full vacuum, but keep in mind that only one vacuum process
597          * can be working on a particular table at any time, and that each vacuum
598          * is always an independent transaction.
599          */
600         *oldestXmin = GetOldestXmin(sharedRel, true);
601
602         Assert(TransactionIdIsNormal(*oldestXmin));
603
604         /*
605          * Determine the minimum freeze age to use: as specified by the caller,
606          * or vacuum_freeze_min_age, but in any case not more than half
607          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
608          * wraparound won't occur too frequently.
609          */
610         freezemin = freeze_min_age;
611         if (freezemin < 0)
612                 freezemin = vacuum_freeze_min_age;
613         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
614         Assert(freezemin >= 0);
615
616         /*
617          * Compute the cutoff XID, being careful not to generate a "permanent" XID
618          */
619         limit = *oldestXmin - freezemin;
620         if (!TransactionIdIsNormal(limit))
621                 limit = FirstNormalTransactionId;
622
623         /*
624          * If oldestXmin is very far back (in practice, more than
625          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a
626          * minimum freeze age of zero.
627          */
628         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
629         if (!TransactionIdIsNormal(safeLimit))
630                 safeLimit = FirstNormalTransactionId;
631
632         if (TransactionIdPrecedes(limit, safeLimit))
633         {
634                 ereport(WARNING,
635                                 (errmsg("oldest xmin is far in the past"),
636                                  errhint("Close open transactions soon to avoid wraparound problems.")));
637                 limit = *oldestXmin;
638         }
639
640         *freezeLimit = limit;
641 }
642
643
644 /*
645  *      vac_update_relstats() -- update statistics for one relation
646  *
647  *              Update the whole-relation statistics that are kept in its pg_class
648  *              row.  There are additional stats that will be updated if we are
649  *              doing ANALYZE, but we always update these stats.  This routine works
650  *              for both index and heap relation entries in pg_class.
651  *
652  *              We violate transaction semantics here by overwriting the rel's
653  *              existing pg_class tuple with the new values.  This is reasonably
654  *              safe since the new values are correct whether or not this transaction
655  *              commits.  The reason for this is that if we updated these tuples in
656  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
657  *              by the time we got done with a vacuum cycle, most of the tuples in
658  *              pg_class would've been obsoleted.  Of course, this only works for
659  *              fixed-size never-null columns, but these are.
660  *
661  *              Another reason for doing it this way is that when we are in a lazy
662  *              VACUUM and have inVacuum set, we mustn't do any updates --- somebody
663  *              vacuuming pg_class might think they could delete a tuple marked with
664  *              xmin = our xid.
665  *
666  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
667  *              ANALYZE.
668  */
669 void
670 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
671                                         bool hasindex, TransactionId frozenxid)
672 {
673         Relation        rd;
674         HeapTuple       ctup;
675         Form_pg_class pgcform;
676         bool            dirty;
677
678         rd = heap_open(RelationRelationId, RowExclusiveLock);
679
680         /* Fetch a copy of the tuple to scribble on */
681         ctup = SearchSysCacheCopy(RELOID,
682                                                           ObjectIdGetDatum(relid),
683                                                           0, 0, 0);
684         if (!HeapTupleIsValid(ctup))
685                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
686                          relid);
687         pgcform = (Form_pg_class) GETSTRUCT(ctup);
688
689         /* Apply required updates, if any, to copied tuple */
690
691         dirty = false;
692         if (pgcform->relpages != (int32) num_pages)
693         {
694                 pgcform->relpages = (int32) num_pages;
695                 dirty = true;
696         }
697         if (pgcform->reltuples != (float4) num_tuples)
698         {
699                 pgcform->reltuples = (float4) num_tuples;
700                 dirty = true;
701         }
702         if (pgcform->relhasindex != hasindex)
703         {
704                 pgcform->relhasindex = hasindex;
705                 dirty = true;
706         }
707
708         /*
709          * If we have discovered that there are no indexes, then there's no
710          * primary key either.  This could be done more thoroughly...
711          */
712         if (!hasindex)
713         {
714                 if (pgcform->relhaspkey)
715                 {
716                         pgcform->relhaspkey = false;
717                         dirty = true;
718                 }
719         }
720
721         /*
722          * relfrozenxid should never go backward.  Caller can pass
723          * InvalidTransactionId if it has no new data.
724          */
725         if (TransactionIdIsNormal(frozenxid) &&
726                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
727         {
728                 pgcform->relfrozenxid = frozenxid;
729                 dirty = true;
730         }
731
732         /*
733          * If anything changed, write out the tuple.  Even if nothing changed,
734          * force relcache invalidation so all backends reset their rd_targblock
735          * --- otherwise it might point to a page we truncated away.
736          */
737         if (dirty)
738         {
739                 heap_inplace_update(rd, ctup);
740                 /* the above sends a cache inval message */
741         }
742         else
743         {
744                 /* no need to change tuple, but force relcache inval anyway */
745                 CacheInvalidateRelcacheByTuple(ctup);
746         }
747
748         heap_close(rd, RowExclusiveLock);
749 }
750
751
752 /*
753  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
754  *
755  *              Update pg_database's datfrozenxid entry for our database to be the
756  *              minimum of the pg_class.relfrozenxid values.  If we are able to
757  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
758  *
759  *              We violate transaction semantics here by overwriting the database's
760  *              existing pg_database tuple with the new value.  This is reasonably
761  *              safe since the new value is correct whether or not this transaction
762  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
763  *              behind after a VACUUM.
764  *
765  *              This routine is shared by full and lazy VACUUM.
766  */
767 void
768 vac_update_datfrozenxid(void)
769 {
770         HeapTuple       tuple;
771         Form_pg_database dbform;
772         Relation        relation;
773         SysScanDesc scan;
774         HeapTuple       classTup;
775         TransactionId newFrozenXid;
776         bool            dirty = false;
777
778         /*
779          * Initialize the "min" calculation with RecentGlobalXmin.  Any
780          * not-yet-committed pg_class entries for new tables must have
781          * relfrozenxid at least this high, because any other open xact must have
782          * RecentXmin >= its PGPROC.xmin >= our RecentGlobalXmin; see
783          * AddNewRelationTuple().  So we cannot produce a wrong minimum by
784          * starting with this.
785          */
786         newFrozenXid = RecentGlobalXmin;
787
788         /*
789          * We must seqscan pg_class to find the minimum Xid, because there is no
790          * index that can help us here.
791          */
792         relation = heap_open(RelationRelationId, AccessShareLock);
793
794         scan = systable_beginscan(relation, InvalidOid, false,
795                                                           SnapshotNow, 0, NULL);
796
797         while ((classTup = systable_getnext(scan)) != NULL)
798         {
799                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
800
801                 /*
802                  * Only consider heap and TOAST tables (anything else should have
803                  * InvalidTransactionId in relfrozenxid anyway.)
804                  */
805                 if (classForm->relkind != RELKIND_RELATION &&
806                         classForm->relkind != RELKIND_TOASTVALUE)
807                         continue;
808
809                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
810
811                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
812                         newFrozenXid = classForm->relfrozenxid;
813         }
814
815         /* we're done with pg_class */
816         systable_endscan(scan);
817         heap_close(relation, AccessShareLock);
818
819         Assert(TransactionIdIsNormal(newFrozenXid));
820
821         /* Now fetch the pg_database tuple we need to update. */
822         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
823
824         /* Fetch a copy of the tuple to scribble on */
825         tuple = SearchSysCacheCopy(DATABASEOID,
826                                                            ObjectIdGetDatum(MyDatabaseId),
827                                                            0, 0, 0);
828         if (!HeapTupleIsValid(tuple))
829                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
830         dbform = (Form_pg_database) GETSTRUCT(tuple);
831
832         /*
833          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
834          * and detect the common case where it doesn't go forward either.
835          */
836         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
837         {
838                 dbform->datfrozenxid = newFrozenXid;
839                 dirty = true;
840         }
841
842         if (dirty)
843                 heap_inplace_update(relation, tuple);
844
845         heap_freetuple(tuple);
846         heap_close(relation, RowExclusiveLock);
847
848         /*
849          * If we were able to advance datfrozenxid, mark the flat-file copy of
850          * pg_database for update at commit, and see if we can truncate
851          * pg_clog.
852          */
853         if (dirty)
854         {
855                 database_file_update_needed();
856                 vac_truncate_clog(newFrozenXid);
857         }
858 }
859
860
861 /*
862  *      vac_truncate_clog() -- attempt to truncate the commit log
863  *
864  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
865  *              and use it to truncate the transaction commit log (pg_clog).
866  *              Also update the XID wrap limit info maintained by varsup.c.
867  *
868  *              The passed XID is simply the one I just wrote into my pg_database
869  *              entry.  It's used to initialize the "min" calculation.
870  *
871  *              This routine is shared by full and lazy VACUUM.  Note that it's
872  *              only invoked when we've managed to change our DB's datfrozenxid
873  *              entry.
874  */
875 static void
876 vac_truncate_clog(TransactionId frozenXID)
877 {
878         TransactionId myXID = GetCurrentTransactionId();
879         Relation        relation;
880         HeapScanDesc scan;
881         HeapTuple       tuple;
882         NameData        oldest_datname;
883         bool            frozenAlreadyWrapped = false;
884
885         /* init oldest_datname to sync with my frozenXID */
886         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
887
888         /*
889          * Scan pg_database to compute the minimum datfrozenxid
890          *
891          * Note: we need not worry about a race condition with new entries being
892          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
893          * existing DB's datfrozenxid, and that source DB cannot be ours because
894          * of the interlock against copying a DB containing an active backend.
895          * Hence the new entry will not reduce the minimum.  Also, if two
896          * VACUUMs concurrently modify the datfrozenxid's of different databases,
897          * the worst possible outcome is that pg_clog is not truncated as
898          * aggressively as it could be.
899          */
900         relation = heap_open(DatabaseRelationId, AccessShareLock);
901
902         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
903
904         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
905         {
906                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
907
908                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
909
910                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
911                         frozenAlreadyWrapped = true;
912                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
913                 {
914                         frozenXID = dbform->datfrozenxid;
915                         namecpy(&oldest_datname, &dbform->datname);
916                 }
917         }
918
919         heap_endscan(scan);
920
921         heap_close(relation, AccessShareLock);
922
923         /*
924          * Do not truncate CLOG if we seem to have suffered wraparound already;
925          * the computed minimum XID might be bogus.  This case should now be
926          * impossible due to the defenses in GetNewTransactionId, but we keep the
927          * test anyway.
928          */
929         if (frozenAlreadyWrapped)
930         {
931                 ereport(WARNING,
932                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
933                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
934                 return;
935         }
936
937         /* Truncate CLOG to the oldest frozenxid */
938         TruncateCLOG(frozenXID);
939
940         /*
941          * Update the wrap limit for GetNewTransactionId.  Note: this function
942          * will also signal the postmaster for an(other) autovac cycle if needed.
943          */
944         SetTransactionIdLimit(frozenXID, &oldest_datname);
945 }
946
947
948 /****************************************************************************
949  *                                                                                                                                                      *
950  *                      Code common to both flavors of VACUUM                                                   *
951  *                                                                                                                                                      *
952  ****************************************************************************
953  */
954
955
956 /*
957  *      vacuum_rel() -- vacuum one heap relation
958  *
959  *              Doing one heap at a time incurs extra overhead, since we need to
960  *              check that the heap exists again just before we vacuum it.      The
961  *              reason that we do this is so that vacuuming can be spread across
962  *              many small transactions.  Otherwise, two-phase locking would require
963  *              us to lock the entire database during one pass of the vacuum cleaner.
964  *
965  *              At entry and exit, we are not inside a transaction.
966  */
967 static void
968 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
969 {
970         LOCKMODE        lmode;
971         Relation        onerel;
972         LockRelId       onerelid;
973         Oid                     toast_relid;
974
975         /* Begin a transaction for vacuuming this relation */
976         StartTransactionCommand();
977
978         if (vacstmt->full)
979         {
980                 /* functions in indexes may want a snapshot set */
981                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
982         }
983         else
984         {
985                 /*
986                  * During a lazy VACUUM we do not run any user-supplied functions, and
987                  * so it should be safe to not create a transaction snapshot.
988                  *
989                  * We can furthermore set the inVacuum flag, which lets other
990                  * concurrent VACUUMs know that they can ignore this one while
991                  * determining their OldestXmin.  (The reason we don't set inVacuum
992                  * during a full VACUUM is exactly that we may have to run user-
993                  * defined functions for functional indexes, and we want to make sure
994                  * that if they use the snapshot set above, any tuples it requires
995                  * can't get removed from other tables.  An index function that
996                  * depends on the contents of other tables is arguably broken, but we
997                  * won't break it here by violating transaction semantics.)
998                  *
999                  * Note: the inVacuum flag remains set until CommitTransaction or
1000                  * AbortTransaction.  We don't want to clear it until we reset
1001                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1002                  * which is probably Not Good.
1003                  */
1004                 MyProc->inVacuum = true;
1005         }
1006
1007         /*
1008          * Check for user-requested abort.      Note we want this to be inside a
1009          * transaction, so xact.c doesn't issue useless WARNING.
1010          */
1011         CHECK_FOR_INTERRUPTS();
1012
1013         /*
1014          * Determine the type of lock we want --- hard exclusive lock for a FULL
1015          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1016          * way, we can be sure that no other backend is vacuuming the same table.
1017          */
1018         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1019
1020         /*
1021          * Open the relation and get the appropriate lock on it.
1022          *
1023          * There's a race condition here: the rel may have gone away since the
1024          * last time we saw it.  If so, we don't need to vacuum it.
1025          */
1026         onerel = try_relation_open(relid, lmode);
1027
1028         if (!onerel)
1029         {
1030                 CommitTransactionCommand();
1031                 return;
1032         }
1033
1034         /*
1035          * Check permissions.
1036          *
1037          * We allow the user to vacuum a table if he is superuser, the table
1038          * owner, or the database owner (but in the latter case, only if it's not
1039          * a shared relation).  pg_class_ownercheck includes the superuser case.
1040          *
1041          * Note we choose to treat permissions failure as a WARNING and keep
1042          * trying to vacuum the rest of the DB --- is this appropriate?
1043          */
1044         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1045                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1046         {
1047                 ereport(WARNING,
1048                                 (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1049                                                 RelationGetRelationName(onerel))));
1050                 relation_close(onerel, lmode);
1051                 CommitTransactionCommand();
1052                 return;
1053         }
1054
1055         /*
1056          * Check that it's a plain table; we used to do this in get_rel_oids() but
1057          * seems safer to check after we've locked the relation.
1058          */
1059         if (onerel->rd_rel->relkind != expected_relkind)
1060         {
1061                 ereport(WARNING,
1062                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1063                                                 RelationGetRelationName(onerel))));
1064                 relation_close(onerel, lmode);
1065                 CommitTransactionCommand();
1066                 return;
1067         }
1068
1069         /*
1070          * Silently ignore tables that are temp tables of other backends ---
1071          * trying to vacuum these will lead to great unhappiness, since their
1072          * contents are probably not up-to-date on disk.  (We don't throw a
1073          * warning here; it would just lead to chatter during a database-wide
1074          * VACUUM.)
1075          */
1076         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1077         {
1078                 relation_close(onerel, lmode);
1079                 CommitTransactionCommand();
1080                 return;
1081         }
1082
1083         /*
1084          * Get a session-level lock too. This will protect our access to the
1085          * relation across multiple transactions, so that we can vacuum the
1086          * relation's TOAST table (if any) secure in the knowledge that no one is
1087          * deleting the parent relation.
1088          *
1089          * NOTE: this cannot block, even if someone else is waiting for access,
1090          * because the lock manager knows that both lock requests are from the
1091          * same process.
1092          */
1093         onerelid = onerel->rd_lockInfo.lockRelId;
1094         LockRelationIdForSession(&onerelid, lmode);
1095
1096         /*
1097          * Remember the relation's TOAST relation for later
1098          */
1099         toast_relid = onerel->rd_rel->reltoastrelid;
1100
1101         /*
1102          * Do the actual work --- either FULL or "lazy" vacuum
1103          */
1104         if (vacstmt->full)
1105                 full_vacuum_rel(onerel, vacstmt);
1106         else
1107                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy);
1108
1109         /* all done with this class, but hold lock until commit */
1110         relation_close(onerel, NoLock);
1111
1112         /*
1113          * Complete the transaction and free all temporary memory used.
1114          */
1115         CommitTransactionCommand();
1116
1117         /*
1118          * If the relation has a secondary toast rel, vacuum that too while we
1119          * still hold the session lock on the master table.  Note however that
1120          * "analyze" will not get done on the toast table.      This is good, because
1121          * the toaster always uses hardcoded index access and statistics are
1122          * totally unimportant for toast relations.
1123          */
1124         if (toast_relid != InvalidOid)
1125                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
1126
1127         /*
1128          * Now release the session-level lock on the master table.
1129          */
1130         UnlockRelationIdForSession(&onerelid, lmode);
1131 }
1132
1133
1134 /****************************************************************************
1135  *                                                                                                                                                      *
1136  *                      Code for VACUUM FULL (only)                                                                             *
1137  *                                                                                                                                                      *
1138  ****************************************************************************
1139  */
1140
1141
1142 /*
1143  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1144  *
1145  *              This routine vacuums a single heap, cleans out its indexes, and
1146  *              updates its num_pages and num_tuples statistics.
1147  *
1148  *              At entry, we have already established a transaction and opened
1149  *              and locked the relation.
1150  */
1151 static void
1152 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1153 {
1154         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1155                                                                                  * clean indexes */
1156         VacPageListData fraged_pages;           /* List of pages with space enough for
1157                                                                                  * re-using */
1158         Relation   *Irel;
1159         int                     nindexes,
1160                                 i;
1161         VRelStats  *vacrelstats;
1162
1163         vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
1164                                                   &OldestXmin, &FreezeLimit);
1165
1166         /*
1167          * Flush any previous async-commit transactions.  This does not guarantee
1168          * that we will be able to set hint bits for tuples they inserted, but
1169          * it improves the probability, especially in simple sequential-commands
1170          * cases.  See scan_heap() and repair_frag() for more about this.
1171          */
1172         XLogAsyncCommitFlush();
1173
1174         /*
1175          * Set up statistics-gathering machinery.
1176          */
1177         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1178         vacrelstats->rel_pages = 0;
1179         vacrelstats->rel_tuples = 0;
1180         vacrelstats->hasindex = false;
1181
1182         /* scan the heap */
1183         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1184         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1185
1186         /* Now open all indexes of the relation */
1187         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1188         if (nindexes > 0)
1189                 vacrelstats->hasindex = true;
1190
1191         /* Clean/scan index relation(s) */
1192         if (Irel != NULL)
1193         {
1194                 if (vacuum_pages.num_pages > 0)
1195                 {
1196                         for (i = 0; i < nindexes; i++)
1197                                 vacuum_index(&vacuum_pages, Irel[i],
1198                                                          vacrelstats->rel_tuples, 0);
1199                 }
1200                 else
1201                 {
1202                         /* just scan indexes to update statistic */
1203                         for (i = 0; i < nindexes; i++)
1204                                 scan_index(Irel[i], vacrelstats->rel_tuples);
1205                 }
1206         }
1207
1208         if (fraged_pages.num_pages > 0)
1209         {
1210                 /* Try to shrink heap */
1211                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1212                                         nindexes, Irel);
1213                 vac_close_indexes(nindexes, Irel, NoLock);
1214         }
1215         else
1216         {
1217                 vac_close_indexes(nindexes, Irel, NoLock);
1218                 if (vacuum_pages.num_pages > 0)
1219                 {
1220                         /* Clean pages from vacuum_pages list */
1221                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1222                 }
1223         }
1224
1225         /* update shared free space map with final free space info */
1226         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1227
1228         /* update statistics in pg_class */
1229         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1230                                                 vacrelstats->rel_tuples, vacrelstats->hasindex,
1231                                                 FreezeLimit);
1232
1233         /* report results to the stats collector, too */
1234         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1235                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1236 }
1237
1238
1239 /*
1240  *      scan_heap() -- scan an open heap relation
1241  *
1242  *              This routine sets commit status bits, constructs vacuum_pages (list
1243  *              of pages we need to compact free space on and/or clean indexes of
1244  *              deleted tuples), constructs fraged_pages (list of pages with free
1245  *              space that tuples could be moved into), and calculates statistics
1246  *              on the number of live tuples in the heap.
1247  */
1248 static void
1249 scan_heap(VRelStats *vacrelstats, Relation onerel,
1250                   VacPageList vacuum_pages, VacPageList fraged_pages)
1251 {
1252         BlockNumber nblocks,
1253                                 blkno;
1254         char       *relname;
1255         VacPage         vacpage;
1256         BlockNumber empty_pages,
1257                                 empty_end_pages;
1258         double          num_tuples,
1259                                 tups_vacuumed,
1260                                 nkeep,
1261                                 nunused;
1262         double          free_space,
1263                                 usable_free_space;
1264         Size            min_tlen = MaxHeapTupleSize;
1265         Size            max_tlen = 0;
1266         bool            do_shrinking = true;
1267         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1268         int                     num_vtlinks = 0;
1269         int                     free_vtlinks = 100;
1270         PGRUsage        ru0;
1271
1272         pg_rusage_init(&ru0);
1273
1274         relname = RelationGetRelationName(onerel);
1275         ereport(elevel,
1276                         (errmsg("vacuuming \"%s.%s\"",
1277                                         get_namespace_name(RelationGetNamespace(onerel)),
1278                                         relname)));
1279
1280         empty_pages = empty_end_pages = 0;
1281         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1282         free_space = 0;
1283
1284         nblocks = RelationGetNumberOfBlocks(onerel);
1285
1286         /*
1287          * We initially create each VacPage item in a maximal-sized workspace,
1288          * then copy the workspace into a just-large-enough copy.
1289          */
1290         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1291
1292         for (blkno = 0; blkno < nblocks; blkno++)
1293         {
1294                 Page            page,
1295                                         tempPage = NULL;
1296                 bool            do_reap,
1297                                         do_frag;
1298                 Buffer          buf;
1299                 OffsetNumber offnum,
1300                                         maxoff;
1301                 bool            notup;
1302                 OffsetNumber frozen[MaxOffsetNumber];
1303                 int                     nfrozen;
1304
1305                 vacuum_delay_point();
1306
1307                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1308                 page = BufferGetPage(buf);
1309
1310                 /*
1311                  * Since we are holding exclusive lock on the relation, no other
1312                  * backend can be accessing the page; however it is possible that the
1313                  * background writer will try to write the page if it's already marked
1314                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1315                  * must take exclusive buffer lock wherever we potentially modify
1316                  * pages.
1317                  */
1318                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1319
1320                 vacpage->blkno = blkno;
1321                 vacpage->offsets_used = 0;
1322                 vacpage->offsets_free = 0;
1323
1324                 if (PageIsNew(page))
1325                 {
1326                         VacPage         vacpagecopy;
1327
1328                         ereport(WARNING,
1329                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1330                                            relname, blkno)));
1331                         PageInit(page, BufferGetPageSize(buf), 0);
1332                         MarkBufferDirty(buf);
1333                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1334                         free_space += vacpage->free;
1335                         empty_pages++;
1336                         empty_end_pages++;
1337                         vacpagecopy = copy_vac_page(vacpage);
1338                         vpage_insert(vacuum_pages, vacpagecopy);
1339                         vpage_insert(fraged_pages, vacpagecopy);
1340                         UnlockReleaseBuffer(buf);
1341                         continue;
1342                 }
1343
1344                 if (PageIsEmpty(page))
1345                 {
1346                         VacPage         vacpagecopy;
1347
1348                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1349                         free_space += vacpage->free;
1350                         empty_pages++;
1351                         empty_end_pages++;
1352                         vacpagecopy = copy_vac_page(vacpage);
1353                         vpage_insert(vacuum_pages, vacpagecopy);
1354                         vpage_insert(fraged_pages, vacpagecopy);
1355                         UnlockReleaseBuffer(buf);
1356                         continue;
1357                 }
1358
1359                 nfrozen = 0;
1360                 notup = true;
1361                 maxoff = PageGetMaxOffsetNumber(page);
1362                 for (offnum = FirstOffsetNumber;
1363                          offnum <= maxoff;
1364                          offnum = OffsetNumberNext(offnum))
1365                 {
1366                         ItemId          itemid = PageGetItemId(page, offnum);
1367                         bool            tupgone = false;
1368                         HeapTupleData tuple;
1369
1370                         /*
1371                          * Collect un-used items too - it's possible to have indexes
1372                          * pointing here after crash.
1373                          */
1374                         if (!ItemIdIsUsed(itemid))
1375                         {
1376                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1377                                 nunused += 1;
1378                                 continue;
1379                         }
1380
1381                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1382                         tuple.t_len = ItemIdGetLength(itemid);
1383                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1384
1385                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1386                         {
1387                                 case HEAPTUPLE_LIVE:
1388                                         /* Tuple is good --- but let's do some validity checks */
1389                                         if (onerel->rd_rel->relhasoids &&
1390                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1391                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1392                                                          relname, blkno, offnum);
1393                                         /*
1394                                          * The shrinkage phase of VACUUM FULL requires that all
1395                                          * live tuples have XMIN_COMMITTED set --- see comments in
1396                                          * repair_frag()'s walk-along-page loop.  Use of async
1397                                          * commit may prevent HeapTupleSatisfiesVacuum from
1398                                          * setting the bit for a recently committed tuple.  Rather
1399                                          * than trying to handle this corner case, we just give up
1400                                          * and don't shrink.
1401                                          */
1402                                         if (do_shrinking &&
1403                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1404                                         {
1405                                                 ereport(LOG,
1406                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1407                                                                                 relname, blkno, offnum,
1408                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1409                                                 do_shrinking = false;
1410                                         }
1411                                         break;
1412                                 case HEAPTUPLE_DEAD:
1413                                         tupgone = true;         /* we can delete the tuple */
1414                                         /*
1415                                          * We need not require XMIN_COMMITTED or XMAX_COMMITTED to
1416                                          * be set, since we will remove the tuple without any
1417                                          * further examination of its hint bits.
1418                                          */
1419                                         break;
1420                                 case HEAPTUPLE_RECENTLY_DEAD:
1421
1422                                         /*
1423                                          * If tuple is recently deleted then we must not remove it
1424                                          * from relation.
1425                                          */
1426                                         nkeep += 1;
1427
1428                                         /*
1429                                          * As with the LIVE case, shrinkage requires XMIN_COMMITTED
1430                                          * to be set.
1431                                          */
1432                                         if (do_shrinking &&
1433                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1434                                         {
1435                                                 ereport(LOG,
1436                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1437                                                                                 relname, blkno, offnum,
1438                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1439                                                 do_shrinking = false;
1440                                         }
1441
1442                                         /*
1443                                          * If we do shrinking and this tuple is updated one then
1444                                          * remember it to construct updated tuple dependencies.
1445                                          */
1446                                         if (do_shrinking &&
1447                                                 !(ItemPointerEquals(&(tuple.t_self),
1448                                                                                         &(tuple.t_data->t_ctid))))
1449                                         {
1450                                                 if (free_vtlinks == 0)
1451                                                 {
1452                                                         free_vtlinks = 1000;
1453                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1454                                                                                            (free_vtlinks + num_vtlinks) *
1455                                                                                                          sizeof(VTupleLinkData));
1456                                                 }
1457                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1458                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1459                                                 free_vtlinks--;
1460                                                 num_vtlinks++;
1461                                         }
1462                                         break;
1463                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1464
1465                                         /*
1466                                          * This should not happen, since we hold exclusive lock on
1467                                          * the relation; shouldn't we raise an error?  (Actually,
1468                                          * it can happen in system catalogs, since we tend to
1469                                          * release write lock before commit there.)  As above,
1470                                          * we can't apply repair_frag() if the tuple state is
1471                                          * uncertain.
1472                                          */
1473                                         if (do_shrinking)
1474                                                 ereport(LOG,
1475                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1476                                                                                 relname, blkno, offnum,
1477                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1478                                         do_shrinking = false;
1479                                         break;
1480                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1481
1482                                         /*
1483                                          * This should not happen, since we hold exclusive lock on
1484                                          * the relation; shouldn't we raise an error?  (Actually,
1485                                          * it can happen in system catalogs, since we tend to
1486                                          * release write lock before commit there.)  As above,
1487                                          * we can't apply repair_frag() if the tuple state is
1488                                          * uncertain.
1489                                          */
1490                                         if (do_shrinking)
1491                                                 ereport(LOG,
1492                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1493                                                                                 relname, blkno, offnum,
1494                                                                                 HeapTupleHeaderGetXmax(tuple.t_data))));
1495                                         do_shrinking = false;
1496                                         break;
1497                                 default:
1498                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1499                                         break;
1500                         }
1501
1502                         if (tupgone)
1503                         {
1504                                 ItemId          lpp;
1505
1506                                 /*
1507                                  * Here we are building a temporary copy of the page with dead
1508                                  * tuples removed.      Below we will apply
1509                                  * PageRepairFragmentation to the copy, so that we can
1510                                  * determine how much space will be available after removal of
1511                                  * dead tuples.  But note we are NOT changing the real page
1512                                  * yet...
1513                                  */
1514                                 if (tempPage == NULL)
1515                                 {
1516                                         Size            pageSize;
1517
1518                                         pageSize = PageGetPageSize(page);
1519                                         tempPage = (Page) palloc(pageSize);
1520                                         memcpy(tempPage, page, pageSize);
1521                                 }
1522
1523                                 /* mark it unused on the temp page */
1524                                 lpp = PageGetItemId(tempPage, offnum);
1525                                 ItemIdSetUnused(lpp);
1526
1527                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1528                                 tups_vacuumed += 1;
1529                         }
1530                         else
1531                         {
1532                                 num_tuples += 1;
1533                                 notup = false;
1534                                 if (tuple.t_len < min_tlen)
1535                                         min_tlen = tuple.t_len;
1536                                 if (tuple.t_len > max_tlen)
1537                                         max_tlen = tuple.t_len;
1538
1539                                 /*
1540                                  * Each non-removable tuple must be checked to see if it
1541                                  * needs freezing.
1542                                  */
1543                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1544                                                                           InvalidBuffer))
1545                                         frozen[nfrozen++] = offnum;
1546                         }
1547                 }                                               /* scan along page */
1548
1549                 if (tempPage != NULL)
1550                 {
1551                         /* Some tuples are removable; figure free space after removal */
1552                         PageRepairFragmentation(tempPage, NULL);
1553                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1554                         pfree(tempPage);
1555                         do_reap = true;
1556                 }
1557                 else
1558                 {
1559                         /* Just use current available space */
1560                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1561                         /* Need to reap the page if it has LP_UNUSED line pointers */
1562                         do_reap = (vacpage->offsets_free > 0);
1563                 }
1564
1565                 free_space += vacpage->free;
1566
1567                 /*
1568                  * Add the page to fraged_pages if it has a useful amount of free
1569                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1570                  * don't know that accurately near the start of the relation, so add
1571                  * pages unconditionally if they have >= BLCKSZ/10 free space.
1572                  */
1573                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1574
1575                 if (do_reap || do_frag)
1576                 {
1577                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1578
1579                         if (do_reap)
1580                                 vpage_insert(vacuum_pages, vacpagecopy);
1581                         if (do_frag)
1582                                 vpage_insert(fraged_pages, vacpagecopy);
1583                 }
1584
1585                 /*
1586                  * Include the page in empty_end_pages if it will be empty after
1587                  * vacuuming; this is to keep us from using it as a move destination.
1588                  */
1589                 if (notup)
1590                 {
1591                         empty_pages++;
1592                         empty_end_pages++;
1593                 }
1594                 else
1595                         empty_end_pages = 0;
1596
1597                 /*
1598                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1599                  * record recording the changes.  We must log the changes to be
1600                  * crash-safe against future truncation of CLOG.
1601                  */
1602                 if (nfrozen > 0)
1603                 {
1604                         MarkBufferDirty(buf);
1605                         /* no XLOG for temp tables, though */
1606                         if (!onerel->rd_istemp)
1607                         {
1608                                 XLogRecPtr      recptr;
1609
1610                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1611                                                                                  frozen, nfrozen);
1612                                 PageSetLSN(page, recptr);
1613                                 PageSetTLI(page, ThisTimeLineID);
1614                         }
1615                 }
1616
1617                 UnlockReleaseBuffer(buf);
1618         }
1619
1620         pfree(vacpage);
1621
1622         /* save stats in the rel list for use later */
1623         vacrelstats->rel_tuples = num_tuples;
1624         vacrelstats->rel_pages = nblocks;
1625         if (num_tuples == 0)
1626                 min_tlen = max_tlen = 0;
1627         vacrelstats->min_tlen = min_tlen;
1628         vacrelstats->max_tlen = max_tlen;
1629
1630         vacuum_pages->empty_end_pages = empty_end_pages;
1631         fraged_pages->empty_end_pages = empty_end_pages;
1632
1633         /*
1634          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1635          * remove any "empty" end-pages from the list, and compute usable free
1636          * space = free space in remaining pages.
1637          */
1638         if (do_shrinking)
1639         {
1640                 int                     i;
1641
1642                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1643                 fraged_pages->num_pages -= empty_end_pages;
1644                 usable_free_space = 0;
1645                 for (i = 0; i < fraged_pages->num_pages; i++)
1646                         usable_free_space += fraged_pages->pagedesc[i]->free;
1647         }
1648         else
1649         {
1650                 fraged_pages->num_pages = 0;
1651                 usable_free_space = 0;
1652         }
1653
1654         /* don't bother to save vtlinks if we will not call repair_frag */
1655         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1656         {
1657                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1658                           vac_cmp_vtlinks);
1659                 vacrelstats->vtlinks = vtlinks;
1660                 vacrelstats->num_vtlinks = num_vtlinks;
1661         }
1662         else
1663         {
1664                 vacrelstats->vtlinks = NULL;
1665                 vacrelstats->num_vtlinks = 0;
1666                 pfree(vtlinks);
1667         }
1668
1669         ereport(elevel,
1670                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1671                                         RelationGetRelationName(onerel),
1672                                         tups_vacuumed, num_tuples, nblocks),
1673                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1674                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1675                                            "There were %.0f unused item pointers.\n"
1676            "Total free space (including removable row versions) is %.0f bytes.\n"
1677                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1678          "%u pages containing %.0f free bytes are potential move destinations.\n"
1679                                            "%s.",
1680                                            nkeep,
1681                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1682                                            nunused,
1683                                            free_space,
1684                                            empty_pages, empty_end_pages,
1685                                            fraged_pages->num_pages, usable_free_space,
1686                                            pg_rusage_show(&ru0))));
1687 }
1688
1689
1690 /*
1691  *      repair_frag() -- try to repair relation's fragmentation
1692  *
1693  *              This routine marks dead tuples as unused and tries re-use dead space
1694  *              by moving tuples (and inserting indexes if needed). It constructs
1695  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1696  *              for them after committing (in hack-manner - without losing locks
1697  *              and freeing memory!) current transaction. It truncates relation
1698  *              if some end-blocks are gone away.
1699  */
1700 static void
1701 repair_frag(VRelStats *vacrelstats, Relation onerel,
1702                         VacPageList vacuum_pages, VacPageList fraged_pages,
1703                         int nindexes, Relation *Irel)
1704 {
1705         TransactionId myXID = GetCurrentTransactionId();
1706         Buffer          dst_buffer = InvalidBuffer;
1707         BlockNumber nblocks,
1708                                 blkno;
1709         BlockNumber last_move_dest_block = 0,
1710                                 last_vacuum_block;
1711         Page            dst_page = NULL;
1712         ExecContextData ec;
1713         VacPageListData Nvacpagelist;
1714         VacPage         dst_vacpage = NULL,
1715                                 last_vacuum_page,
1716                                 vacpage,
1717                            *curpage;
1718         int                     i;
1719         int                     num_moved = 0,
1720                                 num_fraged_pages,
1721                                 vacuumed_pages;
1722         int                     keep_tuples = 0;
1723         PGRUsage        ru0;
1724
1725         pg_rusage_init(&ru0);
1726
1727         ExecContext_Init(&ec, onerel);
1728
1729         Nvacpagelist.num_pages = 0;
1730         num_fraged_pages = fraged_pages->num_pages;
1731         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1732         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1733         if (vacuumed_pages > 0)
1734         {
1735                 /* get last reaped page from vacuum_pages */
1736                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1737                 last_vacuum_block = last_vacuum_page->blkno;
1738         }
1739         else
1740         {
1741                 last_vacuum_page = NULL;
1742                 last_vacuum_block = InvalidBlockNumber;
1743         }
1744
1745         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1746         vacpage->offsets_used = vacpage->offsets_free = 0;
1747
1748         /*
1749          * Scan pages backwards from the last nonempty page, trying to move tuples
1750          * down to lower pages.  Quit when we reach a page that we have moved any
1751          * tuples onto, or the first page if we haven't moved anything, or when we
1752          * find a page we cannot completely empty (this last condition is handled
1753          * by "break" statements within the loop).
1754          *
1755          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1756          * in order by blkno.
1757          */
1758         nblocks = vacrelstats->rel_pages;
1759         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1760                  blkno > last_move_dest_block;
1761                  blkno--)
1762         {
1763                 Buffer          buf;
1764                 Page            page;
1765                 OffsetNumber offnum,
1766                                         maxoff;
1767                 bool            isempty,
1768                                         chain_tuple_moved;
1769
1770                 vacuum_delay_point();
1771
1772                 /*
1773                  * Forget fraged_pages pages at or after this one; they're no longer
1774                  * useful as move targets, since we only want to move down. Note that
1775                  * since we stop the outer loop at last_move_dest_block, pages removed
1776                  * here cannot have had anything moved onto them already.
1777                  *
1778                  * Also note that we don't change the stored fraged_pages list, only
1779                  * our local variable num_fraged_pages; so the forgotten pages are
1780                  * still available to be loaded into the free space map later.
1781                  */
1782                 while (num_fraged_pages > 0 &&
1783                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1784                 {
1785                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1786                         --num_fraged_pages;
1787                 }
1788
1789                 /*
1790                  * Process this page of relation.
1791                  */
1792                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1793                 page = BufferGetPage(buf);
1794
1795                 vacpage->offsets_free = 0;
1796
1797                 isempty = PageIsEmpty(page);
1798
1799                 /* Is the page in the vacuum_pages list? */
1800                 if (blkno == last_vacuum_block)
1801                 {
1802                         if (last_vacuum_page->offsets_free > 0)
1803                         {
1804                                 /* there are dead tuples on this page - clean them */
1805                                 Assert(!isempty);
1806                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1807                                 vacuum_page(onerel, buf, last_vacuum_page);
1808                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1809                         }
1810                         else
1811                                 Assert(isempty);
1812                         --vacuumed_pages;
1813                         if (vacuumed_pages > 0)
1814                         {
1815                                 /* get prev reaped page from vacuum_pages */
1816                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1817                                 last_vacuum_block = last_vacuum_page->blkno;
1818                         }
1819                         else
1820                         {
1821                                 last_vacuum_page = NULL;
1822                                 last_vacuum_block = InvalidBlockNumber;
1823                         }
1824                         if (isempty)
1825                         {
1826                                 ReleaseBuffer(buf);
1827                                 continue;
1828                         }
1829                 }
1830                 else
1831                         Assert(!isempty);
1832
1833                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1834                                                                                  * this page, yet */
1835                 vacpage->blkno = blkno;
1836                 maxoff = PageGetMaxOffsetNumber(page);
1837                 for (offnum = FirstOffsetNumber;
1838                          offnum <= maxoff;
1839                          offnum = OffsetNumberNext(offnum))
1840                 {
1841                         Size            tuple_len;
1842                         HeapTupleData tuple;
1843                         ItemId          itemid = PageGetItemId(page, offnum);
1844
1845                         if (!ItemIdIsUsed(itemid))
1846                                 continue;
1847
1848                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1849                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1850                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1851
1852                         /* ---
1853                          * VACUUM FULL has an exclusive lock on the relation.  So
1854                          * normally no other transaction can have pending INSERTs or
1855                          * DELETEs in this relation.  A tuple is either:
1856                          *              (a) live (XMIN_COMMITTED)
1857                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1858                          *                      is visible to all active transactions)
1859                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
1860                          *                      but at least one active transaction does not see the
1861                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
1862                          *              (d) moved by the currently running VACUUM
1863                          *              (e) inserted or deleted by a not yet committed transaction,
1864                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
1865                          * In case (e) we wouldn't be in repair_frag() at all, because
1866                          * scan_heap() detects those cases and shuts off shrinking.
1867                          * We can't see case (b) here either, because such tuples were
1868                          * already removed by vacuum_page().  Cases (a) and (c) are
1869                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
1870                          * possible if a whole tuple chain has been moved while
1871                          * processing this or a higher numbered block.
1872                          * ---
1873                          */
1874                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1875                         {
1876                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1877                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1878                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
1879                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
1880
1881                                 /*
1882                                  * MOVED_OFF by another VACUUM would have caused the
1883                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
1884                                  */
1885                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1886                                         elog(ERROR, "invalid XVAC in tuple header");
1887
1888                                 /*
1889                                  * If this (chain) tuple is moved by me already then I have to
1890                                  * check is it in vacpage or not - i.e. is it moved while
1891                                  * cleaning this page or some previous one.
1892                                  */
1893
1894                                 /* Can't we Assert(keep_tuples > 0) here? */
1895                                 if (keep_tuples == 0)
1896                                         continue;
1897                                 if (chain_tuple_moved)
1898                                 {
1899                                         /* some chains were moved while cleaning this page */
1900                                         Assert(vacpage->offsets_free > 0);
1901                                         for (i = 0; i < vacpage->offsets_free; i++)
1902                                         {
1903                                                 if (vacpage->offsets[i] == offnum)
1904                                                         break;
1905                                         }
1906                                         if (i >= vacpage->offsets_free)         /* not found */
1907                                         {
1908                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1909                                                 keep_tuples--;
1910                                         }
1911                                 }
1912                                 else
1913                                 {
1914                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1915                                         keep_tuples--;
1916                                 }
1917                                 continue;
1918                         }
1919
1920                         /*
1921                          * If this tuple is in a chain of tuples created in updates by
1922                          * "recent" transactions then we have to move the whole chain of
1923                          * tuples to other places, so that we can write new t_ctid links
1924                          * that preserve the chain relationship.
1925                          *
1926                          * This test is complicated.  Read it as "if tuple is a recently
1927                          * created updated version, OR if it is an obsoleted version". (In
1928                          * the second half of the test, we needn't make any check on XMAX
1929                          * --- it must be recently obsoleted, else scan_heap would have
1930                          * deemed it removable.)
1931                          *
1932                          * NOTE: this test is not 100% accurate: it is possible for a
1933                          * tuple to be an updated one with recent xmin, and yet not match
1934                          * any new_tid entry in the vtlinks list.  Presumably there was
1935                          * once a parent tuple with xmax matching the xmin, but it's
1936                          * possible that that tuple has been removed --- for example, if
1937                          * it had xmin = xmax and wasn't itself an updated version, then
1938                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
1939                          * xmin xact completes.
1940                          *
1941                          * To be on the safe side, we abandon the repair_frag process if
1942                          * we cannot find the parent tuple in vtlinks.  This may be overly
1943                          * conservative; AFAICS it would be safe to move the chain.
1944                          *
1945                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
1946                          * using OldestXmin, which is a rather coarse test, it is quite
1947                          * possible to have an update chain in which a tuple we think is
1948                          * RECENTLY_DEAD links forward to one that is definitely DEAD.
1949                          * In such a case the RECENTLY_DEAD tuple must actually be dead,
1950                          * but it seems too complicated to try to make VACUUM remove it.
1951                          * We treat each contiguous set of RECENTLY_DEAD tuples as a
1952                          * separately movable chain, ignoring any intervening DEAD ones.
1953                          */
1954                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1955                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1956                                                                                 OldestXmin)) ||
1957                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1958                                                                                            HEAP_IS_LOCKED)) &&
1959                                  !(ItemPointerEquals(&(tuple.t_self),
1960                                                                          &(tuple.t_data->t_ctid)))))
1961                         {
1962                                 Buffer          Cbuf = buf;
1963                                 bool            freeCbuf = false;
1964                                 bool            chain_move_failed = false;
1965                                 bool            moved_target = false;
1966                                 ItemPointerData Ctid;
1967                                 HeapTupleData tp = tuple;
1968                                 Size            tlen = tuple_len;
1969                                 VTupleMove      vtmove;
1970                                 int                     num_vtmove;
1971                                 int                     free_vtmove;
1972                                 VacPage         to_vacpage = NULL;
1973                                 int                     to_item = 0;
1974                                 int                     ti;
1975
1976                                 if (dst_buffer != InvalidBuffer)
1977                                 {
1978                                         ReleaseBuffer(dst_buffer);
1979                                         dst_buffer = InvalidBuffer;
1980                                 }
1981
1982                                 /* Quick exit if we have no vtlinks to search in */
1983                                 if (vacrelstats->vtlinks == NULL)
1984                                 {
1985                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
1986                                         break;          /* out of walk-along-page loop */
1987                                 }
1988
1989                                 /*
1990                                  * If this tuple is in the begin/middle of the chain then we
1991                                  * have to move to the end of chain.  As with any t_ctid
1992                                  * chase, we have to verify that each new tuple is really the
1993                                  * descendant of the tuple we came from; however, here we
1994                                  * need even more than the normal amount of paranoia.
1995                                  * If t_ctid links forward to a tuple determined to be DEAD,
1996                                  * then depending on where that tuple is, it might already
1997                                  * have been removed, and perhaps even replaced by a MOVED_IN
1998                                  * tuple.  We don't want to include any DEAD tuples in the
1999                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2000                                  */
2001                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2002                                                                                                   HEAP_IS_LOCKED)) &&
2003                                            !(ItemPointerEquals(&(tp.t_self),
2004                                                                                    &(tp.t_data->t_ctid))))
2005                                 {
2006                                         ItemPointerData nextTid;
2007                                         TransactionId priorXmax;
2008                                         Buffer          nextBuf;
2009                                         Page            nextPage;
2010                                         OffsetNumber nextOffnum;
2011                                         ItemId          nextItemid;
2012                                         HeapTupleHeader nextTdata;
2013                                         HTSV_Result     nextTstatus;
2014
2015                                         nextTid = tp.t_data->t_ctid;
2016                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2017                                         /* assume block# is OK (see heap_fetch comments) */
2018                                         nextBuf = ReadBufferWithStrategy(onerel,
2019                                                                                  ItemPointerGetBlockNumber(&nextTid),
2020                                                                                                          vac_strategy);
2021                                         nextPage = BufferGetPage(nextBuf);
2022                                         /* If bogus or unused slot, assume tp is end of chain */
2023                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2024                                         if (nextOffnum < FirstOffsetNumber ||
2025                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2026                                         {
2027                                                 ReleaseBuffer(nextBuf);
2028                                                 break;
2029                                         }
2030                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2031                                         if (!ItemIdIsUsed(nextItemid))
2032                                         {
2033                                                 ReleaseBuffer(nextBuf);
2034                                                 break;
2035                                         }
2036                                         /* if not matching XMIN, assume tp is end of chain */
2037                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2038                                                                                                                           nextItemid);
2039                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2040                                                                                          priorXmax))
2041                                         {
2042                                                 ReleaseBuffer(nextBuf);
2043                                                 break;
2044                                         }
2045                                         /*
2046                                          * Must check for DEAD or MOVED_IN tuple, too.  This
2047                                          * could potentially update hint bits, so we'd better
2048                                          * hold the buffer content lock.
2049                                          */
2050                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2051                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2052                                                                                                                    OldestXmin,
2053                                                                                                                    nextBuf);
2054                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2055                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2056                                         {
2057                                                 UnlockReleaseBuffer(nextBuf);
2058                                                 break;
2059                                         }
2060                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2061                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2062                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2063                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2064                                         /* OK, switch our attention to the next tuple in chain */
2065                                         tp.t_data = nextTdata;
2066                                         tp.t_self = nextTid;
2067                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2068                                         if (freeCbuf)
2069                                                 ReleaseBuffer(Cbuf);
2070                                         Cbuf = nextBuf;
2071                                         freeCbuf = true;
2072                                 }
2073
2074                                 /* Set up workspace for planning the chain move */
2075                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2076                                 num_vtmove = 0;
2077                                 free_vtmove = 100;
2078
2079                                 /*
2080                                  * Now, walk backwards up the chain (towards older tuples) and
2081                                  * check if all items in chain can be moved.  We record all
2082                                  * the moves that need to be made in the vtmove array.
2083                                  */
2084                                 for (;;)
2085                                 {
2086                                         Buffer          Pbuf;
2087                                         Page            Ppage;
2088                                         ItemId          Pitemid;
2089                                         HeapTupleHeader PTdata;
2090                                         VTupleLinkData vtld,
2091                                                            *vtlp;
2092
2093                                         /* Identify a target page to move this tuple to */
2094                                         if (to_vacpage == NULL ||
2095                                                 !enough_space(to_vacpage, tlen))
2096                                         {
2097                                                 for (i = 0; i < num_fraged_pages; i++)
2098                                                 {
2099                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2100                                                                 break;
2101                                                 }
2102
2103                                                 if (i == num_fraged_pages)
2104                                                 {
2105                                                         /* can't move item anywhere */
2106                                                         chain_move_failed = true;
2107                                                         break;          /* out of check-all-items loop */
2108                                                 }
2109                                                 to_item = i;
2110                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2111                                         }
2112                                         to_vacpage->free -= MAXALIGN(tlen);
2113                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2114                                                 to_vacpage->free -= sizeof(ItemIdData);
2115                                         (to_vacpage->offsets_used)++;
2116
2117                                         /* Add an entry to vtmove list */
2118                                         if (free_vtmove == 0)
2119                                         {
2120                                                 free_vtmove = 1000;
2121                                                 vtmove = (VTupleMove)
2122                                                         repalloc(vtmove,
2123                                                                          (free_vtmove + num_vtmove) *
2124                                                                          sizeof(VTupleMoveData));
2125                                         }
2126                                         vtmove[num_vtmove].tid = tp.t_self;
2127                                         vtmove[num_vtmove].vacpage = to_vacpage;
2128                                         if (to_vacpage->offsets_used == 1)
2129                                                 vtmove[num_vtmove].cleanVpd = true;
2130                                         else
2131                                                 vtmove[num_vtmove].cleanVpd = false;
2132                                         free_vtmove--;
2133                                         num_vtmove++;
2134
2135                                         /* Remember if we reached the original target tuple */
2136                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2137                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2138                                                 moved_target = true;
2139
2140                                         /* Done if at beginning of chain */
2141                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2142                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2143                                                                                    OldestXmin))
2144                                                 break;  /* out of check-all-items loop */
2145
2146                                         /* Move to tuple with prior row version */
2147                                         vtld.new_tid = tp.t_self;
2148                                         vtlp = (VTupleLink)
2149                                                 vac_bsearch((void *) &vtld,
2150                                                                         (void *) (vacrelstats->vtlinks),
2151                                                                         vacrelstats->num_vtlinks,
2152                                                                         sizeof(VTupleLinkData),
2153                                                                         vac_cmp_vtlinks);
2154                                         if (vtlp == NULL)
2155                                         {
2156                                                 /* see discussion above */
2157                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2158                                                 chain_move_failed = true;
2159                                                 break;  /* out of check-all-items loop */
2160                                         }
2161                                         tp.t_self = vtlp->this_tid;
2162                                         Pbuf = ReadBufferWithStrategy(onerel,
2163                                                                         ItemPointerGetBlockNumber(&(tp.t_self)),
2164                                                                                                   vac_strategy);
2165                                         Ppage = BufferGetPage(Pbuf);
2166                                         Pitemid = PageGetItemId(Ppage,
2167                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2168                                         /* this can't happen since we saw tuple earlier: */
2169                                         if (!ItemIdIsUsed(Pitemid))
2170                                                 elog(ERROR, "parent itemid marked as unused");
2171                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2172
2173                                         /* ctid should not have changed since we saved it */
2174                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2175                                                                                          &(PTdata->t_ctid)));
2176
2177                                         /*
2178                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2179                                          * (child item is removed)... Due to the fact that at the
2180                                          * moment we don't remove unuseful part of update-chain,
2181                                          * it's possible to get non-matching parent row here. Like
2182                                          * as in the case which caused this problem, we stop
2183                                          * shrinking here. I could try to find real parent row but
2184                                          * want not to do it because of real solution will be
2185                                          * implemented anyway, later, and we are too close to 6.5
2186                                          * release. - vadim 06/11/99
2187                                          */
2188                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2189                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2190                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2191                                         {
2192                                                 ReleaseBuffer(Pbuf);
2193                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2194                                                 chain_move_failed = true;
2195                                                 break;  /* out of check-all-items loop */
2196                                         }
2197                                         tp.t_data = PTdata;
2198                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2199                                         if (freeCbuf)
2200                                                 ReleaseBuffer(Cbuf);
2201                                         Cbuf = Pbuf;
2202                                         freeCbuf = true;
2203                                 }                               /* end of check-all-items loop */
2204
2205                                 if (freeCbuf)
2206                                         ReleaseBuffer(Cbuf);
2207                                 freeCbuf = false;
2208
2209                                 /* Double-check that we will move the current target tuple */
2210                                 if (!moved_target && !chain_move_failed)
2211                                 {
2212                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2213                                         chain_move_failed = true;
2214                                 }
2215
2216                                 if (chain_move_failed)
2217                                 {
2218                                         /*
2219                                          * Undo changes to offsets_used state.  We don't bother
2220                                          * cleaning up the amount-free state, since we're not
2221                                          * going to do any further tuple motion.
2222                                          */
2223                                         for (i = 0; i < num_vtmove; i++)
2224                                         {
2225                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2226                                                 (vtmove[i].vacpage->offsets_used)--;
2227                                         }
2228                                         pfree(vtmove);
2229                                         break;          /* out of walk-along-page loop */
2230                                 }
2231
2232                                 /*
2233                                  * Okay, move the whole tuple chain in reverse order.
2234                                  *
2235                                  * Ctid tracks the new location of the previously-moved tuple.
2236                                  */
2237                                 ItemPointerSetInvalid(&Ctid);
2238                                 for (ti = 0; ti < num_vtmove; ti++)
2239                                 {
2240                                         VacPage         destvacpage = vtmove[ti].vacpage;
2241                                         Page            Cpage;
2242                                         ItemId          Citemid;
2243
2244                                         /* Get page to move from */
2245                                         tuple.t_self = vtmove[ti].tid;
2246                                         Cbuf = ReadBufferWithStrategy(onerel,
2247                                                                  ItemPointerGetBlockNumber(&(tuple.t_self)),
2248                                                                                                   vac_strategy);
2249
2250                                         /* Get page to move to */
2251                                         dst_buffer = ReadBufferWithStrategy(onerel,
2252                                                                                                                 destvacpage->blkno,
2253                                                                                                                 vac_strategy);
2254
2255                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2256                                         if (dst_buffer != Cbuf)
2257                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2258
2259                                         dst_page = BufferGetPage(dst_buffer);
2260                                         Cpage = BufferGetPage(Cbuf);
2261
2262                                         Citemid = PageGetItemId(Cpage,
2263                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2264                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2265                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2266
2267                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2268                                                                          dst_buffer, dst_page, destvacpage,
2269                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2270
2271                                         num_moved++;
2272                                         if (destvacpage->blkno > last_move_dest_block)
2273                                                 last_move_dest_block = destvacpage->blkno;
2274
2275                                         /*
2276                                          * Remember that we moved tuple from the current page
2277                                          * (corresponding index tuple will be cleaned).
2278                                          */
2279                                         if (Cbuf == buf)
2280                                                 vacpage->offsets[vacpage->offsets_free++] =
2281                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2282                                         else
2283                                                 keep_tuples++;
2284
2285                                         ReleaseBuffer(dst_buffer);
2286                                         ReleaseBuffer(Cbuf);
2287                                 }                               /* end of move-the-tuple-chain loop */
2288
2289                                 dst_buffer = InvalidBuffer;
2290                                 pfree(vtmove);
2291                                 chain_tuple_moved = true;
2292
2293                                 /* advance to next tuple in walk-along-page loop */
2294                                 continue;
2295                         }                                       /* end of is-tuple-in-chain test */
2296
2297                         /* try to find new page for this tuple */
2298                         if (dst_buffer == InvalidBuffer ||
2299                                 !enough_space(dst_vacpage, tuple_len))
2300                         {
2301                                 if (dst_buffer != InvalidBuffer)
2302                                 {
2303                                         ReleaseBuffer(dst_buffer);
2304                                         dst_buffer = InvalidBuffer;
2305                                 }
2306                                 for (i = 0; i < num_fraged_pages; i++)
2307                                 {
2308                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2309                                                 break;
2310                                 }
2311                                 if (i == num_fraged_pages)
2312                                         break;          /* can't move item anywhere */
2313                                 dst_vacpage = fraged_pages->pagedesc[i];
2314                                 dst_buffer = ReadBufferWithStrategy(onerel,
2315                                                                                                         dst_vacpage->blkno,
2316                                                                                                         vac_strategy);
2317                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2318                                 dst_page = BufferGetPage(dst_buffer);
2319                                 /* if this page was not used before - clean it */
2320                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2321                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2322                         }
2323                         else
2324                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2325
2326                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2327
2328                         move_plain_tuple(onerel, buf, page, &tuple,
2329                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2330
2331                         num_moved++;
2332                         if (dst_vacpage->blkno > last_move_dest_block)
2333                                 last_move_dest_block = dst_vacpage->blkno;
2334
2335                         /*
2336                          * Remember that we moved tuple from the current page
2337                          * (corresponding index tuple will be cleaned).
2338                          */
2339                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2340                 }                                               /* walk along page */
2341
2342                 /*
2343                  * If we broke out of the walk-along-page loop early (ie, still have
2344                  * offnum <= maxoff), then we failed to move some tuple off this page.
2345                  * No point in shrinking any more, so clean up and exit the per-page
2346                  * loop.
2347                  */
2348                 if (offnum < maxoff && keep_tuples > 0)
2349                 {
2350                         OffsetNumber off;
2351
2352                         /*
2353                          * Fix vacpage state for any unvisited tuples remaining on page
2354                          */
2355                         for (off = OffsetNumberNext(offnum);
2356                                  off <= maxoff;
2357                                  off = OffsetNumberNext(off))
2358                         {
2359                                 ItemId          itemid = PageGetItemId(page, off);
2360                                 HeapTupleHeader htup;
2361
2362                                 if (!ItemIdIsUsed(itemid))
2363                                         continue;
2364                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2365                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2366                                         continue;
2367
2368                                 /*
2369                                  * See comments in the walk-along-page loop above about why
2370                                  * only MOVED_OFF tuples should be found here.
2371                                  */
2372                                 if (htup->t_infomask & HEAP_MOVED_IN)
2373                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2374                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2375                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2376                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2377                                         elog(ERROR, "invalid XVAC in tuple header");
2378
2379                                 if (chain_tuple_moved)
2380                                 {
2381                                         /* some chains were moved while cleaning this page */
2382                                         Assert(vacpage->offsets_free > 0);
2383                                         for (i = 0; i < vacpage->offsets_free; i++)
2384                                         {
2385                                                 if (vacpage->offsets[i] == off)
2386                                                         break;
2387                                         }
2388                                         if (i >= vacpage->offsets_free)         /* not found */
2389                                         {
2390                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2391                                                 Assert(keep_tuples > 0);
2392                                                 keep_tuples--;
2393                                         }
2394                                 }
2395                                 else
2396                                 {
2397                                         vacpage->offsets[vacpage->offsets_free++] = off;
2398                                         Assert(keep_tuples > 0);
2399                                         keep_tuples--;
2400                                 }
2401                         }
2402                 }
2403
2404                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2405                 {
2406                         if (chain_tuple_moved)          /* else - they are ordered */
2407                         {
2408                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2409                                           sizeof(OffsetNumber), vac_cmp_offno);
2410                         }
2411                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2412                 }
2413
2414                 ReleaseBuffer(buf);
2415
2416                 if (offnum <= maxoff)
2417                         break;                          /* had to quit early, see above note */
2418
2419         }                                                       /* walk along relation */
2420
2421         blkno++;                                        /* new number of blocks */
2422
2423         if (dst_buffer != InvalidBuffer)
2424         {
2425                 Assert(num_moved > 0);
2426                 ReleaseBuffer(dst_buffer);
2427         }
2428
2429         if (num_moved > 0)
2430         {
2431                 /*
2432                  * We have to commit our tuple movings before we truncate the
2433                  * relation.  Ideally we should do Commit/StartTransactionCommand
2434                  * here, relying on the session-level table lock to protect our
2435                  * exclusive access to the relation.  However, that would require a
2436                  * lot of extra code to close and re-open the relation, indexes, etc.
2437                  * For now, a quick hack: record status of current transaction as
2438                  * committed, and continue.  We force the commit to be synchronous
2439                  * so that it's down to disk before we truncate.  (Note: tqual.c
2440                  * knows that VACUUM FULL always uses sync commit, too.)  The
2441                  * transaction continues to be shown as running in the ProcArray.
2442                  *
2443                  * XXX This desperately needs to be revisited.  Any failure after
2444                  * this point will result in a PANIC "cannot abort transaction nnn,
2445                  * it was already committed"!
2446                  */
2447                 ForceSyncCommit();
2448                 (void) RecordTransactionCommit();
2449         }
2450
2451         /*
2452          * We are not going to move any more tuples across pages, but we still
2453          * need to apply vacuum_page to compact free space in the remaining pages
2454          * in vacuum_pages list.  Note that some of these pages may also be in the
2455          * fraged_pages list, and may have had tuples moved onto them; if so, we
2456          * already did vacuum_page and needn't do it again.
2457          */
2458         for (i = 0, curpage = vacuum_pages->pagedesc;
2459                  i < vacuumed_pages;
2460                  i++, curpage++)
2461         {
2462                 vacuum_delay_point();
2463
2464                 Assert((*curpage)->blkno < blkno);
2465                 if ((*curpage)->offsets_used == 0)
2466                 {
2467                         Buffer          buf;
2468                         Page            page;
2469
2470                         /* this page was not used as a move target, so must clean it */
2471                         buf = ReadBufferWithStrategy(onerel,
2472                                                                                  (*curpage)->blkno,
2473                                                                                  vac_strategy);
2474                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2475                         page = BufferGetPage(buf);
2476                         if (!PageIsEmpty(page))
2477                                 vacuum_page(onerel, buf, *curpage);
2478                         UnlockReleaseBuffer(buf);
2479                 }
2480         }
2481
2482         /*
2483          * Now scan all the pages that we moved tuples onto and update tuple
2484          * status bits.  This is not really necessary, but will save time for
2485          * future transactions examining these tuples.
2486          */
2487         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2488                                          last_move_dest_block, num_moved);
2489
2490         /*
2491          * It'd be cleaner to make this report at the bottom of this routine, but
2492          * then the rusage would double-count the second pass of index vacuuming.
2493          * So do it here and ignore the relatively small amount of processing that
2494          * occurs below.
2495          */
2496         ereport(elevel,
2497                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2498                                         RelationGetRelationName(onerel),
2499                                         num_moved, nblocks, blkno),
2500                          errdetail("%s.",
2501                                            pg_rusage_show(&ru0))));
2502
2503         /*
2504          * Reflect the motion of system tuples to catalog cache here.
2505          */
2506         CommandCounterIncrement();
2507
2508         if (Nvacpagelist.num_pages > 0)
2509         {
2510                 /* vacuum indexes again if needed */
2511                 if (Irel != NULL)
2512                 {
2513                         VacPage    *vpleft,
2514                                            *vpright,
2515                                                 vpsave;
2516
2517                         /* re-sort Nvacpagelist.pagedesc */
2518                         for (vpleft = Nvacpagelist.pagedesc,
2519                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2520                                  vpleft < vpright; vpleft++, vpright--)
2521                         {
2522                                 vpsave = *vpleft;
2523                                 *vpleft = *vpright;
2524                                 *vpright = vpsave;
2525                         }
2526
2527                         /*
2528                          * keep_tuples is the number of tuples that have been moved off a
2529                          * page during chain moves but not been scanned over subsequently.
2530                          * The tuple ids of these tuples are not recorded as free offsets
2531                          * for any VacPage, so they will not be cleared from the indexes.
2532                          */
2533                         Assert(keep_tuples >= 0);
2534                         for (i = 0; i < nindexes; i++)
2535                                 vacuum_index(&Nvacpagelist, Irel[i],
2536                                                          vacrelstats->rel_tuples, keep_tuples);
2537                 }
2538
2539                 /*
2540                  * Clean moved-off tuples from last page in Nvacpagelist list.
2541                  *
2542                  * We need only do this in this one page, because higher-numbered
2543                  * pages are going to be truncated from the relation entirely. But see
2544                  * comments for update_hint_bits().
2545                  */
2546                 if (vacpage->blkno == (blkno - 1) &&
2547                         vacpage->offsets_free > 0)
2548                 {
2549                         Buffer          buf;
2550                         Page            page;
2551                         OffsetNumber unused[MaxOffsetNumber];
2552                         OffsetNumber offnum,
2553                                                 maxoff;
2554                         int                     uncnt;
2555                         int                     num_tuples = 0;
2556
2557                         buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
2558                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2559                         page = BufferGetPage(buf);
2560                         maxoff = PageGetMaxOffsetNumber(page);
2561                         for (offnum = FirstOffsetNumber;
2562                                  offnum <= maxoff;
2563                                  offnum = OffsetNumberNext(offnum))
2564                         {
2565                                 ItemId          itemid = PageGetItemId(page, offnum);
2566                                 HeapTupleHeader htup;
2567
2568                                 if (!ItemIdIsUsed(itemid))
2569                                         continue;
2570                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2571                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2572                                         continue;
2573
2574                                 /*
2575                                  * See comments in the walk-along-page loop above about why
2576                                  * only MOVED_OFF tuples should be found here.
2577                                  */
2578                                 if (htup->t_infomask & HEAP_MOVED_IN)
2579                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2580                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2581                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2582                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2583                                         elog(ERROR, "invalid XVAC in tuple header");
2584
2585                                 ItemIdSetUnused(itemid);
2586                                 num_tuples++;
2587                         }
2588                         Assert(vacpage->offsets_free == num_tuples);
2589
2590                         START_CRIT_SECTION();
2591
2592                         uncnt = PageRepairFragmentation(page, unused);
2593
2594                         MarkBufferDirty(buf);
2595
2596                         /* XLOG stuff */
2597                         if (!onerel->rd_istemp)
2598                         {
2599                                 XLogRecPtr      recptr;
2600
2601                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2602                                 PageSetLSN(page, recptr);
2603                                 PageSetTLI(page, ThisTimeLineID);
2604                         }
2605
2606                         END_CRIT_SECTION();
2607
2608                         UnlockReleaseBuffer(buf);
2609                 }
2610
2611                 /* now - free new list of reaped pages */
2612                 curpage = Nvacpagelist.pagedesc;
2613                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2614                         pfree(*curpage);
2615                 pfree(Nvacpagelist.pagedesc);
2616         }
2617
2618         /* Truncate relation, if needed */
2619         if (blkno < nblocks)
2620         {
2621                 RelationTruncate(onerel, blkno);
2622                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2623         }
2624
2625         /* clean up */
2626         pfree(vacpage);
2627         if (vacrelstats->vtlinks != NULL)
2628                 pfree(vacrelstats->vtlinks);
2629
2630         ExecContext_Finish(&ec);
2631 }
2632
2633 /*
2634  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2635  *
2636  *              This routine moves old_tup from old_page to dst_page.
2637  *              old_page and dst_page might be the same page.
2638  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2639  *              the single lock, if this is a intra-page-move) are released before
2640  *              exit.
2641  *
2642  *              Yes, a routine with ten parameters is ugly, but it's still better
2643  *              than having these 120 lines of code in repair_frag() which is
2644  *              already too long and almost unreadable.
2645  */
2646 static void
2647 move_chain_tuple(Relation rel,
2648                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2649                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2650                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2651 {
2652         TransactionId myXID = GetCurrentTransactionId();
2653         HeapTupleData newtup;
2654         OffsetNumber newoff;
2655         ItemId          newitemid;
2656         Size            tuple_len = old_tup->t_len;
2657
2658         /*
2659          * make a modifiable copy of the source tuple.
2660          */
2661         heap_copytuple_with_tuple(old_tup, &newtup);
2662
2663         /*
2664          * register invalidation of source tuple in catcaches.
2665          */
2666         CacheInvalidateHeapTuple(rel, old_tup);
2667
2668         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2669         START_CRIT_SECTION();
2670
2671         /*
2672          * mark the source tuple MOVED_OFF.
2673          */
2674         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2675                                                                          HEAP_XMIN_INVALID |
2676                                                                          HEAP_MOVED_IN);
2677         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2678         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2679
2680         /*
2681          * If this page was not used before - clean it.
2682          *
2683          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2684          * destination pages to be the same (since this tuple-chain member can be
2685          * on a page lower than the one we're currently processing in the outer
2686          * loop).  If that's true, then after vacuum_page() the source tuple will
2687          * have been moved, and tuple.t_data will be pointing at garbage.
2688          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2689          * step!!
2690          *
2691          * This path is different from the other callers of vacuum_page, because
2692          * we have already incremented the vacpage's offsets_used field to account
2693          * for the tuple(s) we expect to move onto the page. Therefore
2694          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2695          * good debugging check for all other callers, we work around it here
2696          * rather than remove it.
2697          */
2698         if (!PageIsEmpty(dst_page) && cleanVpd)
2699         {
2700                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2701
2702                 dst_vacpage->offsets_used = 0;
2703                 vacuum_page(rel, dst_buf, dst_vacpage);
2704                 dst_vacpage->offsets_used = sv_offsets_used;
2705         }
2706
2707         /*
2708          * Update the state of the copied tuple, and store it on the destination
2709          * page.
2710          */
2711         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2712                                                                    HEAP_XMIN_INVALID |
2713                                                                    HEAP_MOVED_OFF);
2714         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2715         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2716         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2717                                                  InvalidOffsetNumber, false);
2718         if (newoff == InvalidOffsetNumber)
2719                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2720                          (unsigned long) tuple_len, dst_vacpage->blkno);
2721         newitemid = PageGetItemId(dst_page, newoff);
2722         /* drop temporary copy, and point to the version on the dest page */
2723         pfree(newtup.t_data);
2724         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2725
2726         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2727
2728         /*
2729          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2730          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2731          * order, this is actually the previously processed tuple.)
2732          */
2733         if (!ItemPointerIsValid(ctid))
2734                 newtup.t_data->t_ctid = newtup.t_self;
2735         else
2736                 newtup.t_data->t_ctid = *ctid;
2737         *ctid = newtup.t_self;
2738
2739         MarkBufferDirty(dst_buf);
2740         if (dst_buf != old_buf)
2741                 MarkBufferDirty(old_buf);
2742
2743         /* XLOG stuff */
2744         if (!rel->rd_istemp)
2745         {
2746                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2747                                                                                    dst_buf, &newtup);
2748
2749                 if (old_buf != dst_buf)
2750                 {
2751                         PageSetLSN(old_page, recptr);
2752                         PageSetTLI(old_page, ThisTimeLineID);
2753                 }
2754                 PageSetLSN(dst_page, recptr);
2755                 PageSetTLI(dst_page, ThisTimeLineID);
2756         }
2757
2758         END_CRIT_SECTION();
2759
2760         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2761         if (dst_buf != old_buf)
2762                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2763
2764         /* Create index entries for the moved tuple */
2765         if (ec->resultRelInfo->ri_NumIndices > 0)
2766         {
2767                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2768                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2769                 ResetPerTupleExprContext(ec->estate);
2770         }
2771 }
2772
2773 /*
2774  *      move_plain_tuple() -- move one tuple that is not part of a chain
2775  *
2776  *              This routine moves old_tup from old_page to dst_page.
2777  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2778  *              released before exit.
2779  *
2780  *              Yes, a routine with eight parameters is ugly, but it's still better
2781  *              than having these 90 lines of code in repair_frag() which is already
2782  *              too long and almost unreadable.
2783  */
2784 static void
2785 move_plain_tuple(Relation rel,
2786                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2787                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2788                                  ExecContext ec)
2789 {
2790         TransactionId myXID = GetCurrentTransactionId();
2791         HeapTupleData newtup;
2792         OffsetNumber newoff;
2793         ItemId          newitemid;
2794         Size            tuple_len = old_tup->t_len;
2795
2796         /* copy tuple */
2797         heap_copytuple_with_tuple(old_tup, &newtup);
2798
2799         /*
2800          * register invalidation of source tuple in catcaches.
2801          *
2802          * (Note: we do not need to register the copied tuple, because we are not
2803          * changing the tuple contents and so there cannot be any need to flush
2804          * negative catcache entries.)
2805          */
2806         CacheInvalidateHeapTuple(rel, old_tup);
2807
2808         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2809         START_CRIT_SECTION();
2810
2811         /*
2812          * Mark new tuple as MOVED_IN by me.
2813          */
2814         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2815                                                                    HEAP_XMIN_INVALID |
2816                                                                    HEAP_MOVED_OFF);
2817         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2818         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2819
2820         /* add tuple to the page */
2821         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2822                                                  InvalidOffsetNumber, false);
2823         if (newoff == InvalidOffsetNumber)
2824                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2825                          (unsigned long) tuple_len,
2826                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
2827                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
2828         newitemid = PageGetItemId(dst_page, newoff);
2829         pfree(newtup.t_data);
2830         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2831         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
2832         newtup.t_self = newtup.t_data->t_ctid;
2833
2834         /*
2835          * Mark old tuple as MOVED_OFF by me.
2836          */
2837         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2838                                                                          HEAP_XMIN_INVALID |
2839                                                                          HEAP_MOVED_IN);
2840         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2841         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2842
2843         MarkBufferDirty(dst_buf);
2844         MarkBufferDirty(old_buf);
2845
2846         /* XLOG stuff */
2847         if (!rel->rd_istemp)
2848         {
2849                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2850                                                                                    dst_buf, &newtup);
2851
2852                 PageSetLSN(old_page, recptr);
2853                 PageSetTLI(old_page, ThisTimeLineID);
2854                 PageSetLSN(dst_page, recptr);
2855                 PageSetTLI(dst_page, ThisTimeLineID);
2856         }
2857
2858         END_CRIT_SECTION();
2859
2860         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
2861         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2862         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2863
2864         dst_vacpage->offsets_used++;
2865
2866         /* insert index' tuples if needed */
2867         if (ec->resultRelInfo->ri_NumIndices > 0)
2868         {
2869                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2870                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2871                 ResetPerTupleExprContext(ec->estate);
2872         }
2873 }
2874
2875 /*
2876  *      update_hint_bits() -- update hint bits in destination pages
2877  *
2878  * Scan all the pages that we moved tuples onto and update tuple status bits.
2879  * This is not really necessary, but it will save time for future transactions
2880  * examining these tuples.
2881  *
2882  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
2883  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
2884  *
2885  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2886  * pages that were move source pages but not move dest pages.  The bulk
2887  * of the move source pages will be physically truncated from the relation,
2888  * and the last page remaining in the rel will be fixed separately in
2889  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
2890  * hint bits updated are tuples that are moved as part of a chain and were
2891  * on pages that were not either move destinations nor at the end of the rel.
2892  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
2893  * to remember and revisit those pages too.
2894  *
2895  * One wonders whether it wouldn't be better to skip this work entirely,
2896  * and let the tuple status updates happen someplace that's not holding an
2897  * exclusive lock on the relation.
2898  */
2899 static void
2900 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
2901                                  BlockNumber last_move_dest_block, int num_moved)
2902 {
2903         TransactionId myXID = GetCurrentTransactionId();
2904         int                     checked_moved = 0;
2905         int                     i;
2906         VacPage    *curpage;
2907
2908         for (i = 0, curpage = fraged_pages->pagedesc;
2909                  i < num_fraged_pages;
2910                  i++, curpage++)
2911         {
2912                 Buffer          buf;
2913                 Page            page;
2914                 OffsetNumber max_offset;
2915                 OffsetNumber off;
2916                 int                     num_tuples = 0;
2917
2918                 vacuum_delay_point();
2919
2920                 if ((*curpage)->blkno > last_move_dest_block)
2921                         break;                          /* no need to scan any further */
2922                 if ((*curpage)->offsets_used == 0)
2923                         continue;                       /* this page was never used as a move dest */
2924                 buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
2925                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2926                 page = BufferGetPage(buf);
2927                 max_offset = PageGetMaxOffsetNumber(page);
2928                 for (off = FirstOffsetNumber;
2929                          off <= max_offset;
2930                          off = OffsetNumberNext(off))
2931                 {
2932                         ItemId          itemid = PageGetItemId(page, off);
2933                         HeapTupleHeader htup;
2934
2935                         if (!ItemIdIsUsed(itemid))
2936                                 continue;
2937                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
2938                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2939                                 continue;
2940
2941                         /*
2942                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
2943                          */
2944                         if (!(htup->t_infomask & HEAP_MOVED))
2945                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2946                         if (HeapTupleHeaderGetXvac(htup) != myXID)
2947                                 elog(ERROR, "invalid XVAC in tuple header");
2948
2949                         if (htup->t_infomask & HEAP_MOVED_IN)
2950                         {
2951                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
2952                                 htup->t_infomask &= ~HEAP_MOVED;
2953                                 num_tuples++;
2954                         }
2955                         else
2956                                 htup->t_infomask |= HEAP_XMIN_INVALID;
2957                 }
2958                 MarkBufferDirty(buf);
2959                 UnlockReleaseBuffer(buf);
2960                 Assert((*curpage)->offsets_used == num_tuples);
2961                 checked_moved += num_tuples;
2962         }
2963         Assert(num_moved == checked_moved);
2964 }
2965
2966 /*
2967  *      vacuum_heap() -- free dead tuples
2968  *
2969  *              This routine marks dead tuples as unused and truncates relation
2970  *              if there are "empty" end-blocks.
2971  */
2972 static void
2973 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2974 {
2975         Buffer          buf;
2976         VacPage    *vacpage;
2977         BlockNumber relblocks;
2978         int                     nblocks;
2979         int                     i;
2980
2981         nblocks = vacuum_pages->num_pages;
2982         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2983
2984         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2985         {
2986                 vacuum_delay_point();
2987
2988                 if ((*vacpage)->offsets_free > 0)
2989                 {
2990                         buf = ReadBufferWithStrategy(onerel,
2991                                                                                  (*vacpage)->blkno,
2992                                                                                  vac_strategy);
2993                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2994                         vacuum_page(onerel, buf, *vacpage);
2995                         UnlockReleaseBuffer(buf);
2996                 }
2997         }
2998
2999         /* Truncate relation if there are some empty end-pages */
3000         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3001         if (vacuum_pages->empty_end_pages > 0)
3002         {
3003                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3004                 ereport(elevel,
3005                                 (errmsg("\"%s\": truncated %u to %u pages",
3006                                                 RelationGetRelationName(onerel),
3007                                                 vacrelstats->rel_pages, relblocks)));
3008                 RelationTruncate(onerel, relblocks);
3009                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3010         }
3011 }
3012
3013 /*
3014  *      vacuum_page() -- free dead tuples on a page
3015  *                                       and repair its fragmentation.
3016  *
3017  * Caller must hold pin and lock on buffer.
3018  */
3019 static void
3020 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3021 {
3022         OffsetNumber unused[MaxOffsetNumber];
3023         int                     uncnt;
3024         Page            page = BufferGetPage(buffer);
3025         ItemId          itemid;
3026         int                     i;
3027
3028         /* There shouldn't be any tuples moved onto the page yet! */
3029         Assert(vacpage->offsets_used == 0);
3030
3031         START_CRIT_SECTION();
3032
3033         for (i = 0; i < vacpage->offsets_free; i++)
3034         {
3035                 itemid = PageGetItemId(page, vacpage->offsets[i]);
3036                 ItemIdSetUnused(itemid);
3037         }
3038
3039         uncnt = PageRepairFragmentation(page, unused);
3040
3041         MarkBufferDirty(buffer);
3042
3043         /* XLOG stuff */
3044         if (!onerel->rd_istemp)
3045         {
3046                 XLogRecPtr      recptr;
3047
3048                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
3049                 PageSetLSN(page, recptr);
3050                 PageSetTLI(page, ThisTimeLineID);
3051         }
3052
3053         END_CRIT_SECTION();
3054 }
3055
3056 /*
3057  *      scan_index() -- scan one index relation to update pg_class statistics.
3058  *
3059  * We use this when we have no deletions to do.
3060  */
3061 static void
3062 scan_index(Relation indrel, double num_tuples)
3063 {
3064         IndexBulkDeleteResult *stats;
3065         IndexVacuumInfo ivinfo;
3066         PGRUsage        ru0;
3067
3068         pg_rusage_init(&ru0);
3069
3070         ivinfo.index = indrel;
3071         ivinfo.vacuum_full = true;
3072         ivinfo.message_level = elevel;
3073         ivinfo.num_heap_tuples = num_tuples;
3074         ivinfo.strategy = vac_strategy;
3075
3076         stats = index_vacuum_cleanup(&ivinfo, NULL);
3077
3078         if (!stats)
3079                 return;
3080
3081         /* now update statistics in pg_class */
3082         vac_update_relstats(RelationGetRelid(indrel),
3083                                                 stats->num_pages, stats->num_index_tuples,
3084                                                 false, InvalidTransactionId);
3085
3086         ereport(elevel,
3087                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3088                                         RelationGetRelationName(indrel),
3089                                         stats->num_index_tuples,
3090                                         stats->num_pages),
3091         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3092                           "%s.",
3093                           stats->pages_deleted, stats->pages_free,
3094                           pg_rusage_show(&ru0))));
3095
3096         /*
3097          * Check for tuple count mismatch.      If the index is partial, then it's OK
3098          * for it to have fewer tuples than the heap; else we got trouble.
3099          */
3100         if (stats->num_index_tuples != num_tuples)
3101         {
3102                 if (stats->num_index_tuples > num_tuples ||
3103                         !vac_is_partial_index(indrel))
3104                         ereport(WARNING,
3105                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3106                                                         RelationGetRelationName(indrel),
3107                                                         stats->num_index_tuples, num_tuples),
3108                                          errhint("Rebuild the index with REINDEX.")));
3109         }
3110
3111         pfree(stats);
3112 }
3113
3114 /*
3115  *      vacuum_index() -- vacuum one index relation.
3116  *
3117  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3118  *              It's locked. Indrel is an index relation on the vacuumed heap.
3119  *
3120  *              We don't bother to set locks on the index relation here, since
3121  *              the parent table is exclusive-locked already.
3122  *
3123  *              Finally, we arrange to update the index relation's statistics in
3124  *              pg_class.
3125  */
3126 static void
3127 vacuum_index(VacPageList vacpagelist, Relation indrel,
3128                          double num_tuples, int keep_tuples)
3129 {
3130         IndexBulkDeleteResult *stats;
3131         IndexVacuumInfo ivinfo;
3132         PGRUsage        ru0;
3133
3134         pg_rusage_init(&ru0);
3135
3136         ivinfo.index = indrel;
3137         ivinfo.vacuum_full = true;
3138         ivinfo.message_level = elevel;
3139         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3140         ivinfo.strategy = vac_strategy;
3141
3142         /* Do bulk deletion */
3143         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3144
3145         /* Do post-VACUUM cleanup */
3146         stats = index_vacuum_cleanup(&ivinfo, stats);
3147
3148         if (!stats)
3149                 return;
3150
3151         /* now update statistics in pg_class */
3152         vac_update_relstats(RelationGetRelid(indrel),
3153                                                 stats->num_pages, stats->num_index_tuples,
3154                                                 false, InvalidTransactionId);
3155
3156         ereport(elevel,
3157                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3158                                         RelationGetRelationName(indrel),
3159                                         stats->num_index_tuples,
3160                                         stats->num_pages),
3161                          errdetail("%.0f index row versions were removed.\n"
3162                          "%u index pages have been deleted, %u are currently reusable.\n"
3163                                            "%s.",
3164                                            stats->tuples_removed,
3165                                            stats->pages_deleted, stats->pages_free,
3166                                            pg_rusage_show(&ru0))));
3167
3168         /*
3169          * Check for tuple count mismatch.      If the index is partial, then it's OK
3170          * for it to have fewer tuples than the heap; else we got trouble.
3171          */
3172         if (stats->num_index_tuples != num_tuples + keep_tuples)
3173         {
3174                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3175                         !vac_is_partial_index(indrel))
3176                         ereport(WARNING,
3177                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3178                                                         RelationGetRelationName(indrel),
3179                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3180                                          errhint("Rebuild the index with REINDEX.")));
3181         }
3182
3183         pfree(stats);
3184 }
3185
3186 /*
3187  *      tid_reaped() -- is a particular tid reaped?
3188  *
3189  *              This has the right signature to be an IndexBulkDeleteCallback.
3190  *
3191  *              vacpagelist->VacPage_array is sorted in right order.
3192  */
3193 static bool
3194 tid_reaped(ItemPointer itemptr, void *state)
3195 {
3196         VacPageList vacpagelist = (VacPageList) state;
3197         OffsetNumber ioffno;
3198         OffsetNumber *voff;
3199         VacPage         vp,
3200                            *vpp;
3201         VacPageData vacpage;
3202
3203         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3204         ioffno = ItemPointerGetOffsetNumber(itemptr);
3205
3206         vp = &vacpage;
3207         vpp = (VacPage *) vac_bsearch((void *) &vp,
3208                                                                   (void *) (vacpagelist->pagedesc),
3209                                                                   vacpagelist->num_pages,
3210                                                                   sizeof(VacPage),
3211                                                                   vac_cmp_blk);
3212
3213         if (vpp == NULL)
3214                 return false;
3215
3216         /* ok - we are on a partially or fully reaped page */
3217         vp = *vpp;
3218
3219         if (vp->offsets_free == 0)
3220         {
3221                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3222                 return true;
3223         }
3224
3225         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3226                                                                                 (void *) (vp->offsets),
3227                                                                                 vp->offsets_free,
3228                                                                                 sizeof(OffsetNumber),
3229                                                                                 vac_cmp_offno);
3230
3231         if (voff == NULL)
3232                 return false;
3233
3234         /* tid is reaped */
3235         return true;
3236 }
3237
3238 /*
3239  * Update the shared Free Space Map with the info we now have about
3240  * free space in the relation, discarding any old info the map may have.
3241  */
3242 static void
3243 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3244                            BlockNumber rel_pages)
3245 {
3246         int                     nPages = fraged_pages->num_pages;
3247         VacPage    *pagedesc = fraged_pages->pagedesc;
3248         Size            threshold;
3249         PageFreeSpaceInfo *pageSpaces;
3250         int                     outPages;
3251         int                     i;
3252
3253         /*
3254          * We only report pages with free space at least equal to the average
3255          * request size --- this avoids cluttering FSM with uselessly-small bits
3256          * of space.  Although FSM would discard pages with little free space
3257          * anyway, it's important to do this prefiltering because (a) it reduces
3258          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3259          * FSM uses the number of pages reported as a statistic for guiding space
3260          * management.  If we didn't threshold our reports the same way
3261          * vacuumlazy.c does, we'd be skewing that statistic.
3262          */
3263         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3264
3265         pageSpaces = (PageFreeSpaceInfo *)
3266                 palloc(nPages * sizeof(PageFreeSpaceInfo));
3267         outPages = 0;
3268
3269         for (i = 0; i < nPages; i++)
3270         {
3271                 /*
3272                  * fraged_pages may contain entries for pages that we later decided to
3273                  * truncate from the relation; don't enter them into the free space
3274                  * map!
3275                  */
3276                 if (pagedesc[i]->blkno >= rel_pages)
3277                         break;
3278
3279                 if (pagedesc[i]->free >= threshold)
3280                 {
3281                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
3282                         pageSpaces[outPages].avail = pagedesc[i]->free;
3283                         outPages++;
3284                 }
3285         }
3286
3287         RecordRelationFreeSpace(&onerel->rd_node, outPages, outPages, pageSpaces);
3288
3289         pfree(pageSpaces);
3290 }
3291
3292 /* Copy a VacPage structure */
3293 static VacPage
3294 copy_vac_page(VacPage vacpage)
3295 {
3296         VacPage         newvacpage;
3297
3298         /* allocate a VacPageData entry */
3299         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3300                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3301
3302         /* fill it in */
3303         if (vacpage->offsets_free > 0)
3304                 memcpy(newvacpage->offsets, vacpage->offsets,
3305                            vacpage->offsets_free * sizeof(OffsetNumber));
3306         newvacpage->blkno = vacpage->blkno;
3307         newvacpage->free = vacpage->free;
3308         newvacpage->offsets_used = vacpage->offsets_used;
3309         newvacpage->offsets_free = vacpage->offsets_free;
3310
3311         return newvacpage;
3312 }
3313
3314 /*
3315  * Add a VacPage pointer to a VacPageList.
3316  *
3317  *              As a side effect of the way that scan_heap works,
3318  *              higher pages come after lower pages in the array
3319  *              (and highest tid on a page is last).
3320  */
3321 static void
3322 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3323 {
3324 #define PG_NPAGEDESC 1024
3325
3326         /* allocate a VacPage entry if needed */
3327         if (vacpagelist->num_pages == 0)
3328         {
3329                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3330                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3331         }
3332         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3333         {
3334                 vacpagelist->num_allocated_pages *= 2;
3335                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3336         }
3337         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3338         (vacpagelist->num_pages)++;
3339 }
3340
3341 /*
3342  * vac_bsearch: just like standard C library routine bsearch(),
3343  * except that we first test to see whether the target key is outside
3344  * the range of the table entries.      This case is handled relatively slowly
3345  * by the normal binary search algorithm (ie, no faster than any other key)
3346  * but it occurs often enough in VACUUM to be worth optimizing.
3347  */
3348 static void *
3349 vac_bsearch(const void *key, const void *base,
3350                         size_t nelem, size_t size,
3351                         int (*compar) (const void *, const void *))
3352 {
3353         int                     res;
3354         const void *last;
3355
3356         if (nelem == 0)
3357                 return NULL;
3358         res = compar(key, base);
3359         if (res < 0)
3360                 return NULL;
3361         if (res == 0)
3362                 return (void *) base;
3363         if (nelem > 1)
3364         {
3365                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3366                 res = compar(key, last);
3367                 if (res > 0)
3368                         return NULL;
3369                 if (res == 0)
3370                         return (void *) last;
3371         }
3372         if (nelem <= 2)
3373                 return NULL;                    /* already checked 'em all */
3374         return bsearch(key, base, nelem, size, compar);
3375 }
3376
3377 /*
3378  * Comparator routines for use with qsort() and bsearch().
3379  */
3380 static int
3381 vac_cmp_blk(const void *left, const void *right)
3382 {
3383         BlockNumber lblk,
3384                                 rblk;
3385
3386         lblk = (*((VacPage *) left))->blkno;
3387         rblk = (*((VacPage *) right))->blkno;
3388
3389         if (lblk < rblk)
3390                 return -1;
3391         if (lblk == rblk)
3392                 return 0;
3393         return 1;
3394 }
3395
3396 static int
3397 vac_cmp_offno(const void *left, const void *right)
3398 {
3399         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3400                 return -1;
3401         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3402                 return 0;
3403         return 1;
3404 }
3405
3406 static int
3407 vac_cmp_vtlinks(const void *left, const void *right)
3408 {
3409         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3410                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3411                 return -1;
3412         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3413                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3414                 return 1;
3415         /* bi_hi-es are equal */
3416         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3417                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3418                 return -1;
3419         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3420                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3421                 return 1;
3422         /* bi_lo-es are equal */
3423         if (((VTupleLink) left)->new_tid.ip_posid <
3424                 ((VTupleLink) right)->new_tid.ip_posid)
3425                 return -1;
3426         if (((VTupleLink) left)->new_tid.ip_posid >
3427                 ((VTupleLink) right)->new_tid.ip_posid)
3428                 return 1;
3429         return 0;
3430 }
3431
3432
3433 /*
3434  * Open all the indexes of the given relation, obtaining the specified kind
3435  * of lock on each.  Return an array of Relation pointers for the indexes
3436  * into *Irel, and the number of indexes into *nindexes.
3437  */
3438 void
3439 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3440                                  int *nindexes, Relation **Irel)
3441 {
3442         List       *indexoidlist;
3443         ListCell   *indexoidscan;
3444         int                     i;
3445
3446         Assert(lockmode != NoLock);
3447
3448         indexoidlist = RelationGetIndexList(relation);
3449
3450         *nindexes = list_length(indexoidlist);
3451
3452         if (*nindexes > 0)
3453                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3454         else
3455                 *Irel = NULL;
3456
3457         i = 0;
3458         foreach(indexoidscan, indexoidlist)
3459         {
3460                 Oid                     indexoid = lfirst_oid(indexoidscan);
3461
3462                 (*Irel)[i++] = index_open(indexoid, lockmode);
3463         }
3464
3465         list_free(indexoidlist);
3466 }
3467
3468 /*
3469  * Release the resources acquired by vac_open_indexes.  Optionally release
3470  * the locks (say NoLock to keep 'em).
3471  */
3472 void
3473 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3474 {
3475         if (Irel == NULL)
3476                 return;
3477
3478         while (nindexes--)
3479         {
3480                 Relation        ind = Irel[nindexes];
3481
3482                 index_close(ind, lockmode);
3483         }
3484         pfree(Irel);
3485 }
3486
3487
3488 /*
3489  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3490  */
3491 bool
3492 vac_is_partial_index(Relation indrel)
3493 {
3494         /*
3495          * If the index's AM doesn't support nulls, it's partial for our purposes
3496          */
3497         if (!indrel->rd_am->amindexnulls)
3498                 return true;
3499
3500         /* Otherwise, look to see if there's a partial-index predicate */
3501         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3502                 return true;
3503
3504         return false;
3505 }
3506
3507
3508 static bool
3509 enough_space(VacPage vacpage, Size len)
3510 {
3511         len = MAXALIGN(len);
3512
3513         if (len > vacpage->free)
3514                 return false;
3515
3516         /* if there are free itemid(s) and len <= free_space... */
3517         if (vacpage->offsets_used < vacpage->offsets_free)
3518                 return true;
3519
3520         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3521         if (len + sizeof(ItemIdData) <= vacpage->free)
3522                 return true;
3523
3524         return false;
3525 }
3526
3527 static Size
3528 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3529 {
3530         PageHeader      pd = (PageHeader) page;
3531         Size            freespace = pd->pd_upper - pd->pd_lower;
3532         Size            targetfree;
3533
3534         targetfree = RelationGetTargetPageFreeSpace(relation,
3535                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3536         if (freespace > targetfree)
3537                 return freespace - targetfree;
3538         else
3539                 return 0;
3540 }
3541
3542 /*
3543  * vacuum_delay_point --- check for interrupts and cost-based delay.
3544  *
3545  * This should be called in each major loop of VACUUM processing,
3546  * typically once per page processed.
3547  */
3548 void
3549 vacuum_delay_point(void)
3550 {
3551         /* Always check for interrupts */
3552         CHECK_FOR_INTERRUPTS();
3553
3554         /* Nap if appropriate */
3555         if (VacuumCostActive && !InterruptPending &&
3556                 VacuumCostBalance >= VacuumCostLimit)
3557         {
3558                 int                     msec;
3559
3560                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3561                 if (msec > VacuumCostDelay * 4)
3562                         msec = VacuumCostDelay * 4;
3563
3564                 pg_usleep(msec * 1000L);
3565
3566                 VacuumCostBalance = 0;
3567
3568                 /* update balance values for workers */
3569                 AutoVacuumUpdateDelay();
3570
3571                 /* Might have gotten an interrupt while sleeping */
3572                 CHECK_FOR_INTERRUPTS();
3573         }
3574 }