]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
Minor correction: cause ALTER ROLE role ROLE rolenames to behave
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.311 2005/07/14 05:13:39 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <sys/time.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/subtrans.h"
29 #include "access/xlog.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_index.h"
34 #include "commands/dbcommands.h"
35 #include "commands/vacuum.h"
36 #include "executor/executor.h"
37 #include "miscadmin.h"
38 #include "storage/freespace.h"
39 #include "storage/procarray.h"
40 #include "storage/smgr.h"
41 #include "tcop/pquery.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/relcache.h"
49 #include "utils/syscache.h"
50 #include "pgstat.h"
51
52
53 typedef struct VacPageData
54 {
55         BlockNumber blkno;                      /* BlockNumber of this Page */
56         Size            free;                   /* FreeSpace on this Page */
57         uint16          offsets_used;   /* Number of OffNums used by vacuum */
58         uint16          offsets_free;   /* Number of OffNums free or to be free */
59         OffsetNumber offsets[1];        /* Array of free OffNums */
60 } VacPageData;
61
62 typedef VacPageData *VacPage;
63
64 typedef struct VacPageListData
65 {
66         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
67         int                     num_pages;              /* Number of pages in pagedesc */
68         int                     num_allocated_pages;    /* Number of allocated pages in
69                                                                                  * pagedesc */
70         VacPage    *pagedesc;           /* Descriptions of pages */
71 } VacPageListData;
72
73 typedef VacPageListData *VacPageList;
74
75 typedef struct VTupleLinkData
76 {
77         ItemPointerData new_tid;
78         ItemPointerData this_tid;
79 } VTupleLinkData;
80
81 typedef VTupleLinkData *VTupleLink;
82
83 typedef struct VTupleMoveData
84 {
85         ItemPointerData tid;            /* tuple ID */
86         VacPage         vacpage;                /* where to move */
87         bool            cleanVpd;               /* clean vacpage before using */
88 } VTupleMoveData;
89
90 typedef VTupleMoveData *VTupleMove;
91
92 typedef struct VRelStats
93 {
94         BlockNumber rel_pages;
95         double          rel_tuples;
96         Size            min_tlen;
97         Size            max_tlen;
98         bool            hasindex;
99         int                     num_vtlinks;
100         VTupleLink      vtlinks;
101 } VRelStats;
102
103 /*----------------------------------------------------------------------
104  * ExecContext:
105  *
106  * As these variables always appear together, we put them into one struct
107  * and pull initialization and cleanup into separate routines.
108  * ExecContext is used by repair_frag() and move_xxx_tuple().  More
109  * accurately:  It is *used* only in move_xxx_tuple(), but because this
110  * routine is called many times, we initialize the struct just once in
111  * repair_frag() and pass it on to move_xxx_tuple().
112  */
113 typedef struct ExecContextData
114 {
115         ResultRelInfo *resultRelInfo;
116         EState     *estate;
117         TupleTableSlot *slot;
118 } ExecContextData;
119 typedef ExecContextData *ExecContext;
120
121 static void
122 ExecContext_Init(ExecContext ec, Relation rel)
123 {
124         TupleDesc       tupdesc = RelationGetDescr(rel);
125
126         /*
127          * We need a ResultRelInfo and an EState so we can use the regular
128          * executor's index-entry-making machinery.
129          */
130         ec->estate = CreateExecutorState();
131
132         ec->resultRelInfo = makeNode(ResultRelInfo);
133         ec->resultRelInfo->ri_RangeTableIndex = 1;      /* dummy */
134         ec->resultRelInfo->ri_RelationDesc = rel;
135         ec->resultRelInfo->ri_TrigDesc = NULL;          /* we don't fire triggers */
136
137         ExecOpenIndices(ec->resultRelInfo);
138
139         ec->estate->es_result_relations = ec->resultRelInfo;
140         ec->estate->es_num_result_relations = 1;
141         ec->estate->es_result_relation_info = ec->resultRelInfo;
142
143         /* Set up a tuple slot too */
144         ec->slot = MakeSingleTupleTableSlot(tupdesc);
145 }
146
147 static void
148 ExecContext_Finish(ExecContext ec)
149 {
150         ExecDropSingleTupleTableSlot(ec->slot);
151         ExecCloseIndices(ec->resultRelInfo);
152         FreeExecutorState(ec->estate);
153 }
154
155 /*
156  * End of ExecContext Implementation
157  *----------------------------------------------------------------------
158  */
159
160 static MemoryContext vac_context = NULL;
161
162 static int      elevel = -1;
163
164 static TransactionId OldestXmin;
165 static TransactionId FreezeLimit;
166
167
168 /* non-export function prototypes */
169 static List *get_rel_oids(List *relids, const RangeVar *vacrel,
170                                                   const char *stmttype);
171 static void vac_update_dbstats(Oid dbid,
172                                    TransactionId vacuumXID,
173                                    TransactionId frozenXID);
174 static void vac_truncate_clog(TransactionId vacuumXID,
175                                   TransactionId frozenXID);
176 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
177 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
178 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
179                   VacPageList vacuum_pages, VacPageList fraged_pages);
180 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
181                         VacPageList vacuum_pages, VacPageList fraged_pages,
182                         int nindexes, Relation *Irel);
183 static void move_chain_tuple(Relation rel,
184                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
185                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
186                                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
187 static void move_plain_tuple(Relation rel,
188                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
189                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
190                                  ExecContext ec);
191 static void update_hint_bits(Relation rel, VacPageList fraged_pages,
192                                  int num_fraged_pages, BlockNumber last_move_dest_block,
193                                  int num_moved);
194 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
195                         VacPageList vacpagelist);
196 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
197 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
198                          double num_tuples, int keep_tuples);
199 static void scan_index(Relation indrel, double num_tuples);
200 static bool tid_reaped(ItemPointer itemptr, void *state);
201 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
202 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
203                            BlockNumber rel_pages);
204 static VacPage copy_vac_page(VacPage vacpage);
205 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
206 static void *vac_bsearch(const void *key, const void *base,
207                         size_t nelem, size_t size,
208                         int (*compar) (const void *, const void *));
209 static int      vac_cmp_blk(const void *left, const void *right);
210 static int      vac_cmp_offno(const void *left, const void *right);
211 static int      vac_cmp_vtlinks(const void *left, const void *right);
212 static bool enough_space(VacPage vacpage, Size len);
213
214
215 /****************************************************************************
216  *                                                                                                                                                      *
217  *                      Code common to all flavors of VACUUM and ANALYZE                                *
218  *                                                                                                                                                      *
219  ****************************************************************************
220  */
221
222
223 /*
224  * Primary entry point for VACUUM and ANALYZE commands.
225  *
226  * relids is normally NIL; if it is not, then it provides the list of
227  * relation OIDs to be processed, and vacstmt->relation is ignored.
228  * (The non-NIL case is currently only used by autovacuum.)
229  *
230  * It is the caller's responsibility that both vacstmt and relids
231  * (if given) be allocated in a memory context that won't disappear
232  * at transaction commit.  In fact this context must be QueryContext
233  * to avoid complaints from PreventTransactionChain.
234  */
235 void
236 vacuum(VacuumStmt *vacstmt, List *relids)
237 {
238         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
239         TransactionId initialOldestXmin = InvalidTransactionId;
240         TransactionId initialFreezeLimit = InvalidTransactionId;
241         volatile MemoryContext anl_context = NULL;
242         volatile bool all_rels,
243                                 in_outer_xact,
244                                 use_own_xacts;
245         List       *relations;
246
247         if (vacstmt->verbose)
248                 elevel = INFO;
249         else
250                 elevel = DEBUG2;
251
252         /*
253          * We cannot run VACUUM inside a user transaction block; if we were
254          * inside a transaction, then our commit- and
255          * start-transaction-command calls would not have the intended effect!
256          * Furthermore, the forced commit that occurs before truncating the
257          * relation's file would have the effect of committing the rest of the
258          * user's transaction too, which would certainly not be the desired
259          * behavior.  (This only applies to VACUUM FULL, though.  We could in
260          * theory run lazy VACUUM inside a transaction block, but we choose to
261          * disallow that case because we'd rather commit as soon as possible
262          * after finishing the vacuum.  This is mainly so that we can let go
263          * the AccessExclusiveLock that we may be holding.)
264          *
265          * ANALYZE (without VACUUM) can run either way.
266          */
267         if (vacstmt->vacuum)
268         {
269                 PreventTransactionChain((void *) vacstmt, stmttype);
270                 in_outer_xact = false;
271         }
272         else
273                 in_outer_xact = IsInTransactionChain((void *) vacstmt);
274
275         /*
276          * Disallow the combination VACUUM FULL FREEZE; although it would mostly
277          * work, VACUUM FULL's ability to move tuples around means that it is
278          * injecting its own XID into tuple visibility checks.  We'd have to
279          * guarantee that every moved tuple is properly marked XMIN_COMMITTED or
280          * XMIN_INVALID before the end of the operation.  There are corner cases
281          * where this does not happen, and getting rid of them all seems hard
282          * (not to mention fragile to maintain).  On the whole it's not worth it
283          * compared to telling people to use two operations.  See pgsql-hackers
284          * discussion of 27-Nov-2004, and comments below for update_hint_bits().
285          *
286          * Note: this is enforced here, and not in the grammar, since (a) we can
287          * give a better error message, and (b) we might want to allow it again
288          * someday.
289          */
290         if (vacstmt->vacuum && vacstmt->full && vacstmt->freeze)
291                 ereport(ERROR,
292                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
293                                  errmsg("VACUUM FULL FREEZE is not supported"),
294                                  errhint("Use VACUUM FULL, then VACUUM FREEZE.")));
295
296         /*
297          * Send info about dead objects to the statistics collector
298          */
299         if (vacstmt->vacuum)
300                 pgstat_vacuum_tabstat();
301
302         /*
303          * Create special memory context for cross-transaction storage.
304          *
305          * Since it is a child of PortalContext, it will go away eventually even
306          * if we suffer an error; there's no need for special abort cleanup
307          * logic.
308          */
309         vac_context = AllocSetContextCreate(PortalContext,
310                                                                                 "Vacuum",
311                                                                                 ALLOCSET_DEFAULT_MINSIZE,
312                                                                                 ALLOCSET_DEFAULT_INITSIZE,
313                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
314
315         /* Remember whether we are processing everything in the DB */
316         all_rels = (relids == NIL && vacstmt->relation == NULL);
317
318         /*
319          * Build list of relations to process, unless caller gave us one.
320          * (If we build one, we put it in vac_context for safekeeping.)
321          */
322         relations = get_rel_oids(relids, vacstmt->relation, stmttype);
323
324         if (vacstmt->vacuum && all_rels)
325         {
326                 /*
327                  * It's a database-wide VACUUM.
328                  *
329                  * Compute the initially applicable OldestXmin and FreezeLimit XIDs,
330                  * so that we can record these values at the end of the VACUUM.
331                  * Note that individual tables may well be processed with newer
332                  * values, but we can guarantee that no (non-shared) relations are
333                  * processed with older ones.
334                  *
335                  * It is okay to record non-shared values in pg_database, even though
336                  * we may vacuum shared relations with older cutoffs, because only
337                  * the minimum of the values present in pg_database matters.  We
338                  * can be sure that shared relations have at some time been
339                  * vacuumed with cutoffs no worse than the global minimum; for, if
340                  * there is a backend in some other DB with xmin = OLDXMIN that's
341                  * determining the cutoff with which we vacuum shared relations,
342                  * it is not possible for that database to have a cutoff newer
343                  * than OLDXMIN recorded in pg_database.
344                  */
345                 vacuum_set_xid_limits(vacstmt, false,
346                                                           &initialOldestXmin,
347                                                           &initialFreezeLimit);
348         }
349
350         /*
351          * Decide whether we need to start/commit our own transactions.
352          *
353          * For VACUUM (with or without ANALYZE): always do so, so that we can
354          * release locks as soon as possible.  (We could possibly use the
355          * outer transaction for a one-table VACUUM, but handling TOAST tables
356          * would be problematic.)
357          *
358          * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
359          * start/commit our own transactions.  Also, there's no need to do so
360          * if only processing one relation.  For multiple relations when not
361          * within a transaction block, use own transactions so we can release
362          * locks sooner.
363          */
364         if (vacstmt->vacuum)
365                 use_own_xacts = true;
366         else
367         {
368                 Assert(vacstmt->analyze);
369                 if (in_outer_xact)
370                         use_own_xacts = false;
371                 else if (list_length(relations) > 1)
372                         use_own_xacts = true;
373                 else
374                         use_own_xacts = false;
375         }
376
377         /*
378          * If we are running ANALYZE without per-table transactions, we'll
379          * need a memory context with table lifetime.
380          */
381         if (!use_own_xacts)
382                 anl_context = AllocSetContextCreate(PortalContext,
383                                                                                         "Analyze",
384                                                                                         ALLOCSET_DEFAULT_MINSIZE,
385                                                                                         ALLOCSET_DEFAULT_INITSIZE,
386                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
387
388         /*
389          * vacuum_rel expects to be entered with no transaction active; it
390          * will start and commit its own transaction.  But we are called by an
391          * SQL command, and so we are executing inside a transaction already.
392          * We commit the transaction started in PostgresMain() here, and start
393          * another one before exiting to match the commit waiting for us back
394          * in PostgresMain().
395          */
396         if (use_own_xacts)
397         {
398                 /* matches the StartTransaction in PostgresMain() */
399                 CommitTransactionCommand();
400         }
401
402         /* Turn vacuum cost accounting on or off */
403         PG_TRY();
404         {
405                 ListCell   *cur;
406
407                 VacuumCostActive = (VacuumCostDelay > 0);
408                 VacuumCostBalance = 0;
409
410                 /*
411                  * Loop to process each selected relation.
412                  */
413                 foreach(cur, relations)
414                 {
415                         Oid                     relid = lfirst_oid(cur);
416
417                         if (vacstmt->vacuum)
418                         {
419                                 if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
420                                         all_rels = false;       /* forget about updating dbstats */
421                         }
422                         if (vacstmt->analyze)
423                         {
424                                 MemoryContext old_context = NULL;
425
426                                 /*
427                                  * If using separate xacts, start one for analyze.
428                                  * Otherwise, we can use the outer transaction, but we
429                                  * still need to call analyze_rel in a memory context that
430                                  * will be cleaned up on return (else we leak memory while
431                                  * processing multiple tables).
432                                  */
433                                 if (use_own_xacts)
434                                 {
435                                         StartTransactionCommand();
436                                         /* functions in indexes may want a snapshot set */
437                                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
438                                 }
439                                 else
440                                         old_context = MemoryContextSwitchTo(anl_context);
441
442                                 /*
443                                  * Tell the buffer replacement strategy that vacuum is
444                                  * causing the IO
445                                  */
446                                 StrategyHintVacuum(true);
447
448                                 analyze_rel(relid, vacstmt);
449
450                                 StrategyHintVacuum(false);
451
452                                 if (use_own_xacts)
453                                         CommitTransactionCommand();
454                                 else
455                                 {
456                                         MemoryContextSwitchTo(old_context);
457                                         MemoryContextResetAndDeleteChildren(anl_context);
458                                 }
459                         }
460                 }
461         }
462         PG_CATCH();
463         {
464                 /* Make sure cost accounting is turned off after error */
465                 VacuumCostActive = false;
466                 PG_RE_THROW();
467         }
468         PG_END_TRY();
469
470         /* Turn off vacuum cost accounting */
471         VacuumCostActive = false;
472
473         /*
474          * Finish up processing.
475          */
476         if (use_own_xacts)
477         {
478                 /* here, we are not in a transaction */
479
480                 /*
481                  * This matches the CommitTransaction waiting for us in
482                  * PostgresMain().
483                  */
484                 StartTransactionCommand();
485         }
486
487         if (vacstmt->vacuum)
488         {
489                 /*
490                  * If it was a database-wide VACUUM, print FSM usage statistics
491                  * (we don't make you be superuser to see these).
492                  */
493                 if (vacstmt->relation == NULL)
494                         PrintFreeSpaceMapStatistics(elevel);
495
496                 /*
497                  * If we completed a database-wide VACUUM without skipping any
498                  * relations, update the database's pg_database row with info
499                  * about the transaction IDs used, and try to truncate pg_clog.
500                  */
501                 if (all_rels)
502                 {
503                         vac_update_dbstats(MyDatabaseId,
504                                                            initialOldestXmin, initialFreezeLimit);
505                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
506                 }
507         }
508
509         /*
510          * Clean up working storage --- note we must do this after
511          * StartTransactionCommand, else we might be trying to delete the
512          * active context!
513          */
514         MemoryContextDelete(vac_context);
515         vac_context = NULL;
516
517         if (anl_context)
518                 MemoryContextDelete(anl_context);
519 }
520
521 /*
522  * Build a list of Oids for each relation to be processed
523  *
524  * The list is built in vac_context so that it will survive across our
525  * per-relation transactions.
526  */
527 static List *
528 get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
529 {
530         List       *oid_list = NIL;
531         MemoryContext oldcontext;
532
533         /* List supplied by VACUUM's caller? */
534         if (relids)
535                 return relids;
536
537         if (vacrel)
538         {
539                 /* Process a specific relation */
540                 Oid                     relid;
541
542                 relid = RangeVarGetRelid(vacrel, false);
543
544                 /* Make a relation list entry for this guy */
545                 oldcontext = MemoryContextSwitchTo(vac_context);
546                 oid_list = lappend_oid(oid_list, relid);
547                 MemoryContextSwitchTo(oldcontext);
548         }
549         else
550         {
551                 /* Process all plain relations listed in pg_class */
552                 Relation        pgclass;
553                 HeapScanDesc scan;
554                 HeapTuple       tuple;
555                 ScanKeyData key;
556
557                 ScanKeyInit(&key,
558                                         Anum_pg_class_relkind,
559                                         BTEqualStrategyNumber, F_CHAREQ,
560                                         CharGetDatum(RELKIND_RELATION));
561
562                 pgclass = heap_open(RelationRelationId, AccessShareLock);
563
564                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
565
566                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
567                 {
568                         /* Make a relation list entry for this guy */
569                         oldcontext = MemoryContextSwitchTo(vac_context);
570                         oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
571                         MemoryContextSwitchTo(oldcontext);
572                 }
573
574                 heap_endscan(scan);
575                 heap_close(pgclass, AccessShareLock);
576         }
577
578         return oid_list;
579 }
580
581 /*
582  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
583  */
584 void
585 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
586                                           TransactionId *oldestXmin,
587                                           TransactionId *freezeLimit)
588 {
589         TransactionId limit;
590
591         *oldestXmin = GetOldestXmin(sharedRel);
592
593         Assert(TransactionIdIsNormal(*oldestXmin));
594
595         if (vacstmt->freeze)
596         {
597                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
598                 limit = *oldestXmin;
599         }
600         else
601         {
602                 /*
603                  * Normal case: freeze cutoff is well in the past, to wit, about
604                  * halfway to the wrap horizon
605                  */
606                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
607         }
608
609         /*
610          * Be careful not to generate a "permanent" XID
611          */
612         if (!TransactionIdIsNormal(limit))
613                 limit = FirstNormalTransactionId;
614
615         /*
616          * Ensure sane relationship of limits
617          */
618         if (TransactionIdFollows(limit, *oldestXmin))
619         {
620                 ereport(WARNING,
621                                 (errmsg("oldest xmin is far in the past"),
622                                  errhint("Close open transactions soon to avoid wraparound problems.")));
623                 limit = *oldestXmin;
624         }
625
626         *freezeLimit = limit;
627 }
628
629
630 /*
631  *      vac_update_relstats() -- update statistics for one relation
632  *
633  *              Update the whole-relation statistics that are kept in its pg_class
634  *              row.  There are additional stats that will be updated if we are
635  *              doing ANALYZE, but we always update these stats.  This routine works
636  *              for both index and heap relation entries in pg_class.
637  *
638  *              We violate no-overwrite semantics here by storing new values for the
639  *              statistics columns directly into the pg_class tuple that's already on
640  *              the page.  The reason for this is that if we updated these tuples in
641  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
642  *              by the time we got done with a vacuum cycle, most of the tuples in
643  *              pg_class would've been obsoleted.  Of course, this only works for
644  *              fixed-size never-null columns, but these are.
645  *
646  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
647  *              ANALYZE.
648  */
649 void
650 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
651                                         bool hasindex)
652 {
653         Relation        rd;
654         HeapTupleData rtup;
655         HeapTuple       ctup;
656         Form_pg_class pgcform;
657         Buffer          buffer;
658
659         /*
660          * update number of tuples and number of pages in pg_class
661          */
662         rd = heap_open(RelationRelationId, RowExclusiveLock);
663
664         ctup = SearchSysCache(RELOID,
665                                                   ObjectIdGetDatum(relid),
666                                                   0, 0, 0);
667         if (!HeapTupleIsValid(ctup))
668                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
669                          relid);
670
671         /* get the buffer cache tuple */
672         rtup.t_self = ctup->t_self;
673         ReleaseSysCache(ctup);
674         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
675                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
676                          relid);
677
678         /* ensure no one else does this at the same time */
679         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
680
681         /* overwrite the existing statistics in the tuple */
682         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
683         pgcform->relpages = (int32) num_pages;
684         pgcform->reltuples = (float4) num_tuples;
685         pgcform->relhasindex = hasindex;
686
687         /*
688          * If we have discovered that there are no indexes, then there's no
689          * primary key either.  This could be done more thoroughly...
690          */
691         if (!hasindex)
692                 pgcform->relhaspkey = false;
693
694         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
695
696         /*
697          * Invalidate the tuple in the catcaches; this also arranges to flush
698          * the relation's relcache entry.  (If we fail to commit for some
699          * reason, no flush will occur, but no great harm is done since there
700          * are no noncritical state updates here.)
701          */
702         CacheInvalidateHeapTuple(rd, &rtup);
703
704         /* Write the buffer */
705         WriteBuffer(buffer);
706
707         heap_close(rd, RowExclusiveLock);
708 }
709
710
711 /*
712  *      vac_update_dbstats() -- update statistics for one database
713  *
714  *              Update the whole-database statistics that are kept in its pg_database
715  *              row.
716  *
717  *              We violate no-overwrite semantics here by storing new values for the
718  *              statistics columns directly into the tuple that's already on the page.
719  *              As with vac_update_relstats, this avoids leaving dead tuples behind
720  *              after a VACUUM.
721  *
722  *              This routine is shared by full and lazy VACUUM.  Note that it is only
723  *              applied after a database-wide VACUUM operation.
724  *
725  *              Note that we don't bother to update the flat-file copy of pg_database.
726  */
727 static void
728 vac_update_dbstats(Oid dbid,
729                                    TransactionId vacuumXID,
730                                    TransactionId frozenXID)
731 {
732         Relation        relation;
733         ScanKeyData entry[1];
734         HeapScanDesc scan;
735         HeapTuple       tuple;
736         Form_pg_database dbform;
737
738         relation = heap_open(DatabaseRelationId, RowExclusiveLock);
739
740         /* Must use a heap scan, since there's no syscache for pg_database */
741         ScanKeyInit(&entry[0],
742                                 ObjectIdAttributeNumber,
743                                 BTEqualStrategyNumber, F_OIDEQ,
744                                 ObjectIdGetDatum(dbid));
745
746         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
747
748         tuple = heap_getnext(scan, ForwardScanDirection);
749
750         if (!HeapTupleIsValid(tuple))
751                 elog(ERROR, "could not find tuple for database %u", dbid);
752
753         /* ensure no one else does this at the same time */
754         LockBuffer(scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE);
755
756         dbform = (Form_pg_database) GETSTRUCT(tuple);
757
758         /* overwrite the existing statistics in the tuple */
759         dbform->datvacuumxid = vacuumXID;
760         dbform->datfrozenxid = frozenXID;
761
762         LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
763
764         /* invalidate the tuple in the cache and write the buffer */
765         CacheInvalidateHeapTuple(relation, tuple);
766         WriteNoReleaseBuffer(scan->rs_cbuf);
767
768         heap_endscan(scan);
769
770         heap_close(relation, RowExclusiveLock);
771 }
772
773
774 /*
775  *      vac_truncate_clog() -- attempt to truncate the commit log
776  *
777  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
778  *              and use it to truncate the transaction commit log (pg_clog).
779  *              Also update the XID wrap limit point maintained by varsup.c.
780  *
781  *              We also generate a warning if the system-wide oldest datfrozenxid
782  *              seems to be in danger of wrapping around.  This is a long-in-advance
783  *              warning; if we start getting uncomfortably close, GetNewTransactionId
784  *              will generate more-annoying warnings, and ultimately refuse to issue
785  *              any more new XIDs.
786  *
787  *              The passed XIDs are simply the ones I just wrote into my pg_database
788  *              entry.  They're used to initialize the "min" calculations.
789  *
790  *              This routine is shared by full and lazy VACUUM.  Note that it is only
791  *              applied after a database-wide VACUUM operation.
792  */
793 static void
794 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
795 {
796         TransactionId myXID = GetCurrentTransactionId();
797         Relation        relation;
798         HeapScanDesc scan;
799         HeapTuple       tuple;
800         int32           age;
801         NameData        oldest_datname;
802         bool            vacuumAlreadyWrapped = false;
803         bool            frozenAlreadyWrapped = false;
804
805         /* init oldest_datname to sync with my frozenXID */
806         namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
807
808         /*
809          * Note: the "already wrapped" cases should now be impossible due to the
810          * defenses in GetNewTransactionId, but we keep them anyway.
811          */
812         relation = heap_open(DatabaseRelationId, AccessShareLock);
813
814         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
815
816         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
817         {
818                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
819
820                 /* Ignore non-connectable databases (eg, template0) */
821                 /* It's assumed that these have been frozen correctly */
822                 if (!dbform->datallowconn)
823                         continue;
824
825                 if (TransactionIdIsNormal(dbform->datvacuumxid))
826                 {
827                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
828                                 vacuumAlreadyWrapped = true;
829                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
830                                 vacuumXID = dbform->datvacuumxid;
831                 }
832                 if (TransactionIdIsNormal(dbform->datfrozenxid))
833                 {
834                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
835                                 frozenAlreadyWrapped = true;
836                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
837                         {
838                                 frozenXID = dbform->datfrozenxid;
839                                 namecpy(&oldest_datname, &dbform->datname);
840                         }
841                 }
842         }
843
844         heap_endscan(scan);
845
846         heap_close(relation, AccessShareLock);
847
848         /*
849          * Do not truncate CLOG if we seem to have suffered wraparound
850          * already; the computed minimum XID might be bogus.
851          */
852         if (vacuumAlreadyWrapped)
853         {
854                 ereport(WARNING,
855                                 (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
856                                  errdetail("You may have already suffered transaction-wraparound data loss.")));
857                 return;
858         }
859
860         /* Truncate CLOG to the oldest vacuumxid */
861         TruncateCLOG(vacuumXID);
862
863         /*
864          * Do not update varsup.c if we seem to have suffered wraparound
865          * already; the computed XID might be bogus.
866          */
867         if (frozenAlreadyWrapped)
868         {
869                 ereport(WARNING,
870                                 (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
871                                  errhint("Better vacuum them soon, or you may have a wraparound failure.")));
872                 return;
873         }
874
875         /* Update the wrap limit for GetNewTransactionId */
876         SetTransactionIdLimit(frozenXID, &oldest_datname);
877
878         /* Give warning about impending wraparound problems */
879         age = (int32) (myXID - frozenXID);
880         if (age > (int32) ((MaxTransactionId >> 3) * 3))
881                 ereport(WARNING,
882                                 (errmsg("database \"%s\" must be vacuumed within %u transactions",
883                                                 NameStr(oldest_datname),
884                                                 (MaxTransactionId >> 1) - age),
885                                  errhint("To avoid a database shutdown, execute a full-database VACUUM in \"%s\".",
886                                                  NameStr(oldest_datname))));
887 }
888
889
890 /****************************************************************************
891  *                                                                                                                                                      *
892  *                      Code common to both flavors of VACUUM                                                   *
893  *                                                                                                                                                      *
894  ****************************************************************************
895  */
896
897
898 /*
899  *      vacuum_rel() -- vacuum one heap relation
900  *
901  *              Returns TRUE if we actually processed the relation (or can ignore it
902  *              for some reason), FALSE if we failed to process it due to permissions
903  *              or other reasons.  (A FALSE result really means that some data
904  *              may have been left unvacuumed, so we can't update XID stats.)
905  *
906  *              Doing one heap at a time incurs extra overhead, since we need to
907  *              check that the heap exists again just before we vacuum it.      The
908  *              reason that we do this is so that vacuuming can be spread across
909  *              many small transactions.  Otherwise, two-phase locking would require
910  *              us to lock the entire database during one pass of the vacuum cleaner.
911  *
912  *              At entry and exit, we are not inside a transaction.
913  */
914 static bool
915 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
916 {
917         LOCKMODE        lmode;
918         Relation        onerel;
919         LockRelId       onerelid;
920         Oid                     toast_relid;
921         bool            result;
922
923         /* Begin a transaction for vacuuming this relation */
924         StartTransactionCommand();
925         /* functions in indexes may want a snapshot set */
926         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
927
928         /*
929          * Tell the cache replacement strategy that vacuum is causing all
930          * following IO
931          */
932         StrategyHintVacuum(true);
933
934         /*
935          * Check for user-requested abort.      Note we want this to be inside a
936          * transaction, so xact.c doesn't issue useless WARNING.
937          */
938         CHECK_FOR_INTERRUPTS();
939
940         /*
941          * Race condition -- if the pg_class tuple has gone away since the
942          * last time we saw it, we don't need to vacuum it.
943          */
944         if (!SearchSysCacheExists(RELOID,
945                                                           ObjectIdGetDatum(relid),
946                                                           0, 0, 0))
947         {
948                 StrategyHintVacuum(false);
949                 CommitTransactionCommand();
950                 return true;                    /* okay 'cause no data there */
951         }
952
953         /*
954          * Determine the type of lock we want --- hard exclusive lock for a
955          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
956          * vacuum.      Either way, we can be sure that no other backend is
957          * vacuuming the same table.
958          */
959         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
960
961         /*
962          * Open the class, get an appropriate lock on it, and check
963          * permissions.
964          *
965          * We allow the user to vacuum a table if he is superuser, the table
966          * owner, or the database owner (but in the latter case, only if it's
967          * not a shared relation).      pg_class_ownercheck includes the superuser
968          * case.
969          *
970          * Note we choose to treat permissions failure as a WARNING and keep
971          * trying to vacuum the rest of the DB --- is this appropriate?
972          */
973         onerel = relation_open(relid, lmode);
974
975         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
976                   (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
977         {
978                 ereport(WARNING,
979                                 (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
980                                                 RelationGetRelationName(onerel))));
981                 relation_close(onerel, lmode);
982                 StrategyHintVacuum(false);
983                 CommitTransactionCommand();
984                 return false;
985         }
986
987         /*
988          * Check that it's a plain table; we used to do this in get_rel_oids()
989          * but seems safer to check after we've locked the relation.
990          */
991         if (onerel->rd_rel->relkind != expected_relkind)
992         {
993                 ereport(WARNING,
994                                 (errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
995                                                 RelationGetRelationName(onerel))));
996                 relation_close(onerel, lmode);
997                 StrategyHintVacuum(false);
998                 CommitTransactionCommand();
999                 return false;
1000         }
1001
1002         /*
1003          * Silently ignore tables that are temp tables of other backends ---
1004          * trying to vacuum these will lead to great unhappiness, since their
1005          * contents are probably not up-to-date on disk.  (We don't throw a
1006          * warning here; it would just lead to chatter during a database-wide
1007          * VACUUM.)
1008          */
1009         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
1010         {
1011                 relation_close(onerel, lmode);
1012                 StrategyHintVacuum(false);
1013                 CommitTransactionCommand();
1014                 return true;                    /* assume no long-lived data in temp
1015                                                                  * tables */
1016         }
1017
1018         /*
1019          * Get a session-level lock too. This will protect our access to the
1020          * relation across multiple transactions, so that we can vacuum the
1021          * relation's TOAST table (if any) secure in the knowledge that no one
1022          * is deleting the parent relation.
1023          *
1024          * NOTE: this cannot block, even if someone else is waiting for access,
1025          * because the lock manager knows that both lock requests are from the
1026          * same process.
1027          */
1028         onerelid = onerel->rd_lockInfo.lockRelId;
1029         LockRelationForSession(&onerelid, onerel->rd_istemp, lmode);
1030
1031         /*
1032          * Remember the relation's TOAST relation for later
1033          */
1034         toast_relid = onerel->rd_rel->reltoastrelid;
1035
1036         /*
1037          * Do the actual work --- either FULL or "lazy" vacuum
1038          */
1039         if (vacstmt->full)
1040                 full_vacuum_rel(onerel, vacstmt);
1041         else
1042                 lazy_vacuum_rel(onerel, vacstmt);
1043
1044         result = true;                          /* did the vacuum */
1045
1046         /* all done with this class, but hold lock until commit */
1047         relation_close(onerel, NoLock);
1048
1049         /*
1050          * Complete the transaction and free all temporary memory used.
1051          */
1052         StrategyHintVacuum(false);
1053         CommitTransactionCommand();
1054
1055         /*
1056          * If the relation has a secondary toast rel, vacuum that too while we
1057          * still hold the session lock on the master table.  Note however that
1058          * "analyze" will not get done on the toast table.      This is good,
1059          * because the toaster always uses hardcoded index access and
1060          * statistics are totally unimportant for toast relations.
1061          */
1062         if (toast_relid != InvalidOid)
1063         {
1064                 if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
1065                         result = false;         /* failed to vacuum the TOAST table? */
1066         }
1067
1068         /*
1069          * Now release the session-level lock on the master table.
1070          */
1071         UnlockRelationForSession(&onerelid, lmode);
1072
1073         return result;
1074 }
1075
1076
1077 /****************************************************************************
1078  *                                                                                                                                                      *
1079  *                      Code for VACUUM FULL (only)                                                                             *
1080  *                                                                                                                                                      *
1081  ****************************************************************************
1082  */
1083
1084
1085 /*
1086  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
1087  *
1088  *              This routine vacuums a single heap, cleans out its indexes, and
1089  *              updates its num_pages and num_tuples statistics.
1090  *
1091  *              At entry, we have already established a transaction and opened
1092  *              and locked the relation.
1093  */
1094 static void
1095 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1096 {
1097         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
1098                                                                                  * clean indexes */
1099         VacPageListData fraged_pages;           /* List of pages with space enough
1100                                                                                  * for re-using */
1101         Relation   *Irel;
1102         int                     nindexes,
1103                                 i;
1104         VRelStats  *vacrelstats;
1105
1106         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
1107                                                   &OldestXmin, &FreezeLimit);
1108
1109         /*
1110          * Set up statistics-gathering machinery.
1111          */
1112         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1113         vacrelstats->rel_pages = 0;
1114         vacrelstats->rel_tuples = 0;
1115         vacrelstats->hasindex = false;
1116
1117         /* scan the heap */
1118         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1119         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
1120
1121         /* Now open all indexes of the relation */
1122         vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1123         if (nindexes > 0)
1124                 vacrelstats->hasindex = true;
1125
1126         /* Clean/scan index relation(s) */
1127         if (Irel != NULL)
1128         {
1129                 if (vacuum_pages.num_pages > 0)
1130                 {
1131                         for (i = 0; i < nindexes; i++)
1132                                 vacuum_index(&vacuum_pages, Irel[i],
1133                                                          vacrelstats->rel_tuples, 0);
1134                 }
1135                 else
1136                 {
1137                         /* just scan indexes to update statistic */
1138                         for (i = 0; i < nindexes; i++)
1139                                 scan_index(Irel[i], vacrelstats->rel_tuples);
1140                 }
1141         }
1142
1143         if (fraged_pages.num_pages > 0)
1144         {
1145                 /* Try to shrink heap */
1146                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1147                                         nindexes, Irel);
1148                 vac_close_indexes(nindexes, Irel, NoLock);
1149         }
1150         else
1151         {
1152                 vac_close_indexes(nindexes, Irel, NoLock);
1153                 if (vacuum_pages.num_pages > 0)
1154                 {
1155                         /* Clean pages from vacuum_pages list */
1156                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1157                 }
1158         }
1159
1160         /* update shared free space map with final free space info */
1161         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
1162
1163         /* update statistics in pg_class */
1164         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1165                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
1166
1167         /* report results to the stats collector, too */
1168         pgstat_report_vacuum(RelationGetRelid(onerel), vacstmt->analyze,
1169                                                  vacrelstats->rel_tuples);
1170 }
1171
1172
1173 /*
1174  *      scan_heap() -- scan an open heap relation
1175  *
1176  *              This routine sets commit status bits, constructs vacuum_pages (list
1177  *              of pages we need to compact free space on and/or clean indexes of
1178  *              deleted tuples), constructs fraged_pages (list of pages with free
1179  *              space that tuples could be moved into), and calculates statistics
1180  *              on the number of live tuples in the heap.
1181  */
1182 static void
1183 scan_heap(VRelStats *vacrelstats, Relation onerel,
1184                   VacPageList vacuum_pages, VacPageList fraged_pages)
1185 {
1186         BlockNumber nblocks,
1187                                 blkno;
1188         HeapTupleData tuple;
1189         char       *relname;
1190         VacPage         vacpage;
1191         BlockNumber empty_pages,
1192                                 empty_end_pages;
1193         double          num_tuples,
1194                                 tups_vacuumed,
1195                                 nkeep,
1196                                 nunused;
1197         double          free_space,
1198                                 usable_free_space;
1199         Size            min_tlen = MaxTupleSize;
1200         Size            max_tlen = 0;
1201         bool            do_shrinking = true;
1202         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1203         int                     num_vtlinks = 0;
1204         int                     free_vtlinks = 100;
1205         VacRUsage       ru0;
1206
1207         vac_init_rusage(&ru0);
1208
1209         relname = RelationGetRelationName(onerel);
1210         ereport(elevel,
1211                         (errmsg("vacuuming \"%s.%s\"",
1212                                         get_namespace_name(RelationGetNamespace(onerel)),
1213                                         relname)));
1214
1215         empty_pages = empty_end_pages = 0;
1216         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1217         free_space = 0;
1218
1219         nblocks = RelationGetNumberOfBlocks(onerel);
1220
1221         /*
1222          * We initially create each VacPage item in a maximal-sized workspace,
1223          * then copy the workspace into a just-large-enough copy.
1224          */
1225         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1226
1227         for (blkno = 0; blkno < nblocks; blkno++)
1228         {
1229                 Page            page,
1230                                         tempPage = NULL;
1231                 bool            do_reap,
1232                                         do_frag;
1233                 Buffer          buf;
1234                 OffsetNumber offnum,
1235                                         maxoff;
1236                 bool            pgchanged,
1237                                         notup;
1238
1239                 vacuum_delay_point();
1240
1241                 buf = ReadBuffer(onerel, blkno);
1242                 page = BufferGetPage(buf);
1243
1244                 /*
1245                  * We don't bother to do LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE)
1246                  * because we assume that holding exclusive lock on the relation
1247                  * will keep other backends from looking at the page.
1248                  */
1249
1250                 vacpage->blkno = blkno;
1251                 vacpage->offsets_used = 0;
1252                 vacpage->offsets_free = 0;
1253
1254                 if (PageIsNew(page))
1255                 {
1256                         VacPage         vacpagecopy;
1257
1258                         ereport(WARNING,
1259                         (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
1260                                         relname, blkno)));
1261                         PageInit(page, BufferGetPageSize(buf), 0);
1262                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1263                         free_space += vacpage->free;
1264                         empty_pages++;
1265                         empty_end_pages++;
1266                         vacpagecopy = copy_vac_page(vacpage);
1267                         vpage_insert(vacuum_pages, vacpagecopy);
1268                         vpage_insert(fraged_pages, vacpagecopy);
1269                         WriteBuffer(buf);
1270                         continue;
1271                 }
1272
1273                 if (PageIsEmpty(page))
1274                 {
1275                         VacPage         vacpagecopy;
1276
1277                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1278                         free_space += vacpage->free;
1279                         empty_pages++;
1280                         empty_end_pages++;
1281                         vacpagecopy = copy_vac_page(vacpage);
1282                         vpage_insert(vacuum_pages, vacpagecopy);
1283                         vpage_insert(fraged_pages, vacpagecopy);
1284                         ReleaseBuffer(buf);
1285                         continue;
1286                 }
1287
1288                 pgchanged = false;
1289                 notup = true;
1290                 maxoff = PageGetMaxOffsetNumber(page);
1291                 for (offnum = FirstOffsetNumber;
1292                          offnum <= maxoff;
1293                          offnum = OffsetNumberNext(offnum))
1294                 {
1295                         ItemId          itemid = PageGetItemId(page, offnum);
1296                         bool            tupgone = false;
1297
1298                         /*
1299                          * Collect un-used items too - it's possible to have indexes
1300                          * pointing here after crash.
1301                          */
1302                         if (!ItemIdIsUsed(itemid))
1303                         {
1304                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1305                                 nunused += 1;
1306                                 continue;
1307                         }
1308
1309                         tuple.t_datamcxt = NULL;
1310                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1311                         tuple.t_len = ItemIdGetLength(itemid);
1312                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1313
1314                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1315                         {
1316                                 case HEAPTUPLE_DEAD:
1317                                         tupgone = true;         /* we can delete the tuple */
1318                                         break;
1319                                 case HEAPTUPLE_LIVE:
1320
1321                                         /*
1322                                          * Tuple is good.  Consider whether to replace its
1323                                          * xmin value with FrozenTransactionId.
1324                                          */
1325                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1326                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1327                                                                                           FreezeLimit))
1328                                         {
1329                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1330                                                 /* infomask should be okay already */
1331                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1332                                                 pgchanged = true;
1333                                         }
1334
1335                                         /*
1336                                          * Other checks...
1337                                          */
1338                                         if (onerel->rd_rel->relhasoids &&
1339                                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1340                                                 elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
1341                                                          relname, blkno, offnum);
1342                                         break;
1343                                 case HEAPTUPLE_RECENTLY_DEAD:
1344
1345                                         /*
1346                                          * If tuple is recently deleted then we must not
1347                                          * remove it from relation.
1348                                          */
1349                                         nkeep += 1;
1350
1351                                         /*
1352                                          * If we do shrinking and this tuple is updated one
1353                                          * then remember it to construct updated tuple
1354                                          * dependencies.
1355                                          */
1356                                         if (do_shrinking &&
1357                                                 !(ItemPointerEquals(&(tuple.t_self),
1358                                                                                         &(tuple.t_data->t_ctid))))
1359                                         {
1360                                                 if (free_vtlinks == 0)
1361                                                 {
1362                                                         free_vtlinks = 1000;
1363                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1364                                                                                    (free_vtlinks + num_vtlinks) *
1365                                                                                                  sizeof(VTupleLinkData));
1366                                                 }
1367                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1368                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1369                                                 free_vtlinks--;
1370                                                 num_vtlinks++;
1371                                         }
1372                                         break;
1373                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1374
1375                                         /*
1376                                          * This should not happen, since we hold exclusive
1377                                          * lock on the relation; shouldn't we raise an error?
1378                                          * (Actually, it can happen in system catalogs, since
1379                                          * we tend to release write lock before commit there.)
1380                                          */
1381                                         ereport(NOTICE,
1382                                                         (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
1383                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
1384                                         do_shrinking = false;
1385                                         break;
1386                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1387
1388                                         /*
1389                                          * This should not happen, since we hold exclusive
1390                                          * lock on the relation; shouldn't we raise an error?
1391                                          * (Actually, it can happen in system catalogs, since
1392                                          * we tend to release write lock before commit there.)
1393                                          */
1394                                         ereport(NOTICE,
1395                                                         (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
1396                                                                         relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
1397                                         do_shrinking = false;
1398                                         break;
1399                                 default:
1400                                         elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1401                                         break;
1402                         }
1403
1404                         if (tupgone)
1405                         {
1406                                 ItemId          lpp;
1407
1408                                 /*
1409                                  * Here we are building a temporary copy of the page with
1410                                  * dead tuples removed.  Below we will apply
1411                                  * PageRepairFragmentation to the copy, so that we can
1412                                  * determine how much space will be available after
1413                                  * removal of dead tuples.      But note we are NOT changing
1414                                  * the real page yet...
1415                                  */
1416                                 if (tempPage == NULL)
1417                                 {
1418                                         Size            pageSize;
1419
1420                                         pageSize = PageGetPageSize(page);
1421                                         tempPage = (Page) palloc(pageSize);
1422                                         memcpy(tempPage, page, pageSize);
1423                                 }
1424
1425                                 /* mark it unused on the temp page */
1426                                 lpp = PageGetItemId(tempPage, offnum);
1427                                 lpp->lp_flags &= ~LP_USED;
1428
1429                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1430                                 tups_vacuumed += 1;
1431                         }
1432                         else
1433                         {
1434                                 num_tuples += 1;
1435                                 notup = false;
1436                                 if (tuple.t_len < min_tlen)
1437                                         min_tlen = tuple.t_len;
1438                                 if (tuple.t_len > max_tlen)
1439                                         max_tlen = tuple.t_len;
1440                         }
1441                 }                                               /* scan along page */
1442
1443                 if (tempPage != NULL)
1444                 {
1445                         /* Some tuples are removable; figure free space after removal */
1446                         PageRepairFragmentation(tempPage, NULL);
1447                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1448                         pfree(tempPage);
1449                         do_reap = true;
1450                 }
1451                 else
1452                 {
1453                         /* Just use current available space */
1454                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1455                         /* Need to reap the page if it has ~LP_USED line pointers */
1456                         do_reap = (vacpage->offsets_free > 0);
1457                 }
1458
1459                 free_space += vacpage->free;
1460
1461                 /*
1462                  * Add the page to fraged_pages if it has a useful amount of free
1463                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1464                  * don't know that accurately near the start of the relation, so
1465                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1466                  */
1467                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1468
1469                 if (do_reap || do_frag)
1470                 {
1471                         VacPage         vacpagecopy = copy_vac_page(vacpage);
1472
1473                         if (do_reap)
1474                                 vpage_insert(vacuum_pages, vacpagecopy);
1475                         if (do_frag)
1476                                 vpage_insert(fraged_pages, vacpagecopy);
1477                 }
1478
1479                 /*
1480                  * Include the page in empty_end_pages if it will be empty after
1481                  * vacuuming; this is to keep us from using it as a move
1482                  * destination.
1483                  */
1484                 if (notup)
1485                 {
1486                         empty_pages++;
1487                         empty_end_pages++;
1488                 }
1489                 else
1490                         empty_end_pages = 0;
1491
1492                 if (pgchanged)
1493                         WriteBuffer(buf);
1494                 else
1495                         ReleaseBuffer(buf);
1496         }
1497
1498         pfree(vacpage);
1499
1500         /* save stats in the rel list for use later */
1501         vacrelstats->rel_tuples = num_tuples;
1502         vacrelstats->rel_pages = nblocks;
1503         if (num_tuples == 0)
1504                 min_tlen = max_tlen = 0;
1505         vacrelstats->min_tlen = min_tlen;
1506         vacrelstats->max_tlen = max_tlen;
1507
1508         vacuum_pages->empty_end_pages = empty_end_pages;
1509         fraged_pages->empty_end_pages = empty_end_pages;
1510
1511         /*
1512          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1513          * remove any "empty" end-pages from the list, and compute usable free
1514          * space = free space in remaining pages.
1515          */
1516         if (do_shrinking)
1517         {
1518                 int                     i;
1519
1520                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1521                 fraged_pages->num_pages -= empty_end_pages;
1522                 usable_free_space = 0;
1523                 for (i = 0; i < fraged_pages->num_pages; i++)
1524                         usable_free_space += fraged_pages->pagedesc[i]->free;
1525         }
1526         else
1527         {
1528                 fraged_pages->num_pages = 0;
1529                 usable_free_space = 0;
1530         }
1531
1532         /* don't bother to save vtlinks if we will not call repair_frag */
1533         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1534         {
1535                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1536                           vac_cmp_vtlinks);
1537                 vacrelstats->vtlinks = vtlinks;
1538                 vacrelstats->num_vtlinks = num_vtlinks;
1539         }
1540         else
1541         {
1542                 vacrelstats->vtlinks = NULL;
1543                 vacrelstats->num_vtlinks = 0;
1544                 pfree(vtlinks);
1545         }
1546
1547         ereport(elevel,
1548                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1549                                         RelationGetRelationName(onerel),
1550                                         tups_vacuumed, num_tuples, nblocks),
1551                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1552                   "Nonremovable row versions range from %lu to %lu bytes long.\n"
1553                                            "There were %.0f unused item pointers.\n"
1554         "Total free space (including removable row versions) is %.0f bytes.\n"
1555                                            "%u pages are or will become empty, including %u at the end of the table.\n"
1556                                            "%u pages containing %.0f free bytes are potential move destinations.\n"
1557                                            "%s",
1558                                            nkeep,
1559                                            (unsigned long) min_tlen, (unsigned long) max_tlen,
1560                                            nunused,
1561                                            free_space,
1562                                            empty_pages, empty_end_pages,
1563                                            fraged_pages->num_pages, usable_free_space,
1564                                            vac_show_rusage(&ru0))));
1565 }
1566
1567
1568 /*
1569  *      repair_frag() -- try to repair relation's fragmentation
1570  *
1571  *              This routine marks dead tuples as unused and tries re-use dead space
1572  *              by moving tuples (and inserting indexes if needed). It constructs
1573  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1574  *              for them after committing (in hack-manner - without losing locks
1575  *              and freeing memory!) current transaction. It truncates relation
1576  *              if some end-blocks are gone away.
1577  */
1578 static void
1579 repair_frag(VRelStats *vacrelstats, Relation onerel,
1580                         VacPageList vacuum_pages, VacPageList fraged_pages,
1581                         int nindexes, Relation *Irel)
1582 {
1583         TransactionId myXID = GetCurrentTransactionId();
1584         Buffer          dst_buffer = InvalidBuffer;
1585         BlockNumber nblocks,
1586                                 blkno;
1587         BlockNumber last_move_dest_block = 0,
1588                                 last_vacuum_block;
1589         Page            dst_page = NULL;
1590         ExecContextData ec;
1591         VacPageListData Nvacpagelist;
1592         VacPage         dst_vacpage = NULL,
1593                                 last_vacuum_page,
1594                                 vacpage,
1595                            *curpage;
1596         int                     i;
1597         int                     num_moved = 0,
1598                                 num_fraged_pages,
1599                                 vacuumed_pages;
1600         int                     keep_tuples = 0;
1601         VacRUsage       ru0;
1602
1603         vac_init_rusage(&ru0);
1604
1605         ExecContext_Init(&ec, onerel);
1606
1607         Nvacpagelist.num_pages = 0;
1608         num_fraged_pages = fraged_pages->num_pages;
1609         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1610         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1611         if (vacuumed_pages > 0)
1612         {
1613                 /* get last reaped page from vacuum_pages */
1614                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1615                 last_vacuum_block = last_vacuum_page->blkno;
1616         }
1617         else
1618         {
1619                 last_vacuum_page = NULL;
1620                 last_vacuum_block = InvalidBlockNumber;
1621         }
1622
1623         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1624         vacpage->offsets_used = vacpage->offsets_free = 0;
1625
1626         /*
1627          * Scan pages backwards from the last nonempty page, trying to move
1628          * tuples down to lower pages.  Quit when we reach a page that we have
1629          * moved any tuples onto, or the first page if we haven't moved
1630          * anything, or when we find a page we cannot completely empty (this
1631          * last condition is handled by "break" statements within the loop).
1632          *
1633          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1634          * in order by blkno.
1635          */
1636         nblocks = vacrelstats->rel_pages;
1637         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1638                  blkno > last_move_dest_block;
1639                  blkno--)
1640         {
1641                 Buffer          buf;
1642                 Page            page;
1643                 OffsetNumber offnum,
1644                                         maxoff;
1645                 bool            isempty,
1646                                         dowrite,
1647                                         chain_tuple_moved;
1648
1649                 vacuum_delay_point();
1650
1651                 /*
1652                  * Forget fraged_pages pages at or after this one; they're no
1653                  * longer useful as move targets, since we only want to move down.
1654                  * Note that since we stop the outer loop at last_move_dest_block,
1655                  * pages removed here cannot have had anything moved onto them
1656                  * already.
1657                  *
1658                  * Also note that we don't change the stored fraged_pages list, only
1659                  * our local variable num_fraged_pages; so the forgotten pages are
1660                  * still available to be loaded into the free space map later.
1661                  */
1662                 while (num_fraged_pages > 0 &&
1663                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1664                 {
1665                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1666                         --num_fraged_pages;
1667                 }
1668
1669                 /*
1670                  * Process this page of relation.
1671                  */
1672                 buf = ReadBuffer(onerel, blkno);
1673                 page = BufferGetPage(buf);
1674
1675                 vacpage->offsets_free = 0;
1676
1677                 isempty = PageIsEmpty(page);
1678
1679                 dowrite = false;
1680
1681                 /* Is the page in the vacuum_pages list? */
1682                 if (blkno == last_vacuum_block)
1683                 {
1684                         if (last_vacuum_page->offsets_free > 0)
1685                         {
1686                                 /* there are dead tuples on this page - clean them */
1687                                 Assert(!isempty);
1688                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1689                                 vacuum_page(onerel, buf, last_vacuum_page);
1690                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1691                                 dowrite = true;
1692                         }
1693                         else
1694                                 Assert(isempty);
1695                         --vacuumed_pages;
1696                         if (vacuumed_pages > 0)
1697                         {
1698                                 /* get prev reaped page from vacuum_pages */
1699                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1700                                 last_vacuum_block = last_vacuum_page->blkno;
1701                         }
1702                         else
1703                         {
1704                                 last_vacuum_page = NULL;
1705                                 last_vacuum_block = InvalidBlockNumber;
1706                         }
1707                         if (isempty)
1708                         {
1709                                 ReleaseBuffer(buf);
1710                                 continue;
1711                         }
1712                 }
1713                 else
1714                         Assert(!isempty);
1715
1716                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1717                                                                                  * off this page, yet */
1718                 vacpage->blkno = blkno;
1719                 maxoff = PageGetMaxOffsetNumber(page);
1720                 for (offnum = FirstOffsetNumber;
1721                          offnum <= maxoff;
1722                          offnum = OffsetNumberNext(offnum))
1723                 {
1724                         Size            tuple_len;
1725                         HeapTupleData tuple;
1726                         ItemId          itemid = PageGetItemId(page, offnum);
1727
1728                         if (!ItemIdIsUsed(itemid))
1729                                 continue;
1730
1731                         tuple.t_datamcxt = NULL;
1732                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1733                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1734                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1735
1736                         /* ---
1737                          * VACUUM FULL has an exclusive lock on the relation.  So
1738                          * normally no other transaction can have pending INSERTs or
1739                          * DELETEs in this relation.  A tuple is either:
1740                          *              (a) a tuple in a system catalog, inserted or deleted
1741                          *                      by a not yet committed transaction
1742                          *              (b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
1743                          *                      is visible to all active transactions)
1744                          *              (c) inserted by a committed xact (XMIN_COMMITTED)
1745                          *              (d) moved by the currently running VACUUM.
1746                          *              (e) deleted (XMAX_COMMITTED) but at least one active
1747                          *                      transaction does not see the deleting transaction
1748                          * In case (a) we wouldn't be in repair_frag() at all.
1749                          * In case (b) we cannot be here, because scan_heap() has
1750                          * already marked the item as unused, see continue above. Case
1751                          * (c) is what normally is to be expected. Case (d) is only
1752                          * possible, if a whole tuple chain has been moved while
1753                          * processing this or a higher numbered block.
1754                          * ---
1755                          */
1756                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1757                         {
1758                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1759                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1760                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
1761                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
1762
1763                                 /*
1764                                  * MOVED_OFF by another VACUUM would have caused the
1765                                  * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
1766                                  */
1767                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1768                                         elog(ERROR, "invalid XVAC in tuple header");
1769
1770                                 /*
1771                                  * If this (chain) tuple is moved by me already then I
1772                                  * have to check is it in vacpage or not - i.e. is it
1773                                  * moved while cleaning this page or some previous one.
1774                                  */
1775
1776                                 /* Can't we Assert(keep_tuples > 0) here? */
1777                                 if (keep_tuples == 0)
1778                                         continue;
1779                                 if (chain_tuple_moved)
1780                                 {
1781                                         /* some chains were moved while cleaning this page */
1782                                         Assert(vacpage->offsets_free > 0);
1783                                         for (i = 0; i < vacpage->offsets_free; i++)
1784                                         {
1785                                                 if (vacpage->offsets[i] == offnum)
1786                                                         break;
1787                                         }
1788                                         if (i >= vacpage->offsets_free)         /* not found */
1789                                         {
1790                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1791                                                 keep_tuples--;
1792                                         }
1793                                 }
1794                                 else
1795                                 {
1796                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1797                                         keep_tuples--;
1798                                 }
1799                                 continue;
1800                         }
1801
1802                         /*
1803                          * If this tuple is in the chain of tuples created in updates
1804                          * by "recent" transactions then we have to move all chain of
1805                          * tuples to another places.
1806                          *
1807                          * NOTE: this test is not 100% accurate: it is possible for a
1808                          * tuple to be an updated one with recent xmin, and yet not
1809                          * have a corresponding tuple in the vtlinks list.      Presumably
1810                          * there was once a parent tuple with xmax matching the xmin,
1811                          * but it's possible that that tuple has been removed --- for
1812                          * example, if it had xmin = xmax then
1813                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1814                          * the xmin xact completes.
1815                          *
1816                          * To be on the safe side, we abandon the repair_frag process if
1817                          * we cannot find the parent tuple in vtlinks.  This may be
1818                          * overly conservative; AFAICS it would be safe to move the
1819                          * chain.
1820                          */
1821                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1822                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1823                                                                         OldestXmin)) ||
1824                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1825                                                                                            HEAP_IS_LOCKED)) &&
1826                                  !(ItemPointerEquals(&(tuple.t_self),
1827                                                                          &(tuple.t_data->t_ctid)))))
1828                         {
1829                                 Buffer          Cbuf = buf;
1830                                 bool            freeCbuf = false;
1831                                 bool            chain_move_failed = false;
1832                                 ItemPointerData Ctid;
1833                                 HeapTupleData tp = tuple;
1834                                 Size            tlen = tuple_len;
1835                                 VTupleMove      vtmove;
1836                                 int                     num_vtmove;
1837                                 int                     free_vtmove;
1838                                 VacPage         to_vacpage = NULL;
1839                                 int                     to_item = 0;
1840                                 int                     ti;
1841
1842                                 if (dst_buffer != InvalidBuffer)
1843                                 {
1844                                         WriteBuffer(dst_buffer);
1845                                         dst_buffer = InvalidBuffer;
1846                                 }
1847
1848                                 /* Quick exit if we have no vtlinks to search in */
1849                                 if (vacrelstats->vtlinks == NULL)
1850                                 {
1851                                         elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1852                                         break;          /* out of walk-along-page loop */
1853                                 }
1854
1855                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1856                                 num_vtmove = 0;
1857                                 free_vtmove = 100;
1858
1859                                 /*
1860                                  * If this tuple is in the begin/middle of the chain then
1861                                  * we have to move to the end of chain.
1862                                  */
1863                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1864                                                                                                   HEAP_IS_LOCKED)) &&
1865                                            !(ItemPointerEquals(&(tp.t_self),
1866                                                                                    &(tp.t_data->t_ctid))))
1867                                 {
1868                                         Page            Cpage;
1869                                         ItemId          Citemid;
1870                                         ItemPointerData Ctid;
1871
1872                                         Ctid = tp.t_data->t_ctid;
1873                                         if (freeCbuf)
1874                                                 ReleaseBuffer(Cbuf);
1875                                         freeCbuf = true;
1876                                         Cbuf = ReadBuffer(onerel,
1877                                                                           ItemPointerGetBlockNumber(&Ctid));
1878                                         Cpage = BufferGetPage(Cbuf);
1879                                         Citemid = PageGetItemId(Cpage,
1880                                                                           ItemPointerGetOffsetNumber(&Ctid));
1881                                         if (!ItemIdIsUsed(Citemid))
1882                                         {
1883                                                 /*
1884                                                  * This means that in the middle of chain there
1885                                                  * was tuple updated by older (than OldestXmin)
1886                                                  * xaction and this tuple is already deleted by
1887                                                  * me. Actually, upper part of chain should be
1888                                                  * removed and seems that this should be handled
1889                                                  * in scan_heap(), but it's not implemented at the
1890                                                  * moment and so we just stop shrinking here.
1891                                                  */
1892                                                 elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
1893                                                 chain_move_failed = true;
1894                                                 break;  /* out of loop to move to chain end */
1895                                         }
1896                                         tp.t_datamcxt = NULL;
1897                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1898                                         tp.t_self = Ctid;
1899                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1900                                 }
1901                                 if (chain_move_failed)
1902                                 {
1903                                         if (freeCbuf)
1904                                                 ReleaseBuffer(Cbuf);
1905                                         pfree(vtmove);
1906                                         break;          /* out of walk-along-page loop */
1907                                 }
1908
1909                                 /*
1910                                  * Check if all items in chain can be moved
1911                                  */
1912                                 for (;;)
1913                                 {
1914                                         Buffer          Pbuf;
1915                                         Page            Ppage;
1916                                         ItemId          Pitemid;
1917                                         HeapTupleData Ptp;
1918                                         VTupleLinkData vtld,
1919                                                            *vtlp;
1920
1921                                         if (to_vacpage == NULL ||
1922                                                 !enough_space(to_vacpage, tlen))
1923                                         {
1924                                                 for (i = 0; i < num_fraged_pages; i++)
1925                                                 {
1926                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1927                                                                 break;
1928                                                 }
1929
1930                                                 if (i == num_fraged_pages)
1931                                                 {
1932                                                         /* can't move item anywhere */
1933                                                         chain_move_failed = true;
1934                                                         break;          /* out of check-all-items loop */
1935                                                 }
1936                                                 to_item = i;
1937                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1938                                         }
1939                                         to_vacpage->free -= MAXALIGN(tlen);
1940                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1941                                                 to_vacpage->free -= sizeof(ItemIdData);
1942                                         (to_vacpage->offsets_used)++;
1943                                         if (free_vtmove == 0)
1944                                         {
1945                                                 free_vtmove = 1000;
1946                                                 vtmove = (VTupleMove)
1947                                                         repalloc(vtmove,
1948                                                                          (free_vtmove + num_vtmove) *
1949                                                                          sizeof(VTupleMoveData));
1950                                         }
1951                                         vtmove[num_vtmove].tid = tp.t_self;
1952                                         vtmove[num_vtmove].vacpage = to_vacpage;
1953                                         if (to_vacpage->offsets_used == 1)
1954                                                 vtmove[num_vtmove].cleanVpd = true;
1955                                         else
1956                                                 vtmove[num_vtmove].cleanVpd = false;
1957                                         free_vtmove--;
1958                                         num_vtmove++;
1959
1960                                         /* At beginning of chain? */
1961                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1962                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1963                                                                                           OldestXmin))
1964                                                 break;
1965
1966                                         /* No, move to tuple with prior row version */
1967                                         vtld.new_tid = tp.t_self;
1968                                         vtlp = (VTupleLink)
1969                                                 vac_bsearch((void *) &vtld,
1970                                                                         (void *) (vacrelstats->vtlinks),
1971                                                                         vacrelstats->num_vtlinks,
1972                                                                         sizeof(VTupleLinkData),
1973                                                                         vac_cmp_vtlinks);
1974                                         if (vtlp == NULL)
1975                                         {
1976                                                 /* see discussion above */
1977                                                 elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
1978                                                 chain_move_failed = true;
1979                                                 break;  /* out of check-all-items loop */
1980                                         }
1981                                         tp.t_self = vtlp->this_tid;
1982                                         Pbuf = ReadBuffer(onerel,
1983                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1984                                         Ppage = BufferGetPage(Pbuf);
1985                                         Pitemid = PageGetItemId(Ppage,
1986                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1987                                         /* this can't happen since we saw tuple earlier: */
1988                                         if (!ItemIdIsUsed(Pitemid))
1989                                                 elog(ERROR, "parent itemid marked as unused");
1990                                         Ptp.t_datamcxt = NULL;
1991                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1992
1993                                         /* ctid should not have changed since we saved it */
1994                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1995                                                                                          &(Ptp.t_data->t_ctid)));
1996
1997                                         /*
1998                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1999                                          * (child item is removed)... Due to the fact that at
2000                                          * the moment we don't remove unuseful part of
2001                                          * update-chain, it's possible to get too old parent
2002                                          * row here. Like as in the case which caused this
2003                                          * problem, we stop shrinking here. I could try to
2004                                          * find real parent row but want not to do it because
2005                                          * of real solution will be implemented anyway, later,
2006                                          * and we are too close to 6.5 release. - vadim
2007                                          * 06/11/99
2008                                          */
2009                                         if (Ptp.t_data->t_infomask & HEAP_XMAX_IS_MULTI ||
2010                                                 !(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
2011                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
2012                                         {
2013                                                 ReleaseBuffer(Pbuf);
2014                                                 elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
2015                                                 chain_move_failed = true;
2016                                                 break;  /* out of check-all-items loop */
2017                                         }
2018                                         tp.t_datamcxt = Ptp.t_datamcxt;
2019                                         tp.t_data = Ptp.t_data;
2020                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
2021                                         if (freeCbuf)
2022                                                 ReleaseBuffer(Cbuf);
2023                                         Cbuf = Pbuf;
2024                                         freeCbuf = true;
2025                                 }                               /* end of check-all-items loop */
2026
2027                                 if (freeCbuf)
2028                                         ReleaseBuffer(Cbuf);
2029                                 freeCbuf = false;
2030
2031                                 if (chain_move_failed)
2032                                 {
2033                                         /*
2034                                          * Undo changes to offsets_used state.  We don't
2035                                          * bother cleaning up the amount-free state, since
2036                                          * we're not going to do any further tuple motion.
2037                                          */
2038                                         for (i = 0; i < num_vtmove; i++)
2039                                         {
2040                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
2041                                                 (vtmove[i].vacpage->offsets_used)--;
2042                                         }
2043                                         pfree(vtmove);
2044                                         break;          /* out of walk-along-page loop */
2045                                 }
2046
2047                                 /*
2048                                  * Okay, move the whole tuple chain
2049                                  */
2050                                 ItemPointerSetInvalid(&Ctid);
2051                                 for (ti = 0; ti < num_vtmove; ti++)
2052                                 {
2053                                         VacPage         destvacpage = vtmove[ti].vacpage;
2054                                         Page            Cpage;
2055                                         ItemId          Citemid;
2056
2057                                         /* Get page to move from */
2058                                         tuple.t_self = vtmove[ti].tid;
2059                                         Cbuf = ReadBuffer(onerel,
2060                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
2061
2062                                         /* Get page to move to */
2063                                         dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
2064
2065                                         LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2066                                         if (dst_buffer != Cbuf)
2067                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
2068
2069                                         dst_page = BufferGetPage(dst_buffer);
2070                                         Cpage = BufferGetPage(Cbuf);
2071
2072                                         Citemid = PageGetItemId(Cpage,
2073                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
2074                                         tuple.t_datamcxt = NULL;
2075                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
2076                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2077
2078                                         /*
2079                                          * make a copy of the source tuple, and then mark the
2080                                          * source tuple MOVED_OFF.
2081                                          */
2082                                         move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
2083                                                                          dst_buffer, dst_page, destvacpage,
2084                                                                          &ec, &Ctid, vtmove[ti].cleanVpd);
2085
2086                                         num_moved++;
2087                                         if (destvacpage->blkno > last_move_dest_block)
2088                                                 last_move_dest_block = destvacpage->blkno;
2089
2090                                         /*
2091                                          * Remember that we moved tuple from the current page
2092                                          * (corresponding index tuple will be cleaned).
2093                                          */
2094                                         if (Cbuf == buf)
2095                                                 vacpage->offsets[vacpage->offsets_free++] =
2096                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2097                                         else
2098                                                 keep_tuples++;
2099
2100                                         WriteBuffer(dst_buffer);
2101                                         WriteBuffer(Cbuf);
2102                                 }                               /* end of move-the-tuple-chain loop */
2103
2104                                 dst_buffer = InvalidBuffer;
2105                                 pfree(vtmove);
2106                                 chain_tuple_moved = true;
2107
2108                                 /* advance to next tuple in walk-along-page loop */
2109                                 continue;
2110                         }                                       /* end of is-tuple-in-chain test */
2111
2112                         /* try to find new page for this tuple */
2113                         if (dst_buffer == InvalidBuffer ||
2114                                 !enough_space(dst_vacpage, tuple_len))
2115                         {
2116                                 if (dst_buffer != InvalidBuffer)
2117                                 {
2118                                         WriteBuffer(dst_buffer);
2119                                         dst_buffer = InvalidBuffer;
2120                                 }
2121                                 for (i = 0; i < num_fraged_pages; i++)
2122                                 {
2123                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2124                                                 break;
2125                                 }
2126                                 if (i == num_fraged_pages)
2127                                         break;          /* can't move item anywhere */
2128                                 dst_vacpage = fraged_pages->pagedesc[i];
2129                                 dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
2130                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2131                                 dst_page = BufferGetPage(dst_buffer);
2132                                 /* if this page was not used before - clean it */
2133                                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
2134                                         vacuum_page(onerel, dst_buffer, dst_vacpage);
2135                         }
2136                         else
2137                                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
2138
2139                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2140
2141                         move_plain_tuple(onerel, buf, page, &tuple,
2142                                                          dst_buffer, dst_page, dst_vacpage, &ec);
2143
2144
2145                         num_moved++;
2146                         if (dst_vacpage->blkno > last_move_dest_block)
2147                                 last_move_dest_block = dst_vacpage->blkno;
2148
2149                         /*
2150                          * Remember that we moved tuple from the current page
2151                          * (corresponding index tuple will be cleaned).
2152                          */
2153                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2154                 }                                               /* walk along page */
2155
2156                 /*
2157                  * If we broke out of the walk-along-page loop early (ie, still
2158                  * have offnum <= maxoff), then we failed to move some tuple off
2159                  * this page.  No point in shrinking any more, so clean up and
2160                  * exit the per-page loop.
2161                  */
2162                 if (offnum < maxoff && keep_tuples > 0)
2163                 {
2164                         OffsetNumber off;
2165
2166                         /*
2167                          * Fix vacpage state for any unvisited tuples remaining on
2168                          * page
2169                          */
2170                         for (off = OffsetNumberNext(offnum);
2171                                  off <= maxoff;
2172                                  off = OffsetNumberNext(off))
2173                         {
2174                                 ItemId          itemid = PageGetItemId(page, off);
2175                                 HeapTupleHeader htup;
2176
2177                                 if (!ItemIdIsUsed(itemid))
2178                                         continue;
2179                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2180                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2181                                         continue;
2182
2183                                 /*
2184                                  * See comments in the walk-along-page loop above about
2185                                  * why only MOVED_OFF tuples should be found here.
2186                                  */
2187                                 if (htup->t_infomask & HEAP_MOVED_IN)
2188                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2189                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2190                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2191                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2192                                         elog(ERROR, "invalid XVAC in tuple header");
2193
2194                                 if (chain_tuple_moved)
2195                                 {
2196                                         /* some chains were moved while cleaning this page */
2197                                         Assert(vacpage->offsets_free > 0);
2198                                         for (i = 0; i < vacpage->offsets_free; i++)
2199                                         {
2200                                                 if (vacpage->offsets[i] == off)
2201                                                         break;
2202                                         }
2203                                         if (i >= vacpage->offsets_free)         /* not found */
2204                                         {
2205                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2206                                                 Assert(keep_tuples > 0);
2207                                                 keep_tuples--;
2208                                         }
2209                                 }
2210                                 else
2211                                 {
2212                                         vacpage->offsets[vacpage->offsets_free++] = off;
2213                                         Assert(keep_tuples > 0);
2214                                         keep_tuples--;
2215                                 }
2216                         }
2217                 }
2218
2219                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2220                 {
2221                         if (chain_tuple_moved)          /* else - they are ordered */
2222                         {
2223                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2224                                           sizeof(OffsetNumber), vac_cmp_offno);
2225                         }
2226                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2227                         WriteBuffer(buf);
2228                 }
2229                 else if (dowrite)
2230                         WriteBuffer(buf);
2231                 else
2232                         ReleaseBuffer(buf);
2233
2234                 if (offnum <= maxoff)
2235                         break;                          /* had to quit early, see above note */
2236
2237         }                                                       /* walk along relation */
2238
2239         blkno++;                                        /* new number of blocks */
2240
2241         if (dst_buffer != InvalidBuffer)
2242         {
2243                 Assert(num_moved > 0);
2244                 WriteBuffer(dst_buffer);
2245         }
2246
2247         if (num_moved > 0)
2248         {
2249                 /*
2250                  * We have to commit our tuple movings before we truncate the
2251                  * relation.  Ideally we should do Commit/StartTransactionCommand
2252                  * here, relying on the session-level table lock to protect our
2253                  * exclusive access to the relation.  However, that would require
2254                  * a lot of extra code to close and re-open the relation, indexes,
2255                  * etc.  For now, a quick hack: record status of current
2256                  * transaction as committed, and continue.
2257                  */
2258                 RecordTransactionCommit();
2259         }
2260
2261         /*
2262          * We are not going to move any more tuples across pages, but we still
2263          * need to apply vacuum_page to compact free space in the remaining
2264          * pages in vacuum_pages list.  Note that some of these pages may also
2265          * be in the fraged_pages list, and may have had tuples moved onto
2266          * them; if so, we already did vacuum_page and needn't do it again.
2267          */
2268         for (i = 0, curpage = vacuum_pages->pagedesc;
2269                  i < vacuumed_pages;
2270                  i++, curpage++)
2271         {
2272                 vacuum_delay_point();
2273
2274                 Assert((*curpage)->blkno < blkno);
2275                 if ((*curpage)->offsets_used == 0)
2276                 {
2277                         Buffer          buf;
2278                         Page            page;
2279
2280                         /* this page was not used as a move target, so must clean it */
2281                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2282                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2283                         page = BufferGetPage(buf);
2284                         if (!PageIsEmpty(page))
2285                                 vacuum_page(onerel, buf, *curpage);
2286                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2287                         WriteBuffer(buf);
2288                 }
2289         }
2290
2291         /*
2292          * Now scan all the pages that we moved tuples onto and update tuple
2293          * status bits.  This is not really necessary, but will save time for
2294          * future transactions examining these tuples.
2295          */
2296         update_hint_bits(onerel, fraged_pages, num_fraged_pages,
2297                                          last_move_dest_block, num_moved);
2298
2299         /*
2300          * It'd be cleaner to make this report at the bottom of this routine,
2301          * but then the rusage would double-count the second pass of index
2302          * vacuuming.  So do it here and ignore the relatively small amount of
2303          * processing that occurs below.
2304          */
2305         ereport(elevel,
2306            (errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
2307                            RelationGetRelationName(onerel),
2308                            num_moved, nblocks, blkno),
2309                 errdetail("%s",
2310                                   vac_show_rusage(&ru0))));
2311
2312         /*
2313          * Reflect the motion of system tuples to catalog cache here.
2314          */
2315         CommandCounterIncrement();
2316
2317         if (Nvacpagelist.num_pages > 0)
2318         {
2319                 /* vacuum indexes again if needed */
2320                 if (Irel != NULL)
2321                 {
2322                         VacPage    *vpleft,
2323                                            *vpright,
2324                                                 vpsave;
2325
2326                         /* re-sort Nvacpagelist.pagedesc */
2327                         for (vpleft = Nvacpagelist.pagedesc,
2328                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2329                                  vpleft < vpright; vpleft++, vpright--)
2330                         {
2331                                 vpsave = *vpleft;
2332                                 *vpleft = *vpright;
2333                                 *vpright = vpsave;
2334                         }
2335
2336                         /*
2337                          * keep_tuples is the number of tuples that have been moved
2338                          * off a page during chain moves but not been scanned over
2339                          * subsequently.  The tuple ids of these tuples are not
2340                          * recorded as free offsets for any VacPage, so they will not
2341                          * be cleared from the indexes.
2342                          */
2343                         Assert(keep_tuples >= 0);
2344                         for (i = 0; i < nindexes; i++)
2345                                 vacuum_index(&Nvacpagelist, Irel[i],
2346                                                          vacrelstats->rel_tuples, keep_tuples);
2347                 }
2348
2349                 /*
2350                  * Clean moved-off tuples from last page in Nvacpagelist list.
2351                  *
2352                  * We need only do this in this one page, because higher-numbered
2353                  * pages are going to be truncated from the relation entirely.
2354                  * But see comments for update_hint_bits().
2355                  */
2356                 if (vacpage->blkno == (blkno - 1) &&
2357                         vacpage->offsets_free > 0)
2358                 {
2359                         Buffer          buf;
2360                         Page            page;
2361                         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2362                         OffsetNumber offnum,
2363                                                 maxoff;
2364                         int                     uncnt;
2365                         int                     num_tuples = 0;
2366
2367                         buf = ReadBuffer(onerel, vacpage->blkno);
2368                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2369                         page = BufferGetPage(buf);
2370                         maxoff = PageGetMaxOffsetNumber(page);
2371                         for (offnum = FirstOffsetNumber;
2372                                  offnum <= maxoff;
2373                                  offnum = OffsetNumberNext(offnum))
2374                         {
2375                                 ItemId          itemid = PageGetItemId(page, offnum);
2376                                 HeapTupleHeader htup;
2377
2378                                 if (!ItemIdIsUsed(itemid))
2379                                         continue;
2380                                 htup = (HeapTupleHeader) PageGetItem(page, itemid);
2381                                 if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2382                                         continue;
2383
2384                                 /*
2385                                  * See comments in the walk-along-page loop above about
2386                                  * why only MOVED_OFF tuples should be found here.
2387                                  */
2388                                 if (htup->t_infomask & HEAP_MOVED_IN)
2389                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
2390                                 if (!(htup->t_infomask & HEAP_MOVED_OFF))
2391                                         elog(ERROR, "HEAP_MOVED_OFF was expected");
2392                                 if (HeapTupleHeaderGetXvac(htup) != myXID)
2393                                         elog(ERROR, "invalid XVAC in tuple header");
2394
2395                                 itemid->lp_flags &= ~LP_USED;
2396                                 num_tuples++;
2397                         }
2398                         Assert(vacpage->offsets_free == num_tuples);
2399
2400                         START_CRIT_SECTION();
2401
2402                         uncnt = PageRepairFragmentation(page, unused);
2403
2404                         /* XLOG stuff */
2405                         if (!onerel->rd_istemp)
2406                         {
2407                                 XLogRecPtr      recptr;
2408
2409                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2410                                 PageSetLSN(page, recptr);
2411                                 PageSetTLI(page, ThisTimeLineID);
2412                         }
2413                         else
2414                         {
2415                                 /*
2416                                  * No XLOG record, but still need to flag that XID exists
2417                                  * on disk
2418                                  */
2419                                 MyXactMadeTempRelUpdate = true;
2420                         }
2421
2422                         END_CRIT_SECTION();
2423
2424                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2425                         WriteBuffer(buf);
2426                 }
2427
2428                 /* now - free new list of reaped pages */
2429                 curpage = Nvacpagelist.pagedesc;
2430                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2431                         pfree(*curpage);
2432                 pfree(Nvacpagelist.pagedesc);
2433         }
2434
2435         /* Truncate relation, if needed */
2436         if (blkno < nblocks)
2437         {
2438                 RelationTruncate(onerel, blkno);
2439                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2440         }
2441
2442         /* clean up */
2443         pfree(vacpage);
2444         if (vacrelstats->vtlinks != NULL)
2445                 pfree(vacrelstats->vtlinks);
2446
2447         ExecContext_Finish(&ec);
2448 }
2449
2450 /*
2451  *      move_chain_tuple() -- move one tuple that is part of a tuple chain
2452  *
2453  *              This routine moves old_tup from old_page to dst_page.
2454  *              old_page and dst_page might be the same page.
2455  *              On entry old_buf and dst_buf are locked exclusively, both locks (or
2456  *              the single lock, if this is a intra-page-move) are released before
2457  *              exit.
2458  *
2459  *              Yes, a routine with ten parameters is ugly, but it's still better
2460  *              than having these 120 lines of code in repair_frag() which is
2461  *              already too long and almost unreadable.
2462  */
2463 static void
2464 move_chain_tuple(Relation rel,
2465                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2466                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2467                                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
2468 {
2469         TransactionId myXID = GetCurrentTransactionId();
2470         HeapTupleData newtup;
2471         OffsetNumber newoff;
2472         ItemId          newitemid;
2473         Size            tuple_len = old_tup->t_len;
2474
2475         heap_copytuple_with_tuple(old_tup, &newtup);
2476
2477         /*
2478          * register invalidation of source tuple in catcaches.
2479          */
2480         CacheInvalidateHeapTuple(rel, old_tup);
2481
2482         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2483         START_CRIT_SECTION();
2484
2485         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2486                                                                          HEAP_XMIN_INVALID |
2487                                                                          HEAP_MOVED_IN);
2488         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2489         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2490
2491         /*
2492          * If this page was not used before - clean it.
2493          *
2494          * NOTE: a nasty bug used to lurk here.  It is possible for the source
2495          * and destination pages to be the same (since this tuple-chain member
2496          * can be on a page lower than the one we're currently processing in
2497          * the outer loop).  If that's true, then after vacuum_page() the
2498          * source tuple will have been moved, and tuple.t_data will be
2499          * pointing at garbage.  Therefore we must do everything that uses
2500          * old_tup->t_data BEFORE this step!!
2501          *
2502          * This path is different from the other callers of vacuum_page, because
2503          * we have already incremented the vacpage's offsets_used field to
2504          * account for the tuple(s) we expect to move onto the page. Therefore
2505          * vacuum_page's check for offsets_used == 0 is wrong. But since
2506          * that's a good debugging check for all other callers, we work around
2507          * it here rather than remove it.
2508          */
2509         if (!PageIsEmpty(dst_page) && cleanVpd)
2510         {
2511                 int                     sv_offsets_used = dst_vacpage->offsets_used;
2512
2513                 dst_vacpage->offsets_used = 0;
2514                 vacuum_page(rel, dst_buf, dst_vacpage);
2515                 dst_vacpage->offsets_used = sv_offsets_used;
2516         }
2517
2518         /*
2519          * Update the state of the copied tuple, and store it on the
2520          * destination page.
2521          */
2522         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2523                                                                    HEAP_XMIN_INVALID |
2524                                                                    HEAP_MOVED_OFF);
2525         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2526         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2527         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2528                                                  InvalidOffsetNumber, LP_USED);
2529         if (newoff == InvalidOffsetNumber)
2530         {
2531                 elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
2532                          (unsigned long) tuple_len, dst_vacpage->blkno);
2533         }
2534         newitemid = PageGetItemId(dst_page, newoff);
2535         pfree(newtup.t_data);
2536         newtup.t_datamcxt = NULL;
2537         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2538         ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
2539
2540         /* XLOG stuff */
2541         if (!rel->rd_istemp)
2542         {
2543                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2544                                                                                    dst_buf, &newtup);
2545
2546                 if (old_buf != dst_buf)
2547                 {
2548                         PageSetLSN(old_page, recptr);
2549                         PageSetTLI(old_page, ThisTimeLineID);
2550                 }
2551                 PageSetLSN(dst_page, recptr);
2552                 PageSetTLI(dst_page, ThisTimeLineID);
2553         }
2554         else
2555         {
2556                 /*
2557                  * No XLOG record, but still need to flag that XID exists on disk
2558                  */
2559                 MyXactMadeTempRelUpdate = true;
2560         }
2561
2562         END_CRIT_SECTION();
2563
2564         /*
2565          * Set new tuple's t_ctid pointing to itself for last tuple in chain,
2566          * and to next tuple in chain otherwise.
2567          */
2568         /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
2569         if (!ItemPointerIsValid(ctid))
2570                 newtup.t_data->t_ctid = newtup.t_self;
2571         else
2572                 newtup.t_data->t_ctid = *ctid;
2573         *ctid = newtup.t_self;
2574
2575         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2576         if (dst_buf != old_buf)
2577                 LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2578
2579         /* Create index entries for the moved tuple */
2580         if (ec->resultRelInfo->ri_NumIndices > 0)
2581         {
2582                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2583                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2584                 ResetPerTupleExprContext(ec->estate);
2585         }
2586 }
2587
2588 /*
2589  *      move_plain_tuple() -- move one tuple that is not part of a chain
2590  *
2591  *              This routine moves old_tup from old_page to dst_page.
2592  *              On entry old_buf and dst_buf are locked exclusively, both locks are
2593  *              released before exit.
2594  *
2595  *              Yes, a routine with eight parameters is ugly, but it's still better
2596  *              than having these 90 lines of code in repair_frag() which is already
2597  *              too long and almost unreadable.
2598  */
2599 static void
2600 move_plain_tuple(Relation rel,
2601                                  Buffer old_buf, Page old_page, HeapTuple old_tup,
2602                                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
2603                                  ExecContext ec)
2604 {
2605         TransactionId myXID = GetCurrentTransactionId();
2606         HeapTupleData newtup;
2607         OffsetNumber newoff;
2608         ItemId          newitemid;
2609         Size            tuple_len = old_tup->t_len;
2610
2611         /* copy tuple */
2612         heap_copytuple_with_tuple(old_tup, &newtup);
2613
2614         /*
2615          * register invalidation of source tuple in catcaches.
2616          *
2617          * (Note: we do not need to register the copied tuple, because we are not
2618          * changing the tuple contents and so there cannot be any need to
2619          * flush negative catcache entries.)
2620          */
2621         CacheInvalidateHeapTuple(rel, old_tup);
2622
2623         /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
2624         START_CRIT_SECTION();
2625
2626         /*
2627          * Mark new tuple as MOVED_IN by me.
2628          */
2629         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2630                                                                    HEAP_XMIN_INVALID |
2631                                                                    HEAP_MOVED_OFF);
2632         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2633         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2634
2635         /* add tuple to the page */
2636         newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
2637                                                  InvalidOffsetNumber, LP_USED);
2638         if (newoff == InvalidOffsetNumber)
2639         {
2640                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2641                          (unsigned long) tuple_len,
2642                          dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
2643                          dst_vacpage->offsets_used, dst_vacpage->offsets_free);
2644         }
2645         newitemid = PageGetItemId(dst_page, newoff);
2646         pfree(newtup.t_data);
2647         newtup.t_datamcxt = NULL;
2648         newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2649         ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
2650         newtup.t_self = newtup.t_data->t_ctid;
2651
2652         /*
2653          * Mark old tuple as MOVED_OFF by me.
2654          */
2655         old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2656                                                                          HEAP_XMIN_INVALID |
2657                                                                          HEAP_MOVED_IN);
2658         old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
2659         HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
2660
2661         /* XLOG stuff */
2662         if (!rel->rd_istemp)
2663         {
2664                 XLogRecPtr      recptr = log_heap_move(rel, old_buf, old_tup->t_self,
2665                                                                                    dst_buf, &newtup);
2666
2667                 PageSetLSN(old_page, recptr);
2668                 PageSetTLI(old_page, ThisTimeLineID);
2669                 PageSetLSN(dst_page, recptr);
2670                 PageSetTLI(dst_page, ThisTimeLineID);
2671         }
2672         else
2673         {
2674                 /*
2675                  * No XLOG record, but still need to flag that XID exists on disk
2676                  */
2677                 MyXactMadeTempRelUpdate = true;
2678         }
2679
2680         END_CRIT_SECTION();
2681
2682         dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
2683                 ((PageHeader) dst_page)->pd_lower;
2684         LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
2685         LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
2686
2687         dst_vacpage->offsets_used++;
2688
2689         /* insert index' tuples if needed */
2690         if (ec->resultRelInfo->ri_NumIndices > 0)
2691         {
2692                 ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
2693                 ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2694                 ResetPerTupleExprContext(ec->estate);
2695         }
2696 }
2697
2698 /*
2699  *      update_hint_bits() -- update hint bits in destination pages
2700  *
2701  * Scan all the pages that we moved tuples onto and update tuple status bits.
2702  * This is normally not really necessary, but it will save time for future
2703  * transactions examining these tuples.
2704  *
2705  * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
2706  * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
2707  *
2708  * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2709  * pages that were move source pages but not move dest pages.  The bulk
2710  * of the move source pages will be physically truncated from the relation,
2711  * and the last page remaining in the rel will be fixed separately in
2712  * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
2713  * hint bits updated are tuples that are moved as part of a chain and were
2714  * on pages that were not either move destinations nor at the end of the rel.
2715  * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
2716  * to remember and revisit those pages too.
2717  *
2718  * Because of this omission, VACUUM FULL FREEZE is not a safe combination;
2719  * it's possible that the VACUUM's own XID remains exposed as something that
2720  * tqual tests would need to check.
2721  *
2722  * For the non-freeze case, one wonders whether it wouldn't be better to skip
2723  * this work entirely, and let the tuple status updates happen someplace
2724  * that's not holding an exclusive lock on the relation.
2725  */
2726 static void
2727 update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
2728                                  BlockNumber last_move_dest_block, int num_moved)
2729 {
2730         TransactionId myXID = GetCurrentTransactionId();
2731         int                     checked_moved = 0;
2732         int                     i;
2733         VacPage    *curpage;
2734
2735         for (i = 0, curpage = fraged_pages->pagedesc;
2736                  i < num_fraged_pages;
2737                  i++, curpage++)
2738         {
2739                 Buffer          buf;
2740                 Page            page;
2741                 OffsetNumber max_offset;
2742                 OffsetNumber off;
2743                 int                     num_tuples = 0;
2744
2745                 vacuum_delay_point();
2746
2747                 if ((*curpage)->blkno > last_move_dest_block)
2748                         break;                          /* no need to scan any further */
2749                 if ((*curpage)->offsets_used == 0)
2750                         continue;                       /* this page was never used as a move dest */
2751                 buf = ReadBuffer(rel, (*curpage)->blkno);
2752                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2753                 page = BufferGetPage(buf);
2754                 max_offset = PageGetMaxOffsetNumber(page);
2755                 for (off = FirstOffsetNumber;
2756                          off <= max_offset;
2757                          off = OffsetNumberNext(off))
2758                 {
2759                         ItemId          itemid = PageGetItemId(page, off);
2760                         HeapTupleHeader htup;
2761
2762                         if (!ItemIdIsUsed(itemid))
2763                                 continue;
2764                         htup = (HeapTupleHeader) PageGetItem(page, itemid);
2765                         if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2766                                 continue;
2767
2768                         /*
2769                          * Here we may see either MOVED_OFF or MOVED_IN tuples.
2770                          */
2771                         if (!(htup->t_infomask & HEAP_MOVED))
2772                                 elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2773                         if (HeapTupleHeaderGetXvac(htup) != myXID)
2774                                 elog(ERROR, "invalid XVAC in tuple header");
2775
2776                         if (htup->t_infomask & HEAP_MOVED_IN)
2777                         {
2778                                 htup->t_infomask |= HEAP_XMIN_COMMITTED;
2779                                 htup->t_infomask &= ~HEAP_MOVED;
2780                                 num_tuples++;
2781                         }
2782                         else
2783                                 htup->t_infomask |= HEAP_XMIN_INVALID;
2784                 }
2785                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2786                 WriteBuffer(buf);
2787                 Assert((*curpage)->offsets_used == num_tuples);
2788                 checked_moved += num_tuples;
2789         }
2790         Assert(num_moved == checked_moved);
2791 }
2792
2793 /*
2794  *      vacuum_heap() -- free dead tuples
2795  *
2796  *              This routine marks dead tuples as unused and truncates relation
2797  *              if there are "empty" end-blocks.
2798  */
2799 static void
2800 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2801 {
2802         Buffer          buf;
2803         VacPage    *vacpage;
2804         BlockNumber relblocks;
2805         int                     nblocks;
2806         int                     i;
2807
2808         nblocks = vacuum_pages->num_pages;
2809         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2810
2811         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2812         {
2813                 vacuum_delay_point();
2814
2815                 if ((*vacpage)->offsets_free > 0)
2816                 {
2817                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2818                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2819                         vacuum_page(onerel, buf, *vacpage);
2820                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2821                         WriteBuffer(buf);
2822                 }
2823         }
2824
2825         /* Truncate relation if there are some empty end-pages */
2826         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2827         if (vacuum_pages->empty_end_pages > 0)
2828         {
2829                 relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2830                 ereport(elevel,
2831                                 (errmsg("\"%s\": truncated %u to %u pages",
2832                                                 RelationGetRelationName(onerel),
2833                                                 vacrelstats->rel_pages, relblocks)));
2834                 RelationTruncate(onerel, relblocks);
2835                 vacrelstats->rel_pages = relblocks;             /* set new number of blocks */
2836         }
2837 }
2838
2839 /*
2840  *      vacuum_page() -- free dead tuples on a page
2841  *                                       and repair its fragmentation.
2842  */
2843 static void
2844 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2845 {
2846         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2847         int                     uncnt;
2848         Page            page = BufferGetPage(buffer);
2849         ItemId          itemid;
2850         int                     i;
2851
2852         /* There shouldn't be any tuples moved onto the page yet! */
2853         Assert(vacpage->offsets_used == 0);
2854
2855         START_CRIT_SECTION();
2856
2857         for (i = 0; i < vacpage->offsets_free; i++)
2858         {
2859                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2860                 itemid->lp_flags &= ~LP_USED;
2861         }
2862
2863         uncnt = PageRepairFragmentation(page, unused);
2864
2865         /* XLOG stuff */
2866         if (!onerel->rd_istemp)
2867         {
2868                 XLogRecPtr      recptr;
2869
2870                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2871                 PageSetLSN(page, recptr);
2872                 PageSetTLI(page, ThisTimeLineID);
2873         }
2874         else
2875         {
2876                 /* No XLOG record, but still need to flag that XID exists on disk */
2877                 MyXactMadeTempRelUpdate = true;
2878         }
2879
2880         END_CRIT_SECTION();
2881 }
2882
2883 /*
2884  *      scan_index() -- scan one index relation to update statistic.
2885  *
2886  * We use this when we have no deletions to do.
2887  */
2888 static void
2889 scan_index(Relation indrel, double num_tuples)
2890 {
2891         IndexBulkDeleteResult *stats;
2892         IndexVacuumCleanupInfo vcinfo;
2893         VacRUsage       ru0;
2894
2895         vac_init_rusage(&ru0);
2896
2897         /*
2898          * Even though we're not planning to delete anything, we use the
2899          * ambulkdelete call, because (a) the scan happens within the index AM
2900          * for more speed, and (b) it may want to pass private statistics to
2901          * the amvacuumcleanup call.
2902          */
2903         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2904
2905         /* Do post-VACUUM cleanup, even though we deleted nothing */
2906         vcinfo.vacuum_full = true;
2907         vcinfo.message_level = elevel;
2908
2909         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2910
2911         if (!stats)
2912                 return;
2913
2914         /* now update statistics in pg_class */
2915         vac_update_relstats(RelationGetRelid(indrel),
2916                                                 stats->num_pages, stats->num_index_tuples,
2917                                                 false);
2918
2919         ereport(elevel,
2920            (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
2921                            RelationGetRelationName(indrel),
2922                            stats->num_index_tuples,
2923                            stats->num_pages),
2924                 errdetail("%u index pages have been deleted, %u are currently reusable.\n"
2925                                   "%s",
2926                                   stats->pages_deleted, stats->pages_free,
2927                                   vac_show_rusage(&ru0))));
2928
2929         /*
2930          * Check for tuple count mismatch.      If the index is partial, then it's
2931          * OK for it to have fewer tuples than the heap; else we got trouble.
2932          */
2933         if (stats->num_index_tuples != num_tuples)
2934         {
2935                 if (stats->num_index_tuples > num_tuples ||
2936                         !vac_is_partial_index(indrel))
2937                         ereport(WARNING,
2938                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
2939                                                         RelationGetRelationName(indrel),
2940                                                         stats->num_index_tuples, num_tuples),
2941                                          errhint("Rebuild the index with REINDEX.")));
2942         }
2943
2944         pfree(stats);
2945 }
2946
2947 /*
2948  *      vacuum_index() -- vacuum one index relation.
2949  *
2950  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2951  *              It's locked. Indrel is an index relation on the vacuumed heap.
2952  *
2953  *              We don't bother to set locks on the index relation here, since
2954  *              the parent table is exclusive-locked already.
2955  *
2956  *              Finally, we arrange to update the index relation's statistics in
2957  *              pg_class.
2958  */
2959 static void
2960 vacuum_index(VacPageList vacpagelist, Relation indrel,
2961                          double num_tuples, int keep_tuples)
2962 {
2963         IndexBulkDeleteResult *stats;
2964         IndexVacuumCleanupInfo vcinfo;
2965         VacRUsage       ru0;
2966
2967         vac_init_rusage(&ru0);
2968
2969         /* Do bulk deletion */
2970         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2971
2972         /* Do post-VACUUM cleanup */
2973         vcinfo.vacuum_full = true;
2974         vcinfo.message_level = elevel;
2975
2976         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2977
2978         if (!stats)
2979                 return;
2980
2981         /* now update statistics in pg_class */
2982         vac_update_relstats(RelationGetRelid(indrel),
2983                                                 stats->num_pages, stats->num_index_tuples,
2984                                                 false);
2985
2986         ereport(elevel,
2987            (errmsg("index \"%s\" now contains %.0f row versions in %u pages",
2988                            RelationGetRelationName(indrel),
2989                            stats->num_index_tuples,
2990                            stats->num_pages),
2991                 errdetail("%.0f index row versions were removed.\n"
2992                  "%u index pages have been deleted, %u are currently reusable.\n"
2993                                   "%s",
2994                                   stats->tuples_removed,
2995                                   stats->pages_deleted, stats->pages_free,
2996                                   vac_show_rusage(&ru0))));
2997
2998         /*
2999          * Check for tuple count mismatch.      If the index is partial, then it's
3000          * OK for it to have fewer tuples than the heap; else we got trouble.
3001          */
3002         if (stats->num_index_tuples != num_tuples + keep_tuples)
3003         {
3004                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
3005                         !vac_is_partial_index(indrel))
3006                         ereport(WARNING,
3007                                         (errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3008                                                         RelationGetRelationName(indrel),
3009                                           stats->num_index_tuples, num_tuples + keep_tuples),
3010                                          errhint("Rebuild the index with REINDEX.")));
3011         }
3012
3013         pfree(stats);
3014 }
3015
3016 /*
3017  *      tid_reaped() -- is a particular tid reaped?
3018  *
3019  *              This has the right signature to be an IndexBulkDeleteCallback.
3020  *
3021  *              vacpagelist->VacPage_array is sorted in right order.
3022  */
3023 static bool
3024 tid_reaped(ItemPointer itemptr, void *state)
3025 {
3026         VacPageList vacpagelist = (VacPageList) state;
3027         OffsetNumber ioffno;
3028         OffsetNumber *voff;
3029         VacPage         vp,
3030                            *vpp;
3031         VacPageData vacpage;
3032
3033         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3034         ioffno = ItemPointerGetOffsetNumber(itemptr);
3035
3036         vp = &vacpage;
3037         vpp = (VacPage *) vac_bsearch((void *) &vp,
3038                                                                   (void *) (vacpagelist->pagedesc),
3039                                                                   vacpagelist->num_pages,
3040                                                                   sizeof(VacPage),
3041                                                                   vac_cmp_blk);
3042
3043         if (vpp == NULL)
3044                 return false;
3045
3046         /* ok - we are on a partially or fully reaped page */
3047         vp = *vpp;
3048
3049         if (vp->offsets_free == 0)
3050         {
3051                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
3052                 return true;
3053         }
3054
3055         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
3056                                                                                 (void *) (vp->offsets),
3057                                                                                 vp->offsets_free,
3058                                                                                 sizeof(OffsetNumber),
3059                                                                                 vac_cmp_offno);
3060
3061         if (voff == NULL)
3062                 return false;
3063
3064         /* tid is reaped */
3065         return true;
3066 }
3067
3068 /*
3069  * Dummy version for scan_index.
3070  */
3071 static bool
3072 dummy_tid_reaped(ItemPointer itemptr, void *state)
3073 {
3074         return false;
3075 }
3076
3077 /*
3078  * Update the shared Free Space Map with the info we now have about
3079  * free space in the relation, discarding any old info the map may have.
3080  */
3081 static void
3082 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
3083                            BlockNumber rel_pages)
3084 {
3085         int                     nPages = fraged_pages->num_pages;
3086         VacPage    *pagedesc = fraged_pages->pagedesc;
3087         Size            threshold;
3088         PageFreeSpaceInfo *pageSpaces;
3089         int                     outPages;
3090         int                     i;
3091
3092         /*
3093          * We only report pages with free space at least equal to the average
3094          * request size --- this avoids cluttering FSM with uselessly-small
3095          * bits of space.  Although FSM would discard pages with little free
3096          * space anyway, it's important to do this prefiltering because (a) it
3097          * reduces the time spent holding the FSM lock in
3098          * RecordRelationFreeSpace, and (b) FSM uses the number of pages
3099          * reported as a statistic for guiding space management.  If we didn't
3100          * threshold our reports the same way vacuumlazy.c does, we'd be
3101          * skewing that statistic.
3102          */
3103         threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3104
3105         pageSpaces = (PageFreeSpaceInfo *)
3106                 palloc(nPages * sizeof(PageFreeSpaceInfo));
3107         outPages = 0;
3108
3109         for (i = 0; i < nPages; i++)
3110         {
3111                 /*
3112                  * fraged_pages may contain entries for pages that we later
3113                  * decided to truncate from the relation; don't enter them into
3114                  * the free space map!
3115                  */
3116                 if (pagedesc[i]->blkno >= rel_pages)
3117                         break;
3118
3119                 if (pagedesc[i]->free >= threshold)
3120                 {
3121                         pageSpaces[outPages].blkno = pagedesc[i]->blkno;
3122                         pageSpaces[outPages].avail = pagedesc[i]->free;
3123                         outPages++;
3124                 }
3125         }
3126
3127         RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
3128
3129         pfree(pageSpaces);
3130 }
3131
3132 /* Copy a VacPage structure */
3133 static VacPage
3134 copy_vac_page(VacPage vacpage)
3135 {
3136         VacPage         newvacpage;
3137
3138         /* allocate a VacPageData entry */
3139         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3140                                                    vacpage->offsets_free * sizeof(OffsetNumber));
3141
3142         /* fill it in */
3143         if (vacpage->offsets_free > 0)
3144                 memcpy(newvacpage->offsets, vacpage->offsets,
3145                            vacpage->offsets_free * sizeof(OffsetNumber));
3146         newvacpage->blkno = vacpage->blkno;
3147         newvacpage->free = vacpage->free;
3148         newvacpage->offsets_used = vacpage->offsets_used;
3149         newvacpage->offsets_free = vacpage->offsets_free;
3150
3151         return newvacpage;
3152 }
3153
3154 /*
3155  * Add a VacPage pointer to a VacPageList.
3156  *
3157  *              As a side effect of the way that scan_heap works,
3158  *              higher pages come after lower pages in the array
3159  *              (and highest tid on a page is last).
3160  */
3161 static void
3162 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3163 {
3164 #define PG_NPAGEDESC 1024
3165
3166         /* allocate a VacPage entry if needed */
3167         if (vacpagelist->num_pages == 0)
3168         {
3169                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
3170                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3171         }
3172         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3173         {
3174                 vacpagelist->num_allocated_pages *= 2;
3175                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3176         }
3177         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
3178         (vacpagelist->num_pages)++;
3179 }
3180
3181 /*
3182  * vac_bsearch: just like standard C library routine bsearch(),
3183  * except that we first test to see whether the target key is outside
3184  * the range of the table entries.      This case is handled relatively slowly
3185  * by the normal binary search algorithm (ie, no faster than any other key)
3186  * but it occurs often enough in VACUUM to be worth optimizing.
3187  */
3188 static void *
3189 vac_bsearch(const void *key, const void *base,
3190                         size_t nelem, size_t size,
3191                         int (*compar) (const void *, const void *))
3192 {
3193         int                     res;
3194         const void *last;
3195
3196         if (nelem == 0)
3197                 return NULL;
3198         res = compar(key, base);
3199         if (res < 0)
3200                 return NULL;
3201         if (res == 0)
3202                 return (void *) base;
3203         if (nelem > 1)
3204         {
3205                 last = (const void *) ((const char *) base + (nelem - 1) * size);
3206                 res = compar(key, last);
3207                 if (res > 0)
3208                         return NULL;
3209                 if (res == 0)
3210                         return (void *) last;
3211         }
3212         if (nelem <= 2)
3213                 return NULL;                    /* already checked 'em all */
3214         return bsearch(key, base, nelem, size, compar);
3215 }
3216
3217 /*
3218  * Comparator routines for use with qsort() and bsearch().
3219  */
3220 static int
3221 vac_cmp_blk(const void *left, const void *right)
3222 {
3223         BlockNumber lblk,
3224                                 rblk;
3225
3226         lblk = (*((VacPage *) left))->blkno;
3227         rblk = (*((VacPage *) right))->blkno;
3228
3229         if (lblk < rblk)
3230                 return -1;
3231         if (lblk == rblk)
3232                 return 0;
3233         return 1;
3234 }
3235
3236 static int
3237 vac_cmp_offno(const void *left, const void *right)
3238 {
3239         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3240                 return -1;
3241         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3242                 return 0;
3243         return 1;
3244 }
3245
3246 static int
3247 vac_cmp_vtlinks(const void *left, const void *right)
3248 {
3249         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
3250                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3251                 return -1;
3252         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
3253                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3254                 return 1;
3255         /* bi_hi-es are equal */
3256         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
3257                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3258                 return -1;
3259         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
3260                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3261                 return 1;
3262         /* bi_lo-es are equal */
3263         if (((VTupleLink) left)->new_tid.ip_posid <
3264                 ((VTupleLink) right)->new_tid.ip_posid)
3265                 return -1;
3266         if (((VTupleLink) left)->new_tid.ip_posid >
3267                 ((VTupleLink) right)->new_tid.ip_posid)
3268                 return 1;
3269         return 0;
3270 }
3271
3272
3273 /*
3274  * Open all the indexes of the given relation, obtaining the specified kind
3275  * of lock on each.  Return an array of Relation pointers for the indexes
3276  * into *Irel, and the number of indexes into *nindexes.
3277  */
3278 void
3279 vac_open_indexes(Relation relation, LOCKMODE lockmode,
3280                                  int *nindexes, Relation **Irel)
3281 {
3282         List       *indexoidlist;
3283         ListCell   *indexoidscan;
3284         int                     i;
3285
3286         indexoidlist = RelationGetIndexList(relation);
3287
3288         *nindexes = list_length(indexoidlist);
3289
3290         if (*nindexes > 0)
3291                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3292         else
3293                 *Irel = NULL;
3294
3295         i = 0;
3296         foreach(indexoidscan, indexoidlist)
3297         {
3298                 Oid                     indexoid = lfirst_oid(indexoidscan);
3299                 Relation        ind;
3300
3301                 ind = index_open(indexoid);
3302                 (*Irel)[i++] = ind;
3303                 LockRelation(ind, lockmode);
3304         }
3305
3306         list_free(indexoidlist);
3307 }
3308
3309 /*
3310  * Release the resources acquired by vac_open_indexes.  Optionally release
3311  * the locks (say NoLock to keep 'em).
3312  */
3313 void
3314 vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3315 {
3316         if (Irel == NULL)
3317                 return;
3318
3319         while (nindexes--)
3320         {
3321                 Relation        ind = Irel[nindexes];
3322
3323                 if (lockmode != NoLock)
3324                         UnlockRelation(ind, lockmode);
3325                 index_close(ind);
3326         }
3327         pfree(Irel);
3328 }
3329
3330
3331 /*
3332  * Is an index partial (ie, could it contain fewer tuples than the heap?)
3333  */
3334 bool
3335 vac_is_partial_index(Relation indrel)
3336 {
3337         /*
3338          * If the index's AM doesn't support nulls, it's partial for our
3339          * purposes
3340          */
3341         if (!indrel->rd_am->amindexnulls)
3342                 return true;
3343
3344         /* Otherwise, look to see if there's a partial-index predicate */
3345         if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
3346                 return true;
3347
3348         return false;
3349 }
3350
3351
3352 static bool
3353 enough_space(VacPage vacpage, Size len)
3354 {
3355         len = MAXALIGN(len);
3356
3357         if (len > vacpage->free)
3358                 return false;
3359
3360         /* if there are free itemid(s) and len <= free_space... */
3361         if (vacpage->offsets_used < vacpage->offsets_free)
3362                 return true;
3363
3364         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3365         if (len + sizeof(ItemIdData) <= vacpage->free)
3366                 return true;
3367
3368         return false;
3369 }
3370
3371
3372 /*
3373  * Initialize usage snapshot.
3374  */
3375 void
3376 vac_init_rusage(VacRUsage *ru0)
3377 {
3378         struct timezone tz;
3379
3380         getrusage(RUSAGE_SELF, &ru0->ru);
3381         gettimeofday(&ru0->tv, &tz);
3382 }
3383
3384 /*
3385  * Compute elapsed time since ru0 usage snapshot, and format into
3386  * a displayable string.  Result is in a static string, which is
3387  * tacky, but no one ever claimed that the Postgres backend is
3388  * threadable...
3389  */
3390 const char *
3391 vac_show_rusage(VacRUsage *ru0)
3392 {
3393         static char result[100];
3394         VacRUsage       ru1;
3395
3396         vac_init_rusage(&ru1);
3397
3398         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3399         {
3400                 ru1.tv.tv_sec--;
3401                 ru1.tv.tv_usec += 1000000;
3402         }
3403         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3404         {
3405                 ru1.ru.ru_stime.tv_sec--;
3406                 ru1.ru.ru_stime.tv_usec += 1000000;
3407         }
3408         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3409         {
3410                 ru1.ru.ru_utime.tv_sec--;
3411                 ru1.ru.ru_utime.tv_usec += 1000000;
3412         }
3413
3414         snprintf(result, sizeof(result),
3415                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3416                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3417           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3418                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3419           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3420                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3421                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3422
3423         return result;
3424 }
3425
3426 /*
3427  * vacuum_delay_point --- check for interrupts and cost-based delay.
3428  *
3429  * This should be called in each major loop of VACUUM processing,
3430  * typically once per page processed.
3431  */
3432 void
3433 vacuum_delay_point(void)
3434 {
3435         /* Always check for interrupts */
3436         CHECK_FOR_INTERRUPTS();
3437
3438         /* Nap if appropriate */
3439         if (VacuumCostActive && !InterruptPending &&
3440                 VacuumCostBalance >= VacuumCostLimit)
3441         {
3442                 int                     msec;
3443
3444                 msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
3445                 if (msec > VacuumCostDelay * 4)
3446                         msec = VacuumCostDelay * 4;
3447
3448                 pg_usleep(msec * 1000L);
3449
3450                 VacuumCostBalance = 0;
3451
3452                 /* Might have gotten an interrupt while sleeping */
3453                 CHECK_FOR_INTERRUPTS();
3454         }
3455 }