]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Rearrange vacuum-related bits in PGPROC as a bitmask, to better support
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.360 2007/10/24 20:55:36 alvherre Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/transam.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "commands/dbcommands.h"
34 #include "commands/vacuum.h"
35 #include "executor/executor.h"
36 #include "miscadmin.h"
37 #include "postmaster/autovacuum.h"
38 #include "storage/freespace.h"
39 #include "storage/proc.h"
40 #include "storage/procarray.h"
41 #include "utils/acl.h"
42 #include "utils/builtins.h"
43 #include "utils/flatfiles.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/pg_rusage.h"
49 #include "utils/relcache.h"
50 #include "utils/syscache.h"
51 #include "pgstat.h"
52
53
54 /*
55  * GUC parameters
56  */
57 int                     vacuum_freeze_min_age;
58
59 /*
60  * VacPage structures keep track of each page on which we find useful
61  * amounts of free space.
62  */
63 typedef struct VacPageData
64 {
65         BlockNumber blkno;                      /* BlockNumber of this Page */
66         Size            free;                   /* FreeSpace on this Page */
67         uint16          offsets_used;   /* Number of OffNums used by vacuum */
68         uint16          offsets_free;   /* Number of OffNums free or to be free */
69         OffsetNumber offsets[1];        /* Array of free OffNums */
70 } VacPageData;
71
72 typedef VacPageData *VacPage;
73
74 typedef struct VacPageListData
75 {
76         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
77         int                     num_pages;              /* Number of pages in pagedesc */
78         int                     num_allocated_pages;    /* Number of allocated pages in
79                                                                                  * pagedesc */
80         VacPage    *pagedesc;           /* Descriptions of pages */
81 } VacPageListData;
82
83 typedef VacPageListData *VacPageList;
84
85 /*
86  * The "vtlinks" array keeps information about each recently-updated tuple
87  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
88  * We store the tuple's own TID as well as its t_ctid (its link to the next
89  * newer tuple version).  Searching in this array allows us to follow update
90  * chains backwards from newer to older tuples.  When we move a member of an
91  * update chain, we must move *all* the live members of the chain, so that we
92  * can maintain their t_ctid link relationships (we must not just overwrite
93  * t_ctid in an existing tuple).
94  *
95  * Note: because t_ctid links can be stale (this would only occur if a prior
96  * VACUUM crashed partway through), it is possible that new_tid points to an
97  * empty slot or unrelated tuple.  We have to check the linkage as we follow
98  * it, just as is done in EvalPlanQual.
99  */
100 typedef struct VTupleLinkData
101 {
102         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
103         ItemPointerData this_tid;       /* t_self of the tuple */
104 } VTupleLinkData;
105
106 typedef VTupleLinkData *VTupleLink;
107
108 /*
109  * We use an array of VTupleMoveData to plan a chain tuple move fully
110  * before we do it.
111  */
112 typedef struct VTupleMoveData
113 {
114         ItemPointerData tid;            /* tuple ID */
115         VacPage         vacpage;                /* where to move it to */
116         bool            cleanVpd;               /* clean vacpage before using? */
117 } VTupleMoveData;
118
119 typedef VTupleMoveData *VTupleMove;
120
121 /*
122  * VRelStats contains the data acquired by scan_heap for use later
123  */
124 typedef struct VRelStats
125 {
126         /* miscellaneous statistics */
127         BlockNumber rel_pages;          /* pages in relation */
128         double          rel_tuples;             /* tuples that remain after vacuuming */
129         double          rel_indexed_tuples;             /* indexed tuples that remain */
130         Size            min_tlen;               /* min surviving tuple size */
131         Size            max_tlen;               /* max surviving tuple size */
132         bool            hasindex;
133         /* vtlinks array for tuple chain following - sorted by new_tid */
134         int                     num_vtlinks;
135         VTupleLink      vtlinks;
136 } VRelStats;
137
138 /*----------------------------------------------------------------------
139  * ExecContext:
140  *
141  * As these variables always appear together, we put them into one struct
142  * and pull initialization and cleanup into separate routines.
143  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
144  * accurately:  It is *used* only in move_xxx_tuple(), but because this
145  * routine is called many times, we initialize the struct just once in
146  * repair_frag() and pass it on to move_xxx_tuple().
147  */
148 typedef struct ExecContextData
149 {
150         ResultRelInfo *resultRelInfo;
151         EState     *estate;
152         TupleTableSlot *slot;
153 } ExecContextData;
154
155 typedef ExecContextData *ExecContext;
156
157 static void
158 ExecContext_Init(ExecContext ec, Relation rel)
159 {
160         TupleDesc       tupdesc = RelationGetDescr(rel);
161
162         /*
163          * We need a ResultRelInfo and an EState so we can use the regular
164          * executor's index-entry-making machinery.
165          */
166         ec->estate = CreateExecutorState();
167
168         ec->resultRelInfo = makeNode(ResultRelInfo);
169         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
170         ec->resultRelInfo->ri_RelationDesc = rel;
171         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
172
173         ExecOpenIndices(ec->resultRelInfo);
174
175         ec->estate->es_result_relations = ec->resultRelInfo;
176         ec->estate->es_num_result_relations = 1;
177         ec->estate->es_result_relation_info = ec->resultRelInfo;
178
179         /* Set up a tuple slot too */
180         ec->slot = MakeSingleTupleTableSlot(tupdesc);
181 }
182
183 static void
184 ExecContext_Finish(ExecContext ec)
185 {
186         ExecDropSingleTupleTableSlot(ec->slot);
187         ExecCloseIndices(ec->resultRelInfo);
188         FreeExecutorState(ec->estate);
189 }
190
191 /*
192  * End of ExecContext Implementation
193  *----------------------------------------------------------------------
194  */
195
196 /* A few variables that don't seem worth passing around as parameters */
197 static MemoryContext vac_context = NULL;
198
199 static int      elevel = -1;
200
201 static TransactionId OldestXmin;
202 static TransactionId FreezeLimit;
203
204 static BufferAccessStrategy vac_strategy;
205
206
207 /* non-export function prototypes */
208 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
209                          const char *stmttype);
210 static void vac_truncate_clog(TransactionId frozenXID);
211 static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
212 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
213 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
214                   VacPageList vacuum_pages, VacPageList fraged_pages);
215 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
216                         VacPageList vacuum_pages, VacPageList fraged_pages,
217                         int nindexes, Relation *Irel);
218 static void move_chain_tuple(Relation rel,
219                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
220                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
221                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
222 static void move_plain_tuple(Relation rel,
223                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
224                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
225                                  ExecContext ec);
226 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
227                                  int num_fraged_pages, BlockNumber last_move_dest_block,
228                                  int num_moved);
229 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
230                         VacPageList vacpagelist);
231 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
232 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
233                          double num_tuples, int keep_tuples);
234 static void scan_index(Relation indrel, double num_tuples);
235 static bool tid_reaped(ItemPointer itemptr, void *state);
236 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
237                            BlockNumber rel_pages);
238 static VacPage copy_vac_page(VacPage vacpage);
239 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
240 static void *vac_bsearch(const void *key, const void *base,
241                         size_t nelem, size_t size,
242                         int (*compar) (const void *, const void *));
243 static int      vac_cmp_blk(const void *left, const void *right);
244 static int      vac_cmp_offno(const void *left, const void *right);
245 static int      vac_cmp_vtlinks(const void *left, const void *right);
246 static bool enough_space(VacPage vacpage, Size len);
247 static Size PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
248
249
250 /****************************************************************************
251  *                                                                                                                                                      *
252  *                      Code common to all flavors of VACUUM and ANALYZE                                *
253  *                                                                                                                                                      *
254  ****************************************************************************
255  */
256
257
258 /*
259  * Primary entry point for VACUUM and ANALYZE commands.
260  *
261  * relids is normally NIL; if it is not, then it provides the list of
262  * relation OIDs to be processed, and vacstmt->relation is ignored.
263  * (The non-NIL case is currently only used by autovacuum.)
264  *
265  * bstrategy is normally given as NULL, but in autovacuum it can be passed
266  * in to use the same buffer strategy object across multiple vacuum() calls.
267  *
268  * isTopLevel should be passed down from ProcessUtility.
269  *
270  * It is the caller's responsibility that vacstmt, relids, and bstrategy
271  * (if given) be allocated in a memory context that won't disappear
272  * at transaction commit.
273  */
274 void
275 vacuum(VacuumStmt *vacstmt, List *relids,
276            BufferAccessStrategy bstrategy, bool isTopLevel)
277 {
278         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
279         volatile MemoryContext anl_context = NULL;
280         volatile bool all_rels,
281                                 in_outer_xact,
282                                 use_own_xacts;
283         List       *relations;
284
285         if (vacstmt->verbose)
286                 elevel = INFO;
287         else
288                 elevel = DEBUG2;
289
290         /*
291          * We cannot run VACUUM inside a user transaction block; if we were inside
292          * a transaction, then our commit- and start-transaction-command calls
293          * would not have the intended effect! Furthermore, the forced commit that
294          * occurs before truncating the relation's file would have the effect of
295          * committing the rest of the user's transaction too, which would
296          * certainly not be the desired behavior.  (This only applies to VACUUM
297          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
298          * block, but we choose to disallow that case because we'd rather commit
299          * as soon as possible after finishing the vacuum.      This is mainly so that
300          * we can let go the AccessExclusiveLock that we may be holding.)
301          *
302          * ANALYZE (without VACUUM) can run either way.
303          */
304         if (vacstmt->vacuum)
305         {
306                 PreventTransactionChain(isTopLevel, stmttype);
307                 in_outer_xact = false;
308         }
309         else
310                 in_outer_xact = IsInTransactionChain(isTopLevel);
311
312         /*
313          * Send info about dead objects to the statistics collector, unless we are
314          * in autovacuum --- autovacuum.c does this for itself.
315          */
316         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
317                 pgstat_vacuum_tabstat();
318
319         /*
320          * Create special memory context for cross-transaction storage.
321          *
322          * Since it is a child of PortalContext, it will go away eventually even
323          * if we suffer an error; there's no need for special abort cleanup logic.
324          */
325         vac_context = AllocSetContextCreate(PortalContext,
326                                                                                 "Vacuum",
327                                                                                 ALLOCSET_DEFAULT_MINSIZE,
328                                                                                 ALLOCSET_DEFAULT_INITSIZE,
329                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
330
331         /*
332          * If caller didn't give us a buffer strategy object, make one in the
333          * cross-transaction memory context.
334          */
335         if (bstrategy == NULL)
336         {
337                 MemoryContext old_context = MemoryContextSwitchTo(vac_context);
338
339                 bstrategy = GetAccessStrategy(BAS_VACUUM);
340                 MemoryContextSwitchTo(old_context);
341         }
342         vac_strategy = bstrategy;
343
344         /* Remember whether we are processing everything in the DB */
345         all_rels = (relids == NIL && vacstmt->relation == NULL);
346
347         /*
348          * Build list of relations to process, unless caller gave us one. (If we
349          * build one, we put it in vac_context for safekeeping.)
350          */
351         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
352
353         /*
354          * Decide whether we need to start/commit our own transactions.
355          *
356          * For VACUUM (with or without ANALYZE): always do so, so that we can
357          * release locks as soon as possible.  (We could possibly use the outer
358          * transaction for a one-table VACUUM, but handling TOAST tables would be
359          * problematic.)
360          *
361          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
362          * start/commit our own transactions.  Also, there's no need to do so if
363          * only processing one relation.  For multiple relations when not within a
364          * transaction block, and also in an autovacuum worker, use own
365          * transactions so we can release locks sooner.
366          */
367         if (vacstmt->vacuum)
368                 use_own_xacts = true;
369         else
370         {
371                 Assert(vacstmt->analyze);
372                 if (IsAutoVacuumWorkerProcess())
373                         use_own_xacts = true;
374                 else if (in_outer_xact)
375                         use_own_xacts = false;
376                 else if (list_length(relations) > 1)
377                         use_own_xacts = true;
378                 else
379                         use_own_xacts = false;
380         }
381
382         /*
383          * If we are running ANALYZE without per-table transactions, we'll need a
384          * memory context with table lifetime.
385          */
386         if (!use_own_xacts)
387                 anl_context = AllocSetContextCreate(PortalContext,
388                                                                                         "Analyze",
389                                                                                         ALLOCSET_DEFAULT_MINSIZE,
390                                                                                         ALLOCSET_DEFAULT_INITSIZE,
391                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
392
393         /*
394          * vacuum_rel expects to be entered with no transaction active; it will
395          * start and commit its own transaction.  But we are called by an SQL
396          * command, and so we are executing inside a transaction already. We
397          * commit the transaction started in PostgresMain() here, and start
398          * another one before exiting to match the commit waiting for us back in
399          * PostgresMain().
400          */
401         if (use_own_xacts)
402         {
403                 /* matches the StartTransaction in PostgresMain() */
404                 CommitTransactionCommand();
405         }
406
407         /* Turn vacuum cost accounting on or off */
408         PG_TRY();
409         {
410                 ListCell   *cur;
411
412                 VacuumCostActive = (VacuumCostDelay > 0);
413                 VacuumCostBalance = 0;
414
415                 /*
416                  * Loop to process each selected relation.
417                  */
418                 foreach(cur, relations)
419                 {
420                         Oid                     relid = lfirst_oid(cur);
421
422                         if (vacstmt->vacuum)
423                                 vacuum_rel(relid, vacstmt, RELKIND_RELATION);
424
425                         if (vacstmt->analyze)
426                         {
427                                 MemoryContext old_context = NULL;
428
429                                 /*
430                                  * If using separate xacts, start one for analyze. Otherwise,
431                                  * we can use the outer transaction, but we still need to call
432                                  * analyze_rel in a memory context that will be cleaned up on
433                                  * return (else we leak memory while processing multiple
434                                  * tables).
435                                  */
436                                 if (use_own_xacts)
437                                 {
438                                         StartTransactionCommand();
439                                         /* functions in indexes may want a snapshot set */
440                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
441                                 }
442                                 else
443                                         old_context = MemoryContextSwitchTo(anl_context);
444
445                                 analyze_rel(relid, vacstmt, vac_strategy);
446
447                                 if (use_own_xacts)
448                                         CommitTransactionCommand();
449                                 else
450                                 {
451                                         MemoryContextSwitchTo(old_context);
452                                         MemoryContextResetAndDeleteChildren(anl_context);
453                                 }
454                         }
455                 }
456         }
457         PG_CATCH();
458         {
459                 /* Make sure cost accounting is turned off after error */
460                 VacuumCostActive = false;
461                 PG_RE_THROW();
462         }
463         PG_END_TRY();
464
465         /* Turn off vacuum cost accounting */
466         VacuumCostActive = false;
467
468         /*
469          * Finish up processing.
470          */
471         if (use_own_xacts)
472         {
473                 /* here, we are not in a transaction */
474
475                 /*
476                  * This matches the CommitTransaction waiting for us in
477                  * PostgresMain().
478                  */
479                 StartTransactionCommand();
480
481                 /*
482                  * Re-establish the transaction snapshot.  This is wasted effort when
483                  * we are called as a normal utility command, because the new
484                  * transaction will be dropped immediately by PostgresMain(); but it's
485                  * necessary if we are called from autovacuum because autovacuum might
486                  * continue on to do an ANALYZE-only call.
487                  */
488                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
489         }
490
491         if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
492         {
493                 /*
494                  * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
495                  * (autovacuum.c does this for itself.)
496                  */
497                 vac_update_datfrozenxid();
498
499                 /*
500                  * If it was a database-wide VACUUM, print FSM usage statistics (we
501                  * don't make you be superuser to see these).  We suppress this in
502                  * autovacuum, too.
503                  */
504                 if (all_rels)
505                         PrintFreeSpaceMapStatistics(elevel);
506         }
507
508         /*
509          * Clean up working storage --- note we must do this after
510          * StartTransactionCommand, else we might be trying to delete the active
511          * context!
512          */
513         MemoryContextDelete(vac_context);
514         vac_context = NULL;
515
516         if (anl_context)
517                 MemoryContextDelete(anl_context);
518 }
519
520 /*
521  * Build a list of Oids for each relation to be processed
522  *
523  * The list is built in vac_context so that it will survive across our
524  * per-relation transactions.
525  */
526 static List *
527 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
528 {
529         List       *oid_list = NIL;
530         MemoryContext oldcontext;
531
532         /* List supplied by VACUUM's caller? */
533         if (relids)
534                 return relids;
535
536         if (vacrel)
537         {
538                 /* Process a specific relation */
539                 Oid                     relid;
540
541                 relid = RangeVarGetRelid(vacrel, false);
542
543                 /* Make a relation list entry for this guy */
544                 oldcontext = MemoryContextSwitchTo(vac_context);
545                 oid_list = lappend_oid(oid_list, relid);
546                 MemoryContextSwitchTo(oldcontext);
547         }
548         else
549         {
550                 /* Process all plain relations listed in pg_class */
551                 Relation        pgclass;
552                 HeapScanDesc scan;
553                 HeapTuple       tuple;
554                 ScanKeyData key;
555
556                 ScanKeyInit(&key,
557                                         Anum_pg_class_relkind,
558                                         BTEqualStrategyNumber, F_CHAREQ,
559                                         CharGetDatum(RELKIND_RELATION));
560
561                 pgclass = heap_open(RelationRelationId, AccessShareLock);
562
563                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
564
565                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
566                 {
567                         /* Make a relation list entry for this guy */
568                         oldcontext = MemoryContextSwitchTo(vac_context);
569                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
570                         MemoryContextSwitchTo(oldcontext);
571                 }
572
573                 heap_endscan(scan);
574                 heap_close(pgclass, AccessShareLock);
575         }
576
577         return oid_list;
578 }
579
580 /*
581  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
582  */
583 void
584 vacuum_set_xid_limits(int freeze_min_age, bool sharedRel,
585                                           TransactionId *oldestXmin,
586                                           TransactionId *freezeLimit)
587 {
588         int                     freezemin;
589         TransactionId limit;
590         TransactionId safeLimit;
591
592         /*
593          * We can always ignore processes running lazy vacuum.  This is because we
594          * use these values only for deciding which tuples we must keep in the
595          * tables.      Since lazy vacuum doesn't write its XID anywhere, it's
596          * safe to ignore it.  In theory it could be problematic to ignore lazy
597          * vacuums on a full vacuum, but keep in mind that only one vacuum process
598          * can be working on a particular table at any time, and that each vacuum
599          * is always an independent transaction.
600          */
601         *oldestXmin = GetOldestXmin(sharedRel, true);
602
603         Assert(TransactionIdIsNormal(*oldestXmin));
604
605         /*
606          * Determine the minimum freeze age to use: as specified by the caller,
607          * or vacuum_freeze_min_age, but in any case not more than half
608          * autovacuum_freeze_max_age, so that autovacuums to prevent XID
609          * wraparound won't occur too frequently.
610          */
611         freezemin = freeze_min_age;
612         if (freezemin < 0)
613                 freezemin = vacuum_freeze_min_age;
614         freezemin = Min(freezemin, autovacuum_freeze_max_age / 2);
615         Assert(freezemin >= 0);
616
617         /*
618          * Compute the cutoff XID, being careful not to generate a "permanent" XID
619          */
620         limit = *oldestXmin - freezemin;
621         if (!TransactionIdIsNormal(limit))
622                 limit = FirstNormalTransactionId;
623
624         /*
625          * If oldestXmin is very far back (in practice, more than
626          * autovacuum_freeze_max_age / 2 XIDs old), complain and force a
627          * minimum freeze age of zero.
628          */
629         safeLimit = ReadNewTransactionId() - autovacuum_freeze_max_age;
630         if (!TransactionIdIsNormal(safeLimit))
631                 safeLimit = FirstNormalTransactionId;
632
633         if (TransactionIdPrecedes(limit, safeLimit))
634         {
635                 ereport(WARNING,
636                                 (errmsg("oldest xmin is far in the past"),
637                                  errhint("Close open transactions soon to avoid wraparound problems.")));
638                 limit = *oldestXmin;
639         }
640
641         *freezeLimit = limit;
642 }
643
644
645 /*
646  *      vac_update_relstats() -- update statistics for one relation
647  *
648  *              Update the whole-relation statistics that are kept in its pg_class
649  *              row.  There are additional stats that will be updated if we are
650  *              doing ANALYZE, but we always update these stats.  This routine works
651  *              for both index and heap relation entries in pg_class.
652  *
653  *              We violate transaction semantics here by overwriting the rel's
654  *              existing pg_class tuple with the new values.  This is reasonably
655  *              safe since the new values are correct whether or not this transaction
656  *              commits.  The reason for this is that if we updated these tuples in
657  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
658  *              by the time we got done with a vacuum cycle, most of the tuples in
659  *              pg_class would've been obsoleted.  Of course, this only works for
660  *              fixed-size never-null columns, but these are.
661  *
662  *              Another reason for doing it this way is that when we are in a lazy
663  *              VACUUM and have PROC_IN_VACUUM set, we mustn't do any updates ---
664  *              somebody vacuuming pg_class might think they could delete a tuple
665  *              marked with xmin = our xid.
666  *
667  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
668  *              ANALYZE.
669  */
670 void
671 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
672                                         bool hasindex, TransactionId frozenxid)
673 {
674         Relation        rd;
675         HeapTuple       ctup;
676         Form_pg_class pgcform;
677         bool            dirty;
678
679         rd = heap_open(RelationRelationId, RowExclusiveLock);
680
681         /* Fetch a copy of the tuple to scribble on */
682         ctup = SearchSysCacheCopy(RELOID,
683                                                           ObjectIdGetDatum(relid),
684                                                           0, 0, 0);
685         if (!HeapTupleIsValid(ctup))
686                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
687                          relid);
688         pgcform = (Form_pg_class) GETSTRUCT(ctup);
689
690         /* Apply required updates, if any, to copied tuple */
691
692         dirty = false;
693         if (pgcform->relpages != (int32) num_pages)
694         {
695                 pgcform->relpages = (int32) num_pages;
696                 dirty = true;
697         }
698         if (pgcform->reltuples != (float4) num_tuples)
699         {
700                 pgcform->reltuples = (float4) num_tuples;
701                 dirty = true;
702         }
703         if (pgcform->relhasindex != hasindex)
704         {
705                 pgcform->relhasindex = hasindex;
706                 dirty = true;
707         }
708
709         /*
710          * If we have discovered that there are no indexes, then there's no
711          * primary key either.  This could be done more thoroughly...
712          */
713         if (!hasindex)
714         {
715                 if (pgcform->relhaspkey)
716                 {
717                         pgcform->relhaspkey = false;
718                         dirty = true;
719                 }
720         }
721
722         /*
723          * relfrozenxid should never go backward.  Caller can pass
724          * InvalidTransactionId if it has no new data.
725          */
726         if (TransactionIdIsNormal(frozenxid) &&
727                 TransactionIdPrecedes(pgcform->relfrozenxid, frozenxid))
728         {
729                 pgcform->relfrozenxid = frozenxid;
730                 dirty = true;
731         }
732
733         /*
734          * If anything changed, write out the tuple.  Even if nothing changed,
735          * force relcache invalidation so all backends reset their rd_targblock
736          * --- otherwise it might point to a page we truncated away.
737          */
738         if (dirty)
739         {
740                 heap_inplace_update(rd, ctup);
741                 /* the above sends a cache inval message */
742         }
743         else
744         {
745                 /* no need to change tuple, but force relcache inval anyway */
746                 CacheInvalidateRelcacheByTuple(ctup);
747         }
748
749         heap_close(rd, RowExclusiveLock);
750 }
751
752
753 /*
754  *      vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
755  *
756  *              Update pg_database's datfrozenxid entry for our database to be the
757  *              minimum of the pg_class.relfrozenxid values.  If we are able to
758  *              advance pg_database.datfrozenxid, also try to truncate pg_clog.
759  *
760  *              We violate transaction semantics here by overwriting the database's
761  *              existing pg_database tuple with the new value.  This is reasonably
762  *              safe since the new value is correct whether or not this transaction
763  *              commits.  As with vac_update_relstats, this avoids leaving dead tuples
764  *              behind after a VACUUM.
765  *
766  *              This routine is shared by full and lazy VACUUM.
767  */
768 void
769 vac_update_datfrozenxid(void)
770 {
771         HeapTuple       tuple;
772         Form_pg_database dbform;
773         Relation        relation;
774         SysScanDesc scan;
775         HeapTuple       classTup;
776         TransactionId newFrozenXid;
777         bool            dirty = false;
778
779         /*
780          * Initialize the "min" calculation with RecentGlobalXmin.  Any
781          * not-yet-committed pg_class entries for new tables must have
782          * relfrozenxid at least this high, because any other open xact must have
783          * RecentXmin >= its PGPROC.xmin >= our RecentGlobalXmin; see
784          * AddNewRelationTuple().  So we cannot produce a wrong minimum by
785          * starting with this.
786          */
787         newFrozenXid = RecentGlobalXmin;
788
789         /*
790          * We must seqscan pg_class to find the minimum Xid, because there is no
791          * index that can help us here.
792          */
793         relation = heap_open(RelationRelationId, AccessShareLock);
794
795         scan = systable_beginscan(relation, InvalidOid, false,
796                                                           SnapshotNow, 0, NULL);
797
798         while ((classTup = systable_getnext(scan)) != NULL)
799         {
800                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTup);
801
802                 /*
803                  * Only consider heap and TOAST tables (anything else should have
804                  * InvalidTransactionId in relfrozenxid anyway.)
805                  */
806                 if (classForm->relkind != RELKIND_RELATION &&
807                         classForm->relkind != RELKIND_TOASTVALUE)
808                         continue;
809
810                 Assert(TransactionIdIsNormal(classForm->relfrozenxid));
811
812                 if (TransactionIdPrecedes(classForm->relfrozenxid, newFrozenXid))
813                         newFrozenXid = classForm->relfrozenxid;
814         }
815
816         /* we're done with pg_class */
817         systable_endscan(scan);
818         heap_close(relation, AccessShareLock);
819
820         Assert(TransactionIdIsNormal(newFrozenXid));
821
822         /* Now fetch the pg_database tuple we need to update. */
823         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
824
825         /* Fetch a copy of the tuple to scribble on */
826         tuple = SearchSysCacheCopy(DATABASEOID,
827                                                            ObjectIdGetDatum(MyDatabaseId),
828                                                            0, 0, 0);
829         if (!HeapTupleIsValid(tuple))
830                 elog(ERROR, "could not find tuple for database %u", MyDatabaseId);
831         dbform = (Form_pg_database) GETSTRUCT(tuple);
832
833         /*
834          * Don't allow datfrozenxid to go backward (probably can't happen anyway);
835          * and detect the common case where it doesn't go forward either.
836          */
837         if (TransactionIdPrecedes(dbform->datfrozenxid, newFrozenXid))
838         {
839                 dbform->datfrozenxid = newFrozenXid;
840                 dirty = true;
841         }
842
843         if (dirty)
844                 heap_inplace_update(relation, tuple);
845
846         heap_freetuple(tuple);
847         heap_close(relation, RowExclusiveLock);
848
849         /*
850          * If we were able to advance datfrozenxid, mark the flat-file copy of
851          * pg_database for update at commit, and see if we can truncate
852          * pg_clog.
853          */
854         if (dirty)
855         {
856                 database_file_update_needed();
857                 vac_truncate_clog(newFrozenXid);
858         }
859 }
860
861
862 /*
863  *      vac_truncate_clog() -- attempt to truncate the commit log
864  *
865  *              Scan pg_database to determine the system-wide oldest datfrozenxid,
866  *              and use it to truncate the transaction commit log (pg_clog).
867  *              Also update the XID wrap limit info maintained by varsup.c.
868  *
869  *              The passed XID is simply the one I just wrote into my pg_database
870  *              entry.  It's used to initialize the "min" calculation.
871  *
872  *              This routine is shared by full and lazy VACUUM.  Note that it's
873  *              only invoked when we've managed to change our DB's datfrozenxid
874  *              entry.
875  */
876 static void
877 vac_truncate_clog(TransactionId frozenXID)
878 {
879         TransactionId myXID = GetCurrentTransactionId();
880         Relation        relation;
881         HeapScanDesc scan;
882         HeapTuple       tuple;
883         NameData        oldest_datname;
884         bool            frozenAlreadyWrapped = false;
885
886         /* init oldest_datname to sync with my frozenXID */
887         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
888
889         /*
890          * Scan pg_database to compute the minimum datfrozenxid
891          *
892          * Note: we need not worry about a race condition with new entries being
893          * inserted by CREATE DATABASE.  Any such entry will have a copy of some
894          * existing DB's datfrozenxid, and that source DB cannot be ours because
895          * of the interlock against copying a DB containing an active backend.
896          * Hence the new entry will not reduce the minimum.  Also, if two
897          * VACUUMs concurrently modify the datfrozenxid's of different databases,
898          * the worst possible outcome is that pg_clog is not truncated as
899          * aggressively as it could be.
900          */
901         relation = heap_open(DatabaseRelationId, AccessShareLock);
902
903         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
904
905         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
906         {
907                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
908
909                 Assert(TransactionIdIsNormal(dbform->datfrozenxid));
910
911                 if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
912                         frozenAlreadyWrapped = true;
913                 else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
914                 {
915                         frozenXID = dbform->datfrozenxid;
916                         namecpy(&oldest_datname, &dbform->datname);
917                 }
918         }
919
920         heap_endscan(scan);
921
922         heap_close(relation, AccessShareLock);
923
924         /*
925          * Do not truncate CLOG if we seem to have suffered wraparound already;
926          * the computed minimum XID might be bogus.  This case should now be
927          * impossible due to the defenses in GetNewTransactionId, but we keep the
928          * test anyway.
929          */
930         if (frozenAlreadyWrapped)
931         {
932                 ereport(WARNING,
933                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
934                                  errdetail("You might have already suffered transaction-wraparound data loss.")));
935                 return;
936         }
937
938         /* Truncate CLOG to the oldest frozenxid */
939         TruncateCLOG(frozenXID);
940
941         /*
942          * Update the wrap limit for GetNewTransactionId.  Note: this function
943          * will also signal the postmaster for an(other) autovac cycle if needed.
944          */
945         SetTransactionIdLimit(frozenXID, &oldest_datname);
946 }
947
948
949 /****************************************************************************
950  *                                                                                                                                                      *
951  *                      Code common to both flavors of VACUUM                                                   *
952  *                                                                                                                                                      *
953  ****************************************************************************
954  */
955
956
957 /*
958  *      vacuum_rel() -- vacuum one heap relation
959  *
960  *              Doing one heap at a time incurs extra overhead, since we need to
961  *              check that the heap exists again just before we vacuum it.      The
962  *              reason that we do this is so that vacuuming can be spread across
963  *              many small transactions.  Otherwise, two-phase locking would require
964  *              us to lock the entire database during one pass of the vacuum cleaner.
965  *
966  *              At entry and exit, we are not inside a transaction.
967  */
968 static void
969 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
970 {
971         LOCKMODE        lmode;
972         Relation        onerel;
973         LockRelId       onerelid;
974         Oid                     toast_relid;
975
976         /* Begin a transaction for vacuuming this relation */
977         StartTransactionCommand();
978
979         if (vacstmt->full)
980         {
981                 /* functions in indexes may want a snapshot set */
982                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
983         }
984         else
985         {
986                 /*
987                  * During a lazy VACUUM we do not run any user-supplied functions, and
988                  * so it should be safe to not create a transaction snapshot.
989                  *
990                  * We can furthermore set the PROC_IN_VACUUM flag, which lets other
991                  * concurrent VACUUMs know that they can ignore this one while
992                  * determining their OldestXmin.  (The reason we don't set it
993                  * during a full VACUUM is exactly that we may have to run user-
994                  * defined functions for functional indexes, and we want to make sure
995                  * that if they use the snapshot set above, any tuples it requires
996                  * can't get removed from other tables.  An index function that
997                  * depends on the contents of other tables is arguably broken, but we
998                  * won't break it here by violating transaction semantics.)
999                  *
1000                  * Note: this flag remains set until CommitTransaction or
1001                  * AbortTransaction.  We don't want to clear it until we reset
1002                  * MyProc->xid/xmin, else OldestXmin might appear to go backwards,
1003                  * which is probably Not Good.
1004                  */
1005                 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1006                 MyProc->vacuumFlags |= PROC_IN_VACUUM;
1007                 LWLockRelease(ProcArrayLock);
1008         }
1009
1010         /*
1011          * Check for user-requested abort.      Note we want this to be inside a
1012          * transaction, so xact.c doesn't issue useless WARNING.
1013          */
1014         CHECK_FOR_INTERRUPTS();
1015
1016         /*
1017          * Determine the type of lock we want --- hard exclusive lock for a FULL
1018          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
1019          * way, we can be sure that no other backend is vacuuming the same table.
1020          */
1021         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
1022
1023         /*
1024          * Open the relation and get the appropriate lock on it.
1025          *
1026          * There's a race condition here: the rel may have gone away since the
1027          * last time we saw it.  If so, we don't need to vacuum it.
1028          */
1029         onerel = try_relation_open(relid, lmode);
1030
1031         if (!onerel)
1032         {
1033                 CommitTransactionCommand();
1034                 return;
1035         }
1036
1037         /*
1038          * Check permissions.
1039          *
1040          * We allow the user to vacuum a table if he is superuser, the table
1041          * owner, or the database owner (but in the latter case, only if it's not
1042          * a shared relation).  pg_class_ownercheck includes the superuser case.
1043          *
1044          * Note we choose to treat permissions failure as a WARNING and keep
1045          * trying to vacuum the rest of the DB --- is this appropriate?
1046          */
1047         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1048                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1049         {
1050                 ereport(WARNING,
1051                                 (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1052                                                 RelationGetRelationName(onerel))));
1053                 relation_close(onerel, lmode);
1054                 CommitTransactionCommand();
1055                 return;
1056         }
1057
1058         /*
1059          * Check that it's a plain table; we used to do this in get_rel_oids() but
1060          * seems safer to check after we've locked the relation.
1061          */
1062         if (onerel->rd_rel->relkind != expected_relkind)
1063         {
1064                 ereport(WARNING,
1065                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1066                                                 RelationGetRelationName(onerel))));
1067                 relation_close(onerel, lmode);
1068                 CommitTransactionCommand();
1069                 return;
1070         }
1071
1072         /*
1073          * Silently ignore tables that are temp tables of other backends ---
1074          * trying to vacuum these will lead to great unhappiness, since their
1075          * contents are probably not up-to-date on disk.  (We don't throw a
1076          * warning here; it would just lead to chatter during a database-wide
1077          * VACUUM.)
1078          */
1079         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1080         {
1081                 relation_close(onerel, lmode);
1082                 CommitTransactionCommand();
1083                 return;
1084         }
1085
1086         /*
1087          * Get a session-level lock too. This will protect our access to the
1088          * relation across multiple transactions, so that we can vacuum the
1089          * relation's TOAST table (if any) secure in the knowledge that no one is
1090          * deleting the parent relation.
1091          *
1092          * NOTE: this cannot block, even if someone else is waiting for access,
1093          * because the lock manager knows that both lock requests are from the
1094          * same process.
1095          */
1096         onerelid = onerel->rd_lockInfo.lockRelId;
1097         LockRelationIdForSession(&onerelid, lmode);
1098
1099         /*
1100          * Remember the relation's TOAST relation for later
1101          */
1102         toast_relid = onerel->rd_rel->reltoastrelid;
1103
1104         /*
1105          * Do the actual work --- either FULL or "lazy" vacuum
1106          */
1107         if (vacstmt->full)
1108                 full_vacuum_rel(onerel, vacstmt);
1109         else
1110                 lazy_vacuum_rel(onerel, vacstmt, vac_strategy);
1111
1112         /* all done with this class, but hold lock until commit */
1113         relation_close(onerel, NoLock);
1114
1115         /*
1116          * Complete the transaction and free all temporary memory used.
1117          */
1118         CommitTransactionCommand();
1119
1120         /*
1121          * If the relation has a secondary toast rel, vacuum that too while we
1122          * still hold the session lock on the master table.  Note however that
1123          * "analyze" will not get done on the toast table.      This is good, because
1124          * the toaster always uses hardcoded index access and statistics are
1125          * totally unimportant for toast relations.
1126          */
1127         if (toast_relid != InvalidOid)
1128                 vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
1129
1130         /*
1131          * Now release the session-level lock on the master table.
1132          */
1133         UnlockRelationIdForSession(&onerelid, lmode);
1134 }
1135
1136
1137 /****************************************************************************
1138  *                                                                                                                                                      *
1139  *                      Code for VACUUM FULL (only)                                                                             *
1140  *                                                                                                                                                      *
1141  ****************************************************************************
1142  */
1143
1144
1145 /*
1146  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1147  *
1148  *              This routine vacuums a single heap, cleans out its indexes, and
1149  *              updates its num_pages and num_tuples statistics.
1150  *
1151  *              At entry, we have already established a transaction and opened
1152  *              and locked the relation.
1153  */
1154 static void
1155 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1156 {
1157         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1158                                                                                  * clean indexes */
1159         VacPageListData fraged_pages;           /* List of pages with space enough for
1160                                                                                  * re-using */
1161         Relation   *Irel;
1162         int                     nindexes,
1163                                 i;
1164         VRelStats  *vacrelstats;
1165
1166         vacuum_set_xid_limits(vacstmt->freeze_min_age, onerel->rd_rel->relisshared,
1167                                                   &OldestXmin, &FreezeLimit);
1168
1169         /*
1170          * Flush any previous async-commit transactions.  This does not guarantee
1171          * that we will be able to set hint bits for tuples they inserted, but
1172          * it improves the probability, especially in simple sequential-commands
1173          * cases.  See scan_heap() and repair_frag() for more about this.
1174          */
1175         XLogAsyncCommitFlush();
1176
1177         /*
1178          * Set up statistics-gathering machinery.
1179          */
1180         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1181         vacrelstats->rel_pages = 0;
1182         vacrelstats->rel_tuples = 0;
1183         vacrelstats->rel_indexed_tuples = 0;
1184         vacrelstats->hasindex = false;
1185
1186         /* scan the heap */
1187         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1188         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1189
1190         /* Now open all indexes of the relation */
1191         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1192         if (nindexes > 0)
1193                 vacrelstats->hasindex = true;
1194
1195         /* Clean/scan index relation(s) */
1196         if (Irel != NULL)
1197         {
1198                 if (vacuum_pages.num_pages > 0)
1199                 {
1200                         for (i = 0; i < nindexes; i++)
1201                                 vacuum_index(&vacuum_pages, Irel[i],
1202                                                          vacrelstats->rel_indexed_tuples, 0);
1203                 }
1204                 else
1205                 {
1206                         /* just scan indexes to update statistic */
1207                         for (i = 0; i < nindexes; i++)
1208                                 scan_index(Irel[i], vacrelstats->rel_indexed_tuples);
1209                 }
1210         }
1211
1212         if (fraged_pages.num_pages > 0)
1213         {
1214                 /* Try to shrink heap */
1215                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1216                                         nindexes, Irel);
1217                 vac_close_indexes(nindexes, Irel, NoLock);
1218         }
1219         else
1220         {
1221                 vac_close_indexes(nindexes, Irel, NoLock);
1222                 if (vacuum_pages.num_pages > 0)
1223                 {
1224                         /* Clean pages from vacuum_pages list */
1225                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1226                 }
1227         }
1228
1229         /* update shared free space map with final free space info */
1230         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1231
1232         /* update statistics in pg_class */
1233         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1234                                                 vacrelstats->rel_tuples, vacrelstats->hasindex,
1235                                                 FreezeLimit);
1236
1237         /* report results to the stats collector, too */
1238         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1239                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1240 }
1241
1242
1243 /*
1244  *      scan_heap() -- scan an open heap relation
1245  *
1246  *              This routine sets commit status bits, constructs vacuum_pages (list
1247  *              of pages we need to compact free space on and/or clean indexes of
1248  *              deleted tuples), constructs fraged_pages (list of pages with free
1249  *              space that tuples could be moved into), and calculates statistics
1250  *              on the number of live tuples in the heap.
1251  */
1252 static void
1253 scan_heap(VRelStats *vacrelstats, Relation onerel,
1254                   VacPageList vacuum_pages, VacPageList fraged_pages)
1255 {
1256         BlockNumber nblocks,
1257                                 blkno;
1258         char       *relname;
1259         VacPage         vacpage;
1260         BlockNumber empty_pages,
1261                                 empty_end_pages;
1262         double          num_tuples,
1263                                 num_indexed_tuples,
1264                                 tups_vacuumed,
1265                                 nkeep,
1266                                 nunused;
1267         double          free_space,
1268                                 usable_free_space;
1269         Size            min_tlen = MaxHeapTupleSize;
1270         Size            max_tlen = 0;
1271         bool            do_shrinking = true;
1272         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1273         int                     num_vtlinks = 0;
1274         int                     free_vtlinks = 100;
1275         PGRUsage        ru0;
1276
1277         pg_rusage_init(&ru0);
1278
1279         relname = RelationGetRelationName(onerel);
1280         ereport(elevel,
1281                         (errmsg("vacuuming \"%s.%s\"",
1282                                         get_namespace_name(RelationGetNamespace(onerel)),
1283                                         relname)));
1284
1285         empty_pages = empty_end_pages = 0;
1286         num_tuples = num_indexed_tuples = tups_vacuumed = nkeep = nunused = 0;
1287         free_space = 0;
1288
1289         nblocks = RelationGetNumberOfBlocks(onerel);
1290
1291         /*
1292          * We initially create each VacPage item in a maximal-sized workspace,
1293          * then copy the workspace into a just-large-enough copy.
1294          */
1295         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1296
1297         for (blkno = 0; blkno < nblocks; blkno++)
1298         {
1299                 Page            page,
1300                                         tempPage = NULL;
1301                 bool            do_reap,
1302                                         do_frag;
1303                 Buffer          buf;
1304                 OffsetNumber offnum,
1305                                         maxoff;
1306                 bool            notup;
1307                 OffsetNumber frozen[MaxOffsetNumber];
1308                 int                     nfrozen;
1309
1310                 vacuum_delay_point();
1311
1312                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1313                 page = BufferGetPage(buf);
1314
1315                 /*
1316                  * Since we are holding exclusive lock on the relation, no other
1317                  * backend can be accessing the page; however it is possible that the
1318                  * background writer will try to write the page if it's already marked
1319                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1320                  * must take exclusive buffer lock wherever we potentially modify
1321                  * pages.  In fact, we insist on cleanup lock so that we can safely
1322                  * call heap_page_prune().  (This might be overkill, since the bgwriter
1323                  * pays no attention to individual tuples, but on the other hand it's
1324                  * unlikely that the bgwriter has this particular page pinned at this
1325                  * instant.  So violating the coding rule would buy us little anyway.)
1326                  */
1327                 LockBufferForCleanup(buf);
1328
1329                 vacpage->blkno = blkno;
1330                 vacpage->offsets_used = 0;
1331                 vacpage->offsets_free = 0;
1332
1333                 if (PageIsNew(page))
1334                 {
1335                         VacPage         vacpagecopy;
1336
1337                         ereport(WARNING,
1338                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1339                                            relname, blkno)));
1340                         PageInit(page, BufferGetPageSize(buf), 0);
1341                         MarkBufferDirty(buf);
1342                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1343                         free_space += vacpage->free;
1344                         empty_pages++;
1345                         empty_end_pages++;
1346                         vacpagecopy = copy_vac_page(vacpage);
1347                         vpage_insert(vacuum_pages, vacpagecopy);
1348                         vpage_insert(fraged_pages, vacpagecopy);
1349                         UnlockReleaseBuffer(buf);
1350                         continue;
1351                 }
1352
1353                 if (PageIsEmpty(page))
1354                 {
1355                         VacPage         vacpagecopy;
1356
1357                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1358                         free_space += vacpage->free;
1359                         empty_pages++;
1360                         empty_end_pages++;
1361                         vacpagecopy = copy_vac_page(vacpage);
1362                         vpage_insert(vacuum_pages, vacpagecopy);
1363                         vpage_insert(fraged_pages, vacpagecopy);
1364                         UnlockReleaseBuffer(buf);
1365                         continue;
1366                 }
1367
1368                 /* 
1369                  * Prune all HOT-update chains in this page.
1370                  *
1371                  * We use the redirect_move option so that redirecting line pointers
1372                  * get collapsed out; this allows us to not worry about them below.
1373                  *
1374                  * We count tuples removed by the pruning step as removed by VACUUM.
1375                  */
1376                 tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin,
1377                                                                                  true, false);
1378
1379                 /*
1380                  * Now scan the page to collect vacuumable items and check for
1381                  * tuples requiring freezing.
1382                  */
1383                 nfrozen = 0;
1384                 notup = true;
1385                 maxoff = PageGetMaxOffsetNumber(page);
1386                 for (offnum = FirstOffsetNumber;
1387                          offnum <= maxoff;
1388                          offnum = OffsetNumberNext(offnum))
1389                 {
1390                         ItemId          itemid = PageGetItemId(page, offnum);
1391                         bool            tupgone = false;
1392                         HeapTupleData tuple;
1393
1394                         /*
1395                          * Collect un-used items too - it's possible to have indexes
1396                          * pointing here after crash.  (That's an ancient comment and
1397                          * is likely obsolete with WAL, but we might as well continue
1398                          * to check for such problems.)
1399                          */
1400                         if (!ItemIdIsUsed(itemid))
1401                         {
1402                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1403                                 nunused += 1;
1404                                 continue;
1405                         }
1406
1407                         /*
1408                          * DEAD item pointers are to be vacuumed normally; but we don't
1409                          * count them in tups_vacuumed, else we'd be double-counting
1410                          * (at least in the common case where heap_page_prune() just
1411                          * freed up a non-HOT tuple).
1412                          */
1413                         if (ItemIdIsDead(itemid))
1414                         {
1415                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1416                                 continue;
1417                         }
1418
1419                         /* Shouldn't have any redirected items anymore */
1420                         if (!ItemIdIsNormal(itemid))
1421                                 elog(ERROR, "relation \"%s\" TID %u/%u: unexpected redirect item",
1422                                          relname, blkno, offnum);
1423
1424                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1425                         tuple.t_len = ItemIdGetLength(itemid);
1426                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1427
1428                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1429                         {
1430                                 case HEAPTUPLE_LIVE:
1431                                         /* Tuple is good --- but let's do some validity checks */
1432                                         if (onerel->rd_rel->relhasoids &&
1433                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1434                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1435                                                          relname, blkno, offnum);
1436                                         /*
1437                                          * The shrinkage phase of VACUUM FULL requires that all
1438                                          * live tuples have XMIN_COMMITTED set --- see comments in
1439                                          * repair_frag()'s walk-along-page loop.  Use of async
1440                                          * commit may prevent HeapTupleSatisfiesVacuum from
1441                                          * setting the bit for a recently committed tuple.  Rather
1442                                          * than trying to handle this corner case, we just give up
1443                                          * and don't shrink.
1444                                          */
1445                                         if (do_shrinking &&
1446                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1447                                         {
1448                                                 ereport(LOG,
1449                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1450                                                                                 relname, blkno, offnum,
1451                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1452                                                 do_shrinking = false;
1453                                         }
1454                                         break;
1455                                 case HEAPTUPLE_DEAD:
1456                                         /*
1457                                          * Ordinarily, DEAD tuples would have been removed by
1458                                          * heap_page_prune(), but it's possible that the tuple
1459                                          * state changed since heap_page_prune() looked.  In
1460                                          * particular an INSERT_IN_PROGRESS tuple could have
1461                                          * changed to DEAD if the inserter aborted.  So this
1462                                          * cannot be considered an error condition, though it
1463                                          * does suggest that someone released a lock early.
1464                                          *
1465                                          * If the tuple is HOT-updated then it must only be
1466                                          * removed by a prune operation; so we keep it as if it
1467                                          * were RECENTLY_DEAD, and abandon shrinking. (XXX is it
1468                                          * worth trying to make the shrinking code smart enough
1469                                          * to handle this?  It's an unusual corner case.)
1470                                          *
1471                                          * DEAD heap-only tuples can safely be removed if they
1472                                          * aren't themselves HOT-updated, although this is a bit
1473                                          * inefficient since we'll uselessly try to remove
1474                                          * index entries for them.
1475                                          */
1476                                         if (HeapTupleIsHotUpdated(&tuple))
1477                                         {
1478                                                 nkeep += 1;
1479                                                 if (do_shrinking)
1480                                                         ereport(LOG,
1481                                                                         (errmsg("relation \"%s\" TID %u/%u: dead HOT-updated tuple --- cannot shrink relation",
1482                                                                                         relname, blkno, offnum)));
1483                                                 do_shrinking = false;
1484                                         }
1485                                         else
1486                                         {
1487                                                 tupgone = true;         /* we can delete the tuple */
1488                                                 /*
1489                                                  * We need not require XMIN_COMMITTED or
1490                                                  * XMAX_COMMITTED to be set, since we will remove the
1491                                                  * tuple without any further examination of its hint
1492                                                  * bits.
1493                                                  */
1494                                         }
1495                                         break;
1496                                 case HEAPTUPLE_RECENTLY_DEAD:
1497
1498                                         /*
1499                                          * If tuple is recently deleted then we must not remove it
1500                                          * from relation.
1501                                          */
1502                                         nkeep += 1;
1503
1504                                         /*
1505                                          * As with the LIVE case, shrinkage requires XMIN_COMMITTED
1506                                          * to be set.
1507                                          */
1508                                         if (do_shrinking &&
1509                                                 !(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1510                                         {
1511                                                 ereport(LOG,
1512                                                                 (errmsg("relation \"%s\" TID %u/%u: XMIN_COMMITTED not set for transaction %u --- cannot shrink relation",
1513                                                                                 relname, blkno, offnum,
1514                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1515                                                 do_shrinking = false;
1516                                         }
1517
1518                                         /*
1519                                          * If we do shrinking and this tuple is updated one then
1520                                          * remember it to construct updated tuple dependencies.
1521                                          */
1522                                         if (do_shrinking &&
1523                                                 !(ItemPointerEquals(&(tuple.t_self),
1524                                                                                         &(tuple.t_data->t_ctid))))
1525                                         {
1526                                                 if (free_vtlinks == 0)
1527                                                 {
1528                                                         free_vtlinks = 1000;
1529                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1530                                                                                            (free_vtlinks + num_vtlinks) *
1531                                                                                                          sizeof(VTupleLinkData));
1532                                                 }
1533                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1534                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1535                                                 free_vtlinks--;
1536                                                 num_vtlinks++;
1537                                         }
1538                                         break;
1539                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1540
1541                                         /*
1542                                          * This should not happen, since we hold exclusive lock on
1543                                          * the relation; shouldn't we raise an error?  (Actually,
1544                                          * it can happen in system catalogs, since we tend to
1545                                          * release write lock before commit there.)  As above,
1546                                          * we can't apply repair_frag() if the tuple state is
1547                                          * uncertain.
1548                                          */
1549                                         if (do_shrinking)
1550                                                 ereport(LOG,
1551                                                                 (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- cannot shrink relation",
1552                                                                                 relname, blkno, offnum,
1553                                                                                 HeapTupleHeaderGetXmin(tuple.t_data))));
1554                                         do_shrinking = false;
1555                                         break;
1556                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1557
1558                                         /*
1559                                          * This should not happen, since we hold exclusive lock on
1560                                          * the relation; shouldn't we raise an error?  (Actually,
1561                                          * it can happen in system catalogs, since we tend to
1562                                          * release write lock before commit there.)  As above,
1563                                          * we can't apply repair_frag() if the tuple state is
1564                                          * uncertain.
1565                                          */
1566                                         if (do_shrinking)
1567                                                 ereport(LOG,
1568                                                                 (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- cannot shrink relation",
1569                                                                                 relname, blkno, offnum,
1570                                                                                 HeapTupleHeaderGetXmax(tuple.t_data))));
1571                                         do_shrinking = false;
1572                                         break;
1573                                 default:
1574                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1575                                         break;
1576                         }
1577
1578                         if (tupgone)
1579                         {
1580                                 ItemId          lpp;
1581
1582                                 /*
1583                                  * Here we are building a temporary copy of the page with dead
1584                                  * tuples removed.      Below we will apply
1585                                  * PageRepairFragmentation to the copy, so that we can
1586                                  * determine how much space will be available after removal of
1587                                  * dead tuples.  But note we are NOT changing the real page
1588                                  * yet...
1589                                  */
1590                                 if (tempPage == NULL)
1591                                 {
1592                                         Size            pageSize;
1593
1594                                         pageSize = PageGetPageSize(page);
1595                                         tempPage = (Page) palloc(pageSize);
1596                                         memcpy(tempPage, page, pageSize);
1597                                 }
1598
1599                                 /* mark it unused on the temp page */
1600                                 lpp = PageGetItemId(tempPage, offnum);
1601                                 ItemIdSetUnused(lpp);
1602
1603                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1604                                 tups_vacuumed += 1;
1605                         }
1606                         else
1607                         {
1608                                 num_tuples += 1;
1609                                 if (!HeapTupleIsHeapOnly(&tuple))
1610                                         num_indexed_tuples += 1;
1611                                 notup = false;
1612                                 if (tuple.t_len < min_tlen)
1613                                         min_tlen = tuple.t_len;
1614                                 if (tuple.t_len > max_tlen)
1615                                         max_tlen = tuple.t_len;
1616
1617                                 /*
1618                                  * Each non-removable tuple must be checked to see if it
1619                                  * needs freezing.
1620                                  */
1621                                 if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
1622                                                                           InvalidBuffer))
1623                                         frozen[nfrozen++] = offnum;
1624                         }
1625                 }                                               /* scan along page */
1626
1627                 if (tempPage != NULL)
1628                 {
1629                         /* Some tuples are removable; figure free space after removal */
1630                         PageRepairFragmentation(tempPage);
1631                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1632                         pfree(tempPage);
1633                         do_reap = true;
1634                 }
1635                 else
1636                 {
1637                         /* Just use current available space */
1638                         vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1639                         /* Need to reap the page if it has UNUSED or DEAD line pointers */
1640                         do_reap = (vacpage->offsets_free > 0);
1641                 }
1642
1643                 free_space += vacpage->free;
1644
1645                 /*
1646                  * Add the page to fraged_pages if it has a useful amount of free
1647                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1648                  * don't know that accurately near the start of the relation, so add
1649                  * pages unconditionally if they have >= BLCKSZ/10 free space.
1650                  */
1651                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1652
1653                 if (do_reap || do_frag)
1654                 {
1655                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1656
1657                         if (do_reap)
1658                                 vpage_insert(vacuum_pages, vacpagecopy);
1659                         if (do_frag)
1660                                 vpage_insert(fraged_pages, vacpagecopy);
1661                 }
1662
1663                 /*
1664                  * Include the page in empty_end_pages if it will be empty after
1665                  * vacuuming; this is to keep us from using it as a move destination.
1666                  */
1667                 if (notup)
1668                 {
1669                         empty_pages++;
1670                         empty_end_pages++;
1671                 }
1672                 else
1673                         empty_end_pages = 0;
1674
1675                 /*
1676                  * If we froze any tuples, mark the buffer dirty, and write a WAL
1677                  * record recording the changes.  We must log the changes to be
1678                  * crash-safe against future truncation of CLOG.
1679                  */
1680                 if (nfrozen > 0)
1681                 {
1682                         MarkBufferDirty(buf);
1683                         /* no XLOG for temp tables, though */
1684                         if (!onerel->rd_istemp)
1685                         {
1686                                 XLogRecPtr      recptr;
1687
1688                                 recptr = log_heap_freeze(onerel, buf, FreezeLimit,
1689                                                                                  frozen, nfrozen);
1690                                 PageSetLSN(page, recptr);
1691                                 PageSetTLI(page, ThisTimeLineID);
1692                         }
1693                 }
1694
1695                 UnlockReleaseBuffer(buf);
1696         }
1697
1698         pfree(vacpage);
1699
1700         /* save stats in the rel list for use later */
1701         vacrelstats->rel_tuples = num_tuples;
1702         vacrelstats->rel_indexed_tuples = num_indexed_tuples;
1703         vacrelstats->rel_pages = nblocks;
1704         if (num_tuples == 0)
1705                 min_tlen = max_tlen = 0;
1706         vacrelstats->min_tlen = min_tlen;
1707         vacrelstats->max_tlen = max_tlen;
1708
1709         vacuum_pages->empty_end_pages = empty_end_pages;
1710         fraged_pages->empty_end_pages = empty_end_pages;
1711
1712         /*
1713          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1714          * remove any "empty" end-pages from the list, and compute usable free
1715          * space = free space in remaining pages.
1716          */
1717         if (do_shrinking)
1718         {
1719                 int                     i;
1720
1721                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1722                 fraged_pages->num_pages -= empty_end_pages;
1723                 usable_free_space = 0;
1724                 for (i = 0; i < fraged_pages->num_pages; i++)
1725                         usable_free_space += fraged_pages->pagedesc[i]->free;
1726         }
1727         else
1728         {
1729                 fraged_pages->num_pages = 0;
1730                 usable_free_space = 0;
1731         }
1732
1733         /* don't bother to save vtlinks if we will not call repair_frag */
1734         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1735         {
1736                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1737                           vac_cmp_vtlinks);
1738                 vacrelstats->vtlinks = vtlinks;
1739                 vacrelstats->num_vtlinks = num_vtlinks;
1740         }
1741         else
1742         {
1743                 vacrelstats->vtlinks = NULL;
1744                 vacrelstats->num_vtlinks = 0;
1745                 pfree(vtlinks);
1746         }
1747
1748         ereport(elevel,
1749                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1750                                         RelationGetRelationName(onerel),
1751                                         tups_vacuumed, num_tuples, nblocks),
1752                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1753                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1754                                            "There were %.0f unused item pointers.\n"
1755            "Total free space (including removable row versions) is %.0f bytes.\n"
1756                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1757          "%u pages containing %.0f free bytes are potential move destinations.\n"
1758                                            "%s.",
1759                                            nkeep,
1760                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1761                                            nunused,
1762                                            free_space,
1763                                            empty_pages, empty_end_pages,
1764                                            fraged_pages->num_pages, usable_free_space,
1765                                            pg_rusage_show(&ru0))));
1766 }
1767
1768
1769 /*
1770  *      repair_frag() -- try to repair relation's fragmentation
1771  *
1772  *              This routine marks dead tuples as unused and tries re-use dead space
1773  *              by moving tuples (and inserting indexes if needed). It constructs
1774  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1775  *              for them after committing (in hack-manner - without losing locks
1776  *              and freeing memory!) current transaction. It truncates relation
1777  *              if some end-blocks are gone away.
1778  */
1779 static void
1780 repair_frag(VRelStats *vacrelstats, Relation onerel,
1781                         VacPageList vacuum_pages, VacPageList fraged_pages,
1782                         int nindexes, Relation *Irel)
1783 {
1784         TransactionId myXID = GetCurrentTransactionId();
1785         Buffer          dst_buffer = InvalidBuffer;
1786         BlockNumber nblocks,
1787                                 blkno;
1788         BlockNumber last_move_dest_block = 0,
1789                                 last_vacuum_block;
1790         Page            dst_page = NULL;
1791         ExecContextData ec;
1792         VacPageListData Nvacpagelist;
1793         VacPage         dst_vacpage = NULL,
1794                                 last_vacuum_page,
1795                                 vacpage,
1796                            *curpage;
1797         int                     i;
1798         int                     num_moved = 0,
1799                                 num_fraged_pages,
1800                                 vacuumed_pages;
1801         int                     keep_tuples = 0;
1802         int                     keep_indexed_tuples = 0;
1803         PGRUsage        ru0;
1804
1805         pg_rusage_init(&ru0);
1806
1807         ExecContext_Init(&ec, onerel);
1808
1809         Nvacpagelist.num_pages = 0;
1810         num_fraged_pages = fraged_pages->num_pages;
1811         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1812         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1813         if (vacuumed_pages > 0)
1814         {
1815                 /* get last reaped page from vacuum_pages */
1816                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1817                 last_vacuum_block = last_vacuum_page->blkno;
1818         }
1819         else
1820         {
1821                 last_vacuum_page = NULL;
1822                 last_vacuum_block = InvalidBlockNumber;
1823         }
1824
1825         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1826         vacpage->offsets_used = vacpage->offsets_free = 0;
1827
1828         /*
1829          * Scan pages backwards from the last nonempty page, trying to move tuples
1830          * down to lower pages.  Quit when we reach a page that we have moved any
1831          * tuples onto, or the first page if we haven't moved anything, or when we
1832          * find a page we cannot completely empty (this last condition is handled
1833          * by "break" statements within the loop).
1834          *
1835          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1836          * in order by blkno.
1837          */
1838         nblocks = vacrelstats->rel_pages;
1839         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1840                  blkno > last_move_dest_block;
1841                  blkno--)
1842         {
1843                 Buffer          buf;
1844                 Page            page;
1845                 OffsetNumber offnum,
1846                                         maxoff;
1847                 bool            isempty,
1848                                         chain_tuple_moved;
1849
1850                 vacuum_delay_point();
1851
1852                 /*
1853                  * Forget fraged_pages pages at or after this one; they're no longer
1854                  * useful as move targets, since we only want to move down. Note that
1855                  * since we stop the outer loop at last_move_dest_block, pages removed
1856                  * here cannot have had anything moved onto them already.
1857                  *
1858                  * Also note that we don't change the stored fraged_pages list, only
1859                  * our local variable num_fraged_pages; so the forgotten pages are
1860                  * still available to be loaded into the free space map later.
1861                  */
1862                 while (num_fraged_pages > 0 &&
1863                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1864                 {
1865                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1866                         --num_fraged_pages;
1867                 }
1868
1869                 /*
1870                  * Process this page of relation.
1871                  */
1872                 buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
1873                 page = BufferGetPage(buf);
1874
1875                 vacpage->offsets_free = 0;
1876
1877                 isempty = PageIsEmpty(page);
1878
1879                 /* Is the page in the vacuum_pages list? */
1880                 if (blkno == last_vacuum_block)
1881                 {
1882                         if (last_vacuum_page->offsets_free > 0)
1883                         {
1884                                 /* there are dead tuples on this page - clean them */
1885                                 Assert(!isempty);
1886                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1887                                 vacuum_page(onerel, buf, last_vacuum_page);
1888                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1889                         }
1890                         else
1891                                 Assert(isempty);
1892                         --vacuumed_pages;
1893                         if (vacuumed_pages > 0)
1894                         {
1895                                 /* get prev reaped page from vacuum_pages */
1896                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1897                                 last_vacuum_block = last_vacuum_page->blkno;
1898                         }
1899                         else
1900                         {
1901                                 last_vacuum_page = NULL;
1902                                 last_vacuum_block = InvalidBlockNumber;
1903                         }
1904                         if (isempty)
1905                         {
1906                                 ReleaseBuffer(buf);
1907                                 continue;
1908                         }
1909                 }
1910                 else
1911                         Assert(!isempty);
1912
1913                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1914                                                                                  * this page, yet */
1915                 vacpage->blkno = blkno;
1916                 maxoff = PageGetMaxOffsetNumber(page);
1917                 for (offnum = FirstOffsetNumber;
1918                          offnum <= maxoff;
1919                          offnum = OffsetNumberNext(offnum))
1920                 {
1921                         Size            tuple_len;
1922                         HeapTupleData tuple;
1923                         ItemId          itemid = PageGetItemId(page, offnum);
1924
1925                         if (!ItemIdIsUsed(itemid))
1926                                 continue;
1927
1928                         if (ItemIdIsDead(itemid))
1929                         {
1930                                 /* just remember it for vacuum_page() */
1931                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1932                                 continue;
1933                         }
1934
1935                         /* Shouldn't have any redirected items now */
1936                         Assert(ItemIdIsNormal(itemid));
1937
1938                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1939                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1940                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1941
1942                         /* ---
1943                          * VACUUM FULL has an exclusive lock on the relation.  So
1944                          * normally no other transaction can have pending INSERTs or
1945                          * DELETEs in this relation.  A tuple is either:
1946                          *              (a) live (XMIN_COMMITTED)
1947                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1948                          *                      is visible to all active transactions)
1949                          *              (c) inserted and deleted (XMIN_COMMITTED+XMAX_COMMITTED)
1950                          *                      but at least one active transaction does not see the
1951                          *                      deleting transaction (ie, it's RECENTLY_DEAD)
1952                          *              (d) moved by the currently running VACUUM
1953                          *              (e) inserted or deleted by a not yet committed transaction,
1954                          *                      or by a transaction we couldn't set XMIN_COMMITTED for.
1955                          * In case (e) we wouldn't be in repair_frag() at all, because
1956                          * scan_heap() detects those cases and shuts off shrinking.
1957                          * We can't see case (b) here either, because such tuples were
1958                          * already removed by vacuum_page().  Cases (a) and (c) are
1959                          * normal and will have XMIN_COMMITTED set.  Case (d) is only
1960                          * possible if a whole tuple chain has been moved while
1961                          * processing this or a higher numbered block.
1962                          * ---
1963                          */
1964                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1965                         {
1966                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1967                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1968                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
1969                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
1970
1971                                 /*
1972                                  * MOVED_OFF by another VACUUM would have caused the
1973                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
1974                                  */
1975                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1976                                         elog(ERROR, "invalid XVAC in tuple header");
1977
1978                                 /*
1979                                  * If this (chain) tuple is moved by me already then I have to
1980                                  * check is it in vacpage or not - i.e. is it moved while
1981                                  * cleaning this page or some previous one.
1982                                  */
1983
1984                                 /* Can't we Assert(keep_tuples > 0) here? */
1985                                 if (keep_tuples == 0)
1986                                         continue;
1987                                 if (chain_tuple_moved)
1988                                 {
1989                                         /* some chains were moved while cleaning this page */
1990                                         Assert(vacpage->offsets_free > 0);
1991                                         for (i = 0; i < vacpage->offsets_free; i++)
1992                                         {
1993                                                 if (vacpage->offsets[i] == offnum)
1994                                                         break;
1995                                         }
1996                                         if (i >= vacpage->offsets_free)         /* not found */
1997                                         {
1998                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1999                                                 /*
2000                                                  * If this is not a heap-only tuple, there must be an
2001                                                  * index entry for this item which will be removed in
2002                                                  * the index cleanup. Decrement the keep_indexed_tuples
2003                                                  * count to remember this.
2004                                                  */
2005                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2006                                                         keep_indexed_tuples--;
2007                                                 keep_tuples--;
2008                                         }
2009                                 }
2010                                 else
2011                                 {
2012                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2013                                         /*
2014                                          * If this is not a heap-only tuple, there must be an
2015                                          * index entry for this item which will be removed in
2016                                          * the index cleanup. Decrement the keep_indexed_tuples
2017                                          * count to remember this.
2018                                          */
2019                                         if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2020                                                 keep_indexed_tuples--;
2021                                         keep_tuples--;
2022                                 }
2023                                 continue;
2024                         }
2025
2026                         /*
2027                          * If this tuple is in a chain of tuples created in updates by
2028                          * "recent" transactions then we have to move the whole chain of
2029                          * tuples to other places, so that we can write new t_ctid links
2030                          * that preserve the chain relationship.
2031                          *
2032                          * This test is complicated.  Read it as "if tuple is a recently
2033                          * created updated version, OR if it is an obsoleted version". (In
2034                          * the second half of the test, we needn't make any check on XMAX
2035                          * --- it must be recently obsoleted, else scan_heap would have
2036                          * deemed it removable.)
2037                          *
2038                          * NOTE: this test is not 100% accurate: it is possible for a
2039                          * tuple to be an updated one with recent xmin, and yet not match
2040                          * any new_tid entry in the vtlinks list.  Presumably there was
2041                          * once a parent tuple with xmax matching the xmin, but it's
2042                          * possible that that tuple has been removed --- for example, if
2043                          * it had xmin = xmax and wasn't itself an updated version, then
2044                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
2045                          * xmin xact completes.
2046                          *
2047                          * To be on the safe side, we abandon the repair_frag process if
2048                          * we cannot find the parent tuple in vtlinks.  This may be overly
2049                          * conservative; AFAICS it would be safe to move the chain.
2050                          *
2051                          * Also, because we distinguish DEAD and RECENTLY_DEAD tuples
2052                          * using OldestXmin, which is a rather coarse test, it is quite
2053                          * possible to have an update chain in which a tuple we think is
2054                          * RECENTLY_DEAD links forward to one that is definitely DEAD.
2055                          * In such a case the RECENTLY_DEAD tuple must actually be dead,
2056                          * but it seems too complicated to try to make VACUUM remove it.
2057                          * We treat each contiguous set of RECENTLY_DEAD tuples as a
2058                          * separately movable chain, ignoring any intervening DEAD ones.
2059                          */
2060                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
2061                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
2062                                                                                 OldestXmin)) ||
2063                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
2064                                                                                            HEAP_IS_LOCKED)) &&
2065                                  !(ItemPointerEquals(&(tuple.t_self),
2066                                                                          &(tuple.t_data->t_ctid)))))
2067                         {
2068                                 Buffer          Cbuf = buf;
2069                                 bool            freeCbuf = false;
2070                                 bool            chain_move_failed = false;
2071                                 bool            moved_target = false;
2072                                 ItemPointerData Ctid;
2073                                 HeapTupleData tp = tuple;
2074                                 Size            tlen = tuple_len;
2075                                 VTupleMove      vtmove;
2076                                 int                     num_vtmove;
2077                                 int                     free_vtmove;
2078                                 VacPage         to_vacpage = NULL;
2079                                 int                     to_item = 0;
2080                                 int                     ti;
2081
2082                                 if (dst_buffer != InvalidBuffer)
2083                                 {
2084                                         ReleaseBuffer(dst_buffer);
2085                                         dst_buffer = InvalidBuffer;
2086                                 }
2087
2088                                 /* Quick exit if we have no vtlinks to search in */
2089                                 if (vacrelstats->vtlinks == NULL)
2090                                 {
2091                                         elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2092                                         break;          /* out of walk-along-page loop */
2093                                 }
2094
2095                                 /*
2096                                  * If this tuple is in the begin/middle of the chain then we
2097                                  * have to move to the end of chain.  As with any t_ctid
2098                                  * chase, we have to verify that each new tuple is really the
2099                                  * descendant of the tuple we came from; however, here we
2100                                  * need even more than the normal amount of paranoia.
2101                                  * If t_ctid links forward to a tuple determined to be DEAD,
2102                                  * then depending on where that tuple is, it might already
2103                                  * have been removed, and perhaps even replaced by a MOVED_IN
2104                                  * tuple.  We don't want to include any DEAD tuples in the
2105                                  * chain, so we have to recheck HeapTupleSatisfiesVacuum.
2106                                  */
2107                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2108                                                                                                   HEAP_IS_LOCKED)) &&
2109                                            !(ItemPointerEquals(&(tp.t_self),
2110                                                                                    &(tp.t_data->t_ctid))))
2111                                 {
2112                                         ItemPointerData nextTid;
2113                                         TransactionId priorXmax;
2114                                         Buffer          nextBuf;
2115                                         Page            nextPage;
2116                                         OffsetNumber nextOffnum;
2117                                         ItemId          nextItemid;
2118                                         HeapTupleHeader nextTdata;
2119                                         HTSV_Result     nextTstatus;
2120
2121                                         nextTid = tp.t_data->t_ctid;
2122                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
2123                                         /* assume block# is OK (see heap_fetch comments) */
2124                                         nextBuf = ReadBufferWithStrategy(onerel,
2125                                                                                  ItemPointerGetBlockNumber(&nextTid),
2126                                                                                                          vac_strategy);
2127                                         nextPage = BufferGetPage(nextBuf);
2128                                         /* If bogus or unused slot, assume tp is end of chain */
2129                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
2130                                         if (nextOffnum < FirstOffsetNumber ||
2131                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
2132                                         {
2133                                                 ReleaseBuffer(nextBuf);
2134                                                 break;
2135                                         }
2136                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
2137                                         if (!ItemIdIsNormal(nextItemid))
2138                                         {
2139                                                 ReleaseBuffer(nextBuf);
2140                                                 break;
2141                                         }
2142                                         /* if not matching XMIN, assume tp is end of chain */
2143                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
2144                                                                                                                           nextItemid);
2145                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
2146                                                                                          priorXmax))
2147                                         {
2148                                                 ReleaseBuffer(nextBuf);
2149                                                 break;
2150                                         }
2151                                         /*
2152                                          * Must check for DEAD or MOVED_IN tuple, too.  This
2153                                          * could potentially update hint bits, so we'd better
2154                                          * hold the buffer content lock.
2155                                          */
2156                                         LockBuffer(nextBuf, BUFFER_LOCK_SHARE);
2157                                         nextTstatus = HeapTupleSatisfiesVacuum(nextTdata,
2158                                                                                                                    OldestXmin,
2159                                                                                                                    nextBuf);
2160                                         if (nextTstatus == HEAPTUPLE_DEAD ||
2161                                                 nextTstatus == HEAPTUPLE_INSERT_IN_PROGRESS)
2162                                         {
2163                                                 UnlockReleaseBuffer(nextBuf);
2164                                                 break;
2165                                         }
2166                                         LockBuffer(nextBuf, BUFFER_LOCK_UNLOCK);
2167                                         /* if it's MOVED_OFF we shoulda moved this one with it */
2168                                         if (nextTstatus == HEAPTUPLE_DELETE_IN_PROGRESS)
2169                                                 elog(ERROR, "updated tuple is already HEAP_MOVED_OFF");
2170                                         /* OK, switch our attention to the next tuple in chain */
2171                                         tp.t_data = nextTdata;
2172                                         tp.t_self = nextTid;
2173                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
2174                                         if (freeCbuf)
2175                                                 ReleaseBuffer(Cbuf);
2176                                         Cbuf = nextBuf;
2177                                         freeCbuf = true;
2178                                 }
2179
2180                                 /* Set up workspace for planning the chain move */
2181                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
2182                                 num_vtmove = 0;
2183                                 free_vtmove = 100;
2184
2185                                 /*
2186                                  * Now, walk backwards up the chain (towards older tuples) and
2187                                  * check if all items in chain can be moved.  We record all
2188                                  * the moves that need to be made in the vtmove array.
2189                                  */
2190                                 for (;;)
2191                                 {
2192                                         Buffer          Pbuf;
2193                                         Page            Ppage;
2194                                         ItemId          Pitemid;
2195                                         HeapTupleHeader PTdata;
2196                                         VTupleLinkData vtld,
2197                                                            *vtlp;
2198
2199                                         /* Identify a target page to move this tuple to */
2200                                         if (to_vacpage == NULL ||
2201                                                 !enough_space(to_vacpage, tlen))
2202                                         {
2203                                                 for (i = 0; i < num_fraged_pages; i++)
2204                                                 {
2205                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
2206                                                                 break;
2207                                                 }
2208
2209                                                 if (i == num_fraged_pages)
2210                                                 {
2211                                                         /* can't move item anywhere */
2212                                                         chain_move_failed = true;
2213                                                         break;          /* out of check-all-items loop */
2214                                                 }
2215                                                 to_item = i;
2216                                                 to_vacpage = fraged_pages->pagedesc[to_item];
2217                                         }
2218                                         to_vacpage->free -= MAXALIGN(tlen);
2219                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2220                                                 to_vacpage->free -= sizeof(ItemIdData);
2221                                         (to_vacpage->offsets_used)++;
2222
2223                                         /* Add an entry to vtmove list */
2224                                         if (free_vtmove == 0)
2225                                         {
2226                                                 free_vtmove = 1000;
2227                                                 vtmove = (VTupleMove)
2228                                                         repalloc(vtmove,
2229                                                                          (free_vtmove + num_vtmove) *
2230                                                                          sizeof(VTupleMoveData));
2231                                         }
2232                                         vtmove[num_vtmove].tid = tp.t_self;
2233                                         vtmove[num_vtmove].vacpage = to_vacpage;
2234                                         if (to_vacpage->offsets_used == 1)
2235                                                 vtmove[num_vtmove].cleanVpd = true;
2236                                         else
2237                                                 vtmove[num_vtmove].cleanVpd = false;
2238                                         free_vtmove--;
2239                                         num_vtmove++;
2240
2241                                         /* Remember if we reached the original target tuple */
2242                                         if (ItemPointerGetBlockNumber(&tp.t_self) == blkno &&
2243                                                 ItemPointerGetOffsetNumber(&tp.t_self) == offnum)
2244                                                 moved_target = true;
2245
2246                                         /* Done if at beginning of chain */
2247                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2248                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2249                                                                                    OldestXmin))
2250                                                 break;  /* out of check-all-items loop */
2251
2252                                         /* Move to tuple with prior row version */
2253                                         vtld.new_tid = tp.t_self;
2254                                         vtlp = (VTupleLink)
2255                                                 vac_bsearch((void *) &vtld,
2256                                                                         (void *) (vacrelstats->vtlinks),
2257                                                                         vacrelstats->num_vtlinks,
2258                                                                         sizeof(VTupleLinkData),
2259                                                                         vac_cmp_vtlinks);
2260                                         if (vtlp == NULL)
2261                                         {
2262                                                 /* see discussion above */
2263                                                 elog(DEBUG2, "parent item in update-chain not found --- cannot continue repair_frag");
2264                                                 chain_move_failed = true;
2265                                                 break;  /* out of check-all-items loop */
2266                                         }
2267                                         tp.t_self = vtlp->this_tid;
2268                                         Pbuf = ReadBufferWithStrategy(onerel,
2269                                                                         ItemPointerGetBlockNumber(&(tp.t_self)),
2270                                                                                                   vac_strategy);
2271                                         Ppage = BufferGetPage(Pbuf);
2272                                         Pitemid = PageGetItemId(Ppage,
2273                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2274                                         /* this can't happen since we saw tuple earlier: */
2275                                         if (!ItemIdIsNormal(Pitemid))
2276                                                 elog(ERROR, "parent itemid marked as unused");
2277                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2278
2279                                         /* ctid should not have changed since we saved it */
2280                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2281                                                                                          &(PTdata->t_ctid)));
2282
2283                                         /*
2284                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2285                                          * (child item is removed)... Due to the fact that at the
2286                                          * moment we don't remove unuseful part of update-chain,
2287                                          * it's possible to get non-matching parent row here. Like
2288                                          * as in the case which caused this problem, we stop
2289                                          * shrinking here. I could try to find real parent row but
2290                                          * want not to do it because of real solution will be
2291                                          * implemented anyway, later, and we are too close to 6.5
2292                                          * release. - vadim 06/11/99
2293                                          */
2294                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2295                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2296                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2297                                         {
2298                                                 ReleaseBuffer(Pbuf);
2299                                                 elog(DEBUG2, "too old parent tuple found --- cannot continue repair_frag");
2300                                                 chain_move_failed = true;
2301                                                 break;  /* out of check-all-items loop */
2302                                         }
2303                                         tp.t_data = PTdata;
2304                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2305                                         if (freeCbuf)
2306                                                 ReleaseBuffer(Cbuf);
2307                                         Cbuf = Pbuf;
2308                                         freeCbuf = true;
2309                                 }                               /* end of check-all-items loop */
2310
2311                                 if (freeCbuf)
2312                                         ReleaseBuffer(Cbuf);
2313                                 freeCbuf = false;
2314
2315                                 /* Double-check that we will move the current target tuple */
2316                                 if (!moved_target && !chain_move_failed)
2317                                 {
2318                                         elog(DEBUG2, "failed to chain back to target --- cannot continue repair_frag");
2319                                         chain_move_failed = true;
2320                                 }
2321
2322                                 if (chain_move_failed)
2323                                 {
2324                                         /*
2325                                          * Undo changes to offsets_used state.  We don't bother
2326                                          * cleaning up the amount-free state, since we're not
2327                                          * going to do any further tuple motion.
2328                                          */
2329                                         for (i = 0; i < num_vtmove; i++)
2330                                         {
2331                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2332                                                 (vtmove[i].vacpage->offsets_used)--;
2333                                         }
2334                                         pfree(vtmove);
2335                                         break;          /* out of walk-along-page loop */
2336                                 }
2337
2338                                 /*
2339                                  * Okay, move the whole tuple chain in reverse order.
2340                                  *
2341                                  * Ctid tracks the new location of the previously-moved tuple.
2342                                  */
2343                                 ItemPointerSetInvalid(&Ctid);
2344                                 for (ti = 0; ti < num_vtmove; ti++)
2345                                 {
2346                                         VacPage         destvacpage = vtmove[ti].vacpage;
2347                                         Page            Cpage;
2348                                         ItemId          Citemid;
2349
2350                                         /* Get page to move from */
2351                                         tuple.t_self = vtmove[ti].tid;
2352                                         Cbuf = ReadBufferWithStrategy(onerel,
2353                                                                  ItemPointerGetBlockNumber(&(tuple.t_self)),
2354                                                                                                   vac_strategy);
2355
2356                                         /* Get page to move to */
2357                                         dst_buffer = ReadBufferWithStrategy(onerel,
2358                                                                                                                 destvacpage->blkno,
2359                                                                                                                 vac_strategy);
2360
2361                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2362                                         if (dst_buffer != Cbuf)
2363                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2364
2365                                         dst_page = BufferGetPage(dst_buffer);
2366                                         Cpage = BufferGetPage(Cbuf);
2367
2368                                         Citemid = PageGetItemId(Cpage,
2369                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2370                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2371                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2372
2373                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2374                                                                          dst_buffer, dst_page, destvacpage,
2375                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2376
2377                                         /*
2378                                          * If the tuple we are moving is a heap-only tuple,
2379                                          * this move will generate an additional index entry,
2380                                          * so increment the rel_indexed_tuples count.
2381                                          */ 
2382                                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2383                                                 vacrelstats->rel_indexed_tuples++;
2384
2385                                         num_moved++;
2386                                         if (destvacpage->blkno > last_move_dest_block)
2387                                                 last_move_dest_block = destvacpage->blkno;
2388
2389                                         /*
2390                                          * Remember that we moved tuple from the current page
2391                                          * (corresponding index tuple will be cleaned).
2392                                          */
2393                                         if (Cbuf == buf)
2394                                                 vacpage->offsets[vacpage->offsets_free++] =
2395                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2396                                         else
2397                                         {
2398                                                 /*
2399                                                  * When we move tuple chains, we may need to move
2400                                                  * tuples from a block that we haven't yet scanned in
2401                                                  * the outer walk-along-the-relation loop. Note that we
2402                                                  * can't be moving a tuple from a block that we have
2403                                                  * already scanned because if such a tuple exists, then
2404                                                  * we must have moved the chain along with that tuple
2405                                                  * when we scanned that block. IOW the test of
2406                                                  * (Cbuf != buf) guarantees that the tuple we are
2407                                                  * looking at right now is in a block which is yet to
2408                                                  * be scanned.
2409                                                  *
2410                                                  * We maintain two counters to correctly count the
2411                                                  * moved-off tuples from blocks that are not yet
2412                                                  * scanned (keep_tuples) and how many of them have
2413                                                  * index pointers (keep_indexed_tuples).  The main
2414                                                  * reason to track the latter is to help verify
2415                                                  * that indexes have the expected number of entries
2416                                                  * when all the dust settles.
2417                                                  */
2418                                                 if (!HeapTupleHeaderIsHeapOnly(tuple.t_data))
2419                                                         keep_indexed_tuples++;
2420                                                 keep_tuples++;
2421                                         }
2422
2423                                         ReleaseBuffer(dst_buffer);
2424                                         ReleaseBuffer(Cbuf);
2425                                 }                               /* end of move-the-tuple-chain loop */
2426
2427                                 dst_buffer = InvalidBuffer;
2428                                 pfree(vtmove);
2429                                 chain_tuple_moved = true;
2430
2431                                 /* advance to next tuple in walk-along-page loop */
2432                                 continue;
2433                         }                                       /* end of is-tuple-in-chain test */
2434
2435                         /* try to find new page for this tuple */
2436                         if (dst_buffer == InvalidBuffer ||
2437                                 !enough_space(dst_vacpage, tuple_len))
2438                         {
2439                                 if (dst_buffer != InvalidBuffer)
2440                                 {
2441                                         ReleaseBuffer(dst_buffer);
2442                                         dst_buffer = InvalidBuffer;
2443                                 }
2444                                 for (i = 0; i < num_fraged_pages; i++)
2445                                 {
2446                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2447                                                 break;
2448                                 }
2449                                 if (i == num_fraged_pages)
2450                                         break;          /* can't move item anywhere */
2451                                 dst_vacpage = fraged_pages->pagedesc[i];
2452                                 dst_buffer = ReadBufferWithStrategy(onerel,
2453                                                                                                         dst_vacpage->blkno,
2454                                                                                                         vac_strategy);
2455                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2456                                 dst_page = BufferGetPage(dst_buffer);
2457                                 /* if this page was not used before - clean it */
2458                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2459                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2460                         }
2461                         else
2462                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2463
2464                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2465
2466                         move_plain_tuple(onerel, buf, page, &tuple,
2467                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2468
2469                         /*
2470                          * If the tuple we are moving is a heap-only tuple,
2471                          * this move will generate an additional index entry,
2472                          * so increment the rel_indexed_tuples count.
2473                          */
2474                         if (HeapTupleHeaderIsHeapOnly(tuple.t_data))
2475                                 vacrelstats->rel_indexed_tuples++;
2476
2477                         num_moved++;
2478                         if (dst_vacpage->blkno > last_move_dest_block)
2479                                 last_move_dest_block = dst_vacpage->blkno;
2480
2481                         /*
2482                          * Remember that we moved tuple from the current page
2483                          * (corresponding index tuple will be cleaned).
2484                          */
2485                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2486                 }                                               /* walk along page */
2487
2488                 /*
2489                  * If we broke out of the walk-along-page loop early (ie, still have
2490                  * offnum <= maxoff), then we failed to move some tuple off this page.
2491                  * No point in shrinking any more, so clean up and exit the per-page
2492                  * loop.
2493                  */
2494                 if (offnum < maxoff && keep_tuples > 0)
2495                 {
2496                         OffsetNumber off;
2497
2498                         /*
2499                          * Fix vacpage state for any unvisited tuples remaining on page
2500                          */
2501                         for (off = OffsetNumberNext(offnum);
2502                                  off <= maxoff;
2503                                  off = OffsetNumberNext(off))
2504                         {
2505                                 ItemId          itemid = PageGetItemId(page, off);
2506                                 HeapTupleHeader htup;
2507
2508                                 if (!ItemIdIsUsed(itemid))
2509                                         continue;
2510                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2511                                 Assert(ItemIdIsNormal(itemid));
2512
2513                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2514                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2515                                         continue;
2516
2517                                 /*
2518                                  * See comments in the walk-along-page loop above about why
2519                                  * only MOVED_OFF tuples should be found here.
2520                                  */
2521                                 if (htup->t_infomask & HEAP_MOVED_IN)
2522                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2523                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2524                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2525                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2526                                         elog(ERROR, "invalid XVAC in tuple header");
2527
2528                                 if (chain_tuple_moved)
2529                                 {
2530                                         /* some chains were moved while cleaning this page */
2531                                         Assert(vacpage->offsets_free > 0);
2532                                         for (i = 0; i < vacpage->offsets_free; i++)
2533                                         {
2534                                                 if (vacpage->offsets[i] == off)
2535                                                         break;
2536                                         }
2537                                         if (i >= vacpage->offsets_free)         /* not found */
2538                                         {
2539                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2540                                                 Assert(keep_tuples > 0);
2541                                                 /*
2542                                                  * If this is not a heap-only tuple, there must be an
2543                                                  * index entry for this item which will be removed in
2544                                                  * the index cleanup. Decrement the keep_indexed_tuples
2545                                                  * count to remember this.
2546                                                  */
2547                                                 if (!HeapTupleHeaderIsHeapOnly(htup))
2548                                                         keep_indexed_tuples--;
2549                                                 keep_tuples--;
2550                                         }
2551                                 }
2552                                 else
2553                                 {
2554                                         vacpage->offsets[vacpage->offsets_free++] = off;
2555                                         Assert(keep_tuples > 0);
2556                                         if (!HeapTupleHeaderIsHeapOnly(htup))
2557                                                 keep_indexed_tuples--;
2558                                         keep_tuples--;
2559                                 }
2560                         }
2561                 }
2562
2563                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2564                 {
2565                         if (chain_tuple_moved)          /* else - they are ordered */
2566                         {
2567                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2568                                           sizeof(OffsetNumber), vac_cmp_offno);
2569                         }
2570                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2571                 }
2572
2573                 ReleaseBuffer(buf);
2574
2575                 if (offnum <= maxoff)
2576                         break;                          /* had to quit early, see above note */
2577
2578         }                                                       /* walk along relation */
2579
2580         blkno++;                                        /* new number of blocks */
2581
2582         if (dst_buffer != InvalidBuffer)
2583         {
2584                 Assert(num_moved > 0);
2585                 ReleaseBuffer(dst_buffer);
2586         }
2587
2588         if (num_moved > 0)
2589         {
2590                 /*
2591                  * We have to commit our tuple movings before we truncate the
2592                  * relation.  Ideally we should do Commit/StartTransactionCommand
2593                  * here, relying on the session-level table lock to protect our
2594                  * exclusive access to the relation.  However, that would require a
2595                  * lot of extra code to close and re-open the relation, indexes, etc.
2596                  * For now, a quick hack: record status of current transaction as
2597                  * committed, and continue.  We force the commit to be synchronous
2598                  * so that it's down to disk before we truncate.  (Note: tqual.c
2599                  * knows that VACUUM FULL always uses sync commit, too.)  The
2600                  * transaction continues to be shown as running in the ProcArray.
2601                  *
2602                  * XXX This desperately needs to be revisited.  Any failure after
2603                  * this point will result in a PANIC "cannot abort transaction nnn,
2604                  * it was already committed"!
2605                  */
2606                 ForceSyncCommit();
2607                 (void) RecordTransactionCommit();
2608         }
2609
2610         /*
2611          * We are not going to move any more tuples across pages, but we still
2612          * need to apply vacuum_page to compact free space in the remaining pages
2613          * in vacuum_pages list.  Note that some of these pages may also be in the
2614          * fraged_pages list, and may have had tuples moved onto them; if so, we
2615          * already did vacuum_page and needn't do it again.
2616          */
2617         for (i = 0, curpage = vacuum_pages->pagedesc;
2618                  i < vacuumed_pages;
2619                  i++, curpage++)
2620         {
2621                 vacuum_delay_point();
2622
2623                 Assert((*curpage)->blkno < blkno);
2624                 if ((*curpage)->offsets_used == 0)
2625                 {
2626                         Buffer          buf;
2627                         Page            page;
2628
2629                         /* this page was not used as a move target, so must clean it */
2630                         buf = ReadBufferWithStrategy(onerel,
2631                                                                                  (*curpage)->blkno,
2632                                                                                  vac_strategy);
2633                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2634                         page = BufferGetPage(buf);
2635                         if (!PageIsEmpty(page))
2636                                 vacuum_page(onerel, buf, *curpage);
2637                         UnlockReleaseBuffer(buf);
2638                 }
2639         }
2640
2641         /*
2642          * Now scan all the pages that we moved tuples onto and update tuple
2643          * status bits.  This is not really necessary, but will save time for
2644          * future transactions examining these tuples.
2645          */
2646         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2647                                          last_move_dest_block, num_moved);
2648
2649         /*
2650          * It'd be cleaner to make this report at the bottom of this routine, but
2651          * then the rusage would double-count the second pass of index vacuuming.
2652          * So do it here and ignore the relatively small amount of processing that
2653          * occurs below.
2654          */
2655         ereport(elevel,
2656                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2657                                         RelationGetRelationName(onerel),
2658                                         num_moved, nblocks, blkno),
2659                          errdetail("%s.",
2660                                            pg_rusage_show(&ru0))));
2661
2662         /*
2663          * Reflect the motion of system tuples to catalog cache here.
2664          */
2665         CommandCounterIncrement();
2666
2667         if (Nvacpagelist.num_pages > 0)
2668         {
2669                 /* vacuum indexes again if needed */
2670                 if (Irel != NULL)
2671                 {
2672                         VacPage    *vpleft,
2673                                            *vpright,
2674                                                 vpsave;
2675
2676                         /* re-sort Nvacpagelist.pagedesc */
2677                         for (vpleft = Nvacpagelist.pagedesc,
2678                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2679                                  vpleft < vpright; vpleft++, vpright--)
2680                         {
2681                                 vpsave = *vpleft;
2682                                 *vpleft = *vpright;
2683                                 *vpright = vpsave;
2684                         }
2685
2686                         /*
2687                          * keep_tuples is the number of tuples that have been moved off a
2688                          * page during chain moves but not been scanned over subsequently.
2689                          * The tuple ids of these tuples are not recorded as free offsets
2690                          * for any VacPage, so they will not be cleared from the indexes.
2691                          * keep_indexed_tuples is the portion of these that are expected
2692                          * to have index entries.
2693                          */
2694                         Assert(keep_tuples >= 0);
2695                         for (i = 0; i < nindexes; i++)
2696                                 vacuum_index(&Nvacpagelist, Irel[i],
2697                                                          vacrelstats->rel_indexed_tuples,
2698                                                          keep_indexed_tuples);
2699                 }
2700
2701                 /*
2702                  * Clean moved-off tuples from last page in Nvacpagelist list.
2703                  *
2704                  * We need only do this in this one page, because higher-numbered
2705                  * pages are going to be truncated from the relation entirely. But see
2706                  * comments for update_hint_bits().
2707                  */
2708                 if (vacpage->blkno == (blkno - 1) &&
2709                         vacpage->offsets_free > 0)
2710                 {
2711                         Buffer          buf;
2712                         Page            page;
2713                         OffsetNumber unused[MaxOffsetNumber];
2714                         OffsetNumber offnum,
2715                                                 maxoff;
2716                         int                     uncnt = 0;
2717                         int                     num_tuples = 0;
2718
2719                         buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
2720                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2721                         page = BufferGetPage(buf);
2722                         maxoff = PageGetMaxOffsetNumber(page);
2723                         for (offnum = FirstOffsetNumber;
2724                                  offnum <= maxoff;
2725                                  offnum = OffsetNumberNext(offnum))
2726                         {
2727                                 ItemId          itemid = PageGetItemId(page, offnum);
2728                                 HeapTupleHeader htup;
2729
2730                                 if (!ItemIdIsUsed(itemid))
2731                                         continue;
2732                                 /* Shouldn't be any DEAD or REDIRECT items anymore */
2733                                 Assert(ItemIdIsNormal(itemid));
2734
2735                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2736                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2737                                         continue;
2738
2739                                 /*
2740                                  * See comments in the walk-along-page loop above about why
2741                                  * only MOVED_OFF tuples should be found here.
2742                                  */
2743                                 if (htup->t_infomask & HEAP_MOVED_IN)
2744                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2745                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2746                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2747                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2748                                         elog(ERROR, "invalid XVAC in tuple header");
2749
2750                                 ItemIdSetUnused(itemid);
2751                                 num_tuples++;
2752
2753                                 unused[uncnt++] = offnum;
2754                         }
2755                         Assert(vacpage->offsets_free == num_tuples);
2756
2757                         START_CRIT_SECTION();
2758
2759                         PageRepairFragmentation(page);
2760
2761                         MarkBufferDirty(buf);
2762
2763                         /* XLOG stuff */
2764                         if (!onerel->rd_istemp)
2765                         {
2766                                 XLogRecPtr      recptr;
2767
2768                                 recptr = log_heap_clean(onerel, buf,
2769                                                                                 NULL, 0, NULL, 0,
2770                                                                                 unused, uncnt,
2771                                                                                 false);
2772                                 PageSetLSN(page, recptr);
2773                                 PageSetTLI(page, ThisTimeLineID);
2774                         }
2775
2776                         END_CRIT_SECTION();
2777
2778                         UnlockReleaseBuffer(buf);
2779                 }
2780
2781                 /* now - free new list of reaped pages */
2782                 curpage = Nvacpagelist.pagedesc;
2783                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2784                         pfree(*curpage);
2785                 pfree(Nvacpagelist.pagedesc);
2786         }
2787
2788         /* Truncate relation, if needed */
2789         if (blkno < nblocks)
2790         {
2791                 RelationTruncate(onerel, blkno);
2792                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2793         }
2794
2795         /* clean up */
2796         pfree(vacpage);
2797         if (vacrelstats->vtlinks != NULL)
2798                 pfree(vacrelstats->vtlinks);
2799
2800         ExecContext_Finish(&ec);
2801 }
2802
2803 /*
2804  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2805  *
2806  *              This routine moves old_tup from old_page to dst_page.
2807  *              old_page and dst_page might be the same page.
2808  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2809  *              the single lock, if this is a intra-page-move) are released before
2810  *              exit.
2811  *
2812  *              Yes, a routine with ten parameters is ugly, but it's still better
2813  *              than having these 120 lines of code in repair_frag() which is
2814  *              already too long and almost unreadable.
2815  */
2816 static void
2817 move_chain_tuple(Relation rel,
2818                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2819                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2820                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2821 {
2822         TransactionId myXID = GetCurrentTransactionId();
2823         HeapTupleData newtup;
2824         OffsetNumber newoff;
2825         ItemId          newitemid;
2826         Size            tuple_len = old_tup->t_len;
2827
2828         /*
2829          * make a modifiable copy of the source tuple.
2830          */
2831         heap_copytuple_with_tuple(old_tup, &newtup);
2832
2833         /*
2834          * register invalidation of source tuple in catcaches.
2835          */
2836         CacheInvalidateHeapTuple(rel, old_tup);
2837
2838         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2839         START_CRIT_SECTION();
2840
2841         /*
2842          * mark the source tuple MOVED_OFF.
2843          */
2844         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2845                                                                          HEAP_XMIN_INVALID |
2846                                                                          HEAP_MOVED_IN);
2847         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2848         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2849
2850         /*
2851          * If this page was not used before - clean it.
2852          *
2853          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2854          * destination pages to be the same (since this tuple-chain member can be
2855          * on a page lower than the one we're currently processing in the outer
2856          * loop).  If that's true, then after vacuum_page() the source tuple will
2857          * have been moved, and tuple.t_data will be pointing at garbage.
2858          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2859          * step!!
2860          *
2861          * This path is different from the other callers of vacuum_page, because
2862          * we have already incremented the vacpage's offsets_used field to account
2863          * for the tuple(s) we expect to move onto the page. Therefore
2864          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2865          * good debugging check for all other callers, we work around it here
2866          * rather than remove it.
2867          */
2868         if (!PageIsEmpty(dst_page) && cleanVpd)
2869         {
2870                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2871
2872                 dst_vacpage->offsets_used = 0;
2873                 vacuum_page(rel, dst_buf, dst_vacpage);
2874                 dst_vacpage->offsets_used = sv_offsets_used;
2875         }
2876
2877         /*
2878          * Update the state of the copied tuple, and store it on the destination
2879          * page.  The copied tuple is never part of a HOT chain.
2880          */
2881         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2882                                                                    HEAP_XMIN_INVALID |
2883                                                                    HEAP_MOVED_OFF);
2884         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2885         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2886         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2887         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2888         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2889                                                  InvalidOffsetNumber, false, true);
2890         if (newoff == InvalidOffsetNumber)
2891                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2892                          (unsigned long) tuple_len, dst_vacpage->blkno);
2893         newitemid = PageGetItemId(dst_page, newoff);
2894         /* drop temporary copy, and point to the version on the dest page */
2895         pfree(newtup.t_data);
2896         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2897
2898         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2899
2900         /*
2901          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2902          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2903          * order, this is actually the previously processed tuple.)
2904          */
2905         if (!ItemPointerIsValid(ctid))
2906                 newtup.t_data->t_ctid = newtup.t_self;
2907         else
2908                 newtup.t_data->t_ctid = *ctid;
2909         *ctid = newtup.t_self;
2910
2911         MarkBufferDirty(dst_buf);
2912         if (dst_buf != old_buf)
2913                 MarkBufferDirty(old_buf);
2914
2915         /* XLOG stuff */
2916         if (!rel->rd_istemp)
2917         {
2918                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2919                                                                                    dst_buf, &newtup);
2920
2921                 if (old_buf != dst_buf)
2922                 {
2923                         PageSetLSN(old_page, recptr);
2924                         PageSetTLI(old_page, ThisTimeLineID);
2925                 }
2926                 PageSetLSN(dst_page, recptr);
2927                 PageSetTLI(dst_page, ThisTimeLineID);
2928         }
2929
2930         END_CRIT_SECTION();
2931
2932         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2933         if (dst_buf != old_buf)
2934                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2935
2936         /* Create index entries for the moved tuple */
2937         if (ec->resultRelInfo->ri_NumIndices > 0)
2938         {
2939                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2940                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2941                 ResetPerTupleExprContext(ec->estate);
2942         }
2943 }
2944
2945 /*
2946  *      move_plain_tuple() -- move one tuple that is not part of a chain
2947  *
2948  *              This routine moves old_tup from old_page to dst_page.
2949  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2950  *              released before exit.
2951  *
2952  *              Yes, a routine with eight parameters is ugly, but it's still better
2953  *              than having these 90 lines of code in repair_frag() which is already
2954  *              too long and almost unreadable.
2955  */
2956 static void
2957 move_plain_tuple(Relation rel,
2958                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2959                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2960                                  ExecContext ec)
2961 {
2962         TransactionId myXID = GetCurrentTransactionId();
2963         HeapTupleData newtup;
2964         OffsetNumber newoff;
2965         ItemId          newitemid;
2966         Size            tuple_len = old_tup->t_len;
2967
2968         /* copy tuple */
2969         heap_copytuple_with_tuple(old_tup, &newtup);
2970
2971         /*
2972          * register invalidation of source tuple in catcaches.
2973          *
2974          * (Note: we do not need to register the copied tuple, because we are not
2975          * changing the tuple contents and so there cannot be any need to flush
2976          * negative catcache entries.)
2977          */
2978         CacheInvalidateHeapTuple(rel, old_tup);
2979
2980         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2981         START_CRIT_SECTION();
2982
2983         /*
2984          * Mark new tuple as MOVED_IN by me; also mark it not HOT.
2985          */
2986         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2987                                                                    HEAP_XMIN_INVALID |
2988                                                                    HEAP_MOVED_OFF);
2989         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2990         HeapTupleHeaderClearHotUpdated(newtup.t_data);
2991         HeapTupleHeaderClearHeapOnly(newtup.t_data);
2992         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2993
2994         /* add tuple to the page */
2995         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2996                                                  InvalidOffsetNumber, false, true);
2997         if (newoff == InvalidOffsetNumber)
2998                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2999                          (unsigned long) tuple_len,
3000                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
3001                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
3002         newitemid = PageGetItemId(dst_page, newoff);
3003         pfree(newtup.t_data);
3004         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
3005         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
3006         newtup.t_self = newtup.t_data->t_ctid;
3007
3008         /*
3009          * Mark old tuple as MOVED_OFF by me.
3010          */
3011         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
3012                                                                          HEAP_XMIN_INVALID |
3013                                                                          HEAP_MOVED_IN);
3014         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
3015         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
3016
3017         MarkBufferDirty(dst_buf);
3018         MarkBufferDirty(old_buf);
3019
3020         /* XLOG stuff */
3021         if (!rel->rd_istemp)
3022         {
3023                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
3024                                                                                    dst_buf, &newtup);
3025
3026                 PageSetLSN(old_page, recptr);
3027                 PageSetTLI(old_page, ThisTimeLineID);
3028                 PageSetLSN(dst_page, recptr);
3029                 PageSetTLI(dst_page, ThisTimeLineID);
3030         }
3031
3032         END_CRIT_SECTION();
3033
3034         dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
3035         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
3036         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
3037
3038         dst_vacpage->offsets_used++;
3039
3040         /* insert index' tuples if needed */
3041         if (ec->resultRelInfo->ri_NumIndices > 0)
3042         {
3043                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
3044                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
3045                 ResetPerTupleExprContext(ec->estate);
3046         }
3047 }
3048
3049 /*
3050  *      update_hint_bits() -- update hint bits in destination pages
3051  *
3052  * Scan all the pages that we moved tuples onto and update tuple status bits.
3053  * This is not really necessary, but it will save time for future transactions
3054  * examining these tuples.
3055  *
3056  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
3057  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
3058  *
3059  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
3060  * pages that were move source pages but not move dest pages.  The bulk
3061  * of the move source pages will be physically truncated from the relation,
3062  * and the last page remaining in the rel will be fixed separately in
3063  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
3064  * hint bits updated are tuples that are moved as part of a chain and were
3065  * on pages that were not either move destinations nor at the end of the rel.
3066  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
3067  * to remember and revisit those pages too.
3068  *
3069  * One wonders whether it wouldn't be better to skip this work entirely,
3070  * and let the tuple status updates happen someplace that's not holding an
3071  * exclusive lock on the relation.
3072  */
3073 static void
3074 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
3075                                  BlockNumber last_move_dest_block, int num_moved)
3076 {
3077         TransactionId myXID = GetCurrentTransactionId();
3078         int                     checked_moved = 0;
3079         int                     i;
3080         VacPage    *curpage;
3081
3082         for (i = 0, curpage = fraged_pages->pagedesc;
3083                  i < num_fraged_pages;
3084                  i++, curpage++)
3085         {
3086                 Buffer          buf;
3087                 Page            page;
3088                 OffsetNumber max_offset;
3089                 OffsetNumber off;
3090                 int                     num_tuples = 0;
3091
3092                 vacuum_delay_point();
3093
3094                 if ((*curpage)->blkno > last_move_dest_block)
3095                         break;                          /* no need to scan any further */
3096                 if ((*curpage)->offsets_used == 0)
3097                         continue;                       /* this page was never used as a move dest */
3098                 buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
3099                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3100                 page = BufferGetPage(buf);
3101                 max_offset = PageGetMaxOffsetNumber(page);
3102                 for (off = FirstOffsetNumber;
3103                          off <= max_offset;
3104                          off = OffsetNumberNext(off))
3105                 {
3106                         ItemId          itemid = PageGetItemId(page, off);
3107                         HeapTupleHeader htup;
3108
3109                         if (!ItemIdIsUsed(itemid))
3110                                 continue;
3111                         /* Shouldn't be any DEAD or REDIRECT items anymore */
3112                         Assert(ItemIdIsNormal(itemid));
3113
3114                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
3115                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
3116                                 continue;
3117
3118                         /*
3119                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
3120                          */
3121                         if (!(htup->t_infomask & HEAP_MOVED))
3122                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
3123                         if (HeapTupleHeaderGetXvac(htup) != myXID)
3124                                 elog(ERROR, "invalid XVAC in tuple header");
3125
3126                         if (htup->t_infomask & HEAP_MOVED_IN)
3127                         {
3128                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
3129                                 htup->t_infomask &= ~HEAP_MOVED;
3130                                 num_tuples++;
3131                         }
3132                         else
3133                                 htup->t_infomask |= HEAP_XMIN_INVALID;
3134                 }
3135                 MarkBufferDirty(buf);
3136                 UnlockReleaseBuffer(buf);
3137                 Assert((*curpage)->offsets_used == num_tuples);
3138                 checked_moved += num_tuples;
3139         }
3140         Assert(num_moved == checked_moved);
3141 }
3142
3143 /*
3144  *      vacuum_heap() -- free dead tuples
3145  *
3146  *              This routine marks dead tuples as unused and truncates relation
3147  *              if there are "empty" end-blocks.
3148  */
3149 static void
3150 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
3151 {
3152         Buffer          buf;
3153         VacPage    *vacpage;
3154         BlockNumber relblocks;
3155         int                     nblocks;
3156         int                     i;
3157
3158         nblocks = vacuum_pages->num_pages;
3159         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
3160
3161         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
3162         {
3163                 vacuum_delay_point();
3164
3165                 if ((*vacpage)->offsets_free > 0)
3166                 {
3167                         buf = ReadBufferWithStrategy(onerel,
3168                                                                                  (*vacpage)->blkno,
3169                                                                                  vac_strategy);
3170                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
3171                         vacuum_page(onerel, buf, *vacpage);
3172                         UnlockReleaseBuffer(buf);
3173                 }
3174         }
3175
3176         /* Truncate relation if there are some empty end-pages */
3177         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
3178         if (vacuum_pages->empty_end_pages > 0)
3179         {
3180                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
3181                 ereport(elevel,
3182                                 (errmsg("\"%s\": truncated %u to %u pages",
3183                                                 RelationGetRelationName(onerel),
3184                                                 vacrelstats->rel_pages, relblocks)));
3185                 RelationTruncate(onerel, relblocks);
3186                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
3187         }
3188 }
3189
3190 /*
3191  *      vacuum_page() -- free dead tuples on a page
3192  *                                       and repair its fragmentation.
3193  *
3194  * Caller must hold pin and lock on buffer.
3195  */
3196 static void
3197 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3198 {
3199         Page            page = BufferGetPage(buffer);
3200         int                     i;
3201
3202         /* There shouldn't be any tuples moved onto the page yet! */
3203         Assert(vacpage->offsets_used == 0);
3204
3205         START_CRIT_SECTION();
3206
3207         for (i = 0; i < vacpage->offsets_free; i++)
3208         {
3209                 ItemId          itemid = PageGetItemId(page, vacpage->offsets[i]);
3210
3211                 ItemIdSetUnused(itemid);
3212         }
3213
3214         PageRepairFragmentation(page);
3215
3216         MarkBufferDirty(buffer);
3217
3218         /* XLOG stuff */
3219         if (!onerel->rd_istemp)
3220         {
3221                 XLogRecPtr      recptr;
3222
3223                 recptr = log_heap_clean(onerel, buffer,
3224                                                                 NULL, 0, NULL, 0,
3225                                                                 vacpage->offsets, vacpage->offsets_free,
3226                                                                 false);
3227                 PageSetLSN(page, recptr);
3228                 PageSetTLI(page, ThisTimeLineID);
3229         }
3230
3231         END_CRIT_SECTION();
3232 }
3233
3234 /*
3235  *      scan_index() -- scan one index relation to update pg_class statistics.
3236  *
3237  * We use this when we have no deletions to do.
3238  */
3239 static void
3240 scan_index(Relation indrel, double num_tuples)
3241 {
3242         IndexBulkDeleteResult *stats;
3243         IndexVacuumInfo ivinfo;
3244         PGRUsage        ru0;
3245
3246         pg_rusage_init(&ru0);
3247
3248         ivinfo.index = indrel;
3249         ivinfo.vacuum_full = true;
3250         ivinfo.message_level = elevel;
3251         ivinfo.num_heap_tuples = num_tuples;
3252         ivinfo.strategy = vac_strategy;
3253
3254         stats = index_vacuum_cleanup(&ivinfo, NULL);
3255
3256         if (!stats)
3257                 return;
3258
3259         /* now update statistics in pg_class */
3260         vac_update_relstats(RelationGetRelid(indrel),
3261                                                 stats->num_pages, stats->num_index_tuples,
3262                                                 false, InvalidTransactionId);
3263
3264         ereport(elevel,
3265                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3266                                         RelationGetRelationName(indrel),
3267                                         stats->num_index_tuples,
3268                                         stats->num_pages),
3269         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
3270                           "%s.",
3271                           stats->pages_deleted, stats->pages_free,
3272                           pg_rusage_show(&ru0))));
3273
3274         /*
3275          * Check for tuple count mismatch.      If the index is partial, then it's OK
3276          * for it to have fewer tuples than the heap; else we got trouble.
3277          */
3278         if (stats->num_index_tuples != num_tuples)
3279         {
3280                 if (stats->num_index_tuples > num_tuples ||
3281                         !vac_is_partial_index(indrel))
3282                         ereport(WARNING,
3283                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3284                                                         RelationGetRelationName(indrel),
3285                                                         stats->num_index_tuples, num_tuples),
3286                                          errhint("Rebuild the index with REINDEX.")));
3287         }
3288
3289         pfree(stats);
3290 }
3291
3292 /*
3293  *      vacuum_index() -- vacuum one index relation.
3294  *
3295  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3296  *              It's locked. Indrel is an index relation on the vacuumed heap.
3297  *
3298  *              We don't bother to set locks on the index relation here, since
3299  *              the parent table is exclusive-locked already.
3300  *
3301  *              Finally, we arrange to update the index relation's statistics in
3302  *              pg_class.
3303  */
3304 static void
3305 vacuum_index(VacPageList vacpagelist, Relation indrel,
3306                          double num_tuples, int keep_tuples)
3307 {
3308         IndexBulkDeleteResult *stats;
3309         IndexVacuumInfo ivinfo;
3310         PGRUsage        ru0;
3311
3312         pg_rusage_init(&ru0);
3313
3314         ivinfo.index = indrel;
3315         ivinfo.vacuum_full = true;
3316         ivinfo.message_level = elevel;
3317         ivinfo.num_heap_tuples = num_tuples + keep_tuples;
3318         ivinfo.strategy = vac_strategy;
3319
3320         /* Do bulk deletion */
3321         stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3322
3323         /* Do post-VACUUM cleanup */
3324         stats = index_vacuum_cleanup(&ivinfo, stats);
3325
3326         if (!stats)
3327                 return;
3328
3329         /* now update statistics in pg_class */
3330         vac_update_relstats(RelationGetRelid(indrel),
3331                                                 stats->num_pages, stats->num_index_tuples,
3332                                                 false, InvalidTransactionId);
3333
3334         ereport(elevel,
3335                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3336                                         RelationGetRelationName(indrel),
3337                                         stats->num_index_tuples,
3338                                         stats->num_pages),
3339                          errdetail("%.0f index row versions were removed.\n"
3340                          "%u index pages have been deleted, %u are currently reusable.\n"
3341                                            "%s.",
3342                                            stats->tuples_removed,
3343                                            stats->pages_deleted, stats->pages_free,
3344                                            pg_rusage_show(&ru0))));
3345
3346         /*
3347          * Check for tuple count mismatch.      If the index is partial, then it's OK
3348          * for it to have fewer tuples than the heap; else we got trouble.
3349          */
3350         if (stats->num_index_tuples != num_tuples + keep_tuples)
3351         {
3352                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3353                         !vac_is_partial_index(indrel))
3354                         ereport(WARNING,
3355                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3356                                                         RelationGetRelationName(indrel),
3357                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3358                                          errhint("Rebuild the index with REINDEX.")));
3359         }
3360
3361         pfree(stats);
3362 }
3363
3364 /*
3365  *      tid_reaped() -- is a particular tid reaped?
3366  *
3367  *              This has the right signature to be an IndexBulkDeleteCallback.
3368  *
3369  *              vacpagelist->VacPage_array is sorted in right order.
3370  */
3371 static bool
3372 tid_reaped(ItemPointer itemptr, void *state)
3373 {
3374         VacPageList vacpagelist = (VacPageList) state;
3375         OffsetNumber ioffno;
3376         OffsetNumber *voff;
3377         VacPage         vp,
3378                            *vpp;
3379         VacPageData vacpage;
3380
3381         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3382         ioffno = ItemPointerGetOffsetNumber(itemptr);
3383
3384         vp = &vacpage;
3385         vpp = (VacPage *) vac_bsearch((void *) &vp,
3386                                                                   (void *) (vacpagelist->pagedesc),
3387                                                                   vacpagelist->num_pages,
3388                                                                   sizeof(VacPage),
3389                                                                   vac_cmp_blk);
3390
3391         if (vpp == NULL)
3392                 return false;
3393
3394         /* ok - we are on a partially or fully reaped page */
3395         vp = *vpp;
3396
3397         if (vp->offsets_free == 0)
3398         {
3399                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3400                 return true;
3401         }
3402
3403         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3404                                                                                 (void *) (vp->offsets),
3405                                                                                 vp->offsets_free,
3406                                                                                 sizeof(OffsetNumber),
3407                                                                                 vac_cmp_offno);
3408
3409         if (voff == NULL)
3410                 return false;
3411
3412         /* tid is reaped */
3413         return true;
3414 }
3415
3416 /*
3417  * Update the shared Free Space Map with the info we now have about
3418  * free space in the relation, discarding any old info the map may have.
3419  */
3420 static void
3421 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3422                            BlockNumber rel_pages)
3423 {
3424         int                     nPages = fraged_pages->num_pages;
3425         VacPage    *pagedesc = fraged_pages->pagedesc;
3426         Size            threshold;
3427         PageFreeSpaceInfo *pageSpaces;
3428         int                     outPages;
3429         int                     i;
3430
3431         /*
3432          * We only report pages with free space at least equal to the average
3433          * request size --- this avoids cluttering FSM with uselessly-small bits
3434          * of space.  Although FSM would discard pages with little free space
3435          * anyway, it's important to do this prefiltering because (a) it reduces
3436          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3437          * FSM uses the number of pages reported as a statistic for guiding space
3438          * management.  If we didn't threshold our reports the same way
3439          * vacuumlazy.c does, we'd be skewing that statistic.
3440          */
3441         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3442
3443         pageSpaces = (PageFreeSpaceInfo *)
3444                 palloc(nPages * sizeof(PageFreeSpaceInfo));
3445         outPages = 0;
3446
3447         for (i = 0; i < nPages; i++)
3448         {
3449                 /*
3450                  * fraged_pages may contain entries for pages that we later decided to
3451                  * truncate from the relation; don't enter them into the free space
3452                  * map!
3453                  */
3454                 if (pagedesc[i]->blkno >= rel_pages)
3455                         break;
3456
3457                 if (pagedesc[i]->free >= threshold)
3458                 {
3459                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
3460                         pageSpaces[outPages].avail = pagedesc[i]->free;
3461                         outPages++;
3462                 }
3463         }
3464
3465         RecordRelationFreeSpace(&onerel->rd_node, outPages, outPages, pageSpaces);
3466
3467         pfree(pageSpaces);
3468 }
3469
3470 /* Copy a VacPage structure */
3471 static VacPage
3472 copy_vac_page(VacPage vacpage)
3473 {
3474         VacPage         newvacpage;
3475
3476         /* allocate a VacPageData entry */
3477         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3478                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3479
3480         /* fill it in */
3481         if (vacpage->offsets_free > 0)
3482                 memcpy(newvacpage->offsets, vacpage->offsets,
3483                            vacpage->offsets_free * sizeof(OffsetNumber));
3484         newvacpage->blkno = vacpage->blkno;
3485         newvacpage->free = vacpage->free;
3486         newvacpage->offsets_used = vacpage->offsets_used;
3487         newvacpage->offsets_free = vacpage->offsets_free;
3488
3489         return newvacpage;
3490 }
3491
3492 /*
3493  * Add a VacPage pointer to a VacPageList.
3494  *
3495  *              As a side effect of the way that scan_heap works,
3496  *              higher pages come after lower pages in the array
3497  *              (and highest tid on a page is last).
3498  */
3499 static void
3500 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3501 {
3502 #define PG_NPAGEDESC 1024
3503
3504         /* allocate a VacPage entry if needed */
3505         if (vacpagelist->num_pages == 0)
3506         {
3507                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3508                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3509         }
3510         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3511         {
3512                 vacpagelist->num_allocated_pages *= 2;
3513                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3514         }
3515         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3516         (vacpagelist->num_pages)++;
3517 }
3518
3519 /*
3520  * vac_bsearch: just like standard C library routine bsearch(),
3521  * except that we first test to see whether the target key is outside
3522  * the range of the table entries.      This case is handled relatively slowly
3523  * by the normal binary search algorithm (ie, no faster than any other key)
3524  * but it occurs often enough in VACUUM to be worth optimizing.
3525  */
3526 static void *
3527 vac_bsearch(const void *key, const void *base,
3528                         size_t nelem, size_t size,
3529                         int (*compar) (const void *, const void *))
3530 {
3531         int                     res;
3532         const void *last;
3533
3534         if (nelem == 0)
3535                 return NULL;
3536         res = compar(key, base);
3537         if (res < 0)
3538                 return NULL;
3539         if (res == 0)
3540                 return (void *) base;
3541         if (nelem > 1)
3542         {
3543                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3544                 res = compar(key, last);
3545                 if (res > 0)
3546                         return NULL;
3547                 if (res == 0)
3548                         return (void *) last;
3549         }
3550         if (nelem <= 2)
3551                 return NULL;                    /* already checked 'em all */
3552         return bsearch(key, base, nelem, size, compar);
3553 }
3554
3555 /*
3556  * Comparator routines for use with qsort() and bsearch().
3557  */
3558 static int
3559 vac_cmp_blk(const void *left, const void *right)
3560 {
3561         BlockNumber lblk,
3562                                 rblk;
3563
3564         lblk = (*((VacPage *) left))->blkno;
3565         rblk = (*((VacPage *) right))->blkno;
3566
3567         if (lblk < rblk)
3568                 return -1;
3569         if (lblk == rblk)
3570                 return 0;
3571         return 1;
3572 }
3573
3574 static int
3575 vac_cmp_offno(const void *left, const void *right)
3576 {
3577         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3578                 return -1;
3579         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3580                 return 0;
3581         return 1;
3582 }
3583
3584 static int
3585 vac_cmp_vtlinks(const void *left, const void *right)
3586 {
3587         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3588                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3589                 return -1;
3590         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3591                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3592                 return 1;
3593         /* bi_hi-es are equal */
3594         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3595                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3596                 return -1;
3597         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3598                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3599                 return 1;
3600         /* bi_lo-es are equal */
3601         if (((VTupleLink) left)->new_tid.ip_posid <
3602                 ((VTupleLink) right)->new_tid.ip_posid)
3603                 return -1;
3604         if (((VTupleLink) left)->new_tid.ip_posid >
3605                 ((VTupleLink) right)->new_tid.ip_posid)
3606                 return 1;
3607         return 0;
3608 }
3609
3610
3611 /*
3612  * Open all the indexes of the given relation, obtaining the specified kind
3613  * of lock on each.  Return an array of Relation pointers for the indexes
3614  * into *Irel, and the number of indexes into *nindexes.
3615  */
3616 void
3617 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3618                                  int *nindexes, Relation **Irel)
3619 {
3620         List       *indexoidlist;
3621         ListCell   *indexoidscan;
3622         int                     i;
3623
3624         Assert(lockmode != NoLock);
3625
3626         indexoidlist = RelationGetIndexList(relation);
3627
3628         *nindexes = list_length(indexoidlist);
3629
3630         if (*nindexes > 0)
3631                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3632         else
3633                 *Irel = NULL;
3634
3635         i = 0;
3636         foreach(indexoidscan, indexoidlist)
3637         {
3638                 Oid                     indexoid = lfirst_oid(indexoidscan);
3639
3640                 (*Irel)[i++] = index_open(indexoid, lockmode);
3641         }
3642
3643         list_free(indexoidlist);
3644 }
3645
3646 /*
3647  * Release the resources acquired by vac_open_indexes.  Optionally release
3648  * the locks (say NoLock to keep 'em).
3649  */
3650 void
3651 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3652 {
3653         if (Irel == NULL)
3654                 return;
3655
3656         while (nindexes--)
3657         {
3658                 Relation        ind = Irel[nindexes];
3659
3660                 index_close(ind, lockmode);
3661         }
3662         pfree(Irel);
3663 }
3664
3665
3666 /*
3667  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3668  */
3669 bool
3670 vac_is_partial_index(Relation indrel)
3671 {
3672         /*
3673          * If the index's AM doesn't support nulls, it's partial for our purposes
3674          */
3675         if (!indrel->rd_am->amindexnulls)
3676                 return true;
3677
3678         /* Otherwise, look to see if there's a partial-index predicate */
3679         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3680                 return true;
3681
3682         return false;
3683 }
3684
3685
3686 static bool
3687 enough_space(VacPage vacpage, Size len)
3688 {
3689         len = MAXALIGN(len);
3690
3691         if (len > vacpage->free)
3692                 return false;
3693
3694         /* if there are free itemid(s) and len <= free_space... */
3695         if (vacpage->offsets_used < vacpage->offsets_free)
3696                 return true;
3697
3698         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3699         if (len + sizeof(ItemIdData) <= vacpage->free)
3700                 return true;
3701
3702         return false;
3703 }
3704
3705 static Size
3706 PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
3707 {
3708         Size            freespace = PageGetHeapFreeSpace(page);
3709         Size            targetfree;
3710
3711         targetfree = RelationGetTargetPageFreeSpace(relation,
3712                                                                                                 HEAP_DEFAULT_FILLFACTOR);
3713         if (freespace > targetfree)
3714                 return freespace - targetfree;
3715         else
3716                 return 0;
3717 }
3718
3719 /*
3720  * vacuum_delay_point --- check for interrupts and cost-based delay.
3721  *
3722  * This should be called in each major loop of VACUUM processing,
3723  * typically once per page processed.
3724  */
3725 void
3726 vacuum_delay_point(void)
3727 {
3728         /* Always check for interrupts */
3729         CHECK_FOR_INTERRUPTS();
3730
3731         /* Nap if appropriate */
3732         if (VacuumCostActive && !InterruptPending &&
3733                 VacuumCostBalance >= VacuumCostLimit)
3734         {
3735                 int                     msec;
3736
3737                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3738                 if (msec > VacuumCostDelay * 4)
3739                         msec = VacuumCostDelay * 4;
3740
3741                 pg_usleep(msec * 1000L);
3742
3743                 VacuumCostBalance = 0;
3744
3745                 /* update balance values for workers */
3746                 AutoVacuumUpdateDelay();
3747
3748                 /* Might have gotten an interrupt while sleeping */
3749                 CHECK_FOR_INTERRUPTS();
3750         }
3751 }