]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Skip ambulkdelete scan if there's nothing to delete and the index is not
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.324 2006/02/11 23:31:33 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/subtrans.h"
29 #include "access/xlog.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_index.h"
34 #include "commands/dbcommands.h"
35 #include "commands/vacuum.h"
36 #include "executor/executor.h"
37 #include "miscadmin.h"
38 #include "postmaster/autovacuum.h"
39 #include "storage/freespace.h"
40 #include "storage/procarray.h"
41 #include "storage/smgr.h"
42 #include "tcop/pquery.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/flatfiles.h"
46 #include "utils/fmgroids.h"
47 #include "utils/inval.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/pg_rusage.h"
51 #include "utils/relcache.h"
52 #include "utils/syscache.h"
53 #include "pgstat.h"
54
55
56 /*
57  * VacPage structures keep track of each page on which we find useful
58  * amounts of free space.
59  */
60 typedef struct VacPageData
61 {
62         BlockNumber blkno;                      /* BlockNumber of this Page */
63         Size            free;                   /* FreeSpace on this Page */
64         uint16          offsets_used;   /* Number of OffNums used by vacuum */
65         uint16          offsets_free;   /* Number of OffNums free or to be free */
66         OffsetNumber offsets[1];        /* Array of free OffNums */
67 } VacPageData;
68
69 typedef VacPageData *VacPage;
70
71 typedef struct VacPageListData
72 {
73         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
74         int                     num_pages;              /* Number of pages in pagedesc */
75         int                     num_allocated_pages;    /* Number of allocated pages in
76                                                                                  * pagedesc */
77         VacPage    *pagedesc;           /* Descriptions of pages */
78 } VacPageListData;
79
80 typedef VacPageListData *VacPageList;
81
82 /*
83  * The "vtlinks" array keeps information about each recently-updated tuple
84  * ("recent" meaning its XMAX is too new to let us recycle the tuple).
85  * We store the tuple's own TID as well as its t_ctid (its link to the next
86  * newer tuple version).  Searching in this array allows us to follow update
87  * chains backwards from newer to older tuples.  When we move a member of an
88  * update chain, we must move *all* the live members of the chain, so that we
89  * can maintain their t_ctid link relationships (we must not just overwrite
90  * t_ctid in an existing tuple).
91  *
92  * Note: because t_ctid links can be stale (this would only occur if a prior
93  * VACUUM crashed partway through), it is possible that new_tid points to an
94  * empty slot or unrelated tuple.  We have to check the linkage as we follow
95  * it, just as is done in EvalPlanQual.
96  */
97 typedef struct VTupleLinkData
98 {
99         ItemPointerData new_tid;        /* t_ctid of an updated tuple */
100         ItemPointerData this_tid;       /* t_self of the tuple */
101 } VTupleLinkData;
102
103 typedef VTupleLinkData *VTupleLink;
104
105 /*
106  * We use an array of VTupleMoveData to plan a chain tuple move fully
107  * before we do it.
108  */
109 typedef struct VTupleMoveData
110 {
111         ItemPointerData tid;            /* tuple ID */
112         VacPage         vacpage;                /* where to move it to */
113         bool            cleanVpd;               /* clean vacpage before using? */
114 } VTupleMoveData;
115
116 typedef VTupleMoveData *VTupleMove;
117
118 /*
119  * VRelStats contains the data acquired by scan_heap for use later
120  */
121 typedef struct VRelStats
122 {
123         /* miscellaneous statistics */
124         BlockNumber rel_pages;
125         double          rel_tuples;
126         Size            min_tlen;
127         Size            max_tlen;
128         bool            hasindex;
129         /* vtlinks array for tuple chain following - sorted by new_tid */
130         int                     num_vtlinks;
131         VTupleLink      vtlinks;
132 } VRelStats;
133
134 /*----------------------------------------------------------------------
135  * ExecContext:
136  *
137  * As these variables always appear together, we put them into one struct
138  * and pull initialization and cleanup into separate routines.
139  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
140  * accurately:  It is *used* only in move_xxx_tuple(), but because this
141  * routine is called many times, we initialize the struct just once in
142  * repair_frag() and pass it on to move_xxx_tuple().
143  */
144 typedef struct ExecContextData
145 {
146         ResultRelInfo *resultRelInfo;
147         EState     *estate;
148         TupleTableSlot *slot;
149 } ExecContextData;
150
151 typedef ExecContextData *ExecContext;
152
153 static void
154 ExecContext_Init(ExecContext ec, Relation rel)
155 {
156         TupleDesc       tupdesc = RelationGetDescr(rel);
157
158         /*
159          * We need a ResultRelInfo and an EState so we can use the regular
160          * executor's index-entry-making machinery.
161          */
162         ec->estate = CreateExecutorState();
163
164         ec->resultRelInfo = makeNode(ResultRelInfo);
165         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
166         ec->resultRelInfo->ri_RelationDesc = rel;
167         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
168
169         ExecOpenIndices(ec->resultRelInfo);
170
171         ec->estate->es_result_relations = ec->resultRelInfo;
172         ec->estate->es_num_result_relations = 1;
173         ec->estate->es_result_relation_info = ec->resultRelInfo;
174
175         /* Set up a tuple slot too */
176         ec->slot = MakeSingleTupleTableSlot(tupdesc);
177 }
178
179 static void
180 ExecContext_Finish(ExecContext ec)
181 {
182         ExecDropSingleTupleTableSlot(ec->slot);
183         ExecCloseIndices(ec->resultRelInfo);
184         FreeExecutorState(ec->estate);
185 }
186
187 /*
188  * End of ExecContext Implementation
189  *----------------------------------------------------------------------
190  */
191
192 static MemoryContext vac_context = NULL;
193
194 static int      elevel = -1;
195
196 static TransactionId OldestXmin;
197 static TransactionId FreezeLimit;
198
199
200 /* non-export function prototypes */
201 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
202                          const char *stmttype);
203 static void vac_update_dbstats(Oid dbid,
204                                    TransactionId vacuumXID,
205                                    TransactionId frozenXID);
206 static void vac_truncate_clog(TransactionId vacuumXID,
207                                   TransactionId frozenXID);
208 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
209 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
210 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
211                   VacPageList vacuum_pages, VacPageList fraged_pages);
212 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
213                         VacPageList vacuum_pages, VacPageList fraged_pages,
214                         int nindexes, Relation *Irel);
215 static void move_chain_tuple(Relation rel,
216                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
217                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
218                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
219 static void move_plain_tuple(Relation rel,
220                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
221                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
222                                  ExecContext ec);
223 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
224                                  int num_fraged_pages, BlockNumber last_move_dest_block,
225                                  int num_moved);
226 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
227                         VacPageList vacpagelist);
228 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
229 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
230                          double num_tuples, int keep_tuples);
231 static void scan_index(Relation indrel, double num_tuples);
232 static bool tid_reaped(ItemPointer itemptr, void *state);
233 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
234 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
235                            BlockNumber rel_pages);
236 static VacPage copy_vac_page(VacPage vacpage);
237 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
238 static void *vac_bsearch(const void *key, const void *base,
239                         size_t nelem, size_t size,
240                         int (*compar) (const void *, const void *));
241 static int      vac_cmp_blk(const void *left, const void *right);
242 static int      vac_cmp_offno(const void *left, const void *right);
243 static int      vac_cmp_vtlinks(const void *left, const void *right);
244 static bool enough_space(VacPage vacpage, Size len);
245
246
247 /****************************************************************************
248  *                                                                                                                                                      *
249  *                      Code common to all flavors of VACUUM and ANALYZE                                *
250  *                                                                                                                                                      *
251  ****************************************************************************
252  */
253
254
255 /*
256  * Primary entry point for VACUUM and ANALYZE commands.
257  *
258  * relids is normally NIL; if it is not, then it provides the list of
259  * relation OIDs to be processed, and vacstmt->relation is ignored.
260  * (The non-NIL case is currently only used by autovacuum.)
261  *
262  * It is the caller's responsibility that both vacstmt and relids
263  * (if given) be allocated in a memory context that won't disappear
264  * at transaction commit.  In fact this context must be QueryContext
265  * to avoid complaints from PreventTransactionChain.
266  */
267 void
268 vacuum(VacuumStmt *vacstmt, List *relids)
269 {
270         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
271         TransactionId initialOldestXmin = InvalidTransactionId;
272         TransactionId initialFreezeLimit = InvalidTransactionId;
273         volatile MemoryContext anl_context = NULL;
274         volatile bool all_rels,
275                                 in_outer_xact,
276                                 use_own_xacts;
277         List       *relations;
278
279         if (vacstmt->verbose)
280                 elevel = INFO;
281         else
282                 elevel = DEBUG2;
283
284         /*
285          * We cannot run VACUUM inside a user transaction block; if we were inside
286          * a transaction, then our commit- and start-transaction-command calls
287          * would not have the intended effect! Furthermore, the forced commit that
288          * occurs before truncating the relation's file would have the effect of
289          * committing the rest of the user's transaction too, which would
290          * certainly not be the desired behavior.  (This only applies to VACUUM
291          * FULL, though.  We could in theory run lazy VACUUM inside a transaction
292          * block, but we choose to disallow that case because we'd rather commit
293          * as soon as possible after finishing the vacuum.      This is mainly so that
294          * we can let go the AccessExclusiveLock that we may be holding.)
295          *
296          * ANALYZE (without VACUUM) can run either way.
297          */
298         if (vacstmt->vacuum)
299         {
300                 PreventTransactionChain((void *) vacstmt, stmttype);
301                 in_outer_xact = false;
302         }
303         else
304                 in_outer_xact = IsInTransactionChain((void *) vacstmt);
305
306         /*
307          * Disallow the combination VACUUM FULL FREEZE; although it would mostly
308          * work, VACUUM FULL's ability to move tuples around means that it is
309          * injecting its own XID into tuple visibility checks.  We'd have to
310          * guarantee that every moved tuple is properly marked XMIN_COMMITTED or
311          * XMIN_INVALID before the end of the operation.  There are corner cases
312          * where this does not happen, and getting rid of them all seems hard (not
313          * to mention fragile to maintain).  On the whole it's not worth it
314          * compared to telling people to use two operations.  See pgsql-hackers
315          * discussion of 27-Nov-2004, and comments below for update_hint_bits().
316          *
317          * Note: this is enforced here, and not in the grammar, since (a) we can
318          * give a better error message, and (b) we might want to allow it again
319          * someday.
320          */
321         if (vacstmt->vacuum && vacstmt->full && vacstmt->freeze)
322                 ereport(ERROR,
323                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324                                  errmsg("VACUUM FULL FREEZE is not supported"),
325                                  errhint("Use VACUUM FULL, then VACUUM FREEZE.")));
326
327         /*
328          * Send info about dead objects to the statistics collector, unless
329          * we are in autovacuum --- autovacuum.c does this for itself.
330          */
331         if (vacstmt->vacuum && !IsAutoVacuumProcess())
332                 pgstat_vacuum_tabstat();
333
334         /*
335          * Create special memory context for cross-transaction storage.
336          *
337          * Since it is a child of PortalContext, it will go away eventually even
338          * if we suffer an error; there's no need for special abort cleanup logic.
339          */
340         vac_context = AllocSetContextCreate(PortalContext,
341                                                                                 "Vacuum",
342                                                                                 ALLOCSET_DEFAULT_MINSIZE,
343                                                                                 ALLOCSET_DEFAULT_INITSIZE,
344                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
345
346         /* Remember whether we are processing everything in the DB */
347         all_rels = (relids == NIL && vacstmt->relation == NULL);
348
349         /*
350          * Build list of relations to process, unless caller gave us one. (If we
351          * build one, we put it in vac_context for safekeeping.)
352          */
353         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
354
355         if (vacstmt->vacuum && all_rels)
356         {
357                 /*
358                  * It's a database-wide VACUUM.
359                  *
360                  * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
361                  * so that we can record these values at the end of the VACUUM. Note
362                  * that individual tables may well be processed with newer values, but
363                  * we can guarantee that no (non-shared) relations are processed with
364                  * older ones.
365                  *
366                  * It is okay to record non-shared values in pg_database, even though
367                  * we may vacuum shared relations with older cutoffs, because only the
368                  * minimum of the values present in pg_database matters.  We can be
369                  * sure that shared relations have at some time been vacuumed with
370                  * cutoffs no worse than the global minimum; for, if there is a
371                  * backend in some other DB with xmin = OLDXMIN that's determining the
372                  * cutoff with which we vacuum shared relations, it is not possible
373                  * for that database to have a cutoff newer than OLDXMIN recorded in
374                  * pg_database.
375                  */
376                 vacuum_set_xid_limits(vacstmt, false,
377                                                           &initialOldestXmin,
378                                                           &initialFreezeLimit);
379         }
380
381         /*
382          * Decide whether we need to start/commit our own transactions.
383          *
384          * For VACUUM (with or without ANALYZE): always do so, so that we can
385          * release locks as soon as possible.  (We could possibly use the outer
386          * transaction for a one-table VACUUM, but handling TOAST tables would be
387          * problematic.)
388          *
389          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
390          * start/commit our own transactions.  Also, there's no need to do so if
391          * only processing one relation.  For multiple relations when not within a
392          * transaction block, use own transactions so we can release locks sooner.
393          */
394         if (vacstmt->vacuum)
395                 use_own_xacts = true;
396         else
397         {
398                 Assert(vacstmt->analyze);
399                 if (in_outer_xact)
400                         use_own_xacts = false;
401                 else if (list_length(relations) > 1)
402                         use_own_xacts = true;
403                 else
404                         use_own_xacts = false;
405         }
406
407         /*
408          * If we are running ANALYZE without per-table transactions, we'll need a
409          * memory context with table lifetime.
410          */
411         if (!use_own_xacts)
412                 anl_context = AllocSetContextCreate(PortalContext,
413                                                                                         "Analyze",
414                                                                                         ALLOCSET_DEFAULT_MINSIZE,
415                                                                                         ALLOCSET_DEFAULT_INITSIZE,
416                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
417
418         /*
419          * vacuum_rel expects to be entered with no transaction active; it will
420          * start and commit its own transaction.  But we are called by an SQL
421          * command, and so we are executing inside a transaction already. We
422          * commit the transaction started in PostgresMain() here, and start
423          * another one before exiting to match the commit waiting for us back in
424          * PostgresMain().
425          */
426         if (use_own_xacts)
427         {
428                 /* matches the StartTransaction in PostgresMain() */
429                 CommitTransactionCommand();
430         }
431
432         /* Turn vacuum cost accounting on or off */
433         PG_TRY();
434         {
435                 ListCell   *cur;
436
437                 VacuumCostActive = (VacuumCostDelay > 0);
438                 VacuumCostBalance = 0;
439
440                 /*
441                  * Loop to process each selected relation.
442                  */
443                 foreach(cur, relations)
444                 {
445                         Oid                     relid = lfirst_oid(cur);
446
447                         if (vacstmt->vacuum)
448                         {
449                                 if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
450                                         all_rels = false;       /* forget about updating dbstats */
451                         }
452                         if (vacstmt->analyze)
453                         {
454                                 MemoryContext old_context = NULL;
455
456                                 /*
457                                  * If using separate xacts, start one for analyze. Otherwise,
458                                  * we can use the outer transaction, but we still need to call
459                                  * analyze_rel in a memory context that will be cleaned up on
460                                  * return (else we leak memory while processing multiple
461                                  * tables).
462                                  */
463                                 if (use_own_xacts)
464                                 {
465                                         StartTransactionCommand();
466                                         /* functions in indexes may want a snapshot set */
467                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
468                                 }
469                                 else
470                                         old_context = MemoryContextSwitchTo(anl_context);
471
472                                 /*
473                                  * Tell the buffer replacement strategy that vacuum is causing
474                                  * the IO
475                                  */
476                                 StrategyHintVacuum(true);
477
478                                 analyze_rel(relid, vacstmt);
479
480                                 StrategyHintVacuum(false);
481
482                                 if (use_own_xacts)
483                                         CommitTransactionCommand();
484                                 else
485                                 {
486                                         MemoryContextSwitchTo(old_context);
487                                         MemoryContextResetAndDeleteChildren(anl_context);
488                                 }
489                         }
490                 }
491         }
492         PG_CATCH();
493         {
494                 /* Make sure cost accounting is turned off after error */
495                 VacuumCostActive = false;
496                 PG_RE_THROW();
497         }
498         PG_END_TRY();
499
500         /* Turn off vacuum cost accounting */
501         VacuumCostActive = false;
502
503         /*
504          * Finish up processing.
505          */
506         if (use_own_xacts)
507         {
508                 /* here, we are not in a transaction */
509
510                 /*
511                  * This matches the CommitTransaction waiting for us in
512                  * PostgresMain().
513                  */
514                 StartTransactionCommand();
515                 /*
516                  * Re-establish the transaction snapshot.  This is wasted effort
517                  * when we are called as a normal utility command, because the
518                  * new transaction will be dropped immediately by PostgresMain();
519                  * but it's necessary if we are called from autovacuum because
520                  * autovacuum might continue on to do an ANALYZE-only call.
521                  */
522                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
523         }
524
525         if (vacstmt->vacuum)
526         {
527                 /*
528                  * If it was a database-wide VACUUM, print FSM usage statistics (we
529                  * don't make you be superuser to see these).
530                  */
531                 if (all_rels)
532                         PrintFreeSpaceMapStatistics(elevel);
533
534                 /*
535                  * If we completed a database-wide VACUUM without skipping any
536                  * relations, update the database's pg_database row with info about
537                  * the transaction IDs used, and try to truncate pg_clog.
538                  */
539                 if (all_rels)
540                 {
541                         vac_update_dbstats(MyDatabaseId,
542                                                            initialOldestXmin, initialFreezeLimit);
543                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
544                 }
545         }
546
547         /*
548          * Clean up working storage --- note we must do this after
549          * StartTransactionCommand, else we might be trying to delete the active
550          * context!
551          */
552         MemoryContextDelete(vac_context);
553         vac_context = NULL;
554
555         if (anl_context)
556                 MemoryContextDelete(anl_context);
557 }
558
559 /*
560  * Build a list of Oids for each relation to be processed
561  *
562  * The list is built in vac_context so that it will survive across our
563  * per-relation transactions.
564  */
565 static List *
566 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
567 {
568         List       *oid_list = NIL;
569         MemoryContext oldcontext;
570
571         /* List supplied by VACUUM's caller? */
572         if (relids)
573                 return relids;
574
575         if (vacrel)
576         {
577                 /* Process a specific relation */
578                 Oid                     relid;
579
580                 relid = RangeVarGetRelid(vacrel, false);
581
582                 /* Make a relation list entry for this guy */
583                 oldcontext = MemoryContextSwitchTo(vac_context);
584                 oid_list = lappend_oid(oid_list, relid);
585                 MemoryContextSwitchTo(oldcontext);
586         }
587         else
588         {
589                 /* Process all plain relations listed in pg_class */
590                 Relation        pgclass;
591                 HeapScanDesc scan;
592                 HeapTuple       tuple;
593                 ScanKeyData key;
594
595                 ScanKeyInit(&key,
596                                         Anum_pg_class_relkind,
597                                         BTEqualStrategyNumber, F_CHAREQ,
598                                         CharGetDatum(RELKIND_RELATION));
599
600                 pgclass = heap_open(RelationRelationId, AccessShareLock);
601
602                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
603
604                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
605                 {
606                         /* Make a relation list entry for this guy */
607                         oldcontext = MemoryContextSwitchTo(vac_context);
608                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
609                         MemoryContextSwitchTo(oldcontext);
610                 }
611
612                 heap_endscan(scan);
613                 heap_close(pgclass, AccessShareLock);
614         }
615
616         return oid_list;
617 }
618
619 /*
620  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
621  */
622 void
623 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
624                                           TransactionId *oldestXmin,
625                                           TransactionId *freezeLimit)
626 {
627         TransactionId limit;
628
629         *oldestXmin = GetOldestXmin(sharedRel);
630
631         Assert(TransactionIdIsNormal(*oldestXmin));
632
633         if (vacstmt->freeze)
634         {
635                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
636                 limit = *oldestXmin;
637         }
638         else
639         {
640                 /*
641                  * Normal case: freeze cutoff is well in the past, to wit, about
642                  * halfway to the wrap horizon
643                  */
644                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
645         }
646
647         /*
648          * Be careful not to generate a "permanent" XID
649          */
650         if (!TransactionIdIsNormal(limit))
651                 limit = FirstNormalTransactionId;
652
653         /*
654          * Ensure sane relationship of limits
655          */
656         if (TransactionIdFollows(limit, *oldestXmin))
657         {
658                 ereport(WARNING,
659                                 (errmsg("oldest xmin is far in the past"),
660                                  errhint("Close open transactions soon to avoid wraparound problems.")));
661                 limit = *oldestXmin;
662         }
663
664         *freezeLimit = limit;
665 }
666
667
668 /*
669  *      vac_update_relstats() -- update statistics for one relation
670  *
671  *              Update the whole-relation statistics that are kept in its pg_class
672  *              row.  There are additional stats that will be updated if we are
673  *              doing ANALYZE, but we always update these stats.  This routine works
674  *              for both index and heap relation entries in pg_class.
675  *
676  *              We violate no-overwrite semantics here by storing new values for the
677  *              statistics columns directly into the pg_class tuple that's already on
678  *              the page.  The reason for this is that if we updated these tuples in
679  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
680  *              by the time we got done with a vacuum cycle, most of the tuples in
681  *              pg_class would've been obsoleted.  Of course, this only works for
682  *              fixed-size never-null columns, but these are.
683  *
684  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
685  *              ANALYZE.
686  */
687 void
688 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
689                                         bool hasindex)
690 {
691         Relation        rd;
692         HeapTupleData rtup;
693         HeapTuple       ctup;
694         Form_pg_class pgcform;
695         Buffer          buffer;
696
697         /*
698          * update number of tuples and number of pages in pg_class
699          */
700         rd = heap_open(RelationRelationId, RowExclusiveLock);
701
702         ctup = SearchSysCache(RELOID,
703                                                   ObjectIdGetDatum(relid),
704                                                   0, 0, 0);
705         if (!HeapTupleIsValid(ctup))
706                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
707                          relid);
708
709         /* get the buffer cache tuple */
710         rtup.t_self = ctup->t_self;
711         ReleaseSysCache(ctup);
712         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
713                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
714                          relid);
715
716         /* ensure no one else does this at the same time */
717         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
718
719         /* overwrite the existing statistics in the tuple */
720         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
721         pgcform->relpages = (int32) num_pages;
722         pgcform->reltuples = (float4) num_tuples;
723         pgcform->relhasindex = hasindex;
724
725         /*
726          * If we have discovered that there are no indexes, then there's no
727          * primary key either.  This could be done more thoroughly...
728          */
729         if (!hasindex)
730                 pgcform->relhaspkey = false;
731
732         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
733
734         /*
735          * Invalidate the tuple in the catcaches; this also arranges to flush the
736          * relation's relcache entry.  (If we fail to commit for some reason, no
737          * flush will occur, but no great harm is done since there are no
738          * noncritical state updates here.)
739          */
740         CacheInvalidateHeapTuple(rd, &rtup);
741
742         /* Write the buffer */
743         WriteBuffer(buffer);
744
745         heap_close(rd, RowExclusiveLock);
746 }
747
748
749 /*
750  *      vac_update_dbstats() -- update statistics for one database
751  *
752  *              Update the whole-database statistics that are kept in its pg_database
753  *              row, and the flat-file copy of pg_database.
754  *
755  *              We violate no-overwrite semantics here by storing new values for the
756  *              statistics columns directly into the tuple that's already on the page.
757  *              As with vac_update_relstats, this avoids leaving dead tuples behind
758  *              after a VACUUM.
759  *
760  *              This routine is shared by full and lazy VACUUM.  Note that it is only
761  *              applied after a database-wide VACUUM operation.
762  */
763 static void
764 vac_update_dbstats(Oid dbid,
765                                    TransactionId vacuumXID,
766                                    TransactionId frozenXID)
767 {
768         Relation        relation;
769         ScanKeyData entry[1];
770         HeapScanDesc scan;
771         HeapTuple       tuple;
772         Form_pg_database dbform;
773
774         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
775
776         /* Must use a heap scan, since there's no syscache for pg_database */
777         ScanKeyInit(&entry[0],
778                                 ObjectIdAttributeNumber,
779                                 BTEqualStrategyNumber, F_OIDEQ,
780                                 ObjectIdGetDatum(dbid));
781
782         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
783
784         tuple = heap_getnext(scan, ForwardScanDirection);
785
786         if (!HeapTupleIsValid(tuple))
787                 elog(ERROR, "could not find tuple for database %u", dbid);
788
789         /* ensure no one else does this at the same time */
790         LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
791
792         dbform = (Form_pg_database) GETSTRUCT(tuple);
793
794         /* overwrite the existing statistics in the tuple */
795         dbform->datvacuumxid = vacuumXID;
796         dbform->datfrozenxid = frozenXID;
797
798         LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
799
800         /* invalidate the tuple in the cache and write the buffer */
801         CacheInvalidateHeapTuple(relation, tuple);
802         WriteNoReleaseBuffer(scan->rs_cbuf);
803
804         heap_endscan(scan);
805
806         heap_close(relation, RowExclusiveLock);
807
808         /* Mark the flat-file copy of pg_database for update at commit */
809         database_file_update_needed();
810 }
811
812
813 /*
814  *      vac_truncate_clog() -- attempt to truncate the commit log
815  *
816  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
817  *              and use it to truncate the transaction commit log (pg_clog).
818  *              Also update the XID wrap limit point maintained by varsup.c.
819  *
820  *              We also generate a warning if the system-wide oldest datfrozenxid
821  *              seems to be in danger of wrapping around.  This is a long-in-advance
822  *              warning; if we start getting uncomfortably close, GetNewTransactionId
823  *              will generate more-annoying warnings, and ultimately refuse to issue
824  *              any more new XIDs.
825  *
826  *              The passed XIDs are simply the ones I just wrote into my pg_database
827  *              entry.  They're used to initialize the "min" calculations.
828  *
829  *              This routine is shared by full and lazy VACUUM.  Note that it is only
830  *              applied after a database-wide VACUUM operation.
831  */
832 static void
833 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
834 {
835         TransactionId myXID = GetCurrentTransactionId();
836         Relation        relation;
837         HeapScanDesc scan;
838         HeapTuple       tuple;
839         int32           age;
840         NameData        oldest_datname;
841         bool            vacuumAlreadyWrapped = false;
842         bool            frozenAlreadyWrapped = false;
843
844         /* init oldest_datname to sync with my frozenXID */
845         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
846
847         /*
848          * Note: the "already wrapped" cases should now be impossible due to the
849          * defenses in GetNewTransactionId, but we keep them anyway.
850          */
851         relation = heap_open(DatabaseRelationId, AccessShareLock);
852
853         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
854
855         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
856         {
857                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
858
859                 /* Ignore non-connectable databases (eg, template0) */
860                 /* It's assumed that these have been frozen correctly */
861                 if (!dbform->datallowconn)
862                         continue;
863
864                 if (TransactionIdIsNormal(dbform->datvacuumxid))
865                 {
866                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
867                                 vacuumAlreadyWrapped = true;
868                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
869                                 vacuumXID = dbform->datvacuumxid;
870                 }
871                 if (TransactionIdIsNormal(dbform->datfrozenxid))
872                 {
873                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
874                                 frozenAlreadyWrapped = true;
875                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
876                         {
877                                 frozenXID = dbform->datfrozenxid;
878                                 namecpy(&oldest_datname, &dbform->datname);
879                         }
880                 }
881         }
882
883         heap_endscan(scan);
884
885         heap_close(relation, AccessShareLock);
886
887         /*
888          * Do not truncate CLOG if we seem to have suffered wraparound already;
889          * the computed minimum XID might be bogus.
890          */
891         if (vacuumAlreadyWrapped)
892         {
893                 ereport(WARNING,
894                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
895                                  errdetail("You may have already suffered transaction-wraparound data loss.")));
896                 return;
897         }
898
899         /* Truncate CLOG to the oldest vacuumxid */
900         TruncateCLOG(vacuumXID);
901
902         /*
903          * Do not update varsup.c if we seem to have suffered wraparound already;
904          * the computed XID might be bogus.
905          */
906         if (frozenAlreadyWrapped)
907         {
908                 ereport(WARNING,
909                                 (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
910                                  errhint("Better vacuum them soon, or you may have a wraparound failure.")));
911                 return;
912         }
913
914         /* Update the wrap limit for GetNewTransactionId */
915         SetTransactionIdLimit(frozenXID, &oldest_datname);
916
917         /* Give warning about impending wraparound problems */
918         age = (int32) (myXID - frozenXID);
919         if (age > (int32) ((MaxTransactionId >> 3) * 3))
920                 ereport(WARNING,
921                    (errmsg("database \"%s\" must be vacuumed within %u transactions",
922                                    NameStr(oldest_datname),
923                                    (MaxTransactionId >> 1) - age),
924                         errhint("To avoid a database shutdown, execute a full-database VACUUM in \"%s\".",
925                                         NameStr(oldest_datname))));
926 }
927
928
929 /****************************************************************************
930  *                                                                                                                                                      *
931  *                      Code common to both flavors of VACUUM                                                   *
932  *                                                                                                                                                      *
933  ****************************************************************************
934  */
935
936
937 /*
938  *      vacuum_rel() -- vacuum one heap relation
939  *
940  *              Returns TRUE if we actually processed the relation (or can ignore it
941  *              for some reason), FALSE if we failed to process it due to permissions
942  *              or other reasons.  (A FALSE result really means that some data
943  *              may have been left unvacuumed, so we can't update XID stats.)
944  *
945  *              Doing one heap at a time incurs extra overhead, since we need to
946  *              check that the heap exists again just before we vacuum it.      The
947  *              reason that we do this is so that vacuuming can be spread across
948  *              many small transactions.  Otherwise, two-phase locking would require
949  *              us to lock the entire database during one pass of the vacuum cleaner.
950  *
951  *              At entry and exit, we are not inside a transaction.
952  */
953 static bool
954 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
955 {
956         LOCKMODE        lmode;
957         Relation        onerel;
958         LockRelId       onerelid;
959         Oid                     toast_relid;
960         bool            result;
961
962         /* Begin a transaction for vacuuming this relation */
963         StartTransactionCommand();
964         /* functions in indexes may want a snapshot set */
965         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
966
967         /*
968          * Tell the cache replacement strategy that vacuum is causing all
969          * following IO
970          */
971         StrategyHintVacuum(true);
972
973         /*
974          * Check for user-requested abort.      Note we want this to be inside a
975          * transaction, so xact.c doesn't issue useless WARNING.
976          */
977         CHECK_FOR_INTERRUPTS();
978
979         /*
980          * Race condition -- if the pg_class tuple has gone away since the last
981          * time we saw it, we don't need to vacuum it.
982          */
983         if (!SearchSysCacheExists(RELOID,
984                                                           ObjectIdGetDatum(relid),
985                                                           0, 0, 0))
986         {
987                 StrategyHintVacuum(false);
988                 CommitTransactionCommand();
989                 return true;                    /* okay 'cause no data there */
990         }
991
992         /*
993          * Determine the type of lock we want --- hard exclusive lock for a FULL
994          * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
995          * way, we can be sure that no other backend is vacuuming the same table.
996          */
997         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
998
999         /*
1000          * Open the class, get an appropriate lock on it, and check permissions.
1001          *
1002          * We allow the user to vacuum a table if he is superuser, the table
1003          * owner, or the database owner (but in the latter case, only if it's not
1004          * a shared relation).  pg_class_ownercheck includes the superuser case.
1005          *
1006          * Note we choose to treat permissions failure as a WARNING and keep
1007          * trying to vacuum the rest of the DB --- is this appropriate?
1008          */
1009         onerel = relation_open(relid, lmode);
1010
1011         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1012                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1013         {
1014                 ereport(WARNING,
1015                                 (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1016                                                 RelationGetRelationName(onerel))));
1017                 relation_close(onerel, lmode);
1018                 StrategyHintVacuum(false);
1019                 CommitTransactionCommand();
1020                 return false;
1021         }
1022
1023         /*
1024          * Check that it's a plain table; we used to do this in get_rel_oids() but
1025          * seems safer to check after we've locked the relation.
1026          */
1027         if (onerel->rd_rel->relkind != expected_relkind)
1028         {
1029                 ereport(WARNING,
1030                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1031                                                 RelationGetRelationName(onerel))));
1032                 relation_close(onerel, lmode);
1033                 StrategyHintVacuum(false);
1034                 CommitTransactionCommand();
1035                 return false;
1036         }
1037
1038         /*
1039          * Silently ignore tables that are temp tables of other backends ---
1040          * trying to vacuum these will lead to great unhappiness, since their
1041          * contents are probably not up-to-date on disk.  (We don't throw a
1042          * warning here; it would just lead to chatter during a database-wide
1043          * VACUUM.)
1044          */
1045         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1046         {
1047                 relation_close(onerel, lmode);
1048                 StrategyHintVacuum(false);
1049                 CommitTransactionCommand();
1050                 return true;                    /* assume no long-lived data in temp tables */
1051         }
1052
1053         /*
1054          * Get a session-level lock too. This will protect our access to the
1055          * relation across multiple transactions, so that we can vacuum the
1056          * relation's TOAST table (if any) secure in the knowledge that no one is
1057          * deleting the parent relation.
1058          *
1059          * NOTE: this cannot block, even if someone else is waiting for access,
1060          * because the lock manager knows that both lock requests are from the
1061          * same process.
1062          */
1063         onerelid = onerel->rd_lockInfo.lockRelId;
1064         LockRelationForSession(&onerelid, onerel->rd_istemp, lmode);
1065
1066         /*
1067          * Remember the relation's TOAST relation for later
1068          */
1069         toast_relid = onerel->rd_rel->reltoastrelid;
1070
1071         /*
1072          * Do the actual work --- either FULL or "lazy" vacuum
1073          */
1074         if (vacstmt->full)
1075                 full_vacuum_rel(onerel, vacstmt);
1076         else
1077                 lazy_vacuum_rel(onerel, vacstmt);
1078
1079         result = true;                          /* did the vacuum */
1080
1081         /* all done with this class, but hold lock until commit */
1082         relation_close(onerel, NoLock);
1083
1084         /*
1085          * Complete the transaction and free all temporary memory used.
1086          */
1087         StrategyHintVacuum(false);
1088         CommitTransactionCommand();
1089
1090         /*
1091          * If the relation has a secondary toast rel, vacuum that too while we
1092          * still hold the session lock on the master table.  Note however that
1093          * "analyze" will not get done on the toast table.      This is good, because
1094          * the toaster always uses hardcoded index access and statistics are
1095          * totally unimportant for toast relations.
1096          */
1097         if (toast_relid != InvalidOid)
1098         {
1099                 if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
1100                         result = false;         /* failed to vacuum the TOAST table? */
1101         }
1102
1103         /*
1104          * Now release the session-level lock on the master table.
1105          */
1106         UnlockRelationForSession(&onerelid, lmode);
1107
1108         return result;
1109 }
1110
1111
1112 /****************************************************************************
1113  *                                                                                                                                                      *
1114  *                      Code for VACUUM FULL (only)                                                                             *
1115  *                                                                                                                                                      *
1116  ****************************************************************************
1117  */
1118
1119
1120 /*
1121  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1122  *
1123  *              This routine vacuums a single heap, cleans out its indexes, and
1124  *              updates its num_pages and num_tuples statistics.
1125  *
1126  *              At entry, we have already established a transaction and opened
1127  *              and locked the relation.
1128  */
1129 static void
1130 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1131 {
1132         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1133                                                                                  * clean indexes */
1134         VacPageListData fraged_pages;           /* List of pages with space enough for
1135                                                                                  * re-using */
1136         Relation   *Irel;
1137         int                     nindexes,
1138                                 i;
1139         VRelStats  *vacrelstats;
1140
1141         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
1142                                                   &OldestXmin, &FreezeLimit);
1143
1144         /*
1145          * Set up statistics-gathering machinery.
1146          */
1147         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1148         vacrelstats->rel_pages = 0;
1149         vacrelstats->rel_tuples = 0;
1150         vacrelstats->hasindex = false;
1151
1152         /* scan the heap */
1153         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1154         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1155
1156         /* Now open all indexes of the relation */
1157         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1158         if (nindexes > 0)
1159                 vacrelstats->hasindex = true;
1160
1161         /* Clean/scan index relation(s) */
1162         if (Irel != NULL)
1163         {
1164                 if (vacuum_pages.num_pages > 0)
1165                 {
1166                         for (i = 0; i < nindexes; i++)
1167                                 vacuum_index(&vacuum_pages, Irel[i],
1168                                                          vacrelstats->rel_tuples, 0);
1169                 }
1170                 else
1171                 {
1172                         /* just scan indexes to update statistic */
1173                         for (i = 0; i < nindexes; i++)
1174                                 scan_index(Irel[i], vacrelstats->rel_tuples);
1175                 }
1176         }
1177
1178         if (fraged_pages.num_pages > 0)
1179         {
1180                 /* Try to shrink heap */
1181                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1182                                         nindexes, Irel);
1183                 vac_close_indexes(nindexes, Irel, NoLock);
1184         }
1185         else
1186         {
1187                 vac_close_indexes(nindexes, Irel, NoLock);
1188                 if (vacuum_pages.num_pages > 0)
1189                 {
1190                         /* Clean pages from vacuum_pages list */
1191                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1192                 }
1193         }
1194
1195         /* update shared free space map with final free space info */
1196         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1197
1198         /* update statistics in pg_class */
1199         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1200                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
1201
1202         /* report results to the stats collector, too */
1203         pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1204                                                  vacstmt->analyze, vacrelstats->rel_tuples);
1205 }
1206
1207
1208 /*
1209  *      scan_heap() -- scan an open heap relation
1210  *
1211  *              This routine sets commit status bits, constructs vacuum_pages (list
1212  *              of pages we need to compact free space on and/or clean indexes of
1213  *              deleted tuples), constructs fraged_pages (list of pages with free
1214  *              space that tuples could be moved into), and calculates statistics
1215  *              on the number of live tuples in the heap.
1216  */
1217 static void
1218 scan_heap(VRelStats *vacrelstats, Relation onerel,
1219                   VacPageList vacuum_pages, VacPageList fraged_pages)
1220 {
1221         BlockNumber nblocks,
1222                                 blkno;
1223         HeapTupleData tuple;
1224         char       *relname;
1225         VacPage         vacpage;
1226         BlockNumber empty_pages,
1227                                 empty_end_pages;
1228         double          num_tuples,
1229                                 tups_vacuumed,
1230                                 nkeep,
1231                                 nunused;
1232         double          free_space,
1233                                 usable_free_space;
1234         Size            min_tlen = MaxTupleSize;
1235         Size            max_tlen = 0;
1236         bool            do_shrinking = true;
1237         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1238         int                     num_vtlinks = 0;
1239         int                     free_vtlinks = 100;
1240         PGRUsage        ru0;
1241
1242         pg_rusage_init(&ru0);
1243
1244         relname = RelationGetRelationName(onerel);
1245         ereport(elevel,
1246                         (errmsg("vacuuming \"%s.%s\"",
1247                                         get_namespace_name(RelationGetNamespace(onerel)),
1248                                         relname)));
1249
1250         empty_pages = empty_end_pages = 0;
1251         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1252         free_space = 0;
1253
1254         nblocks = RelationGetNumberOfBlocks(onerel);
1255
1256         /*
1257          * We initially create each VacPage item in a maximal-sized workspace,
1258          * then copy the workspace into a just-large-enough copy.
1259          */
1260         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1261
1262         for (blkno = 0; blkno < nblocks; blkno++)
1263         {
1264                 Page            page,
1265                                         tempPage = NULL;
1266                 bool            do_reap,
1267                                         do_frag;
1268                 Buffer          buf;
1269                 OffsetNumber offnum,
1270                                         maxoff;
1271                 bool            pgchanged,
1272                                         notup;
1273
1274                 vacuum_delay_point();
1275
1276                 buf = ReadBuffer(onerel, blkno);
1277                 page = BufferGetPage(buf);
1278
1279                 /*
1280                  * Since we are holding exclusive lock on the relation, no other
1281                  * backend can be accessing the page; however it is possible that the
1282                  * background writer will try to write the page if it's already marked
1283                  * dirty.  To ensure that invalid data doesn't get written to disk, we
1284                  * must take exclusive buffer lock wherever we potentially modify
1285                  * pages.
1286                  */
1287                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1288
1289                 vacpage->blkno = blkno;
1290                 vacpage->offsets_used = 0;
1291                 vacpage->offsets_free = 0;
1292
1293                 if (PageIsNew(page))
1294                 {
1295                         VacPage         vacpagecopy;
1296
1297                         ereport(WARNING,
1298                            (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1299                                            relname, blkno)));
1300                         PageInit(page, BufferGetPageSize(buf), 0);
1301                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1302                         free_space += vacpage->free;
1303                         empty_pages++;
1304                         empty_end_pages++;
1305                         vacpagecopy = copy_vac_page(vacpage);
1306                         vpage_insert(vacuum_pages, vacpagecopy);
1307                         vpage_insert(fraged_pages, vacpagecopy);
1308                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1309                         WriteBuffer(buf);
1310                         continue;
1311                 }
1312
1313                 if (PageIsEmpty(page))
1314                 {
1315                         VacPage         vacpagecopy;
1316
1317                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1318                         free_space += vacpage->free;
1319                         empty_pages++;
1320                         empty_end_pages++;
1321                         vacpagecopy = copy_vac_page(vacpage);
1322                         vpage_insert(vacuum_pages, vacpagecopy);
1323                         vpage_insert(fraged_pages, vacpagecopy);
1324                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1325                         ReleaseBuffer(buf);
1326                         continue;
1327                 }
1328
1329                 pgchanged = false;
1330                 notup = true;
1331                 maxoff = PageGetMaxOffsetNumber(page);
1332                 for (offnum = FirstOffsetNumber;
1333                          offnum <= maxoff;
1334                          offnum = OffsetNumberNext(offnum))
1335                 {
1336                         ItemId          itemid = PageGetItemId(page, offnum);
1337                         bool            tupgone = false;
1338
1339                         /*
1340                          * Collect un-used items too - it's possible to have indexes
1341                          * pointing here after crash.
1342                          */
1343                         if (!ItemIdIsUsed(itemid))
1344                         {
1345                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1346                                 nunused += 1;
1347                                 continue;
1348                         }
1349
1350                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1351                         tuple.t_len = ItemIdGetLength(itemid);
1352                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1353
1354                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1355                         {
1356                                 case HEAPTUPLE_DEAD:
1357                                         tupgone = true;         /* we can delete the tuple */
1358                                         break;
1359                                 case HEAPTUPLE_LIVE:
1360
1361                                         /*
1362                                          * Tuple is good.  Consider whether to replace its xmin
1363                                          * value with FrozenTransactionId.
1364                                          */
1365                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1366                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1367                                                                                           FreezeLimit))
1368                                         {
1369                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1370                                                 /* infomask should be okay already */
1371                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1372                                                 pgchanged = true;
1373                                         }
1374
1375                                         /*
1376                                          * Other checks...
1377                                          */
1378                                         if (onerel->rd_rel->relhasoids &&
1379                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1380                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1381                                                          relname, blkno, offnum);
1382                                         break;
1383                                 case HEAPTUPLE_RECENTLY_DEAD:
1384
1385                                         /*
1386                                          * If tuple is recently deleted then we must not remove it
1387                                          * from relation.
1388                                          */
1389                                         nkeep += 1;
1390
1391                                         /*
1392                                          * If we do shrinking and this tuple is updated one then
1393                                          * remember it to construct updated tuple dependencies.
1394                                          */
1395                                         if (do_shrinking &&
1396                                                 !(ItemPointerEquals(&(tuple.t_self),
1397                                                                                         &(tuple.t_data->t_ctid))))
1398                                         {
1399                                                 if (free_vtlinks == 0)
1400                                                 {
1401                                                         free_vtlinks = 1000;
1402                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1403                                                                                            (free_vtlinks + num_vtlinks) *
1404                                                                                                          sizeof(VTupleLinkData));
1405                                                 }
1406                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1407                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1408                                                 free_vtlinks--;
1409                                                 num_vtlinks++;
1410                                         }
1411                                         break;
1412                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1413
1414                                         /*
1415                                          * This should not happen, since we hold exclusive lock on
1416                                          * the relation; shouldn't we raise an error? (Actually,
1417                                          * it can happen in system catalogs, since we tend to
1418                                          * release write lock before commit there.)
1419                                          */
1420                                         ereport(NOTICE,
1421                                                         (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
1422                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
1423                                         do_shrinking = false;
1424                                         break;
1425                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1426
1427                                         /*
1428                                          * This should not happen, since we hold exclusive lock on
1429                                          * the relation; shouldn't we raise an error? (Actually,
1430                                          * it can happen in system catalogs, since we tend to
1431                                          * release write lock before commit there.)
1432                                          */
1433                                         ereport(NOTICE,
1434                                                         (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
1435                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
1436                                         do_shrinking = false;
1437                                         break;
1438                                 default:
1439                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1440                                         break;
1441                         }
1442
1443                         if (tupgone)
1444                         {
1445                                 ItemId          lpp;
1446
1447                                 /*
1448                                  * Here we are building a temporary copy of the page with dead
1449                                  * tuples removed.      Below we will apply
1450                                  * PageRepairFragmentation to the copy, so that we can
1451                                  * determine how much space will be available after removal of
1452                                  * dead tuples.  But note we are NOT changing the real page
1453                                  * yet...
1454                                  */
1455                                 if (tempPage == NULL)
1456                                 {
1457                                         Size            pageSize;
1458
1459                                         pageSize = PageGetPageSize(page);
1460                                         tempPage = (Page) palloc(pageSize);
1461                                         memcpy(tempPage, page, pageSize);
1462                                 }
1463
1464                                 /* mark it unused on the temp page */
1465                                 lpp = PageGetItemId(tempPage, offnum);
1466                                 lpp->lp_flags &= ~LP_USED;
1467
1468                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1469                                 tups_vacuumed += 1;
1470                         }
1471                         else
1472                         {
1473                                 num_tuples += 1;
1474                                 notup = false;
1475                                 if (tuple.t_len < min_tlen)
1476                                         min_tlen = tuple.t_len;
1477                                 if (tuple.t_len > max_tlen)
1478                                         max_tlen = tuple.t_len;
1479                         }
1480                 }                                               /* scan along page */
1481
1482                 if (tempPage != NULL)
1483                 {
1484                         /* Some tuples are removable; figure free space after removal */
1485                         PageRepairFragmentation(tempPage, NULL);
1486                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1487                         pfree(tempPage);
1488                         do_reap = true;
1489                 }
1490                 else
1491                 {
1492                         /* Just use current available space */
1493                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1494                         /* Need to reap the page if it has ~LP_USED line pointers */
1495                         do_reap = (vacpage->offsets_free > 0);
1496                 }
1497
1498                 free_space += vacpage->free;
1499
1500                 /*
1501                  * Add the page to fraged_pages if it has a useful amount of free
1502                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1503                  * don't know that accurately near the start of the relation, so add
1504                  * pages unconditionally if they have >= BLCKSZ/10 free space.
1505                  */
1506                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1507
1508                 if (do_reap || do_frag)
1509                 {
1510                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1511
1512                         if (do_reap)
1513                                 vpage_insert(vacuum_pages, vacpagecopy);
1514                         if (do_frag)
1515                                 vpage_insert(fraged_pages, vacpagecopy);
1516                 }
1517
1518                 /*
1519                  * Include the page in empty_end_pages if it will be empty after
1520                  * vacuuming; this is to keep us from using it as a move destination.
1521                  */
1522                 if (notup)
1523                 {
1524                         empty_pages++;
1525                         empty_end_pages++;
1526                 }
1527                 else
1528                         empty_end_pages = 0;
1529
1530                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1531                 if (pgchanged)
1532                         WriteBuffer(buf);
1533                 else
1534                         ReleaseBuffer(buf);
1535         }
1536
1537         pfree(vacpage);
1538
1539         /* save stats in the rel list for use later */
1540         vacrelstats->rel_tuples = num_tuples;
1541         vacrelstats->rel_pages = nblocks;
1542         if (num_tuples == 0)
1543                 min_tlen = max_tlen = 0;
1544         vacrelstats->min_tlen = min_tlen;
1545         vacrelstats->max_tlen = max_tlen;
1546
1547         vacuum_pages->empty_end_pages = empty_end_pages;
1548         fraged_pages->empty_end_pages = empty_end_pages;
1549
1550         /*
1551          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1552          * remove any "empty" end-pages from the list, and compute usable free
1553          * space = free space in remaining pages.
1554          */
1555         if (do_shrinking)
1556         {
1557                 int                     i;
1558
1559                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1560                 fraged_pages->num_pages -= empty_end_pages;
1561                 usable_free_space = 0;
1562                 for (i = 0; i < fraged_pages->num_pages; i++)
1563                         usable_free_space += fraged_pages->pagedesc[i]->free;
1564         }
1565         else
1566         {
1567                 fraged_pages->num_pages = 0;
1568                 usable_free_space = 0;
1569         }
1570
1571         /* don't bother to save vtlinks if we will not call repair_frag */
1572         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1573         {
1574                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1575                           vac_cmp_vtlinks);
1576                 vacrelstats->vtlinks = vtlinks;
1577                 vacrelstats->num_vtlinks = num_vtlinks;
1578         }
1579         else
1580         {
1581                 vacrelstats->vtlinks = NULL;
1582                 vacrelstats->num_vtlinks = 0;
1583                 pfree(vtlinks);
1584         }
1585
1586         ereport(elevel,
1587                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1588                                         RelationGetRelationName(onerel),
1589                                         tups_vacuumed, num_tuples, nblocks),
1590                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1591                           "Nonremovable row versions range from %lu to %lu bytes long.\n"
1592                                            "There were %.0f unused item pointers.\n"
1593            "Total free space (including removable row versions) is %.0f bytes.\n"
1594                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1595          "%u pages containing %.0f free bytes are potential move destinations.\n"
1596                                            "%s.",
1597                                            nkeep,
1598                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1599                                            nunused,
1600                                            free_space,
1601                                            empty_pages, empty_end_pages,
1602                                            fraged_pages->num_pages, usable_free_space,
1603                                            pg_rusage_show(&ru0))));
1604 }
1605
1606
1607 /*
1608  *      repair_frag() -- try to repair relation's fragmentation
1609  *
1610  *              This routine marks dead tuples as unused and tries re-use dead space
1611  *              by moving tuples (and inserting indexes if needed). It constructs
1612  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1613  *              for them after committing (in hack-manner - without losing locks
1614  *              and freeing memory!) current transaction. It truncates relation
1615  *              if some end-blocks are gone away.
1616  */
1617 static void
1618 repair_frag(VRelStats *vacrelstats, Relation onerel,
1619                         VacPageList vacuum_pages, VacPageList fraged_pages,
1620                         int nindexes, Relation *Irel)
1621 {
1622         TransactionId myXID = GetCurrentTransactionId();
1623         Buffer          dst_buffer = InvalidBuffer;
1624         BlockNumber nblocks,
1625                                 blkno;
1626         BlockNumber last_move_dest_block = 0,
1627                                 last_vacuum_block;
1628         Page            dst_page = NULL;
1629         ExecContextData ec;
1630         VacPageListData Nvacpagelist;
1631         VacPage         dst_vacpage = NULL,
1632                                 last_vacuum_page,
1633                                 vacpage,
1634                            *curpage;
1635         int                     i;
1636         int                     num_moved = 0,
1637                                 num_fraged_pages,
1638                                 vacuumed_pages;
1639         int                     keep_tuples = 0;
1640         PGRUsage        ru0;
1641
1642         pg_rusage_init(&ru0);
1643
1644         ExecContext_Init(&ec, onerel);
1645
1646         Nvacpagelist.num_pages = 0;
1647         num_fraged_pages = fraged_pages->num_pages;
1648         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1649         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1650         if (vacuumed_pages > 0)
1651         {
1652                 /* get last reaped page from vacuum_pages */
1653                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1654                 last_vacuum_block = last_vacuum_page->blkno;
1655         }
1656         else
1657         {
1658                 last_vacuum_page = NULL;
1659                 last_vacuum_block = InvalidBlockNumber;
1660         }
1661
1662         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1663         vacpage->offsets_used = vacpage->offsets_free = 0;
1664
1665         /*
1666          * Scan pages backwards from the last nonempty page, trying to move tuples
1667          * down to lower pages.  Quit when we reach a page that we have moved any
1668          * tuples onto, or the first page if we haven't moved anything, or when we
1669          * find a page we cannot completely empty (this last condition is handled
1670          * by "break" statements within the loop).
1671          *
1672          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1673          * in order by blkno.
1674          */
1675         nblocks = vacrelstats->rel_pages;
1676         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1677                  blkno > last_move_dest_block;
1678                  blkno--)
1679         {
1680                 Buffer          buf;
1681                 Page            page;
1682                 OffsetNumber offnum,
1683                                         maxoff;
1684                 bool            isempty,
1685                                         dowrite,
1686                                         chain_tuple_moved;
1687
1688                 vacuum_delay_point();
1689
1690                 /*
1691                  * Forget fraged_pages pages at or after this one; they're no longer
1692                  * useful as move targets, since we only want to move down. Note that
1693                  * since we stop the outer loop at last_move_dest_block, pages removed
1694                  * here cannot have had anything moved onto them already.
1695                  *
1696                  * Also note that we don't change the stored fraged_pages list, only
1697                  * our local variable num_fraged_pages; so the forgotten pages are
1698                  * still available to be loaded into the free space map later.
1699                  */
1700                 while (num_fraged_pages > 0 &&
1701                            fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1702                 {
1703                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1704                         --num_fraged_pages;
1705                 }
1706
1707                 /*
1708                  * Process this page of relation.
1709                  */
1710                 buf = ReadBuffer(onerel, blkno);
1711                 page = BufferGetPage(buf);
1712
1713                 vacpage->offsets_free = 0;
1714
1715                 isempty = PageIsEmpty(page);
1716
1717                 dowrite = false;
1718
1719                 /* Is the page in the vacuum_pages list? */
1720                 if (blkno == last_vacuum_block)
1721                 {
1722                         if (last_vacuum_page->offsets_free > 0)
1723                         {
1724                                 /* there are dead tuples on this page - clean them */
1725                                 Assert(!isempty);
1726                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1727                                 vacuum_page(onerel, buf, last_vacuum_page);
1728                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1729                                 dowrite = true;
1730                         }
1731                         else
1732                                 Assert(isempty);
1733                         --vacuumed_pages;
1734                         if (vacuumed_pages > 0)
1735                         {
1736                                 /* get prev reaped page from vacuum_pages */
1737                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1738                                 last_vacuum_block = last_vacuum_page->blkno;
1739                         }
1740                         else
1741                         {
1742                                 last_vacuum_page = NULL;
1743                                 last_vacuum_block = InvalidBlockNumber;
1744                         }
1745                         if (isempty)
1746                         {
1747                                 ReleaseBuffer(buf);
1748                                 continue;
1749                         }
1750                 }
1751                 else
1752                         Assert(!isempty);
1753
1754                 chain_tuple_moved = false;              /* no one chain-tuple was moved off
1755                                                                                  * this page, yet */
1756                 vacpage->blkno = blkno;
1757                 maxoff = PageGetMaxOffsetNumber(page);
1758                 for (offnum = FirstOffsetNumber;
1759                          offnum <= maxoff;
1760                          offnum = OffsetNumberNext(offnum))
1761                 {
1762                         Size            tuple_len;
1763                         HeapTupleData tuple;
1764                         ItemId          itemid = PageGetItemId(page, offnum);
1765
1766                         if (!ItemIdIsUsed(itemid))
1767                                 continue;
1768
1769                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1770                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1771                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1772
1773                         /* ---
1774                          * VACUUM FULL has an exclusive lock on the relation.  So
1775                          * normally no other transaction can have pending INSERTs or
1776                          * DELETEs in this relation.  A tuple is either:
1777                          *              (a) a tuple in a system catalog, inserted or deleted
1778                          *                      by a not yet committed transaction
1779                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1780                          *                      is visible to all active transactions)
1781                          *              (c) inserted by a committed xact (XMIN_COMMITTED)
1782                          *              (d) moved by the currently running VACUUM.
1783                          *              (e) deleted (XMAX_COMMITTED) but at least one active
1784                          *                      transaction does not see the deleting transaction
1785                          * In case (a) we wouldn't be in repair_frag() at all.
1786                          * In case (b) we cannot be here, because scan_heap() has
1787                          * already marked the item as unused, see continue above. Case
1788                          * (c) is what normally is to be expected. Case (d) is only
1789                          * possible, if a whole tuple chain has been moved while
1790                          * processing this or a higher numbered block.
1791                          * ---
1792                          */
1793                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1794                         {
1795                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1796                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1797                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
1798                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
1799
1800                                 /*
1801                                  * MOVED_OFF by another VACUUM would have caused the
1802                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
1803                                  */
1804                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1805                                         elog(ERROR, "invalid XVAC in tuple header");
1806
1807                                 /*
1808                                  * If this (chain) tuple is moved by me already then I have to
1809                                  * check is it in vacpage or not - i.e. is it moved while
1810                                  * cleaning this page or some previous one.
1811                                  */
1812
1813                                 /* Can't we Assert(keep_tuples > 0) here? */
1814                                 if (keep_tuples == 0)
1815                                         continue;
1816                                 if (chain_tuple_moved)
1817                                 {
1818                                         /* some chains were moved while cleaning this page */
1819                                         Assert(vacpage->offsets_free > 0);
1820                                         for (i = 0; i < vacpage->offsets_free; i++)
1821                                         {
1822                                                 if (vacpage->offsets[i] == offnum)
1823                                                         break;
1824                                         }
1825                                         if (i >= vacpage->offsets_free)         /* not found */
1826                                         {
1827                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1828                                                 keep_tuples--;
1829                                         }
1830                                 }
1831                                 else
1832                                 {
1833                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1834                                         keep_tuples--;
1835                                 }
1836                                 continue;
1837                         }
1838
1839                         /*
1840                          * If this tuple is in a chain of tuples created in updates by
1841                          * "recent" transactions then we have to move the whole chain of
1842                          * tuples to other places, so that we can write new t_ctid links
1843                          * that preserve the chain relationship.
1844                          *
1845                          * This test is complicated.  Read it as "if tuple is a recently
1846                          * created updated version, OR if it is an obsoleted version". (In
1847                          * the second half of the test, we needn't make any check on XMAX
1848                          * --- it must be recently obsoleted, else scan_heap would have
1849                          * deemed it removable.)
1850                          *
1851                          * NOTE: this test is not 100% accurate: it is possible for a
1852                          * tuple to be an updated one with recent xmin, and yet not match
1853                          * any new_tid entry in the vtlinks list.  Presumably there was
1854                          * once a parent tuple with xmax matching the xmin, but it's
1855                          * possible that that tuple has been removed --- for example, if
1856                          * it had xmin = xmax and wasn't itself an updated version, then
1857                          * HeapTupleSatisfiesVacuum would deem it removable as soon as the
1858                          * xmin xact completes.
1859                          *
1860                          * To be on the safe side, we abandon the repair_frag process if
1861                          * we cannot find the parent tuple in vtlinks.  This may be overly
1862                          * conservative; AFAICS it would be safe to move the chain.
1863                          */
1864                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1865                                  !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1866                                                                                 OldestXmin)) ||
1867                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1868                                                                                            HEAP_IS_LOCKED)) &&
1869                                  !(ItemPointerEquals(&(tuple.t_self),
1870                                                                          &(tuple.t_data->t_ctid)))))
1871                         {
1872                                 Buffer          Cbuf = buf;
1873                                 bool            freeCbuf = false;
1874                                 bool            chain_move_failed = false;
1875                                 ItemPointerData Ctid;
1876                                 HeapTupleData tp = tuple;
1877                                 Size            tlen = tuple_len;
1878                                 VTupleMove      vtmove;
1879                                 int                     num_vtmove;
1880                                 int                     free_vtmove;
1881                                 VacPage         to_vacpage = NULL;
1882                                 int                     to_item = 0;
1883                                 int                     ti;
1884
1885                                 if (dst_buffer != InvalidBuffer)
1886                                 {
1887                                         WriteBuffer(dst_buffer);
1888                                         dst_buffer = InvalidBuffer;
1889                                 }
1890
1891                                 /* Quick exit if we have no vtlinks to search in */
1892                                 if (vacrelstats->vtlinks == NULL)
1893                                 {
1894                                         elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1895                                         break;          /* out of walk-along-page loop */
1896                                 }
1897
1898                                 /*
1899                                  * If this tuple is in the begin/middle of the chain then we
1900                                  * have to move to the end of chain.  As with any t_ctid
1901                                  * chase, we have to verify that each new tuple is really the
1902                                  * descendant of the tuple we came from.
1903                                  */
1904                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1905                                                                                                   HEAP_IS_LOCKED)) &&
1906                                            !(ItemPointerEquals(&(tp.t_self),
1907                                                                                    &(tp.t_data->t_ctid))))
1908                                 {
1909                                         ItemPointerData nextTid;
1910                                         TransactionId priorXmax;
1911                                         Buffer          nextBuf;
1912                                         Page            nextPage;
1913                                         OffsetNumber nextOffnum;
1914                                         ItemId          nextItemid;
1915                                         HeapTupleHeader nextTdata;
1916
1917                                         nextTid = tp.t_data->t_ctid;
1918                                         priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
1919                                         /* assume block# is OK (see heap_fetch comments) */
1920                                         nextBuf = ReadBuffer(onerel,
1921                                                                                  ItemPointerGetBlockNumber(&nextTid));
1922                                         nextPage = BufferGetPage(nextBuf);
1923                                         /* If bogus or unused slot, assume tp is end of chain */
1924                                         nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
1925                                         if (nextOffnum < FirstOffsetNumber ||
1926                                                 nextOffnum > PageGetMaxOffsetNumber(nextPage))
1927                                         {
1928                                                 ReleaseBuffer(nextBuf);
1929                                                 break;
1930                                         }
1931                                         nextItemid = PageGetItemId(nextPage, nextOffnum);
1932                                         if (!ItemIdIsUsed(nextItemid))
1933                                         {
1934                                                 ReleaseBuffer(nextBuf);
1935                                                 break;
1936                                         }
1937                                         /* if not matching XMIN, assume tp is end of chain */
1938                                         nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
1939                                                                                                                           nextItemid);
1940                                         if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
1941                                                                                          priorXmax))
1942                                         {
1943                                                 ReleaseBuffer(nextBuf);
1944                                                 break;
1945                                         }
1946                                         /* OK, switch our attention to the next tuple in chain */
1947                                         tp.t_data = nextTdata;
1948                                         tp.t_self = nextTid;
1949                                         tlen = tp.t_len = ItemIdGetLength(nextItemid);
1950                                         if (freeCbuf)
1951                                                 ReleaseBuffer(Cbuf);
1952                                         Cbuf = nextBuf;
1953                                         freeCbuf = true;
1954                                 }
1955
1956                                 /* Set up workspace for planning the chain move */
1957                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1958                                 num_vtmove = 0;
1959                                 free_vtmove = 100;
1960
1961                                 /*
1962                                  * Now, walk backwards up the chain (towards older tuples) and
1963                                  * check if all items in chain can be moved.  We record all
1964                                  * the moves that need to be made in the vtmove array.
1965                                  */
1966                                 for (;;)
1967                                 {
1968                                         Buffer          Pbuf;
1969                                         Page            Ppage;
1970                                         ItemId          Pitemid;
1971                                         HeapTupleHeader PTdata;
1972                                         VTupleLinkData vtld,
1973                                                            *vtlp;
1974
1975                                         /* Identify a target page to move this tuple to */
1976                                         if (to_vacpage == NULL ||
1977                                                 !enough_space(to_vacpage, tlen))
1978                                         {
1979                                                 for (i = 0; i < num_fraged_pages; i++)
1980                                                 {
1981                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1982                                                                 break;
1983                                                 }
1984
1985                                                 if (i == num_fraged_pages)
1986                                                 {
1987                                                         /* can't move item anywhere */
1988                                                         chain_move_failed = true;
1989                                                         break;          /* out of check-all-items loop */
1990                                                 }
1991                                                 to_item = i;
1992                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1993                                         }
1994                                         to_vacpage->free -= MAXALIGN(tlen);
1995                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1996                                                 to_vacpage->free -= sizeof(ItemIdData);
1997                                         (to_vacpage->offsets_used)++;
1998
1999                                         /* Add an entry to vtmove list */
2000                                         if (free_vtmove == 0)
2001                                         {
2002                                                 free_vtmove = 1000;
2003                                                 vtmove = (VTupleMove)
2004                                                         repalloc(vtmove,
2005                                                                          (free_vtmove + num_vtmove) *
2006                                                                          sizeof(VTupleMoveData));
2007                                         }
2008                                         vtmove[num_vtmove].tid = tp.t_self;
2009                                         vtmove[num_vtmove].vacpage = to_vacpage;
2010                                         if (to_vacpage->offsets_used == 1)
2011                                                 vtmove[num_vtmove].cleanVpd = true;
2012                                         else
2013                                                 vtmove[num_vtmove].cleanVpd = false;
2014                                         free_vtmove--;
2015                                         num_vtmove++;
2016
2017                                         /* Done if at beginning of chain */
2018                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2019                                          TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
2020                                                                                    OldestXmin))
2021                                                 break;  /* out of check-all-items loop */
2022
2023                                         /* Move to tuple with prior row version */
2024                                         vtld.new_tid = tp.t_self;
2025                                         vtlp = (VTupleLink)
2026                                                 vac_bsearch((void *) &vtld,
2027                                                                         (void *) (vacrelstats->vtlinks),
2028                                                                         vacrelstats->num_vtlinks,
2029                                                                         sizeof(VTupleLinkData),
2030                                                                         vac_cmp_vtlinks);
2031                                         if (vtlp == NULL)
2032                                         {
2033                                                 /* see discussion above */
2034                                                 elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
2035                                                 chain_move_failed = true;
2036                                                 break;  /* out of check-all-items loop */
2037                                         }
2038                                         tp.t_self = vtlp->this_tid;
2039                                         Pbuf = ReadBuffer(onerel,
2040                                                                         ItemPointerGetBlockNumber(&(tp.t_self)));
2041                                         Ppage = BufferGetPage(Pbuf);
2042                                         Pitemid = PageGetItemId(Ppage,
2043                                                                    ItemPointerGetOffsetNumber(&(tp.t_self)));
2044                                         /* this can't happen since we saw tuple earlier: */
2045                                         if (!ItemIdIsUsed(Pitemid))
2046                                                 elog(ERROR, "parent itemid marked as unused");
2047                                         PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2048
2049                                         /* ctid should not have changed since we saved it */
2050                                         Assert(ItemPointerEquals(&(vtld.new_tid),
2051                                                                                          &(PTdata->t_ctid)));
2052
2053                                         /*
2054                                          * Read above about cases when !ItemIdIsUsed(nextItemid)
2055                                          * (child item is removed)... Due to the fact that at the
2056                                          * moment we don't remove unuseful part of update-chain,
2057                                          * it's possible to get non-matching parent row here. Like
2058                                          * as in the case which caused this problem, we stop
2059                                          * shrinking here. I could try to find real parent row but
2060                                          * want not to do it because of real solution will be
2061                                          * implemented anyway, later, and we are too close to 6.5
2062                                          * release. - vadim 06/11/99
2063                                          */
2064                                         if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
2065                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2066                                                                                  HeapTupleHeaderGetXmin(tp.t_data))))
2067                                         {
2068                                                 ReleaseBuffer(Pbuf);
2069                                                 elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
2070                                                 chain_move_failed = true;
2071                                                 break;  /* out of check-all-items loop */
2072                                         }
2073                                         tp.t_data = PTdata;
2074                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2075                                         if (freeCbuf)
2076                                                 ReleaseBuffer(Cbuf);
2077                                         Cbuf = Pbuf;
2078                                         freeCbuf = true;
2079                                 }                               /* end of check-all-items loop */
2080
2081                                 if (freeCbuf)
2082                                         ReleaseBuffer(Cbuf);
2083                                 freeCbuf = false;
2084
2085                                 if (chain_move_failed)
2086                                 {
2087                                         /*
2088                                          * Undo changes to offsets_used state.  We don't bother
2089                                          * cleaning up the amount-free state, since we're not
2090                                          * going to do any further tuple motion.
2091                                          */
2092                                         for (i = 0; i < num_vtmove; i++)
2093                                         {
2094                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2095                                                 (vtmove[i].vacpage->offsets_used)--;
2096                                         }
2097                                         pfree(vtmove);
2098                                         break;          /* out of walk-along-page loop */
2099                                 }
2100
2101                                 /*
2102                                  * Okay, move the whole tuple chain in reverse order.
2103                                  *
2104                                  * Ctid tracks the new location of the previously-moved tuple.
2105                                  */
2106                                 ItemPointerSetInvalid(&Ctid);
2107                                 for (ti = 0; ti < num_vtmove; ti++)
2108                                 {
2109                                         VacPage         destvacpage = vtmove[ti].vacpage;
2110                                         Page            Cpage;
2111                                         ItemId          Citemid;
2112
2113                                         /* Get page to move from */
2114                                         tuple.t_self = vtmove[ti].tid;
2115                                         Cbuf = ReadBuffer(onerel,
2116                                                                  ItemPointerGetBlockNumber(&(tuple.t_self)));
2117
2118                                         /* Get page to move to */
2119                                         dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
2120
2121                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2122                                         if (dst_buffer != Cbuf)
2123                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2124
2125                                         dst_page = BufferGetPage(dst_buffer);
2126                                         Cpage = BufferGetPage(Cbuf);
2127
2128                                         Citemid = PageGetItemId(Cpage,
2129                                                                 ItemPointerGetOffsetNumber(&(tuple.t_self)));
2130                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2131                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2132
2133                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2134                                                                          dst_buffer, dst_page, destvacpage,
2135                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2136
2137                                         num_moved++;
2138                                         if (destvacpage->blkno > last_move_dest_block)
2139                                                 last_move_dest_block = destvacpage->blkno;
2140
2141                                         /*
2142                                          * Remember that we moved tuple from the current page
2143                                          * (corresponding index tuple will be cleaned).
2144                                          */
2145                                         if (Cbuf == buf)
2146                                                 vacpage->offsets[vacpage->offsets_free++] =
2147                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2148                                         else
2149                                                 keep_tuples++;
2150
2151                                         WriteBuffer(dst_buffer);
2152                                         WriteBuffer(Cbuf);
2153                                 }                               /* end of move-the-tuple-chain loop */
2154
2155                                 dst_buffer = InvalidBuffer;
2156                                 pfree(vtmove);
2157                                 chain_tuple_moved = true;
2158
2159                                 /* advance to next tuple in walk-along-page loop */
2160                                 continue;
2161                         }                                       /* end of is-tuple-in-chain test */
2162
2163                         /* try to find new page for this tuple */
2164                         if (dst_buffer == InvalidBuffer ||
2165                                 !enough_space(dst_vacpage, tuple_len))
2166                         {
2167                                 if (dst_buffer != InvalidBuffer)
2168                                 {
2169                                         WriteBuffer(dst_buffer);
2170                                         dst_buffer = InvalidBuffer;
2171                                 }
2172                                 for (i = 0; i < num_fraged_pages; i++)
2173                                 {
2174                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2175                                                 break;
2176                                 }
2177                                 if (i == num_fraged_pages)
2178                                         break;          /* can't move item anywhere */
2179                                 dst_vacpage = fraged_pages->pagedesc[i];
2180                                 dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
2181                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2182                                 dst_page = BufferGetPage(dst_buffer);
2183                                 /* if this page was not used before - clean it */
2184                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2185                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2186                         }
2187                         else
2188                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2189
2190                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2191
2192                         move_plain_tuple(onerel, buf, page, &tuple,
2193                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2194
2195                         num_moved++;
2196                         if (dst_vacpage->blkno > last_move_dest_block)
2197                                 last_move_dest_block = dst_vacpage->blkno;
2198
2199                         /*
2200                          * Remember that we moved tuple from the current page
2201                          * (corresponding index tuple will be cleaned).
2202                          */
2203                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2204                 }                                               /* walk along page */
2205
2206                 /*
2207                  * If we broke out of the walk-along-page loop early (ie, still have
2208                  * offnum <= maxoff), then we failed to move some tuple off this page.
2209                  * No point in shrinking any more, so clean up and exit the per-page
2210                  * loop.
2211                  */
2212                 if (offnum < maxoff && keep_tuples > 0)
2213                 {
2214                         OffsetNumber off;
2215
2216                         /*
2217                          * Fix vacpage state for any unvisited tuples remaining on page
2218                          */
2219                         for (off = OffsetNumberNext(offnum);
2220                                  off <= maxoff;
2221                                  off = OffsetNumberNext(off))
2222                         {
2223                                 ItemId          itemid = PageGetItemId(page, off);
2224                                 HeapTupleHeader htup;
2225
2226                                 if (!ItemIdIsUsed(itemid))
2227                                         continue;
2228                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2229                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2230                                         continue;
2231
2232                                 /*
2233                                  * See comments in the walk-along-page loop above about why
2234                                  * only MOVED_OFF tuples should be found here.
2235                                  */
2236                                 if (htup->t_infomask & HEAP_MOVED_IN)
2237                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2238                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2239                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2240                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2241                                         elog(ERROR, "invalid XVAC in tuple header");
2242
2243                                 if (chain_tuple_moved)
2244                                 {
2245                                         /* some chains were moved while cleaning this page */
2246                                         Assert(vacpage->offsets_free > 0);
2247                                         for (i = 0; i < vacpage->offsets_free; i++)
2248                                         {
2249                                                 if (vacpage->offsets[i] == off)
2250                                                         break;
2251                                         }
2252                                         if (i >= vacpage->offsets_free)         /* not found */
2253                                         {
2254                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2255                                                 Assert(keep_tuples > 0);
2256                                                 keep_tuples--;
2257                                         }
2258                                 }
2259                                 else
2260                                 {
2261                                         vacpage->offsets[vacpage->offsets_free++] = off;
2262                                         Assert(keep_tuples > 0);
2263                                         keep_tuples--;
2264                                 }
2265                         }
2266                 }
2267
2268                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2269                 {
2270                         if (chain_tuple_moved)          /* else - they are ordered */
2271                         {
2272                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2273                                           sizeof(OffsetNumber), vac_cmp_offno);
2274                         }
2275                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2276                         WriteBuffer(buf);
2277                 }
2278                 else if (dowrite)
2279                         WriteBuffer(buf);
2280                 else
2281                         ReleaseBuffer(buf);
2282
2283                 if (offnum <= maxoff)
2284                         break;                          /* had to quit early, see above note */
2285
2286         }                                                       /* walk along relation */
2287
2288         blkno++;                                        /* new number of blocks */
2289
2290         if (dst_buffer != InvalidBuffer)
2291         {
2292                 Assert(num_moved > 0);
2293                 WriteBuffer(dst_buffer);
2294         }
2295
2296         if (num_moved > 0)
2297         {
2298                 /*
2299                  * We have to commit our tuple movings before we truncate the
2300                  * relation.  Ideally we should do Commit/StartTransactionCommand
2301                  * here, relying on the session-level table lock to protect our
2302                  * exclusive access to the relation.  However, that would require a
2303                  * lot of extra code to close and re-open the relation, indexes, etc.
2304                  * For now, a quick hack: record status of current transaction as
2305                  * committed, and continue.
2306                  */
2307                 RecordTransactionCommit();
2308         }
2309
2310         /*
2311          * We are not going to move any more tuples across pages, but we still
2312          * need to apply vacuum_page to compact free space in the remaining pages
2313          * in vacuum_pages list.  Note that some of these pages may also be in the
2314          * fraged_pages list, and may have had tuples moved onto them; if so, we
2315          * already did vacuum_page and needn't do it again.
2316          */
2317         for (i = 0, curpage = vacuum_pages->pagedesc;
2318                  i < vacuumed_pages;
2319                  i++, curpage++)
2320         {
2321                 vacuum_delay_point();
2322
2323                 Assert((*curpage)->blkno < blkno);
2324                 if ((*curpage)->offsets_used == 0)
2325                 {
2326                         Buffer          buf;
2327                         Page            page;
2328
2329                         /* this page was not used as a move target, so must clean it */
2330                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2331                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2332                         page = BufferGetPage(buf);
2333                         if (!PageIsEmpty(page))
2334                                 vacuum_page(onerel, buf, *curpage);
2335                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2336                         WriteBuffer(buf);
2337                 }
2338         }
2339
2340         /*
2341          * Now scan all the pages that we moved tuples onto and update tuple
2342          * status bits.  This is not really necessary, but will save time for
2343          * future transactions examining these tuples.
2344          */
2345         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2346                                          last_move_dest_block, num_moved);
2347
2348         /*
2349          * It'd be cleaner to make this report at the bottom of this routine, but
2350          * then the rusage would double-count the second pass of index vacuuming.
2351          * So do it here and ignore the relatively small amount of processing that
2352          * occurs below.
2353          */
2354         ereport(elevel,
2355                         (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2356                                         RelationGetRelationName(onerel),
2357                                         num_moved, nblocks, blkno),
2358                          errdetail("%s.",
2359                                            pg_rusage_show(&ru0))));
2360
2361         /*
2362          * Reflect the motion of system tuples to catalog cache here.
2363          */
2364         CommandCounterIncrement();
2365
2366         if (Nvacpagelist.num_pages > 0)
2367         {
2368                 /* vacuum indexes again if needed */
2369                 if (Irel != NULL)
2370                 {
2371                         VacPage    *vpleft,
2372                                            *vpright,
2373                                                 vpsave;
2374
2375                         /* re-sort Nvacpagelist.pagedesc */
2376                         for (vpleft = Nvacpagelist.pagedesc,
2377                                  vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2378                                  vpleft < vpright; vpleft++, vpright--)
2379                         {
2380                                 vpsave = *vpleft;
2381                                 *vpleft = *vpright;
2382                                 *vpright = vpsave;
2383                         }
2384
2385                         /*
2386                          * keep_tuples is the number of tuples that have been moved off a
2387                          * page during chain moves but not been scanned over subsequently.
2388                          * The tuple ids of these tuples are not recorded as free offsets
2389                          * for any VacPage, so they will not be cleared from the indexes.
2390                          */
2391                         Assert(keep_tuples >= 0);
2392                         for (i = 0; i < nindexes; i++)
2393                                 vacuum_index(&Nvacpagelist, Irel[i],
2394                                                          vacrelstats->rel_tuples, keep_tuples);
2395                 }
2396
2397                 /*
2398                  * Clean moved-off tuples from last page in Nvacpagelist list.
2399                  *
2400                  * We need only do this in this one page, because higher-numbered
2401                  * pages are going to be truncated from the relation entirely. But see
2402                  * comments for update_hint_bits().
2403                  */
2404                 if (vacpage->blkno == (blkno - 1) &&
2405                         vacpage->offsets_free > 0)
2406                 {
2407                         Buffer          buf;
2408                         Page            page;
2409                         OffsetNumber unused[MaxOffsetNumber];
2410                         OffsetNumber offnum,
2411                                                 maxoff;
2412                         int                     uncnt;
2413                         int                     num_tuples = 0;
2414
2415                         buf = ReadBuffer(onerel, vacpage->blkno);
2416                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2417                         page = BufferGetPage(buf);
2418                         maxoff = PageGetMaxOffsetNumber(page);
2419                         for (offnum = FirstOffsetNumber;
2420                                  offnum <= maxoff;
2421                                  offnum = OffsetNumberNext(offnum))
2422                         {
2423                                 ItemId          itemid = PageGetItemId(page, offnum);
2424                                 HeapTupleHeader htup;
2425
2426                                 if (!ItemIdIsUsed(itemid))
2427                                         continue;
2428                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2429                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2430                                         continue;
2431
2432                                 /*
2433                                  * See comments in the walk-along-page loop above about why
2434                                  * only MOVED_OFF tuples should be found here.
2435                                  */
2436                                 if (htup->t_infomask & HEAP_MOVED_IN)
2437                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2438                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2439                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2440                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2441                                         elog(ERROR, "invalid XVAC in tuple header");
2442
2443                                 itemid->lp_flags &= ~LP_USED;
2444                                 num_tuples++;
2445                         }
2446                         Assert(vacpage->offsets_free == num_tuples);
2447
2448                         START_CRIT_SECTION();
2449
2450                         uncnt = PageRepairFragmentation(page, unused);
2451
2452                         /* XLOG stuff */
2453                         if (!onerel->rd_istemp)
2454                         {
2455                                 XLogRecPtr      recptr;
2456
2457                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2458                                 PageSetLSN(page, recptr);
2459                                 PageSetTLI(page, ThisTimeLineID);
2460                         }
2461                         else
2462                         {
2463                                 /*
2464                                  * No XLOG record, but still need to flag that XID exists on
2465                                  * disk
2466                                  */
2467                                 MyXactMadeTempRelUpdate = true;
2468                         }
2469
2470                         END_CRIT_SECTION();
2471
2472                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2473                         WriteBuffer(buf);
2474                 }
2475
2476                 /* now - free new list of reaped pages */
2477                 curpage = Nvacpagelist.pagedesc;
2478                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2479                         pfree(*curpage);
2480                 pfree(Nvacpagelist.pagedesc);
2481         }
2482
2483         /* Truncate relation, if needed */
2484         if (blkno < nblocks)
2485         {
2486                 RelationTruncate(onerel, blkno);
2487                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2488         }
2489
2490         /* clean up */
2491         pfree(vacpage);
2492         if (vacrelstats->vtlinks != NULL)
2493                 pfree(vacrelstats->vtlinks);
2494
2495         ExecContext_Finish(&ec);
2496 }
2497
2498 /*
2499  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2500  *
2501  *              This routine moves old_tup from old_page to dst_page.
2502  *              old_page and dst_page might be the same page.
2503  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2504  *              the single lock, if this is a intra-page-move) are released before
2505  *              exit.
2506  *
2507  *              Yes, a routine with ten parameters is ugly, but it's still better
2508  *              than having these 120 lines of code in repair_frag() which is
2509  *              already too long and almost unreadable.
2510  */
2511 static void
2512 move_chain_tuple(Relation rel,
2513                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2514                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2515                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2516 {
2517         TransactionId myXID = GetCurrentTransactionId();
2518         HeapTupleData newtup;
2519         OffsetNumber newoff;
2520         ItemId          newitemid;
2521         Size            tuple_len = old_tup->t_len;
2522
2523         /*
2524          * make a modifiable copy of the source tuple.
2525          */
2526         heap_copytuple_with_tuple(old_tup, &newtup);
2527
2528         /*
2529          * register invalidation of source tuple in catcaches.
2530          */
2531         CacheInvalidateHeapTuple(rel, old_tup);
2532
2533         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2534         START_CRIT_SECTION();
2535
2536         /*
2537          * mark the source tuple MOVED_OFF.
2538          */
2539         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2540                                                                          HEAP_XMIN_INVALID |
2541                                                                          HEAP_MOVED_IN);
2542         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2543         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2544
2545         /*
2546          * If this page was not used before - clean it.
2547          *
2548          * NOTE: a nasty bug used to lurk here.  It is possible for the source and
2549          * destination pages to be the same (since this tuple-chain member can be
2550          * on a page lower than the one we're currently processing in the outer
2551          * loop).  If that's true, then after vacuum_page() the source tuple will
2552          * have been moved, and tuple.t_data will be pointing at garbage.
2553          * Therefore we must do everything that uses old_tup->t_data BEFORE this
2554          * step!!
2555          *
2556          * This path is different from the other callers of vacuum_page, because
2557          * we have already incremented the vacpage's offsets_used field to account
2558          * for the tuple(s) we expect to move onto the page. Therefore
2559          * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
2560          * good debugging check for all other callers, we work around it here
2561          * rather than remove it.
2562          */
2563         if (!PageIsEmpty(dst_page) && cleanVpd)
2564         {
2565                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2566
2567                 dst_vacpage->offsets_used = 0;
2568                 vacuum_page(rel, dst_buf, dst_vacpage);
2569                 dst_vacpage->offsets_used = sv_offsets_used;
2570         }
2571
2572         /*
2573          * Update the state of the copied tuple, and store it on the destination
2574          * page.
2575          */
2576         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2577                                                                    HEAP_XMIN_INVALID |
2578                                                                    HEAP_MOVED_OFF);
2579         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2580         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2581         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2582                                                  InvalidOffsetNumber, LP_USED);
2583         if (newoff == InvalidOffsetNumber)
2584                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2585                          (unsigned long) tuple_len, dst_vacpage->blkno);
2586         newitemid = PageGetItemId(dst_page, newoff);
2587         /* drop temporary copy, and point to the version on the dest page */
2588         pfree(newtup.t_data);
2589         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2590
2591         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2592
2593         /*
2594          * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
2595          * to next tuple in chain otherwise.  (Since we move the chain in reverse
2596          * order, this is actually the previously processed tuple.)
2597          */
2598         if (!ItemPointerIsValid(ctid))
2599                 newtup.t_data->t_ctid = newtup.t_self;
2600         else
2601                 newtup.t_data->t_ctid = *ctid;
2602         *ctid = newtup.t_self;
2603
2604         /* XLOG stuff */
2605         if (!rel->rd_istemp)
2606         {
2607                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2608                                                                                    dst_buf, &newtup);
2609
2610                 if (old_buf != dst_buf)
2611                 {
2612                         PageSetLSN(old_page, recptr);
2613                         PageSetTLI(old_page, ThisTimeLineID);
2614                 }
2615                 PageSetLSN(dst_page, recptr);
2616                 PageSetTLI(dst_page, ThisTimeLineID);
2617         }
2618         else
2619         {
2620                 /*
2621                  * No XLOG record, but still need to flag that XID exists on disk
2622                  */
2623                 MyXactMadeTempRelUpdate = true;
2624         }
2625
2626         END_CRIT_SECTION();
2627
2628         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2629         if (dst_buf != old_buf)
2630                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2631
2632         /* Create index entries for the moved tuple */
2633         if (ec->resultRelInfo->ri_NumIndices > 0)
2634         {
2635                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2636                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2637                 ResetPerTupleExprContext(ec->estate);
2638         }
2639 }
2640
2641 /*
2642  *      move_plain_tuple() -- move one tuple that is not part of a chain
2643  *
2644  *              This routine moves old_tup from old_page to dst_page.
2645  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2646  *              released before exit.
2647  *
2648  *              Yes, a routine with eight parameters is ugly, but it's still better
2649  *              than having these 90 lines of code in repair_frag() which is already
2650  *              too long and almost unreadable.
2651  */
2652 static void
2653 move_plain_tuple(Relation rel,
2654                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2655                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2656                                  ExecContext ec)
2657 {
2658         TransactionId myXID = GetCurrentTransactionId();
2659         HeapTupleData newtup;
2660         OffsetNumber newoff;
2661         ItemId          newitemid;
2662         Size            tuple_len = old_tup->t_len;
2663
2664         /* copy tuple */
2665         heap_copytuple_with_tuple(old_tup, &newtup);
2666
2667         /*
2668          * register invalidation of source tuple in catcaches.
2669          *
2670          * (Note: we do not need to register the copied tuple, because we are not
2671          * changing the tuple contents and so there cannot be any need to flush
2672          * negative catcache entries.)
2673          */
2674         CacheInvalidateHeapTuple(rel, old_tup);
2675
2676         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2677         START_CRIT_SECTION();
2678
2679         /*
2680          * Mark new tuple as MOVED_IN by me.
2681          */
2682         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2683                                                                    HEAP_XMIN_INVALID |
2684                                                                    HEAP_MOVED_OFF);
2685         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2686         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2687
2688         /* add tuple to the page */
2689         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2690                                                  InvalidOffsetNumber, LP_USED);
2691         if (newoff == InvalidOffsetNumber)
2692                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2693                          (unsigned long) tuple_len,
2694                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
2695                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
2696         newitemid = PageGetItemId(dst_page, newoff);
2697         pfree(newtup.t_data);
2698         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2699         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
2700         newtup.t_self = newtup.t_data->t_ctid;
2701
2702         /*
2703          * Mark old tuple as MOVED_OFF by me.
2704          */
2705         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2706                                                                          HEAP_XMIN_INVALID |
2707                                                                          HEAP_MOVED_IN);
2708         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2709         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2710
2711         /* XLOG stuff */
2712         if (!rel->rd_istemp)
2713         {
2714                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2715                                                                                    dst_buf, &newtup);
2716
2717                 PageSetLSN(old_page, recptr);
2718                 PageSetTLI(old_page, ThisTimeLineID);
2719                 PageSetLSN(dst_page, recptr);
2720                 PageSetTLI(dst_page, ThisTimeLineID);
2721         }
2722         else
2723         {
2724                 /*
2725                  * No XLOG record, but still need to flag that XID exists on disk
2726                  */
2727                 MyXactMadeTempRelUpdate = true;
2728         }
2729
2730         END_CRIT_SECTION();
2731
2732         dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
2733                 ((PageHeader) dst_page)->pd_lower;
2734         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2735         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2736
2737         dst_vacpage->offsets_used++;
2738
2739         /* insert index' tuples if needed */
2740         if (ec->resultRelInfo->ri_NumIndices > 0)
2741         {
2742                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2743                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2744                 ResetPerTupleExprContext(ec->estate);
2745         }
2746 }
2747
2748 /*
2749  *      update_hint_bits() -- update hint bits in destination pages
2750  *
2751  * Scan all the pages that we moved tuples onto and update tuple status bits.
2752  * This is normally not really necessary, but it will save time for future
2753  * transactions examining these tuples.
2754  *
2755  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
2756  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
2757  *
2758  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2759  * pages that were move source pages but not move dest pages.  The bulk
2760  * of the move source pages will be physically truncated from the relation,
2761  * and the last page remaining in the rel will be fixed separately in
2762  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
2763  * hint bits updated are tuples that are moved as part of a chain and were
2764  * on pages that were not either move destinations nor at the end of the rel.
2765  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
2766  * to remember and revisit those pages too.
2767  *
2768  * Because of this omission, VACUUM FULL FREEZE is not a safe combination;
2769  * it's possible that the VACUUM's own XID remains exposed as something that
2770  * tqual tests would need to check.
2771  *
2772  * For the non-freeze case, one wonders whether it wouldn't be better to skip
2773  * this work entirely, and let the tuple status updates happen someplace
2774  * that's not holding an exclusive lock on the relation.
2775  */
2776 static void
2777 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
2778                                  BlockNumber last_move_dest_block, int num_moved)
2779 {
2780         TransactionId myXID = GetCurrentTransactionId();
2781         int                     checked_moved = 0;
2782         int                     i;
2783         VacPage    *curpage;
2784
2785         for (i = 0, curpage = fraged_pages->pagedesc;
2786                  i < num_fraged_pages;
2787                  i++, curpage++)
2788         {
2789                 Buffer          buf;
2790                 Page            page;
2791                 OffsetNumber max_offset;
2792                 OffsetNumber off;
2793                 int                     num_tuples = 0;
2794
2795                 vacuum_delay_point();
2796
2797                 if ((*curpage)->blkno > last_move_dest_block)
2798                         break;                          /* no need to scan any further */
2799                 if ((*curpage)->offsets_used == 0)
2800                         continue;                       /* this page was never used as a move dest */
2801                 buf = ReadBuffer(rel, (*curpage)->blkno);
2802                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2803                 page = BufferGetPage(buf);
2804                 max_offset = PageGetMaxOffsetNumber(page);
2805                 for (off = FirstOffsetNumber;
2806                          off <= max_offset;
2807                          off = OffsetNumberNext(off))
2808                 {
2809                         ItemId          itemid = PageGetItemId(page, off);
2810                         HeapTupleHeader htup;
2811
2812                         if (!ItemIdIsUsed(itemid))
2813                                 continue;
2814                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
2815                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2816                                 continue;
2817
2818                         /*
2819                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
2820                          */
2821                         if (!(htup->t_infomask & HEAP_MOVED))
2822                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2823                         if (HeapTupleHeaderGetXvac(htup) != myXID)
2824                                 elog(ERROR, "invalid XVAC in tuple header");
2825
2826                         if (htup->t_infomask & HEAP_MOVED_IN)
2827                         {
2828                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
2829                                 htup->t_infomask &= ~HEAP_MOVED;
2830                                 num_tuples++;
2831                         }
2832                         else
2833                                 htup->t_infomask |= HEAP_XMIN_INVALID;
2834                 }
2835                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2836                 WriteBuffer(buf);
2837                 Assert((*curpage)->offsets_used == num_tuples);
2838                 checked_moved += num_tuples;
2839         }
2840         Assert(num_moved == checked_moved);
2841 }
2842
2843 /*
2844  *      vacuum_heap() -- free dead tuples
2845  *
2846  *              This routine marks dead tuples as unused and truncates relation
2847  *              if there are "empty" end-blocks.
2848  */
2849 static void
2850 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2851 {
2852         Buffer          buf;
2853         VacPage    *vacpage;
2854         BlockNumber relblocks;
2855         int                     nblocks;
2856         int                     i;
2857
2858         nblocks = vacuum_pages->num_pages;
2859         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2860
2861         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2862         {
2863                 vacuum_delay_point();
2864
2865                 if ((*vacpage)->offsets_free > 0)
2866                 {
2867                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2868                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2869                         vacuum_page(onerel, buf, *vacpage);
2870                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2871                         WriteBuffer(buf);
2872                 }
2873         }
2874
2875         /* Truncate relation if there are some empty end-pages */
2876         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2877         if (vacuum_pages->empty_end_pages > 0)
2878         {
2879                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2880                 ereport(elevel,
2881                                 (errmsg("\"%s\": truncated %u to %u pages",
2882                                                 RelationGetRelationName(onerel),
2883                                                 vacrelstats->rel_pages, relblocks)));
2884                 RelationTruncate(onerel, relblocks);
2885                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
2886         }
2887 }
2888
2889 /*
2890  *      vacuum_page() -- free dead tuples on a page
2891  *                                       and repair its fragmentation.
2892  */
2893 static void
2894 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2895 {
2896         OffsetNumber unused[MaxOffsetNumber];
2897         int                     uncnt;
2898         Page            page = BufferGetPage(buffer);
2899         ItemId          itemid;
2900         int                     i;
2901
2902         /* There shouldn't be any tuples moved onto the page yet! */
2903         Assert(vacpage->offsets_used == 0);
2904
2905         START_CRIT_SECTION();
2906
2907         for (i = 0; i < vacpage->offsets_free; i++)
2908         {
2909                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2910                 itemid->lp_flags &= ~LP_USED;
2911         }
2912
2913         uncnt = PageRepairFragmentation(page, unused);
2914
2915         /* XLOG stuff */
2916         if (!onerel->rd_istemp)
2917         {
2918                 XLogRecPtr      recptr;
2919
2920                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2921                 PageSetLSN(page, recptr);
2922                 PageSetTLI(page, ThisTimeLineID);
2923         }
2924         else
2925         {
2926                 /* No XLOG record, but still need to flag that XID exists on disk */
2927                 MyXactMadeTempRelUpdate = true;
2928         }
2929
2930         END_CRIT_SECTION();
2931 }
2932
2933 /*
2934  *      scan_index() -- scan one index relation to update statistic.
2935  *
2936  * We use this when we have no deletions to do.
2937  */
2938 static void
2939 scan_index(Relation indrel, double num_tuples)
2940 {
2941         IndexBulkDeleteResult *stats;
2942         IndexVacuumCleanupInfo vcinfo;
2943         PGRUsage        ru0;
2944
2945         pg_rusage_init(&ru0);
2946
2947         /*
2948          * Even though we're not planning to delete anything, we use the
2949          * ambulkdelete call, because (a) the scan happens within the index AM for
2950          * more speed, and (b) it may want to pass private statistics to the
2951          * amvacuumcleanup call.
2952          */
2953         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2954
2955         /* Do post-VACUUM cleanup, even though we deleted nothing */
2956         vcinfo.vacuum_full = true;
2957         vcinfo.message_level = elevel;
2958         vcinfo.num_heap_tuples = num_tuples;
2959
2960         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2961
2962         if (!stats)
2963                 return;
2964
2965         /* now update statistics in pg_class */
2966         vac_update_relstats(RelationGetRelid(indrel),
2967                                                 stats->num_pages, stats->num_index_tuples,
2968                                                 false);
2969
2970         ereport(elevel,
2971                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
2972                                         RelationGetRelationName(indrel),
2973                                         stats->num_index_tuples,
2974                                         stats->num_pages),
2975         errdetail("%u index pages have been deleted, %u are currently reusable.\n"
2976                           "%s.",
2977                           stats->pages_deleted, stats->pages_free,
2978                           pg_rusage_show(&ru0))));
2979
2980         /*
2981          * Check for tuple count mismatch.      If the index is partial, then it's OK
2982          * for it to have fewer tuples than the heap; else we got trouble.
2983          */
2984         if (stats->num_index_tuples != num_tuples)
2985         {
2986                 if (stats->num_index_tuples > num_tuples ||
2987                         !vac_is_partial_index(indrel))
2988                         ereport(WARNING,
2989                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
2990                                                         RelationGetRelationName(indrel),
2991                                                         stats->num_index_tuples, num_tuples),
2992                                          errhint("Rebuild the index with REINDEX.")));
2993         }
2994
2995         pfree(stats);
2996 }
2997
2998 /*
2999  *      vacuum_index() -- vacuum one index relation.
3000  *
3001  *              Vpl is the VacPageList of the heap we're currently vacuuming.
3002  *              It's locked. Indrel is an index relation on the vacuumed heap.
3003  *
3004  *              We don't bother to set locks on the index relation here, since
3005  *              the parent table is exclusive-locked already.
3006  *
3007  *              Finally, we arrange to update the index relation's statistics in
3008  *              pg_class.
3009  */
3010 static void
3011 vacuum_index(VacPageList vacpagelist, Relation indrel,
3012                          double num_tuples, int keep_tuples)
3013 {
3014         IndexBulkDeleteResult *stats;
3015         IndexVacuumCleanupInfo vcinfo;
3016         PGRUsage        ru0;
3017
3018         pg_rusage_init(&ru0);
3019
3020         /* Do bulk deletion */
3021         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
3022
3023         /* Do post-VACUUM cleanup */
3024         vcinfo.vacuum_full = true;
3025         vcinfo.message_level = elevel;
3026         vcinfo.num_heap_tuples = num_tuples + keep_tuples;
3027
3028         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
3029
3030         if (!stats)
3031                 return;
3032
3033         /* now update statistics in pg_class */
3034         vac_update_relstats(RelationGetRelid(indrel),
3035                                                 stats->num_pages, stats->num_index_tuples,
3036                                                 false);
3037
3038         ereport(elevel,
3039                         (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
3040                                         RelationGetRelationName(indrel),
3041                                         stats->num_index_tuples,
3042                                         stats->num_pages),
3043                          errdetail("%.0f index row versions were removed.\n"
3044                          "%u index pages have been deleted, %u are currently reusable.\n"
3045                                            "%s.",
3046                                            stats->tuples_removed,
3047                                            stats->pages_deleted, stats->pages_free,
3048                                            pg_rusage_show(&ru0))));
3049
3050         /*
3051          * Check for tuple count mismatch.      If the index is partial, then it's OK
3052          * for it to have fewer tuples than the heap; else we got trouble.
3053          */
3054         if (stats->num_index_tuples != num_tuples + keep_tuples)
3055         {
3056                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3057                         !vac_is_partial_index(indrel))
3058                         ereport(WARNING,
3059                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3060                                                         RelationGetRelationName(indrel),
3061                                                   stats->num_index_tuples, num_tuples + keep_tuples),
3062                                          errhint("Rebuild the index with REINDEX.")));
3063         }
3064
3065         pfree(stats);
3066 }
3067
3068 /*
3069  *      tid_reaped() -- is a particular tid reaped?
3070  *
3071  *              This has the right signature to be an IndexBulkDeleteCallback.
3072  *
3073  *              vacpagelist->VacPage_array is sorted in right order.
3074  */
3075 static bool
3076 tid_reaped(ItemPointer itemptr, void *state)
3077 {
3078         VacPageList vacpagelist = (VacPageList) state;
3079         OffsetNumber ioffno;
3080         OffsetNumber *voff;
3081         VacPage         vp,
3082                            *vpp;
3083         VacPageData vacpage;
3084
3085         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3086         ioffno = ItemPointerGetOffsetNumber(itemptr);
3087
3088         vp = &vacpage;
3089         vpp = (VacPage *) vac_bsearch((void *) &vp,
3090                                                                   (void *) (vacpagelist->pagedesc),
3091                                                                   vacpagelist->num_pages,
3092                                                                   sizeof(VacPage),
3093                                                                   vac_cmp_blk);
3094
3095         if (vpp == NULL)
3096                 return false;
3097
3098         /* ok - we are on a partially or fully reaped page */
3099         vp = *vpp;
3100
3101         if (vp->offsets_free == 0)
3102         {
3103                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3104                 return true;
3105         }
3106
3107         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3108                                                                                 (void *) (vp->offsets),
3109                                                                                 vp->offsets_free,
3110                                                                                 sizeof(OffsetNumber),
3111                                                                                 vac_cmp_offno);
3112
3113         if (voff == NULL)
3114                 return false;
3115
3116         /* tid is reaped */
3117         return true;
3118 }
3119
3120 /*
3121  * Dummy version for scan_index.
3122  */
3123 static bool
3124 dummy_tid_reaped(ItemPointer itemptr, void *state)
3125 {
3126         return false;
3127 }
3128
3129 /*
3130  * Update the shared Free Space Map with the info we now have about
3131  * free space in the relation, discarding any old info the map may have.
3132  */
3133 static void
3134 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3135                            BlockNumber rel_pages)
3136 {
3137         int                     nPages = fraged_pages->num_pages;
3138         VacPage    *pagedesc = fraged_pages->pagedesc;
3139         Size            threshold;
3140         PageFreeSpaceInfo *pageSpaces;
3141         int                     outPages;
3142         int                     i;
3143
3144         /*
3145          * We only report pages with free space at least equal to the average
3146          * request size --- this avoids cluttering FSM with uselessly-small bits
3147          * of space.  Although FSM would discard pages with little free space
3148          * anyway, it's important to do this prefiltering because (a) it reduces
3149          * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
3150          * FSM uses the number of pages reported as a statistic for guiding space
3151          * management.  If we didn't threshold our reports the same way
3152          * vacuumlazy.c does, we'd be skewing that statistic.
3153          */
3154         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3155
3156         pageSpaces = (PageFreeSpaceInfo *)
3157                 palloc(nPages * sizeof(PageFreeSpaceInfo));
3158         outPages = 0;
3159
3160         for (i = 0; i < nPages; i++)
3161         {
3162                 /*
3163                  * fraged_pages may contain entries for pages that we later decided to
3164                  * truncate from the relation; don't enter them into the free space
3165                  * map!
3166                  */
3167                 if (pagedesc[i]->blkno >= rel_pages)
3168                         break;
3169
3170                 if (pagedesc[i]->free >= threshold)
3171                 {
3172                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
3173                         pageSpaces[outPages].avail = pagedesc[i]->free;
3174                         outPages++;
3175                 }
3176         }
3177
3178         RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
3179
3180         pfree(pageSpaces);
3181 }
3182
3183 /* Copy a VacPage structure */
3184 static VacPage
3185 copy_vac_page(VacPage vacpage)
3186 {
3187         VacPage         newvacpage;
3188
3189         /* allocate a VacPageData entry */
3190         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3191                                                            vacpage->offsets_free * sizeof(OffsetNumber));
3192
3193         /* fill it in */
3194         if (vacpage->offsets_free > 0)
3195                 memcpy(newvacpage->offsets, vacpage->offsets,
3196                            vacpage->offsets_free * sizeof(OffsetNumber));
3197         newvacpage->blkno = vacpage->blkno;
3198         newvacpage->free = vacpage->free;
3199         newvacpage->offsets_used = vacpage->offsets_used;
3200         newvacpage->offsets_free = vacpage->offsets_free;
3201
3202         return newvacpage;
3203 }
3204
3205 /*
3206  * Add a VacPage pointer to a VacPageList.
3207  *
3208  *              As a side effect of the way that scan_heap works,
3209  *              higher pages come after lower pages in the array
3210  *              (and highest tid on a page is last).
3211  */
3212 static void
3213 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3214 {
3215 #define PG_NPAGEDESC 1024
3216
3217         /* allocate a VacPage entry if needed */
3218         if (vacpagelist->num_pages == 0)
3219         {
3220                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3221                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3222         }
3223         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3224         {
3225                 vacpagelist->num_allocated_pages *= 2;
3226                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3227         }
3228         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3229         (vacpagelist->num_pages)++;
3230 }
3231
3232 /*
3233  * vac_bsearch: just like standard C library routine bsearch(),
3234  * except that we first test to see whether the target key is outside
3235  * the range of the table entries.      This case is handled relatively slowly
3236  * by the normal binary search algorithm (ie, no faster than any other key)
3237  * but it occurs often enough in VACUUM to be worth optimizing.
3238  */
3239 static void *
3240 vac_bsearch(const void *key, const void *base,
3241                         size_t nelem, size_t size,
3242                         int (*compar) (const void *, const void *))
3243 {
3244         int                     res;
3245         const void *last;
3246
3247         if (nelem == 0)
3248                 return NULL;
3249         res = compar(key, base);
3250         if (res < 0)
3251                 return NULL;
3252         if (res == 0)
3253                 return (void *) base;
3254         if (nelem > 1)
3255         {
3256                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3257                 res = compar(key, last);
3258                 if (res > 0)
3259                         return NULL;
3260                 if (res == 0)
3261                         return (void *) last;
3262         }
3263         if (nelem <= 2)
3264                 return NULL;                    /* already checked 'em all */
3265         return bsearch(key, base, nelem, size, compar);
3266 }
3267
3268 /*
3269  * Comparator routines for use with qsort() and bsearch().
3270  */
3271 static int
3272 vac_cmp_blk(const void *left, const void *right)
3273 {
3274         BlockNumber lblk,
3275                                 rblk;
3276
3277         lblk = (*((VacPage *) left))->blkno;
3278         rblk = (*((VacPage *) right))->blkno;
3279
3280         if (lblk < rblk)
3281                 return -1;
3282         if (lblk == rblk)
3283                 return 0;
3284         return 1;
3285 }
3286
3287 static int
3288 vac_cmp_offno(const void *left, const void *right)
3289 {
3290         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3291                 return -1;
3292         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3293                 return 0;
3294         return 1;
3295 }
3296
3297 static int
3298 vac_cmp_vtlinks(const void *left, const void *right)
3299 {
3300         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3301                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3302                 return -1;
3303         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3304                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3305                 return 1;
3306         /* bi_hi-es are equal */
3307         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3308                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3309                 return -1;
3310         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3311                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3312                 return 1;
3313         /* bi_lo-es are equal */
3314         if (((VTupleLink) left)->new_tid.ip_posid <
3315                 ((VTupleLink) right)->new_tid.ip_posid)
3316                 return -1;
3317         if (((VTupleLink) left)->new_tid.ip_posid >
3318                 ((VTupleLink) right)->new_tid.ip_posid)
3319                 return 1;
3320         return 0;
3321 }
3322
3323
3324 /*
3325  * Open all the indexes of the given relation, obtaining the specified kind
3326  * of lock on each.  Return an array of Relation pointers for the indexes
3327  * into *Irel, and the number of indexes into *nindexes.
3328  */
3329 void
3330 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3331                                  int *nindexes, Relation **Irel)
3332 {
3333         List       *indexoidlist;
3334         ListCell   *indexoidscan;
3335         int                     i;
3336
3337         indexoidlist = RelationGetIndexList(relation);
3338
3339         *nindexes = list_length(indexoidlist);
3340
3341         if (*nindexes > 0)
3342                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3343         else
3344                 *Irel = NULL;
3345
3346         i = 0;
3347         foreach(indexoidscan, indexoidlist)
3348         {
3349                 Oid                     indexoid = lfirst_oid(indexoidscan);
3350                 Relation        ind;
3351
3352                 ind = index_open(indexoid);
3353                 (*Irel)[i++] = ind;
3354                 LockRelation(ind, lockmode);
3355         }
3356
3357         list_free(indexoidlist);
3358 }
3359
3360 /*
3361  * Release the resources acquired by vac_open_indexes.  Optionally release
3362  * the locks (say NoLock to keep 'em).
3363  */
3364 void
3365 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3366 {
3367         if (Irel == NULL)
3368                 return;
3369
3370         while (nindexes--)
3371         {
3372                 Relation        ind = Irel[nindexes];
3373
3374                 if (lockmode != NoLock)
3375                         UnlockRelation(ind, lockmode);
3376                 index_close(ind);
3377         }
3378         pfree(Irel);
3379 }
3380
3381
3382 /*
3383  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3384  */
3385 bool
3386 vac_is_partial_index(Relation indrel)
3387 {
3388         /*
3389          * If the index's AM doesn't support nulls, it's partial for our purposes
3390          */
3391         if (!indrel->rd_am->amindexnulls)
3392                 return true;
3393
3394         /* Otherwise, look to see if there's a partial-index predicate */
3395         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3396                 return true;
3397
3398         return false;
3399 }
3400
3401
3402 static bool
3403 enough_space(VacPage vacpage, Size len)
3404 {
3405         len = MAXALIGN(len);
3406
3407         if (len > vacpage->free)
3408                 return false;
3409
3410         /* if there are free itemid(s) and len <= free_space... */
3411         if (vacpage->offsets_used < vacpage->offsets_free)
3412                 return true;
3413
3414         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3415         if (len + sizeof(ItemIdData) <= vacpage->free)
3416                 return true;
3417
3418         return false;
3419 }
3420
3421
3422 /*
3423  * vacuum_delay_point --- check for interrupts and cost-based delay.
3424  *
3425  * This should be called in each major loop of VACUUM processing,
3426  * typically once per page processed.
3427  */
3428 void
3429 vacuum_delay_point(void)
3430 {
3431         /* Always check for interrupts */
3432         CHECK_FOR_INTERRUPTS();
3433
3434         /* Nap if appropriate */
3435         if (VacuumCostActive && !InterruptPending &&
3436                 VacuumCostBalance >= VacuumCostLimit)
3437         {
3438                 int                     msec;
3439
3440                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3441                 if (msec > VacuumCostDelay * 4)
3442                         msec = VacuumCostDelay * 4;
3443
3444                 pg_usleep(msec * 1000L);
3445
3446                 VacuumCostBalance = 0;
3447
3448                 /* Might have gotten an interrupt while sleeping */
3449                 CHECK_FOR_INTERRUPTS();
3450         }
3451 }