]> granicus.if.org Git - postgresql/blob - src/backend/commands/vacuum.c
During VACUUM FULL, truncate off any deletable pages that are at the
[postgresql] / src / backend / commands / vacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * vacuum.c
4  *        The postgres vacuum cleaner.
5  *
6  * This file includes the "full" version of VACUUM, as well as control code
7  * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
8  * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
9  *
10  *
11  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.250 2003/02/24 00:57:17 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <unistd.h>
23
24 #include "access/clog.h"
25 #include "access/genam.h"
26 #include "access/heapam.h"
27 #include "access/xlog.h"
28 #include "catalog/catalog.h"
29 #include "catalog/catname.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_index.h"
33 #include "commands/vacuum.h"
34 #include "executor/executor.h"
35 #include "miscadmin.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "storage/smgr.h"
39 #include "tcop/pquery.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/inval.h"
44 #include "utils/lsyscache.h"
45 #include "utils/relcache.h"
46 #include "utils/syscache.h"
47 #include "pgstat.h"
48
49
50 typedef struct VacPageData
51 {
52         BlockNumber blkno;                      /* BlockNumber of this Page */
53         Size            free;                   /* FreeSpace on this Page */
54         uint16          offsets_used;   /* Number of OffNums used by vacuum */
55         uint16          offsets_free;   /* Number of OffNums free or to be free */
56         OffsetNumber offsets[1];        /* Array of free OffNums */
57 } VacPageData;
58
59 typedef VacPageData *VacPage;
60
61 typedef struct VacPageListData
62 {
63         BlockNumber empty_end_pages;    /* Number of "empty" end-pages */
64         int                     num_pages;              /* Number of pages in pagedesc */
65         int                     num_allocated_pages;    /* Number of allocated pages in
66                                                                                  * pagedesc */
67         VacPage    *pagedesc;           /* Descriptions of pages */
68 } VacPageListData;
69
70 typedef VacPageListData *VacPageList;
71
72 typedef struct VTupleLinkData
73 {
74         ItemPointerData new_tid;
75         ItemPointerData this_tid;
76 } VTupleLinkData;
77
78 typedef VTupleLinkData *VTupleLink;
79
80 typedef struct VTupleMoveData
81 {
82         ItemPointerData tid;            /* tuple ID */
83         VacPage         vacpage;                /* where to move */
84         bool            cleanVpd;               /* clean vacpage before using */
85 } VTupleMoveData;
86
87 typedef VTupleMoveData *VTupleMove;
88
89 typedef struct VRelStats
90 {
91         BlockNumber rel_pages;
92         double          rel_tuples;
93         Size            min_tlen;
94         Size            max_tlen;
95         bool            hasindex;
96         int                     num_vtlinks;
97         VTupleLink      vtlinks;
98 } VRelStats;
99
100
101 static MemoryContext vac_context = NULL;
102
103 static int      elevel = -1;
104
105 static TransactionId OldestXmin;
106 static TransactionId FreezeLimit;
107
108
109 /* non-export function prototypes */
110 static List *getrels(const RangeVar *vacrel, const char *stmttype);
111 static void vac_update_dbstats(Oid dbid,
112                                    TransactionId vacuumXID,
113                                    TransactionId frozenXID);
114 static void vac_truncate_clog(TransactionId vacuumXID,
115                                   TransactionId frozenXID);
116 static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
117 static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
118 static void scan_heap(VRelStats *vacrelstats, Relation onerel,
119                   VacPageList vacuum_pages, VacPageList fraged_pages);
120 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
121                         VacPageList vacuum_pages, VacPageList fraged_pages,
122                         int nindexes, Relation *Irel);
123 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
124                         VacPageList vacpagelist);
125 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
126 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
127                          double num_tuples, int keep_tuples);
128 static void scan_index(Relation indrel, double num_tuples);
129 static bool tid_reaped(ItemPointer itemptr, void *state);
130 static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
131 static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
132                            BlockNumber rel_pages);
133 static VacPage copy_vac_page(VacPage vacpage);
134 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
135 static void *vac_bsearch(const void *key, const void *base,
136                         size_t nelem, size_t size,
137                         int (*compar) (const void *, const void *));
138 static int      vac_cmp_blk(const void *left, const void *right);
139 static int      vac_cmp_offno(const void *left, const void *right);
140 static int      vac_cmp_vtlinks(const void *left, const void *right);
141 static bool enough_space(VacPage vacpage, Size len);
142
143
144 /****************************************************************************
145  *                                                                                                                                                      *
146  *                      Code common to all flavors of VACUUM and ANALYZE                                *
147  *                                                                                                                                                      *
148  ****************************************************************************
149  */
150
151
152 /*
153  * Primary entry point for VACUUM and ANALYZE commands.
154  */
155 void
156 vacuum(VacuumStmt *vacstmt)
157 {
158         const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
159         MemoryContext anl_context = NULL;
160         TransactionId initialOldestXmin = InvalidTransactionId;
161         TransactionId initialFreezeLimit = InvalidTransactionId;
162         bool            all_rels;
163         List       *vrl,
164                            *cur;
165
166         if (vacstmt->verbose)
167                 elevel = INFO;
168         else
169                 elevel = DEBUG1;
170
171         /*
172          * We cannot run VACUUM inside a user transaction block; if we were
173          * inside a transaction, then our commit- and
174          * start-transaction-command calls would not have the intended effect!
175          * Furthermore, the forced commit that occurs before truncating the
176          * relation's file would have the effect of committing the rest of the
177          * user's transaction too, which would certainly not be the desired
178          * behavior.
179          */
180         if (vacstmt->vacuum)
181                 PreventTransactionChain((void *) vacstmt, stmttype);
182
183         /*
184          * Send info about dead objects to the statistics collector
185          */
186         if (vacstmt->vacuum)
187                 pgstat_vacuum_tabstat();
188
189         /*
190          * Create special memory context for cross-transaction storage.
191          *
192          * Since it is a child of QueryContext, it will go away eventually even
193          * if we suffer an error; there's no need for special abort cleanup
194          * logic.
195          */
196         vac_context = AllocSetContextCreate(QueryContext,
197                                                                                 "Vacuum",
198                                                                                 ALLOCSET_DEFAULT_MINSIZE,
199                                                                                 ALLOCSET_DEFAULT_INITSIZE,
200                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
201
202         /*
203          * If we are running only ANALYZE, we don't need per-table
204          * transactions, but we still need a memory context with table
205          * lifetime.
206          */
207         if (vacstmt->analyze && !vacstmt->vacuum)
208                 anl_context = AllocSetContextCreate(QueryContext,
209                                                                                         "Analyze",
210                                                                                         ALLOCSET_DEFAULT_MINSIZE,
211                                                                                         ALLOCSET_DEFAULT_INITSIZE,
212                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
213
214         /* Assume we are processing everything unless one table is mentioned */
215         all_rels = (vacstmt->relation == NULL);
216
217         /* Build list of relations to process (note this lives in vac_context) */
218         vrl = getrels(vacstmt->relation, stmttype);
219
220         /*
221          * Formerly, there was code here to prevent more than one VACUUM from
222          * executing concurrently in the same database.  However, there's no
223          * good reason to prevent that, and manually removing lockfiles after
224          * a vacuum crash was a pain for dbadmins.      So, forget about
225          * lockfiles, and just rely on the locks we grab on each target table
226          * to ensure that there aren't two VACUUMs running on the same table
227          * at the same time.
228          */
229
230         /*
231          * The strangeness with committing and starting transactions here is
232          * due to wanting to run each table's VACUUM as a separate
233          * transaction, so that we don't hold locks unnecessarily long.  Also,
234          * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
235          * transaction from the VACUUM to further reduce locking.
236          *
237          * vacuum_rel expects to be entered with no transaction active; it will
238          * start and commit its own transaction.  But we are called by an SQL
239          * command, and so we are executing inside a transaction already.  We
240          * commit the transaction started in PostgresMain() here, and start
241          * another one before exiting to match the commit waiting for us back
242          * in PostgresMain().
243          *
244          * In the case of an ANALYZE statement (no vacuum, just analyze) it's
245          * okay to run the whole thing in the outer transaction, and so we
246          * skip transaction start/stop operations.
247          */
248         if (vacstmt->vacuum)
249         {
250                 if (all_rels)
251                 {
252                         /*
253                          * It's a database-wide VACUUM.
254                          *
255                          * Compute the initially applicable OldestXmin and FreezeLimit
256                          * XIDs, so that we can record these values at the end of the
257                          * VACUUM. Note that individual tables may well be processed
258                          * with newer values, but we can guarantee that no
259                          * (non-shared) relations are processed with older ones.
260                          *
261                          * It is okay to record non-shared values in pg_database, even
262                          * though we may vacuum shared relations with older cutoffs,
263                          * because only the minimum of the values present in
264                          * pg_database matters.  We can be sure that shared relations
265                          * have at some time been vacuumed with cutoffs no worse than
266                          * the global minimum; for, if there is a backend in some
267                          * other DB with xmin = OLDXMIN that's determining the cutoff
268                          * with which we vacuum shared relations, it is not possible
269                          * for that database to have a cutoff newer than OLDXMIN
270                          * recorded in pg_database.
271                          */
272                         vacuum_set_xid_limits(vacstmt, false,
273                                                                   &initialOldestXmin,
274                                                                   &initialFreezeLimit);
275                 }
276
277                 /* matches the StartTransaction in PostgresMain() */
278                 CommitTransactionCommand(true);
279         }
280
281         /*
282          * Loop to process each selected relation.
283          */
284         foreach(cur, vrl)
285         {
286                 Oid                     relid = lfirsto(cur);
287
288                 if (vacstmt->vacuum)
289                 {
290                         if (! vacuum_rel(relid, vacstmt, RELKIND_RELATION))
291                                 all_rels = false; /* forget about updating dbstats */
292                 }
293                 if (vacstmt->analyze)
294                 {
295                         MemoryContext old_context = NULL;
296
297                         /*
298                          * If we vacuumed, use new transaction for analyze.
299                          * Otherwise, we can use the outer transaction, but we still
300                          * need to call analyze_rel in a memory context that will be
301                          * cleaned up on return (else we leak memory while processing
302                          * multiple tables).
303                          */
304                         if (vacstmt->vacuum)
305                         {
306                                 StartTransactionCommand(true);
307                                 SetQuerySnapshot();     /* might be needed for functional index */
308                         }
309                         else
310                                 old_context = MemoryContextSwitchTo(anl_context);
311
312                         analyze_rel(relid, vacstmt);
313
314                         if (vacstmt->vacuum)
315                                 CommitTransactionCommand(true);
316                         else
317                         {
318                                 MemoryContextSwitchTo(old_context);
319                                 MemoryContextResetAndDeleteChildren(anl_context);
320                         }
321                 }
322         }
323
324         /*
325          * Finish up processing.
326          */
327         if (vacstmt->vacuum)
328         {
329                 /* here, we are not in a transaction */
330
331                 /*
332                  * This matches the CommitTransaction waiting for us in
333                  * PostgresMain(). We tell xact.c not to chain the upcoming
334                  * commit, so that a VACUUM doesn't start a transaction block,
335                  * even when autocommit is off.
336                  */
337                 StartTransactionCommand(true);
338
339                 /*
340                  * If we completed a database-wide VACUUM without skipping any
341                  * relations, update the database's pg_database row with info
342                  * about the transaction IDs used, and try to truncate pg_clog.
343                  */
344                 if (all_rels)
345                 {
346                         vac_update_dbstats(MyDatabaseId,
347                                                            initialOldestXmin, initialFreezeLimit);
348                         vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
349                 }
350         }
351
352         /*
353          * Clean up working storage --- note we must do this after
354          * StartTransactionCommand, else we might be trying to delete the
355          * active context!
356          */
357         MemoryContextDelete(vac_context);
358         vac_context = NULL;
359
360         if (anl_context)
361                 MemoryContextDelete(anl_context);
362 }
363
364 /*
365  * Build a list of Oids for each relation to be processed
366  *
367  * The list is built in vac_context so that it will survive across our
368  * per-relation transactions.
369  */
370 static List *
371 getrels(const RangeVar *vacrel, const char *stmttype)
372 {
373         List       *vrl = NIL;
374         MemoryContext oldcontext;
375
376         if (vacrel)
377         {
378                 /* Process specific relation */
379                 Oid                     relid;
380
381                 relid = RangeVarGetRelid(vacrel, false);
382
383                 /* Make a relation list entry for this guy */
384                 oldcontext = MemoryContextSwitchTo(vac_context);
385                 vrl = lappendo(vrl, relid);
386                 MemoryContextSwitchTo(oldcontext);
387         }
388         else
389         {
390                 /* Process all plain relations listed in pg_class */
391                 Relation        pgclass;
392                 HeapScanDesc scan;
393                 HeapTuple       tuple;
394                 ScanKeyData key;
395
396                 ScanKeyEntryInitialize(&key, 0x0,
397                                                            Anum_pg_class_relkind,
398                                                            F_CHAREQ,
399                                                            CharGetDatum(RELKIND_RELATION));
400
401                 pgclass = heap_openr(RelationRelationName, AccessShareLock);
402
403                 scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
404
405                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
406                 {
407                         /* Make a relation list entry for this guy */
408                         oldcontext = MemoryContextSwitchTo(vac_context);
409                         vrl = lappendo(vrl, HeapTupleGetOid(tuple));
410                         MemoryContextSwitchTo(oldcontext);
411                 }
412
413                 heap_endscan(scan);
414                 heap_close(pgclass, AccessShareLock);
415         }
416
417         return vrl;
418 }
419
420 /*
421  * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
422  */
423 void
424 vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
425                                           TransactionId *oldestXmin,
426                                           TransactionId *freezeLimit)
427 {
428         TransactionId limit;
429
430         *oldestXmin = GetOldestXmin(sharedRel);
431
432         Assert(TransactionIdIsNormal(*oldestXmin));
433
434         if (vacstmt->freeze)
435         {
436                 /* FREEZE option: use oldest Xmin as freeze cutoff too */
437                 limit = *oldestXmin;
438         }
439         else
440         {
441                 /*
442                  * Normal case: freeze cutoff is well in the past, to wit, about
443                  * halfway to the wrap horizon
444                  */
445                 limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
446         }
447
448         /*
449          * Be careful not to generate a "permanent" XID
450          */
451         if (!TransactionIdIsNormal(limit))
452                 limit = FirstNormalTransactionId;
453
454         /*
455          * Ensure sane relationship of limits
456          */
457         if (TransactionIdFollows(limit, *oldestXmin))
458         {
459                 elog(WARNING, "oldest Xmin is far in the past --- close open transactions soon to avoid wraparound problems");
460                 limit = *oldestXmin;
461         }
462
463         *freezeLimit = limit;
464 }
465
466
467 /*
468  *      vac_update_relstats() -- update statistics for one relation
469  *
470  *              Update the whole-relation statistics that are kept in its pg_class
471  *              row.  There are additional stats that will be updated if we are
472  *              doing ANALYZE, but we always update these stats.  This routine works
473  *              for both index and heap relation entries in pg_class.
474  *
475  *              We violate no-overwrite semantics here by storing new values for the
476  *              statistics columns directly into the pg_class tuple that's already on
477  *              the page.  The reason for this is that if we updated these tuples in
478  *              the usual way, vacuuming pg_class itself wouldn't work very well ---
479  *              by the time we got done with a vacuum cycle, most of the tuples in
480  *              pg_class would've been obsoleted.  Of course, this only works for
481  *              fixed-size never-null columns, but these are.
482  *
483  *              This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
484  *              ANALYZE.
485  */
486 void
487 vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
488                                         bool hasindex)
489 {
490         Relation        rd;
491         HeapTupleData rtup;
492         HeapTuple       ctup;
493         Form_pg_class pgcform;
494         Buffer          buffer;
495
496         /*
497          * update number of tuples and number of pages in pg_class
498          */
499         rd = heap_openr(RelationRelationName, RowExclusiveLock);
500
501         ctup = SearchSysCache(RELOID,
502                                                   ObjectIdGetDatum(relid),
503                                                   0, 0, 0);
504         if (!HeapTupleIsValid(ctup))
505                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
506                          relid);
507
508         /* get the buffer cache tuple */
509         rtup.t_self = ctup->t_self;
510         ReleaseSysCache(ctup);
511         if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
512                 elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
513                          relid);
514
515         /* overwrite the existing statistics in the tuple */
516         pgcform = (Form_pg_class) GETSTRUCT(&rtup);
517         pgcform->relpages = (int32) num_pages;
518         pgcform->reltuples = num_tuples;
519         pgcform->relhasindex = hasindex;
520
521         /*
522          * If we have discovered that there are no indexes, then there's no
523          * primary key either.  This could be done more thoroughly...
524          */
525         if (!hasindex)
526                 pgcform->relhaspkey = false;
527
528         /*
529          * Invalidate the tuple in the catcaches; this also arranges to flush
530          * the relation's relcache entry.  (If we fail to commit for some
531          * reason, no flush will occur, but no great harm is done since there
532          * are no noncritical state updates here.)
533          */
534         CacheInvalidateHeapTuple(rd, &rtup);
535
536         /* Write the buffer */
537         WriteBuffer(buffer);
538
539         heap_close(rd, RowExclusiveLock);
540 }
541
542
543 /*
544  *      vac_update_dbstats() -- update statistics for one database
545  *
546  *              Update the whole-database statistics that are kept in its pg_database
547  *              row.
548  *
549  *              We violate no-overwrite semantics here by storing new values for the
550  *              statistics columns directly into the tuple that's already on the page.
551  *              As with vac_update_relstats, this avoids leaving dead tuples behind
552  *              after a VACUUM; which is good since GetRawDatabaseInfo
553  *              can get confused by finding dead tuples in pg_database.
554  *
555  *              This routine is shared by full and lazy VACUUM.  Note that it is only
556  *              applied after a database-wide VACUUM operation.
557  */
558 static void
559 vac_update_dbstats(Oid dbid,
560                                    TransactionId vacuumXID,
561                                    TransactionId frozenXID)
562 {
563         Relation        relation;
564         ScanKeyData entry[1];
565         HeapScanDesc scan;
566         HeapTuple       tuple;
567         Form_pg_database dbform;
568
569         relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
570
571         /* Must use a heap scan, since there's no syscache for pg_database */
572         ScanKeyEntryInitialize(&entry[0], 0x0,
573                                                    ObjectIdAttributeNumber, F_OIDEQ,
574                                                    ObjectIdGetDatum(dbid));
575
576         scan = heap_beginscan(relation, SnapshotNow, 1, entry);
577
578         tuple = heap_getnext(scan, ForwardScanDirection);
579
580         if (!HeapTupleIsValid(tuple))
581                 elog(ERROR, "database %u does not exist", dbid);
582
583         dbform = (Form_pg_database) GETSTRUCT(tuple);
584
585         /* overwrite the existing statistics in the tuple */
586         dbform->datvacuumxid = vacuumXID;
587         dbform->datfrozenxid = frozenXID;
588
589         /* invalidate the tuple in the cache and write the buffer */
590         CacheInvalidateHeapTuple(relation, tuple);
591         WriteNoReleaseBuffer(scan->rs_cbuf);
592
593         heap_endscan(scan);
594
595         heap_close(relation, RowExclusiveLock);
596 }
597
598
599 /*
600  *      vac_truncate_clog() -- attempt to truncate the commit log
601  *
602  *              Scan pg_database to determine the system-wide oldest datvacuumxid,
603  *              and use it to truncate the transaction commit log (pg_clog).
604  *              Also generate a warning if the system-wide oldest datfrozenxid
605  *              seems to be in danger of wrapping around.
606  *
607  *              The passed XIDs are simply the ones I just wrote into my pg_database
608  *              entry.  They're used to initialize the "min" calculations.
609  *
610  *              This routine is shared by full and lazy VACUUM.  Note that it is only
611  *              applied after a database-wide VACUUM operation.
612  */
613 static void
614 vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
615 {
616         TransactionId myXID;
617         Relation        relation;
618         HeapScanDesc scan;
619         HeapTuple       tuple;
620         int32           age;
621         bool            vacuumAlreadyWrapped = false;
622         bool            frozenAlreadyWrapped = false;
623
624         myXID = GetCurrentTransactionId();
625
626         relation = heap_openr(DatabaseRelationName, AccessShareLock);
627
628         scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
629
630         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
631         {
632                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
633
634                 /* Ignore non-connectable databases (eg, template0) */
635                 /* It's assumed that these have been frozen correctly */
636                 if (!dbform->datallowconn)
637                         continue;
638
639                 if (TransactionIdIsNormal(dbform->datvacuumxid))
640                 {
641                         if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
642                                 vacuumAlreadyWrapped = true;
643                         else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
644                                 vacuumXID = dbform->datvacuumxid;
645                 }
646                 if (TransactionIdIsNormal(dbform->datfrozenxid))
647                 {
648                         if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
649                                 frozenAlreadyWrapped = true;
650                         else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
651                                 frozenXID = dbform->datfrozenxid;
652                 }
653         }
654
655         heap_endscan(scan);
656
657         heap_close(relation, AccessShareLock);
658
659         /*
660          * Do not truncate CLOG if we seem to have suffered wraparound
661          * already; the computed minimum XID might be bogus.
662          */
663         if (vacuumAlreadyWrapped)
664         {
665                 elog(WARNING, "Some databases have not been vacuumed in over 2 billion transactions."
666                          "\n\tYou may have already suffered transaction-wraparound data loss.");
667                 return;
668         }
669
670         /* Truncate CLOG to the oldest vacuumxid */
671         TruncateCLOG(vacuumXID);
672
673         /* Give warning about impending wraparound problems */
674         if (frozenAlreadyWrapped)
675         {
676                 elog(WARNING, "Some databases have not been vacuumed in over 1 billion transactions."
677                          "\n\tBetter vacuum them soon, or you may have a wraparound failure.");
678         }
679         else
680         {
681                 age = (int32) (myXID - frozenXID);
682                 if (age > (int32) ((MaxTransactionId >> 3) * 3))
683                         elog(WARNING, "Some databases have not been vacuumed in %d transactions."
684                                  "\n\tBetter vacuum them within %d transactions,"
685                                  "\n\tor you may have a wraparound failure.",
686                                  age, (int32) (MaxTransactionId >> 1) - age);
687         }
688 }
689
690
691 /****************************************************************************
692  *                                                                                                                                                      *
693  *                      Code common to both flavors of VACUUM                                                   *
694  *                                                                                                                                                      *
695  ****************************************************************************
696  */
697
698
699 /*
700  *      vacuum_rel() -- vacuum one heap relation
701  *
702  *              Returns TRUE if we actually processed the relation (or can ignore it
703  *              for some reason), FALSE if we failed to process it due to permissions
704  *              or other reasons.  (A FALSE result really means that some data
705  *              may have been left unvacuumed, so we can't update XID stats.)
706  *
707  *              Doing one heap at a time incurs extra overhead, since we need to
708  *              check that the heap exists again just before we vacuum it.      The
709  *              reason that we do this is so that vacuuming can be spread across
710  *              many small transactions.  Otherwise, two-phase locking would require
711  *              us to lock the entire database during one pass of the vacuum cleaner.
712  *
713  *              At entry and exit, we are not inside a transaction.
714  */
715 static bool
716 vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
717 {
718         LOCKMODE        lmode;
719         Relation        onerel;
720         LockRelId       onerelid;
721         Oid                     toast_relid;
722         bool            result;
723
724         /* Begin a transaction for vacuuming this relation */
725         StartTransactionCommand(true);
726         SetQuerySnapshot();                     /* might be needed for functional index */
727
728         /*
729          * Check for user-requested abort.      Note we want this to be inside a
730          * transaction, so xact.c doesn't issue useless WARNING.
731          */
732         CHECK_FOR_INTERRUPTS();
733
734         /*
735          * Race condition -- if the pg_class tuple has gone away since the
736          * last time we saw it, we don't need to vacuum it.
737          */
738         if (!SearchSysCacheExists(RELOID,
739                                                           ObjectIdGetDatum(relid),
740                                                           0, 0, 0))
741         {
742                 CommitTransactionCommand(true);
743                 return true;                    /* okay 'cause no data there */
744         }
745
746         /*
747          * Determine the type of lock we want --- hard exclusive lock for a
748          * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
749          * vacuum.      Either way, we can be sure that no other backend is
750          * vacuuming the same table.
751          */
752         lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
753
754         /*
755          * Open the class, get an appropriate lock on it, and check
756          * permissions.
757          *
758          * We allow the user to vacuum a table if he is superuser, the table
759          * owner, or the database owner (but in the latter case, only if it's
760          * not a shared relation).      pg_class_ownercheck includes the superuser
761          * case.
762          *
763          * Note we choose to treat permissions failure as a WARNING and keep
764          * trying to vacuum the rest of the DB --- is this appropriate?
765          */
766         onerel = relation_open(relid, lmode);
767
768         if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
769                   (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
770         {
771                 elog(WARNING, "Skipping \"%s\" --- only table or database owner can VACUUM it",
772                          RelationGetRelationName(onerel));
773                 relation_close(onerel, lmode);
774                 CommitTransactionCommand(true);
775                 return false;
776         }
777
778         /*
779          * Check that it's a plain table; we used to do this in getrels() but
780          * seems safer to check after we've locked the relation.
781          */
782         if (onerel->rd_rel->relkind != expected_relkind)
783         {
784                 elog(WARNING, "Skipping \"%s\" --- can not process indexes, views or special system tables",
785                          RelationGetRelationName(onerel));
786                 relation_close(onerel, lmode);
787                 CommitTransactionCommand(true);
788                 return false;
789         }
790
791         /*
792          * Silently ignore tables that are temp tables of other backends ---
793          * trying to vacuum these will lead to great unhappiness, since their
794          * contents are probably not up-to-date on disk.  (We don't throw a
795          * warning here; it would just lead to chatter during a database-wide
796          * VACUUM.)
797          */
798         if (isOtherTempNamespace(RelationGetNamespace(onerel)))
799         {
800                 relation_close(onerel, lmode);
801                 CommitTransactionCommand(true);
802                 return true;                    /* assume no long-lived data in temp tables */
803         }
804
805         /*
806          * Get a session-level lock too. This will protect our access to the
807          * relation across multiple transactions, so that we can vacuum the
808          * relation's TOAST table (if any) secure in the knowledge that no one
809          * is deleting the parent relation.
810          *
811          * NOTE: this cannot block, even if someone else is waiting for access,
812          * because the lock manager knows that both lock requests are from the
813          * same process.
814          */
815         onerelid = onerel->rd_lockInfo.lockRelId;
816         LockRelationForSession(&onerelid, lmode);
817
818         /*
819          * Remember the relation's TOAST relation for later
820          */
821         toast_relid = onerel->rd_rel->reltoastrelid;
822
823         /*
824          * Do the actual work --- either FULL or "lazy" vacuum
825          */
826         if (vacstmt->full)
827                 full_vacuum_rel(onerel, vacstmt);
828         else
829                 lazy_vacuum_rel(onerel, vacstmt);
830
831         result = true;                          /* did the vacuum */
832
833         /* all done with this class, but hold lock until commit */
834         relation_close(onerel, NoLock);
835
836         /*
837          * Complete the transaction and free all temporary memory used.
838          */
839         CommitTransactionCommand(true);
840
841         /*
842          * If the relation has a secondary toast rel, vacuum that too while we
843          * still hold the session lock on the master table.  Note however that
844          * "analyze" will not get done on the toast table.      This is good,
845          * because the toaster always uses hardcoded index access and
846          * statistics are totally unimportant for toast relations.
847          */
848         if (toast_relid != InvalidOid)
849         {
850                 if (! vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
851                         result = false;         /* failed to vacuum the TOAST table? */
852         }
853
854         /*
855          * Now release the session-level lock on the master table.
856          */
857         UnlockRelationForSession(&onerelid, lmode);
858
859         return result;
860 }
861
862
863 /****************************************************************************
864  *                                                                                                                                                      *
865  *                      Code for VACUUM FULL (only)                                                                             *
866  *                                                                                                                                                      *
867  ****************************************************************************
868  */
869
870
871 /*
872  *      full_vacuum_rel() -- perform FULL VACUUM for one heap relation
873  *
874  *              This routine vacuums a single heap, cleans out its indexes, and
875  *              updates its num_pages and num_tuples statistics.
876  *
877  *              At entry, we have already established a transaction and opened
878  *              and locked the relation.
879  */
880 static void
881 full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
882 {
883         VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
884                                                                                  * clean indexes */
885         VacPageListData fraged_pages;           /* List of pages with space enough
886                                                                                  * for re-using */
887         Relation   *Irel;
888         int                     nindexes,
889                                 i;
890         VRelStats  *vacrelstats;
891         bool            reindex = false;
892
893         if (IsIgnoringSystemIndexes() &&
894                 IsSystemRelation(onerel))
895                 reindex = true;
896
897         vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
898                                                   &OldestXmin, &FreezeLimit);
899
900         /*
901          * Set up statistics-gathering machinery.
902          */
903         vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
904         vacrelstats->rel_pages = 0;
905         vacrelstats->rel_tuples = 0;
906         vacrelstats->hasindex = false;
907
908         /* scan the heap */
909         vacuum_pages.num_pages = fraged_pages.num_pages = 0;
910         scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
911
912         /* Now open all indexes of the relation */
913         vac_open_indexes(onerel, &nindexes, &Irel);
914         if (!Irel)
915                 reindex = false;
916         else if (!RelationGetForm(onerel)->relhasindex)
917                 reindex = true;
918         if (nindexes > 0)
919                 vacrelstats->hasindex = true;
920
921 #ifdef NOT_USED
922
923         /*
924          * reindex in VACUUM is dangerous under WAL. ifdef out until it
925          * becomes safe.
926          */
927         if (reindex)
928         {
929                 vac_close_indexes(nindexes, Irel);
930                 Irel = (Relation *) NULL;
931                 activate_indexes_of_a_table(onerel, false);
932         }
933 #endif   /* NOT_USED */
934
935         /* Clean/scan index relation(s) */
936         if (Irel != (Relation *) NULL)
937         {
938                 if (vacuum_pages.num_pages > 0)
939                 {
940                         for (i = 0; i < nindexes; i++)
941                                 vacuum_index(&vacuum_pages, Irel[i],
942                                                          vacrelstats->rel_tuples, 0);
943                 }
944                 else
945                 {
946                         /* just scan indexes to update statistic */
947                         for (i = 0; i < nindexes; i++)
948                                 scan_index(Irel[i], vacrelstats->rel_tuples);
949                 }
950         }
951
952         if (fraged_pages.num_pages > 0)
953         {
954                 /* Try to shrink heap */
955                 repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
956                                         nindexes, Irel);
957                 vac_close_indexes(nindexes, Irel);
958         }
959         else
960         {
961                 vac_close_indexes(nindexes, Irel);
962                 if (vacuum_pages.num_pages > 0)
963                 {
964                         /* Clean pages from vacuum_pages list */
965                         vacuum_heap(vacrelstats, onerel, &vacuum_pages);
966                 }
967                 else
968                 {
969                         /*
970                          * Flush dirty pages out to disk.  We must do this even if we
971                          * didn't do anything else, because we want to ensure that all
972                          * tuples have correct on-row commit status on disk (see
973                          * bufmgr.c's comments for FlushRelationBuffers()).
974                          */
975                         i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
976                         if (i < 0)
977                                 elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
978                                          i);
979                 }
980         }
981
982 #ifdef NOT_USED
983         if (reindex)
984                 activate_indexes_of_a_table(onerel, true);
985 #endif   /* NOT_USED */
986
987         /* update shared free space map with final free space info */
988         vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
989
990         /* update statistics in pg_class */
991         vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
992                                                 vacrelstats->rel_tuples, vacrelstats->hasindex);
993 }
994
995
996 /*
997  *      scan_heap() -- scan an open heap relation
998  *
999  *              This routine sets commit status bits, constructs vacuum_pages (list
1000  *              of pages we need to compact free space on and/or clean indexes of
1001  *              deleted tuples), constructs fraged_pages (list of pages with free
1002  *              space that tuples could be moved into), and calculates statistics
1003  *              on the number of live tuples in the heap.
1004  */
1005 static void
1006 scan_heap(VRelStats *vacrelstats, Relation onerel,
1007                   VacPageList vacuum_pages, VacPageList fraged_pages)
1008 {
1009         BlockNumber nblocks,
1010                                 blkno;
1011         ItemId          itemid;
1012         Buffer          buf;
1013         HeapTupleData tuple;
1014         OffsetNumber offnum,
1015                                 maxoff;
1016         bool            pgchanged,
1017                                 tupgone,
1018                                 notup;
1019         char       *relname;
1020         VacPage         vacpage,
1021                                 vacpagecopy;
1022         BlockNumber empty_pages,
1023                                 new_pages,
1024                                 changed_pages,
1025                                 empty_end_pages;
1026         double          num_tuples,
1027                                 tups_vacuumed,
1028                                 nkeep,
1029                                 nunused;
1030         double          free_size,
1031                                 usable_free_size;
1032         Size            min_tlen = MaxTupleSize;
1033         Size            max_tlen = 0;
1034         int                     i;
1035         bool            do_shrinking = true;
1036         VTupleLink      vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
1037         int                     num_vtlinks = 0;
1038         int                     free_vtlinks = 100;
1039         VacRUsage       ru0;
1040
1041         vac_init_rusage(&ru0);
1042
1043         relname = RelationGetRelationName(onerel);
1044         elog(elevel, "--Relation %s.%s--",
1045                  get_namespace_name(RelationGetNamespace(onerel)),
1046                  relname);
1047
1048         empty_pages = new_pages = changed_pages = empty_end_pages = 0;
1049         num_tuples = tups_vacuumed = nkeep = nunused = 0;
1050         free_size = 0;
1051
1052         nblocks = RelationGetNumberOfBlocks(onerel);
1053
1054         /*
1055          * We initially create each VacPage item in a maximal-sized workspace,
1056          * then copy the workspace into a just-large-enough copy.
1057          */
1058         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1059
1060         for (blkno = 0; blkno < nblocks; blkno++)
1061         {
1062                 Page            page,
1063                                         tempPage = NULL;
1064                 bool            do_reap,
1065                                         do_frag;
1066
1067                 CHECK_FOR_INTERRUPTS();
1068
1069                 buf = ReadBuffer(onerel, blkno);
1070                 page = BufferGetPage(buf);
1071
1072                 vacpage->blkno = blkno;
1073                 vacpage->offsets_used = 0;
1074                 vacpage->offsets_free = 0;
1075
1076                 if (PageIsNew(page))
1077                 {
1078                         elog(WARNING, "Rel %s: Uninitialized page %u - fixing",
1079                                  relname, blkno);
1080                         PageInit(page, BufferGetPageSize(buf), 0);
1081                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1082                         free_size += vacpage->free;
1083                         new_pages++;
1084                         empty_end_pages++;
1085                         vacpagecopy = copy_vac_page(vacpage);
1086                         vpage_insert(vacuum_pages, vacpagecopy);
1087                         vpage_insert(fraged_pages, vacpagecopy);
1088                         WriteBuffer(buf);
1089                         continue;
1090                 }
1091
1092                 if (PageIsEmpty(page))
1093                 {
1094                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1095                         free_size += vacpage->free;
1096                         empty_pages++;
1097                         empty_end_pages++;
1098                         vacpagecopy = copy_vac_page(vacpage);
1099                         vpage_insert(vacuum_pages, vacpagecopy);
1100                         vpage_insert(fraged_pages, vacpagecopy);
1101                         ReleaseBuffer(buf);
1102                         continue;
1103                 }
1104
1105                 pgchanged = false;
1106                 notup = true;
1107                 maxoff = PageGetMaxOffsetNumber(page);
1108                 for (offnum = FirstOffsetNumber;
1109                          offnum <= maxoff;
1110                          offnum = OffsetNumberNext(offnum))
1111                 {
1112                         uint16          sv_infomask;
1113
1114                         itemid = PageGetItemId(page, offnum);
1115
1116                         /*
1117                          * Collect un-used items too - it's possible to have indexes
1118                          * pointing here after crash.
1119                          */
1120                         if (!ItemIdIsUsed(itemid))
1121                         {
1122                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1123                                 nunused += 1;
1124                                 continue;
1125                         }
1126
1127                         tuple.t_datamcxt = NULL;
1128                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1129                         tuple.t_len = ItemIdGetLength(itemid);
1130                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1131
1132                         tupgone = false;
1133                         sv_infomask = tuple.t_data->t_infomask;
1134
1135                         switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
1136                         {
1137                                 case HEAPTUPLE_DEAD:
1138                                         tupgone = true;         /* we can delete the tuple */
1139                                         break;
1140                                 case HEAPTUPLE_LIVE:
1141
1142                                         /*
1143                                          * Tuple is good.  Consider whether to replace its
1144                                          * xmin value with FrozenTransactionId.
1145                                          */
1146                                         if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
1147                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1148                                                                                           FreezeLimit))
1149                                         {
1150                                                 HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
1151                                                 /* infomask should be okay already */
1152                                                 Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1153                                                 pgchanged = true;
1154                                         }
1155                                         break;
1156                                 case HEAPTUPLE_RECENTLY_DEAD:
1157
1158                                         /*
1159                                          * If tuple is recently deleted then we must not
1160                                          * remove it from relation.
1161                                          */
1162                                         nkeep += 1;
1163
1164                                         /*
1165                                          * If we do shrinking and this tuple is updated one
1166                                          * then remember it to construct updated tuple
1167                                          * dependencies.
1168                                          */
1169                                         if (do_shrinking &&
1170                                                 !(ItemPointerEquals(&(tuple.t_self),
1171                                                                                         &(tuple.t_data->t_ctid))))
1172                                         {
1173                                                 if (free_vtlinks == 0)
1174                                                 {
1175                                                         free_vtlinks = 1000;
1176                                                         vtlinks = (VTupleLink) repalloc(vtlinks,
1177                                                                                    (free_vtlinks + num_vtlinks) *
1178                                                                                                  sizeof(VTupleLinkData));
1179                                                 }
1180                                                 vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
1181                                                 vtlinks[num_vtlinks].this_tid = tuple.t_self;
1182                                                 free_vtlinks--;
1183                                                 num_vtlinks++;
1184                                         }
1185                                         break;
1186                                 case HEAPTUPLE_INSERT_IN_PROGRESS:
1187
1188                                         /*
1189                                          * This should not happen, since we hold exclusive
1190                                          * lock on the relation; shouldn't we raise an error?
1191                                          */
1192                                         elog(WARNING, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
1193                                                  relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data));
1194                                         do_shrinking = false;
1195                                         break;
1196                                 case HEAPTUPLE_DELETE_IN_PROGRESS:
1197
1198                                         /*
1199                                          * This should not happen, since we hold exclusive
1200                                          * lock on the relation; shouldn't we raise an error?
1201                                          */
1202                                         elog(WARNING, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
1203                                                  relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data));
1204                                         do_shrinking = false;
1205                                         break;
1206                                 default:
1207                                         elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
1208                                         break;
1209                         }
1210
1211                         /* check for hint-bit update by HeapTupleSatisfiesVacuum */
1212                         if (sv_infomask != tuple.t_data->t_infomask)
1213                                 pgchanged = true;
1214
1215                         /*
1216                          * Other checks...
1217                          */
1218                         if (onerel->rd_rel->relhasoids &&
1219                                 !OidIsValid(HeapTupleGetOid(&tuple)))
1220                                 elog(WARNING, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
1221                                          relname, blkno, offnum, (int) tupgone);
1222
1223                         if (tupgone)
1224                         {
1225                                 ItemId          lpp;
1226
1227                                 /*
1228                                  * Here we are building a temporary copy of the page with
1229                                  * dead tuples removed.  Below we will apply
1230                                  * PageRepairFragmentation to the copy, so that we can
1231                                  * determine how much space will be available after
1232                                  * removal of dead tuples.      But note we are NOT changing
1233                                  * the real page yet...
1234                                  */
1235                                 if (tempPage == (Page) NULL)
1236                                 {
1237                                         Size            pageSize;
1238
1239                                         pageSize = PageGetPageSize(page);
1240                                         tempPage = (Page) palloc(pageSize);
1241                                         memcpy(tempPage, page, pageSize);
1242                                 }
1243
1244                                 /* mark it unused on the temp page */
1245                                 lpp = PageGetItemId(tempPage, offnum);
1246                                 lpp->lp_flags &= ~LP_USED;
1247
1248                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1249                                 tups_vacuumed += 1;
1250                         }
1251                         else
1252                         {
1253                                 num_tuples += 1;
1254                                 notup = false;
1255                                 if (tuple.t_len < min_tlen)
1256                                         min_tlen = tuple.t_len;
1257                                 if (tuple.t_len > max_tlen)
1258                                         max_tlen = tuple.t_len;
1259                         }
1260                 }                                               /* scan along page */
1261
1262                 if (tempPage != (Page) NULL)
1263                 {
1264                         /* Some tuples are removable; figure free space after removal */
1265                         PageRepairFragmentation(tempPage, NULL);
1266                         vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
1267                         pfree(tempPage);
1268                         do_reap = true;
1269                 }
1270                 else
1271                 {
1272                         /* Just use current available space */
1273                         vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
1274                         /* Need to reap the page if it has ~LP_USED line pointers */
1275                         do_reap = (vacpage->offsets_free > 0);
1276                 }
1277
1278                 free_size += vacpage->free;
1279
1280                 /*
1281                  * Add the page to fraged_pages if it has a useful amount of free
1282                  * space.  "Useful" means enough for a minimal-sized tuple. But we
1283                  * don't know that accurately near the start of the relation, so
1284                  * add pages unconditionally if they have >= BLCKSZ/10 free space.
1285                  */
1286                 do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1287
1288                 if (do_reap || do_frag)
1289                 {
1290                         vacpagecopy = copy_vac_page(vacpage);
1291                         if (do_reap)
1292                                 vpage_insert(vacuum_pages, vacpagecopy);
1293                         if (do_frag)
1294                                 vpage_insert(fraged_pages, vacpagecopy);
1295                 }
1296
1297                 if (notup)
1298                         empty_end_pages++;
1299                 else
1300                         empty_end_pages = 0;
1301
1302                 if (pgchanged)
1303                 {
1304                         WriteBuffer(buf);
1305                         changed_pages++;
1306                 }
1307                 else
1308                         ReleaseBuffer(buf);
1309         }
1310
1311         pfree(vacpage);
1312
1313         /* save stats in the rel list for use later */
1314         vacrelstats->rel_tuples = num_tuples;
1315         vacrelstats->rel_pages = nblocks;
1316         if (num_tuples == 0)
1317                 min_tlen = max_tlen = 0;
1318         vacrelstats->min_tlen = min_tlen;
1319         vacrelstats->max_tlen = max_tlen;
1320
1321         vacuum_pages->empty_end_pages = empty_end_pages;
1322         fraged_pages->empty_end_pages = empty_end_pages;
1323
1324         /*
1325          * Clear the fraged_pages list if we found we couldn't shrink. Else,
1326          * remove any "empty" end-pages from the list, and compute usable free
1327          * space = free space in remaining pages.
1328          */
1329         if (do_shrinking)
1330         {
1331                 Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
1332                 fraged_pages->num_pages -= empty_end_pages;
1333                 usable_free_size = 0;
1334                 for (i = 0; i < fraged_pages->num_pages; i++)
1335                         usable_free_size += fraged_pages->pagedesc[i]->free;
1336         }
1337         else
1338         {
1339                 fraged_pages->num_pages = 0;
1340                 usable_free_size = 0;
1341         }
1342
1343         /* don't bother to save vtlinks if we will not call repair_frag */
1344         if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1345         {
1346                 qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
1347                           vac_cmp_vtlinks);
1348                 vacrelstats->vtlinks = vtlinks;
1349                 vacrelstats->num_vtlinks = num_vtlinks;
1350         }
1351         else
1352         {
1353                 vacrelstats->vtlinks = NULL;
1354                 vacrelstats->num_vtlinks = 0;
1355                 pfree(vtlinks);
1356         }
1357
1358         elog(elevel, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; "
1359                  "Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, "
1360                  "MaxLen %lu; Re-using: Free/Avail. Space %.0f/%.0f; "
1361                  "EndEmpty/Avail. Pages %u/%u.\n\t%s",
1362                  nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
1363                  new_pages, num_tuples, tups_vacuumed,
1364                  nkeep, vacrelstats->num_vtlinks,
1365                  nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
1366                  free_size, usable_free_size,
1367                  empty_end_pages, fraged_pages->num_pages,
1368                  vac_show_rusage(&ru0));
1369 }
1370
1371
1372 /*
1373  *      repair_frag() -- try to repair relation's fragmentation
1374  *
1375  *              This routine marks dead tuples as unused and tries re-use dead space
1376  *              by moving tuples (and inserting indexes if needed). It constructs
1377  *              Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1378  *              for them after committing (in hack-manner - without losing locks
1379  *              and freeing memory!) current transaction. It truncates relation
1380  *              if some end-blocks are gone away.
1381  */
1382 static void
1383 repair_frag(VRelStats *vacrelstats, Relation onerel,
1384                         VacPageList vacuum_pages, VacPageList fraged_pages,
1385                         int nindexes, Relation *Irel)
1386 {
1387         TransactionId myXID;
1388         CommandId       myCID;
1389         Buffer          buf,
1390                                 cur_buffer;
1391         BlockNumber nblocks,
1392                                 blkno;
1393         BlockNumber last_move_dest_block = 0,
1394                                 last_vacuum_block;
1395         Page            page,
1396                                 ToPage = NULL;
1397         OffsetNumber offnum,
1398                                 maxoff,
1399                                 newoff,
1400                                 max_offset;
1401         ItemId          itemid,
1402                                 newitemid;
1403         HeapTupleData tuple,
1404                                 newtup;
1405         TupleDesc       tupdesc;
1406         ResultRelInfo *resultRelInfo;
1407         EState     *estate;
1408         TupleTable      tupleTable;
1409         TupleTableSlot *slot;
1410         VacPageListData Nvacpagelist;
1411         VacPage         cur_page = NULL,
1412                                 last_vacuum_page,
1413                                 vacpage,
1414                            *curpage;
1415         int                     cur_item = 0;
1416         int                     i;
1417         Size            tuple_len;
1418         int                     num_moved,
1419                                 num_fraged_pages,
1420                                 vacuumed_pages;
1421         int                     checked_moved,
1422                                 num_tuples,
1423                                 keep_tuples = 0;
1424         bool            isempty,
1425                                 dowrite,
1426                                 chain_tuple_moved;
1427         VacRUsage       ru0;
1428
1429         vac_init_rusage(&ru0);
1430
1431         myXID = GetCurrentTransactionId();
1432         myCID = GetCurrentCommandId();
1433
1434         tupdesc = RelationGetDescr(onerel);
1435
1436         /*
1437          * We need a ResultRelInfo and an EState so we can use the regular
1438          * executor's index-entry-making machinery.
1439          */
1440         estate = CreateExecutorState();
1441
1442         resultRelInfo = makeNode(ResultRelInfo);
1443         resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
1444         resultRelInfo->ri_RelationDesc = onerel;
1445         resultRelInfo->ri_TrigDesc = NULL;      /* we don't fire triggers */
1446
1447         ExecOpenIndices(resultRelInfo);
1448
1449         estate->es_result_relations = resultRelInfo;
1450         estate->es_num_result_relations = 1;
1451         estate->es_result_relation_info = resultRelInfo;
1452
1453         /* Set up a dummy tuple table too */
1454         tupleTable = ExecCreateTupleTable(1);
1455         slot = ExecAllocTableSlot(tupleTable);
1456         ExecSetSlotDescriptor(slot, tupdesc, false);
1457
1458         Nvacpagelist.num_pages = 0;
1459         num_fraged_pages = fraged_pages->num_pages;
1460         Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
1461         vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1462         if (vacuumed_pages > 0)
1463         {
1464                 /* get last reaped page from vacuum_pages */
1465                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1466                 last_vacuum_block = last_vacuum_page->blkno;
1467         }
1468         else
1469         {
1470                 last_vacuum_page = NULL;
1471                 last_vacuum_block = InvalidBlockNumber;
1472         }
1473         cur_buffer = InvalidBuffer;
1474         num_moved = 0;
1475
1476         vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1477         vacpage->offsets_used = vacpage->offsets_free = 0;
1478
1479         /*
1480          * Scan pages backwards from the last nonempty page, trying to move
1481          * tuples down to lower pages.  Quit when we reach a page that we have
1482          * moved any tuples onto, or the first page if we haven't moved
1483          * anything, or when we find a page we cannot completely empty (this
1484          * last condition is handled by "break" statements within the loop).
1485          *
1486          * NB: this code depends on the vacuum_pages and fraged_pages lists being
1487          * in order by blkno.
1488          */
1489         nblocks = vacrelstats->rel_pages;
1490         for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1491                  blkno > last_move_dest_block;
1492                  blkno--)
1493         {
1494                 CHECK_FOR_INTERRUPTS();
1495
1496                 /*
1497                  * Forget fraged_pages pages at or after this one; they're no
1498                  * longer useful as move targets, since we only want to move down.
1499                  * Note that since we stop the outer loop at last_move_dest_block,
1500                  * pages removed here cannot have had anything moved onto them
1501                  * already.
1502                  *
1503                  * Also note that we don't change the stored fraged_pages list, only
1504                  * our local variable num_fraged_pages; so the forgotten pages are
1505                  * still available to be loaded into the free space map later.
1506                  */
1507                 while (num_fraged_pages > 0 &&
1508                         fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1509                 {
1510                         Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1511                         --num_fraged_pages;
1512                 }
1513
1514                 /*
1515                  * Process this page of relation.
1516                  */
1517                 buf = ReadBuffer(onerel, blkno);
1518                 page = BufferGetPage(buf);
1519
1520                 vacpage->offsets_free = 0;
1521
1522                 isempty = PageIsEmpty(page);
1523
1524                 dowrite = false;
1525
1526                 /* Is the page in the vacuum_pages list? */
1527                 if (blkno == last_vacuum_block)
1528                 {
1529                         if (last_vacuum_page->offsets_free > 0)
1530                         {
1531                                 /* there are dead tuples on this page - clean them */
1532                                 Assert(!isempty);
1533                                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1534                                 vacuum_page(onerel, buf, last_vacuum_page);
1535                                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1536                                 dowrite = true;
1537                         }
1538                         else
1539                                 Assert(isempty);
1540                         --vacuumed_pages;
1541                         if (vacuumed_pages > 0)
1542                         {
1543                                 /* get prev reaped page from vacuum_pages */
1544                                 last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
1545                                 last_vacuum_block = last_vacuum_page->blkno;
1546                         }
1547                         else
1548                         {
1549                                 last_vacuum_page = NULL;
1550                                 last_vacuum_block = InvalidBlockNumber;
1551                         }
1552                         if (isempty)
1553                         {
1554                                 ReleaseBuffer(buf);
1555                                 continue;
1556                         }
1557                 }
1558                 else
1559                         Assert(!isempty);
1560
1561                 chain_tuple_moved = false;              /* no one chain-tuple was moved
1562                                                                                  * off this page, yet */
1563                 vacpage->blkno = blkno;
1564                 maxoff = PageGetMaxOffsetNumber(page);
1565                 for (offnum = FirstOffsetNumber;
1566                          offnum <= maxoff;
1567                          offnum = OffsetNumberNext(offnum))
1568                 {
1569                         itemid = PageGetItemId(page, offnum);
1570
1571                         if (!ItemIdIsUsed(itemid))
1572                                 continue;
1573
1574                         tuple.t_datamcxt = NULL;
1575                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
1576                         tuple_len = tuple.t_len = ItemIdGetLength(itemid);
1577                         ItemPointerSet(&(tuple.t_self), blkno, offnum);
1578
1579                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
1580                         {
1581                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
1582                                         elog(ERROR, "HEAP_MOVED_IN was not expected");
1583
1584                                 /*
1585                                  * If this (chain) tuple is moved by me already then I
1586                                  * have to check is it in vacpage or not - i.e. is it
1587                                  * moved while cleaning this page or some previous one.
1588                                  */
1589                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
1590                                 {
1591                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
1592                                                 elog(ERROR, "Invalid XVAC in tuple header");
1593                                         if (keep_tuples == 0)
1594                                                 continue;
1595                                         if (chain_tuple_moved)          /* some chains was moved
1596                                                                                                  * while */
1597                                         {                       /* cleaning this page */
1598                                                 Assert(vacpage->offsets_free > 0);
1599                                                 for (i = 0; i < vacpage->offsets_free; i++)
1600                                                 {
1601                                                         if (vacpage->offsets[i] == offnum)
1602                                                                 break;
1603                                                 }
1604                                                 if (i >= vacpage->offsets_free) /* not found */
1605                                                 {
1606                                                         vacpage->offsets[vacpage->offsets_free++] = offnum;
1607                                                         keep_tuples--;
1608                                                 }
1609                                         }
1610                                         else
1611                                         {
1612                                                 vacpage->offsets[vacpage->offsets_free++] = offnum;
1613                                                 keep_tuples--;
1614                                         }
1615                                         continue;
1616                                 }
1617                                 elog(ERROR, "HEAP_MOVED_OFF was expected");
1618                         }
1619
1620                         /*
1621                          * If this tuple is in the chain of tuples created in updates
1622                          * by "recent" transactions then we have to move all chain of
1623                          * tuples to another places.
1624                          *
1625                          * NOTE: this test is not 100% accurate: it is possible for a
1626                          * tuple to be an updated one with recent xmin, and yet not
1627                          * have a corresponding tuple in the vtlinks list.      Presumably
1628                          * there was once a parent tuple with xmax matching the xmin,
1629                          * but it's possible that that tuple has been removed --- for
1630                          * example, if it had xmin = xmax then
1631                          * HeapTupleSatisfiesVacuum would deem it removable as soon as
1632                          * the xmin xact completes.
1633                          *
1634                          * To be on the safe side, we abandon the repair_frag process if
1635                          * we cannot find the parent tuple in vtlinks.  This may be
1636                          * overly conservative; AFAICS it would be safe to move the
1637                          * chain.
1638                          */
1639                         if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1640                          !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1641                                                                         OldestXmin)) ||
1642                                 (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1643                                                                                            HEAP_MARKED_FOR_UPDATE)) &&
1644                                  !(ItemPointerEquals(&(tuple.t_self),
1645                                                                          &(tuple.t_data->t_ctid)))))
1646                         {
1647                                 Buffer          Cbuf = buf;
1648                                 bool            freeCbuf = false;
1649                                 bool            chain_move_failed = false;
1650                                 Page            Cpage;
1651                                 ItemId          Citemid;
1652                                 ItemPointerData Ctid;
1653                                 HeapTupleData tp = tuple;
1654                                 Size            tlen = tuple_len;
1655                                 VTupleMove      vtmove;
1656                                 int                     num_vtmove;
1657                                 int                     free_vtmove;
1658                                 VacPage         to_vacpage = NULL;
1659                                 int                     to_item = 0;
1660                                 int                     ti;
1661
1662                                 if (cur_buffer != InvalidBuffer)
1663                                 {
1664                                         WriteBuffer(cur_buffer);
1665                                         cur_buffer = InvalidBuffer;
1666                                 }
1667
1668                                 /* Quick exit if we have no vtlinks to search in */
1669                                 if (vacrelstats->vtlinks == NULL)
1670                                 {
1671                                         elog(DEBUG1, "Parent item in update-chain not found - can't continue repair_frag");
1672                                         break;          /* out of walk-along-page loop */
1673                                 }
1674
1675                                 vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
1676                                 num_vtmove = 0;
1677                                 free_vtmove = 100;
1678
1679                                 /*
1680                                  * If this tuple is in the begin/middle of the chain then
1681                                  * we have to move to the end of chain.
1682                                  */
1683                                 while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
1684                                                                                           HEAP_MARKED_FOR_UPDATE)) &&
1685                                            !(ItemPointerEquals(&(tp.t_self),
1686                                                                                    &(tp.t_data->t_ctid))))
1687                                 {
1688                                         Ctid = tp.t_data->t_ctid;
1689                                         if (freeCbuf)
1690                                                 ReleaseBuffer(Cbuf);
1691                                         freeCbuf = true;
1692                                         Cbuf = ReadBuffer(onerel,
1693                                                                           ItemPointerGetBlockNumber(&Ctid));
1694                                         Cpage = BufferGetPage(Cbuf);
1695                                         Citemid = PageGetItemId(Cpage,
1696                                                                           ItemPointerGetOffsetNumber(&Ctid));
1697                                         if (!ItemIdIsUsed(Citemid))
1698                                         {
1699                                                 /*
1700                                                  * This means that in the middle of chain there
1701                                                  * was tuple updated by older (than OldestXmin)
1702                                                  * xaction and this tuple is already deleted by
1703                                                  * me. Actually, upper part of chain should be
1704                                                  * removed and seems that this should be handled
1705                                                  * in scan_heap(), but it's not implemented at the
1706                                                  * moment and so we just stop shrinking here.
1707                                                  */
1708                                                 elog(DEBUG1, "Child itemid in update-chain marked as unused - can't continue repair_frag");
1709                                                 chain_move_failed = true;
1710                                                 break;  /* out of loop to move to chain end */
1711                                         }
1712                                         tp.t_datamcxt = NULL;
1713                                         tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1714                                         tp.t_self = Ctid;
1715                                         tlen = tp.t_len = ItemIdGetLength(Citemid);
1716                                 }
1717                                 if (chain_move_failed)
1718                                 {
1719                                         if (freeCbuf)
1720                                                 ReleaseBuffer(Cbuf);
1721                                         pfree(vtmove);
1722                                         break;          /* out of walk-along-page loop */
1723                                 }
1724
1725                                 /*
1726                                  * Check if all items in chain can be moved
1727                                  */
1728                                 for (;;)
1729                                 {
1730                                         Buffer          Pbuf;
1731                                         Page            Ppage;
1732                                         ItemId          Pitemid;
1733                                         HeapTupleData Ptp;
1734                                         VTupleLinkData vtld,
1735                                                            *vtlp;
1736
1737                                         if (to_vacpage == NULL ||
1738                                                 !enough_space(to_vacpage, tlen))
1739                                         {
1740                                                 for (i = 0; i < num_fraged_pages; i++)
1741                                                 {
1742                                                         if (enough_space(fraged_pages->pagedesc[i], tlen))
1743                                                                 break;
1744                                                 }
1745
1746                                                 if (i == num_fraged_pages)
1747                                                 {
1748                                                         /* can't move item anywhere */
1749                                                         chain_move_failed = true;
1750                                                         break;          /* out of check-all-items loop */
1751                                                 }
1752                                                 to_item = i;
1753                                                 to_vacpage = fraged_pages->pagedesc[to_item];
1754                                         }
1755                                         to_vacpage->free -= MAXALIGN(tlen);
1756                                         if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
1757                                                 to_vacpage->free -= sizeof(ItemIdData);
1758                                         (to_vacpage->offsets_used)++;
1759                                         if (free_vtmove == 0)
1760                                         {
1761                                                 free_vtmove = 1000;
1762                                                 vtmove = (VTupleMove)
1763                                                         repalloc(vtmove,
1764                                                                          (free_vtmove + num_vtmove) *
1765                                                                          sizeof(VTupleMoveData));
1766                                         }
1767                                         vtmove[num_vtmove].tid = tp.t_self;
1768                                         vtmove[num_vtmove].vacpage = to_vacpage;
1769                                         if (to_vacpage->offsets_used == 1)
1770                                                 vtmove[num_vtmove].cleanVpd = true;
1771                                         else
1772                                                 vtmove[num_vtmove].cleanVpd = false;
1773                                         free_vtmove--;
1774                                         num_vtmove++;
1775
1776                                         /* At beginning of chain? */
1777                                         if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
1778                                                 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
1779                                                                                           OldestXmin))
1780                                                 break;
1781
1782                                         /* No, move to tuple with prior row version */
1783                                         vtld.new_tid = tp.t_self;
1784                                         vtlp = (VTupleLink)
1785                                                 vac_bsearch((void *) &vtld,
1786                                                                         (void *) (vacrelstats->vtlinks),
1787                                                                         vacrelstats->num_vtlinks,
1788                                                                         sizeof(VTupleLinkData),
1789                                                                         vac_cmp_vtlinks);
1790                                         if (vtlp == NULL)
1791                                         {
1792                                                 /* see discussion above */
1793                                                 elog(DEBUG1, "Parent item in update-chain not found - can't continue repair_frag");
1794                                                 chain_move_failed = true;
1795                                                 break;  /* out of check-all-items loop */
1796                                         }
1797                                         tp.t_self = vtlp->this_tid;
1798                                         Pbuf = ReadBuffer(onerel,
1799                                                                 ItemPointerGetBlockNumber(&(tp.t_self)));
1800                                         Ppage = BufferGetPage(Pbuf);
1801                                         Pitemid = PageGetItemId(Ppage,
1802                                                            ItemPointerGetOffsetNumber(&(tp.t_self)));
1803                                         /* this can't happen since we saw tuple earlier: */
1804                                         if (!ItemIdIsUsed(Pitemid))
1805                                                 elog(ERROR, "Parent itemid marked as unused");
1806                                         Ptp.t_datamcxt = NULL;
1807                                         Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
1808
1809                                         /* ctid should not have changed since we saved it */
1810                                         Assert(ItemPointerEquals(&(vtld.new_tid),
1811                                                                                          &(Ptp.t_data->t_ctid)));
1812
1813                                         /*
1814                                          * Read above about cases when !ItemIdIsUsed(Citemid)
1815                                          * (child item is removed)... Due to the fact that at
1816                                          * the moment we don't remove unuseful part of
1817                                          * update-chain, it's possible to get too old parent
1818                                          * row here. Like as in the case which caused this
1819                                          * problem, we stop shrinking here. I could try to
1820                                          * find real parent row but want not to do it because
1821                                          * of real solution will be implemented anyway, later,
1822                                          * and we are too close to 6.5 release. - vadim
1823                                          * 06/11/99
1824                                          */
1825                                         if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
1826                                                                          HeapTupleHeaderGetXmin(tp.t_data))))
1827                                         {
1828                                                 ReleaseBuffer(Pbuf);
1829                                                 elog(DEBUG1, "Too old parent tuple found - can't continue repair_frag");
1830                                                 chain_move_failed = true;
1831                                                 break;  /* out of check-all-items loop */
1832                                         }
1833                                         tp.t_datamcxt = Ptp.t_datamcxt;
1834                                         tp.t_data = Ptp.t_data;
1835                                         tlen = tp.t_len = ItemIdGetLength(Pitemid);
1836                                         if (freeCbuf)
1837                                                 ReleaseBuffer(Cbuf);
1838                                         Cbuf = Pbuf;
1839                                         freeCbuf = true;
1840                                 }                               /* end of check-all-items loop */
1841
1842                                 if (freeCbuf)
1843                                         ReleaseBuffer(Cbuf);
1844                                 freeCbuf = false;
1845
1846                                 if (chain_move_failed)
1847                                 {
1848                                         /*
1849                                          * Undo changes to offsets_used state.  We don't
1850                                          * bother cleaning up the amount-free state, since
1851                                          * we're not going to do any further tuple motion.
1852                                          */
1853                                         for (i = 0; i < num_vtmove; i++)
1854                                         {
1855                                                 Assert(vtmove[i].vacpage->offsets_used > 0);
1856                                                 (vtmove[i].vacpage->offsets_used)--;
1857                                         }
1858                                         pfree(vtmove);
1859                                         break;          /* out of walk-along-page loop */
1860                                 }
1861
1862                                 /*
1863                                  * Okay, move the whle tuple chain
1864                                  */
1865                                 ItemPointerSetInvalid(&Ctid);
1866                                 for (ti = 0; ti < num_vtmove; ti++)
1867                                 {
1868                                         VacPage         destvacpage = vtmove[ti].vacpage;
1869
1870                                         /* Get page to move from */
1871                                         tuple.t_self = vtmove[ti].tid;
1872                                         Cbuf = ReadBuffer(onerel,
1873                                                          ItemPointerGetBlockNumber(&(tuple.t_self)));
1874
1875                                         /* Get page to move to */
1876                                         cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
1877
1878                                         LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
1879                                         if (cur_buffer != Cbuf)
1880                                                 LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
1881
1882                                         ToPage = BufferGetPage(cur_buffer);
1883                                         Cpage = BufferGetPage(Cbuf);
1884
1885                                         Citemid = PageGetItemId(Cpage,
1886                                                         ItemPointerGetOffsetNumber(&(tuple.t_self)));
1887                                         tuple.t_datamcxt = NULL;
1888                                         tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
1889                                         tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
1890
1891                                         /*
1892                                          * make a copy of the source tuple, and then mark the
1893                                          * source tuple MOVED_OFF.
1894                                          */
1895                                         heap_copytuple_with_tuple(&tuple, &newtup);
1896
1897                                         /*
1898                                          * register invalidation of source tuple in catcaches.
1899                                          */
1900                                         CacheInvalidateHeapTuple(onerel, &tuple);
1901
1902                                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
1903                                         START_CRIT_SECTION();
1904
1905                                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1906                                                                                                   HEAP_XMIN_INVALID |
1907                                                                                                   HEAP_MOVED_IN);
1908                                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
1909                                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
1910
1911                                         /*
1912                                          * If this page was not used before - clean it.
1913                                          *
1914                                          * NOTE: a nasty bug used to lurk here.  It is possible
1915                                          * for the source and destination pages to be the same
1916                                          * (since this tuple-chain member can be on a page
1917                                          * lower than the one we're currently processing in
1918                                          * the outer loop).  If that's true, then after
1919                                          * vacuum_page() the source tuple will have been
1920                                          * moved, and tuple.t_data will be pointing at
1921                                          * garbage.  Therefore we must do everything that uses
1922                                          * tuple.t_data BEFORE this step!!
1923                                          *
1924                                          * This path is different from the other callers of
1925                                          * vacuum_page, because we have already incremented
1926                                          * the vacpage's offsets_used field to account for the
1927                                          * tuple(s) we expect to move onto the page. Therefore
1928                                          * vacuum_page's check for offsets_used == 0 is wrong.
1929                                          * But since that's a good debugging check for all
1930                                          * other callers, we work around it here rather than
1931                                          * remove it.
1932                                          */
1933                                         if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
1934                                         {
1935                                                 int                     sv_offsets_used = destvacpage->offsets_used;
1936
1937                                                 destvacpage->offsets_used = 0;
1938                                                 vacuum_page(onerel, cur_buffer, destvacpage);
1939                                                 destvacpage->offsets_used = sv_offsets_used;
1940                                         }
1941
1942                                         /*
1943                                          * Update the state of the copied tuple, and store it
1944                                          * on the destination page.
1945                                          */
1946                                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
1947                                                                                                    HEAP_XMIN_INVALID |
1948                                                                                                    HEAP_MOVED_OFF);
1949                                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
1950                                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
1951                                         newoff = PageAddItem(ToPage,
1952                                                                                  (Item) newtup.t_data,
1953                                                                                  tuple_len,
1954                                                                                  InvalidOffsetNumber,
1955                                                                                  LP_USED);
1956                                         if (newoff == InvalidOffsetNumber)
1957                                         {
1958                                                 elog(PANIC, "moving chain: failed to add item with len = %lu to page %u",
1959                                                   (unsigned long) tuple_len, destvacpage->blkno);
1960                                         }
1961                                         newitemid = PageGetItemId(ToPage, newoff);
1962                                         pfree(newtup.t_data);
1963                                         newtup.t_datamcxt = NULL;
1964                                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
1965                                         ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
1966
1967                                         /* XLOG stuff */
1968                                         if (!onerel->rd_istemp)
1969                                         {
1970                                                 XLogRecPtr      recptr =
1971                                                 log_heap_move(onerel, Cbuf, tuple.t_self,
1972                                                                           cur_buffer, &newtup);
1973
1974                                                 if (Cbuf != cur_buffer)
1975                                                 {
1976                                                         PageSetLSN(Cpage, recptr);
1977                                                         PageSetSUI(Cpage, ThisStartUpID);
1978                                                 }
1979                                                 PageSetLSN(ToPage, recptr);
1980                                                 PageSetSUI(ToPage, ThisStartUpID);
1981                                         }
1982                                         else
1983                                         {
1984                                                 /*
1985                                                  * No XLOG record, but still need to flag that XID
1986                                                  * exists on disk
1987                                                  */
1988                                                 MyXactMadeTempRelUpdate = true;
1989                                         }
1990
1991                                         END_CRIT_SECTION();
1992
1993                                         if (destvacpage->blkno > last_move_dest_block)
1994                                                 last_move_dest_block = destvacpage->blkno;
1995
1996                                         /*
1997                                          * Set new tuple's t_ctid pointing to itself for last
1998                                          * tuple in chain, and to next tuple in chain
1999                                          * otherwise.
2000                                          */
2001                                         if (!ItemPointerIsValid(&Ctid))
2002                                                 newtup.t_data->t_ctid = newtup.t_self;
2003                                         else
2004                                                 newtup.t_data->t_ctid = Ctid;
2005                                         Ctid = newtup.t_self;
2006
2007                                         num_moved++;
2008
2009                                         /*
2010                                          * Remember that we moved tuple from the current page
2011                                          * (corresponding index tuple will be cleaned).
2012                                          */
2013                                         if (Cbuf == buf)
2014                                                 vacpage->offsets[vacpage->offsets_free++] =
2015                                                         ItemPointerGetOffsetNumber(&(tuple.t_self));
2016                                         else
2017                                                 keep_tuples++;
2018
2019                                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2020                                         if (cur_buffer != Cbuf)
2021                                                 LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
2022
2023                                         /* Create index entries for the moved tuple */
2024                                         if (resultRelInfo->ri_NumIndices > 0)
2025                                         {
2026                                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2027                                                 ExecInsertIndexTuples(slot, &(newtup.t_self),
2028                                                                                           estate, true);
2029                                         }
2030
2031                                         WriteBuffer(cur_buffer);
2032                                         WriteBuffer(Cbuf);
2033                                 }                               /* end of move-the-tuple-chain loop */
2034
2035                                 cur_buffer = InvalidBuffer;
2036                                 pfree(vtmove);
2037                                 chain_tuple_moved = true;
2038
2039                                 /* advance to next tuple in walk-along-page loop */
2040                                 continue;
2041                         }                                       /* end of is-tuple-in-chain test */
2042
2043                         /* try to find new page for this tuple */
2044                         if (cur_buffer == InvalidBuffer ||
2045                                 !enough_space(cur_page, tuple_len))
2046                         {
2047                                 if (cur_buffer != InvalidBuffer)
2048                                 {
2049                                         WriteBuffer(cur_buffer);
2050                                         cur_buffer = InvalidBuffer;
2051                                 }
2052                                 for (i = 0; i < num_fraged_pages; i++)
2053                                 {
2054                                         if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2055                                                 break;
2056                                 }
2057                                 if (i == num_fraged_pages)
2058                                         break;          /* can't move item anywhere */
2059                                 cur_item = i;
2060                                 cur_page = fraged_pages->pagedesc[cur_item];
2061                                 cur_buffer = ReadBuffer(onerel, cur_page->blkno);
2062                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2063                                 ToPage = BufferGetPage(cur_buffer);
2064                                 /* if this page was not used before - clean it */
2065                                 if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
2066                                         vacuum_page(onerel, cur_buffer, cur_page);
2067                         }
2068                         else
2069                                 LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
2070
2071                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2072
2073                         /* copy tuple */
2074                         heap_copytuple_with_tuple(&tuple, &newtup);
2075
2076                         /*
2077                          * register invalidation of source tuple in catcaches.
2078                          *
2079                          * (Note: we do not need to register the copied tuple, because we
2080                          * are not changing the tuple contents and so there cannot be
2081                          * any need to flush negative catcache entries.)
2082                          */
2083                         CacheInvalidateHeapTuple(onerel, &tuple);
2084
2085                         /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
2086                         START_CRIT_SECTION();
2087
2088                         /*
2089                          * Mark new tuple as MOVED_IN by me.
2090                          */
2091                         newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2092                                                                                    HEAP_XMIN_INVALID |
2093                                                                                    HEAP_MOVED_OFF);
2094                         newtup.t_data->t_infomask |= HEAP_MOVED_IN;
2095                         HeapTupleHeaderSetXvac(newtup.t_data, myXID);
2096
2097                         /* add tuple to the page */
2098                         newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
2099                                                                  InvalidOffsetNumber, LP_USED);
2100                         if (newoff == InvalidOffsetNumber)
2101                         {
2102                                 elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
2103                                          (unsigned long) tuple_len,
2104                                          cur_page->blkno, (unsigned long) cur_page->free,
2105                                          cur_page->offsets_used, cur_page->offsets_free);
2106                         }
2107                         newitemid = PageGetItemId(ToPage, newoff);
2108                         pfree(newtup.t_data);
2109                         newtup.t_datamcxt = NULL;
2110                         newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
2111                         ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
2112                         newtup.t_self = newtup.t_data->t_ctid;
2113
2114                         /*
2115                          * Mark old tuple as MOVED_OFF by me.
2116                          */
2117                         tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
2118                                                                                   HEAP_XMIN_INVALID |
2119                                                                                   HEAP_MOVED_IN);
2120                         tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
2121                         HeapTupleHeaderSetXvac(tuple.t_data, myXID);
2122
2123                         /* XLOG stuff */
2124                         if (!onerel->rd_istemp)
2125                         {
2126                                 XLogRecPtr      recptr =
2127                                 log_heap_move(onerel, buf, tuple.t_self,
2128                                                           cur_buffer, &newtup);
2129
2130                                 PageSetLSN(page, recptr);
2131                                 PageSetSUI(page, ThisStartUpID);
2132                                 PageSetLSN(ToPage, recptr);
2133                                 PageSetSUI(ToPage, ThisStartUpID);
2134                         }
2135                         else
2136                         {
2137                                 /*
2138                                  * No XLOG record, but still need to flag that XID exists
2139                                  * on disk
2140                                  */
2141                                 MyXactMadeTempRelUpdate = true;
2142                         }
2143
2144                         END_CRIT_SECTION();
2145
2146                         cur_page->offsets_used++;
2147                         num_moved++;
2148                         cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
2149                         if (cur_page->blkno > last_move_dest_block)
2150                                 last_move_dest_block = cur_page->blkno;
2151
2152                         vacpage->offsets[vacpage->offsets_free++] = offnum;
2153
2154                         LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
2155                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2156
2157                         /* insert index' tuples if needed */
2158                         if (resultRelInfo->ri_NumIndices > 0)
2159                         {
2160                                 ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
2161                                 ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
2162                         }
2163                 }                                               /* walk along page */
2164
2165                 /*
2166                  * If we broke out of the walk-along-page loop early (ie, still
2167                  * have offnum <= maxoff), then we failed to move some tuple off
2168                  * this page.  No point in shrinking any more, so clean up and
2169                  * exit the per-page loop.
2170                  */
2171                 if (offnum < maxoff && keep_tuples > 0)
2172                 {
2173                         OffsetNumber off;
2174
2175                         /*
2176                          * Fix vacpage state for any unvisited tuples remaining on
2177                          * page
2178                          */
2179                         for (off = OffsetNumberNext(offnum);
2180                                  off <= maxoff;
2181                                  off = OffsetNumberNext(off))
2182                         {
2183                                 itemid = PageGetItemId(page, off);
2184                                 if (!ItemIdIsUsed(itemid))
2185                                         continue;
2186                                 tuple.t_datamcxt = NULL;
2187                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2188                                 if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
2189                                         continue;
2190                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2191                                         elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
2192                                 if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2193                                 {
2194                                         if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2195                                                 elog(ERROR, "Invalid XVAC in tuple header (4)");
2196                                         /* some chains was moved while */
2197                                         if (chain_tuple_moved)
2198                                         {                       /* cleaning this page */
2199                                                 Assert(vacpage->offsets_free > 0);
2200                                                 for (i = 0; i < vacpage->offsets_free; i++)
2201                                                 {
2202                                                         if (vacpage->offsets[i] == off)
2203                                                                 break;
2204                                                 }
2205                                                 if (i >= vacpage->offsets_free) /* not found */
2206                                                 {
2207                                                         vacpage->offsets[vacpage->offsets_free++] = off;
2208                                                         Assert(keep_tuples > 0);
2209                                                         keep_tuples--;
2210                                                 }
2211                                         }
2212                                         else
2213                                         {
2214                                                 vacpage->offsets[vacpage->offsets_free++] = off;
2215                                                 Assert(keep_tuples > 0);
2216                                                 keep_tuples--;
2217                                         }
2218                                 }
2219                                 else
2220                                         elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
2221                         }
2222                 }
2223
2224                 if (vacpage->offsets_free > 0)  /* some tuples were moved */
2225                 {
2226                         if (chain_tuple_moved)          /* else - they are ordered */
2227                         {
2228                                 qsort((char *) (vacpage->offsets), vacpage->offsets_free,
2229                                           sizeof(OffsetNumber), vac_cmp_offno);
2230                         }
2231                         vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2232                         WriteBuffer(buf);
2233                 }
2234                 else if (dowrite)
2235                         WriteBuffer(buf);
2236                 else
2237                         ReleaseBuffer(buf);
2238
2239                 if (offnum <= maxoff)
2240                         break;                          /* had to quit early, see above note */
2241
2242         }                                                       /* walk along relation */
2243
2244         blkno++;                                        /* new number of blocks */
2245
2246         if (cur_buffer != InvalidBuffer)
2247         {
2248                 Assert(num_moved > 0);
2249                 WriteBuffer(cur_buffer);
2250         }
2251
2252         if (num_moved > 0)
2253         {
2254                 /*
2255                  * We have to commit our tuple movings before we truncate the
2256                  * relation.  Ideally we should do Commit/StartTransactionCommand
2257                  * here, relying on the session-level table lock to protect our
2258                  * exclusive access to the relation.  However, that would require
2259                  * a lot of extra code to close and re-open the relation, indexes,
2260                  * etc.  For now, a quick hack: record status of current
2261                  * transaction as committed, and continue.
2262                  */
2263                 RecordTransactionCommit();
2264         }
2265
2266         /*
2267          * We are not going to move any more tuples across pages, but we still
2268          * need to apply vacuum_page to compact free space in the remaining
2269          * pages in vacuum_pages list.  Note that some of these pages may also
2270          * be in the fraged_pages list, and may have had tuples moved onto
2271          * them; if so, we already did vacuum_page and needn't do it again.
2272          */
2273         for (i = 0, curpage = vacuum_pages->pagedesc;
2274                  i < vacuumed_pages;
2275                  i++, curpage++)
2276         {
2277                 CHECK_FOR_INTERRUPTS();
2278                 Assert((*curpage)->blkno < blkno);
2279                 if ((*curpage)->offsets_used == 0)
2280                 {
2281                         /* this page was not used as a move target, so must clean it */
2282                         buf = ReadBuffer(onerel, (*curpage)->blkno);
2283                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2284                         page = BufferGetPage(buf);
2285                         if (!PageIsEmpty(page))
2286                                 vacuum_page(onerel, buf, *curpage);
2287                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2288                         WriteBuffer(buf);
2289                 }
2290         }
2291
2292         /*
2293          * Now scan all the pages that we moved tuples onto and update tuple
2294          * status bits.  This is not really necessary, but will save time for
2295          * future transactions examining these tuples.
2296          *
2297          * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
2298          * pages that were move source pages but not move dest pages.  One
2299          * also wonders whether it wouldn't be better to skip this step and
2300          * let the tuple status updates happen someplace that's not holding an
2301          * exclusive lock on the relation.
2302          */
2303         checked_moved = 0;
2304         for (i = 0, curpage = fraged_pages->pagedesc;
2305                  i < num_fraged_pages;
2306                  i++, curpage++)
2307         {
2308                 CHECK_FOR_INTERRUPTS();
2309                 Assert((*curpage)->blkno < blkno);
2310                 if ((*curpage)->blkno > last_move_dest_block)
2311                         break;                          /* no need to scan any further */
2312                 if ((*curpage)->offsets_used == 0)
2313                         continue;                       /* this page was never used as a move dest */
2314                 buf = ReadBuffer(onerel, (*curpage)->blkno);
2315                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2316                 page = BufferGetPage(buf);
2317                 num_tuples = 0;
2318                 max_offset = PageGetMaxOffsetNumber(page);
2319                 for (newoff = FirstOffsetNumber;
2320                          newoff <= max_offset;
2321                          newoff = OffsetNumberNext(newoff))
2322                 {
2323                         itemid = PageGetItemId(page, newoff);
2324                         if (!ItemIdIsUsed(itemid))
2325                                 continue;
2326                         tuple.t_datamcxt = NULL;
2327                         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2328                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2329                         {
2330                                 if (!(tuple.t_data->t_infomask & HEAP_MOVED))
2331                                         elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
2332                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2333                                         elog(ERROR, "Invalid XVAC in tuple header (2)");
2334                                 if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
2335                                 {
2336                                         tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
2337                                         tuple.t_data->t_infomask &= ~HEAP_MOVED;
2338                                         num_tuples++;
2339                                 }
2340                                 else
2341                                         tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
2342                         }
2343                 }
2344                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2345                 WriteBuffer(buf);
2346                 Assert((*curpage)->offsets_used == num_tuples);
2347                 checked_moved += num_tuples;
2348         }
2349         Assert(num_moved == checked_moved);
2350
2351         elog(elevel, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
2352                  RelationGetRelationName(onerel),
2353                  nblocks, blkno, num_moved,
2354                  vac_show_rusage(&ru0));
2355
2356         /*
2357          * Reflect the motion of system tuples to catalog cache here.
2358          */
2359         CommandCounterIncrement();
2360
2361         if (Nvacpagelist.num_pages > 0)
2362         {
2363                 /* vacuum indexes again if needed */
2364                 if (Irel != (Relation *) NULL)
2365                 {
2366                         VacPage    *vpleft,
2367                                            *vpright,
2368                                                 vpsave;
2369
2370                         /* re-sort Nvacpagelist.pagedesc */
2371                         for (vpleft = Nvacpagelist.pagedesc,
2372                         vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2373                                  vpleft < vpright; vpleft++, vpright--)
2374                         {
2375                                 vpsave = *vpleft;
2376                                 *vpleft = *vpright;
2377                                 *vpright = vpsave;
2378                         }
2379                         Assert(keep_tuples >= 0);
2380                         for (i = 0; i < nindexes; i++)
2381                                 vacuum_index(&Nvacpagelist, Irel[i],
2382                                                          vacrelstats->rel_tuples, keep_tuples);
2383                 }
2384
2385                 /* clean moved tuples from last page in Nvacpagelist list */
2386                 if (vacpage->blkno == (blkno - 1) &&
2387                         vacpage->offsets_free > 0)
2388                 {
2389                         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2390                         int                     uncnt;
2391
2392                         buf = ReadBuffer(onerel, vacpage->blkno);
2393                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2394                         page = BufferGetPage(buf);
2395                         num_tuples = 0;
2396                         maxoff = PageGetMaxOffsetNumber(page);
2397                         for (offnum = FirstOffsetNumber;
2398                                  offnum <= maxoff;
2399                                  offnum = OffsetNumberNext(offnum))
2400                         {
2401                                 itemid = PageGetItemId(page, offnum);
2402                                 if (!ItemIdIsUsed(itemid))
2403                                         continue;
2404                                 tuple.t_datamcxt = NULL;
2405                                 tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
2406
2407                                 if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
2408                                 {
2409                                         if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
2410                                         {
2411                                                 if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
2412                                                         elog(ERROR, "Invalid XVAC in tuple header (3)");
2413                                                 itemid->lp_flags &= ~LP_USED;
2414                                                 num_tuples++;
2415                                         }
2416                                         else
2417                                                 elog(ERROR, "HEAP_MOVED_OFF was expected (3)");
2418                                 }
2419
2420                         }
2421                         Assert(vacpage->offsets_free == num_tuples);
2422
2423                         START_CRIT_SECTION();
2424
2425                         uncnt = PageRepairFragmentation(page, unused);
2426
2427                         /* XLOG stuff */
2428                         if (!onerel->rd_istemp)
2429                         {
2430                                 XLogRecPtr      recptr;
2431
2432                                 recptr = log_heap_clean(onerel, buf, unused, uncnt);
2433                                 PageSetLSN(page, recptr);
2434                                 PageSetSUI(page, ThisStartUpID);
2435                         }
2436                         else
2437                         {
2438                                 /*
2439                                  * No XLOG record, but still need to flag that XID exists
2440                                  * on disk
2441                                  */
2442                                 MyXactMadeTempRelUpdate = true;
2443                         }
2444
2445                         END_CRIT_SECTION();
2446
2447                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2448                         WriteBuffer(buf);
2449                 }
2450
2451                 /* now - free new list of reaped pages */
2452                 curpage = Nvacpagelist.pagedesc;
2453                 for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
2454                         pfree(*curpage);
2455                 pfree(Nvacpagelist.pagedesc);
2456         }
2457
2458         /*
2459          * Flush dirty pages out to disk.  We do this unconditionally, even if
2460          * we don't need to truncate, because we want to ensure that all
2461          * tuples have correct on-row commit status on disk (see bufmgr.c's
2462          * comments for FlushRelationBuffers()).
2463          */
2464         i = FlushRelationBuffers(onerel, blkno);
2465         if (i < 0)
2466                 elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
2467                          i);
2468
2469         /* truncate relation, if needed */
2470         if (blkno < nblocks)
2471         {
2472                 blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
2473                 onerel->rd_nblocks = blkno;             /* update relcache immediately */
2474                 onerel->rd_targblock = InvalidBlockNumber;
2475                 vacrelstats->rel_pages = blkno; /* set new number of blocks */
2476         }
2477
2478         /* clean up */
2479         pfree(vacpage);
2480         if (vacrelstats->vtlinks != NULL)
2481                 pfree(vacrelstats->vtlinks);
2482
2483         ExecDropTupleTable(tupleTable, true);
2484
2485         ExecCloseIndices(resultRelInfo);
2486
2487         FreeExecutorState(estate);
2488 }
2489
2490 /*
2491  *      vacuum_heap() -- free dead tuples
2492  *
2493  *              This routine marks dead tuples as unused and truncates relation
2494  *              if there are "empty" end-blocks.
2495  */
2496 static void
2497 vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2498 {
2499         Buffer          buf;
2500         VacPage    *vacpage;
2501         BlockNumber relblocks;
2502         int                     nblocks;
2503         int                     i;
2504
2505         nblocks = vacuum_pages->num_pages;
2506         nblocks -= vacuum_pages->empty_end_pages;       /* nothing to do with them */
2507
2508         for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2509         {
2510                 CHECK_FOR_INTERRUPTS();
2511                 if ((*vacpage)->offsets_free > 0)
2512                 {
2513                         buf = ReadBuffer(onerel, (*vacpage)->blkno);
2514                         LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2515                         vacuum_page(onerel, buf, *vacpage);
2516                         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
2517                         WriteBuffer(buf);
2518                 }
2519         }
2520
2521         /*
2522          * Flush dirty pages out to disk.  We do this unconditionally, even if
2523          * we don't need to truncate, because we want to ensure that all
2524          * tuples have correct on-row commit status on disk (see bufmgr.c's
2525          * comments for FlushRelationBuffers()).
2526          */
2527         Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
2528         relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2529
2530         i = FlushRelationBuffers(onerel, relblocks);
2531         if (i < 0)
2532                 elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
2533                          i);
2534
2535         /* truncate relation if there are some empty end-pages */
2536         if (vacuum_pages->empty_end_pages > 0)
2537         {
2538                 elog(elevel, "Rel %s: Pages: %u --> %u.",
2539                          RelationGetRelationName(onerel),
2540                          vacrelstats->rel_pages, relblocks);
2541                 relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
2542                 onerel->rd_nblocks = relblocks; /* update relcache immediately */
2543                 onerel->rd_targblock = InvalidBlockNumber;
2544                 vacrelstats->rel_pages = relblocks;             /* set new number of
2545                                                                                                  * blocks */
2546         }
2547 }
2548
2549 /*
2550  *      vacuum_page() -- free dead tuples on a page
2551  *                                       and repair its fragmentation.
2552  */
2553 static void
2554 vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
2555 {
2556         OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
2557         int                     uncnt;
2558         Page            page = BufferGetPage(buffer);
2559         ItemId          itemid;
2560         int                     i;
2561
2562         /* There shouldn't be any tuples moved onto the page yet! */
2563         Assert(vacpage->offsets_used == 0);
2564
2565         START_CRIT_SECTION();
2566
2567         for (i = 0; i < vacpage->offsets_free; i++)
2568         {
2569                 itemid = PageGetItemId(page, vacpage->offsets[i]);
2570                 itemid->lp_flags &= ~LP_USED;
2571         }
2572
2573         uncnt = PageRepairFragmentation(page, unused);
2574
2575         /* XLOG stuff */
2576         if (!onerel->rd_istemp)
2577         {
2578                 XLogRecPtr      recptr;
2579
2580                 recptr = log_heap_clean(onerel, buffer, unused, uncnt);
2581                 PageSetLSN(page, recptr);
2582                 PageSetSUI(page, ThisStartUpID);
2583         }
2584         else
2585         {
2586                 /* No XLOG record, but still need to flag that XID exists on disk */
2587                 MyXactMadeTempRelUpdate = true;
2588         }
2589
2590         END_CRIT_SECTION();
2591 }
2592
2593 /*
2594  *      scan_index() -- scan one index relation to update statistic.
2595  *
2596  * We use this when we have no deletions to do.
2597  */
2598 static void
2599 scan_index(Relation indrel, double num_tuples)
2600 {
2601         IndexBulkDeleteResult *stats;
2602         IndexVacuumCleanupInfo vcinfo;
2603         VacRUsage       ru0;
2604
2605         vac_init_rusage(&ru0);
2606
2607         /*
2608          * Even though we're not planning to delete anything, we use the
2609          * ambulkdelete call, because (a) the scan happens within the index AM
2610          * for more speed, and (b) it may want to pass private statistics to
2611          * the amvacuumcleanup call.
2612          */
2613         stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
2614
2615         /* Do post-VACUUM cleanup, even though we deleted nothing */
2616         vcinfo.vacuum_full = true;
2617         vcinfo.message_level = elevel;
2618
2619         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2620
2621         if (!stats)
2622                 return;
2623
2624         /* now update statistics in pg_class */
2625         vac_update_relstats(RelationGetRelid(indrel),
2626                                                 stats->num_pages, stats->num_index_tuples,
2627                                                 false);
2628
2629         elog(elevel, "Index %s: Pages %u, %u deleted, %u free; Tuples %.0f.\n\t%s",
2630                  RelationGetRelationName(indrel),
2631                  stats->num_pages, stats->pages_deleted, stats->pages_free,
2632                  stats->num_index_tuples,
2633                  vac_show_rusage(&ru0));
2634
2635         /*
2636          * Check for tuple count mismatch.      If the index is partial, then it's
2637          * OK for it to have fewer tuples than the heap; else we got trouble.
2638          */
2639         if (stats->num_index_tuples != num_tuples)
2640         {
2641                 if (stats->num_index_tuples > num_tuples ||
2642                         !vac_is_partial_index(indrel))
2643                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2644                                  "\n\tRecreate the index.",
2645                                  RelationGetRelationName(indrel),
2646                                  stats->num_index_tuples, num_tuples);
2647         }
2648
2649         pfree(stats);
2650 }
2651
2652 /*
2653  *      vacuum_index() -- vacuum one index relation.
2654  *
2655  *              Vpl is the VacPageList of the heap we're currently vacuuming.
2656  *              It's locked. Indrel is an index relation on the vacuumed heap.
2657  *
2658  *              We don't bother to set locks on the index relation here, since
2659  *              the parent table is exclusive-locked already.
2660  *
2661  *              Finally, we arrange to update the index relation's statistics in
2662  *              pg_class.
2663  */
2664 static void
2665 vacuum_index(VacPageList vacpagelist, Relation indrel,
2666                          double num_tuples, int keep_tuples)
2667 {
2668         IndexBulkDeleteResult *stats;
2669         IndexVacuumCleanupInfo vcinfo;
2670         VacRUsage       ru0;
2671
2672         vac_init_rusage(&ru0);
2673
2674         /* Do bulk deletion */
2675         stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
2676
2677         /* Do post-VACUUM cleanup */
2678         vcinfo.vacuum_full = true;
2679         vcinfo.message_level = elevel;
2680
2681         stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
2682
2683         if (!stats)
2684                 return;
2685
2686         /* now update statistics in pg_class */
2687         vac_update_relstats(RelationGetRelid(indrel),
2688                                                 stats->num_pages, stats->num_index_tuples,
2689                                                 false);
2690
2691         elog(elevel, "Index %s: Pages %u, %u deleted, %u free; Tuples %.0f: Deleted %.0f.\n\t%s",
2692                  RelationGetRelationName(indrel),
2693                  stats->num_pages, stats->pages_deleted, stats->pages_free,
2694                  stats->num_index_tuples - keep_tuples, stats->tuples_removed,
2695                  vac_show_rusage(&ru0));
2696
2697         /*
2698          * Check for tuple count mismatch.      If the index is partial, then it's
2699          * OK for it to have fewer tuples than the heap; else we got trouble.
2700          */
2701         if (stats->num_index_tuples != num_tuples + keep_tuples)
2702         {
2703                 if (stats->num_index_tuples > num_tuples + keep_tuples ||
2704                         !vac_is_partial_index(indrel))
2705                         elog(WARNING, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f)."
2706                                  "\n\tRecreate the index.",
2707                                  RelationGetRelationName(indrel),
2708                                  stats->num_index_tuples, num_tuples);
2709         }
2710
2711         pfree(stats);
2712 }
2713
2714 /*
2715  *      tid_reaped() -- is a particular tid reaped?
2716  *
2717  *              This has the right signature to be an IndexBulkDeleteCallback.
2718  *
2719  *              vacpagelist->VacPage_array is sorted in right order.
2720  */
2721 static bool
2722 tid_reaped(ItemPointer itemptr, void *state)
2723 {
2724         VacPageList vacpagelist = (VacPageList) state;
2725         OffsetNumber ioffno;
2726         OffsetNumber *voff;
2727         VacPage         vp,
2728                            *vpp;
2729         VacPageData vacpage;
2730
2731         vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
2732         ioffno = ItemPointerGetOffsetNumber(itemptr);
2733
2734         vp = &vacpage;
2735         vpp = (VacPage *) vac_bsearch((void *) &vp,
2736                                                                   (void *) (vacpagelist->pagedesc),
2737                                                                   vacpagelist->num_pages,
2738                                                                   sizeof(VacPage),
2739                                                                   vac_cmp_blk);
2740
2741         if (vpp == NULL)
2742                 return false;
2743
2744         /* ok - we are on a partially or fully reaped page */
2745         vp = *vpp;
2746
2747         if (vp->offsets_free == 0)
2748         {
2749                 /* this is EmptyPage, so claim all tuples on it are reaped!!! */
2750                 return true;
2751         }
2752
2753         voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
2754                                                                                 (void *) (vp->offsets),
2755                                                                                 vp->offsets_free,
2756                                                                                 sizeof(OffsetNumber),
2757                                                                                 vac_cmp_offno);
2758
2759         if (voff == NULL)
2760                 return false;
2761
2762         /* tid is reaped */
2763         return true;
2764 }
2765
2766 /*
2767  * Dummy version for scan_index.
2768  */
2769 static bool
2770 dummy_tid_reaped(ItemPointer itemptr, void *state)
2771 {
2772         return false;
2773 }
2774
2775 /*
2776  * Update the shared Free Space Map with the info we now have about
2777  * free space in the relation, discarding any old info the map may have.
2778  */
2779 static void
2780 vac_update_fsm(Relation onerel, VacPageList fraged_pages,
2781                            BlockNumber rel_pages)
2782 {
2783         int                     nPages = fraged_pages->num_pages;
2784         int                     i;
2785         PageFreeSpaceInfo *pageSpaces;
2786
2787         /* +1 to avoid palloc(0) */
2788         pageSpaces = (PageFreeSpaceInfo *)
2789                 palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
2790
2791         for (i = 0; i < nPages; i++)
2792         {
2793                 pageSpaces[i].blkno = fraged_pages->pagedesc[i]->blkno;
2794                 pageSpaces[i].avail = fraged_pages->pagedesc[i]->free;
2795
2796                 /*
2797                  * fraged_pages may contain entries for pages that we later
2798                  * decided to truncate from the relation; don't enter them into
2799                  * the free space map!
2800                  */
2801                 if (pageSpaces[i].blkno >= rel_pages)
2802                 {
2803                         nPages = i;
2804                         break;
2805                 }
2806         }
2807
2808         MultiRecordFreeSpace(&onerel->rd_node, 0, nPages, pageSpaces);
2809
2810         pfree(pageSpaces);
2811 }
2812
2813 /* Copy a VacPage structure */
2814 static VacPage
2815 copy_vac_page(VacPage vacpage)
2816 {
2817         VacPage         newvacpage;
2818
2819         /* allocate a VacPageData entry */
2820         newvacpage = (VacPage) palloc(sizeof(VacPageData) +
2821                                                    vacpage->offsets_free * sizeof(OffsetNumber));
2822
2823         /* fill it in */
2824         if (vacpage->offsets_free > 0)
2825                 memcpy(newvacpage->offsets, vacpage->offsets,
2826                            vacpage->offsets_free * sizeof(OffsetNumber));
2827         newvacpage->blkno = vacpage->blkno;
2828         newvacpage->free = vacpage->free;
2829         newvacpage->offsets_used = vacpage->offsets_used;
2830         newvacpage->offsets_free = vacpage->offsets_free;
2831
2832         return newvacpage;
2833 }
2834
2835 /*
2836  * Add a VacPage pointer to a VacPageList.
2837  *
2838  *              As a side effect of the way that scan_heap works,
2839  *              higher pages come after lower pages in the array
2840  *              (and highest tid on a page is last).
2841  */
2842 static void
2843 vpage_insert(VacPageList vacpagelist, VacPage vpnew)
2844 {
2845 #define PG_NPAGEDESC 1024
2846
2847         /* allocate a VacPage entry if needed */
2848         if (vacpagelist->num_pages == 0)
2849         {
2850                 vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
2851                 vacpagelist->num_allocated_pages = PG_NPAGEDESC;
2852         }
2853         else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
2854         {
2855                 vacpagelist->num_allocated_pages *= 2;
2856                 vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
2857         }
2858         vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
2859         (vacpagelist->num_pages)++;
2860 }
2861
2862 /*
2863  * vac_bsearch: just like standard C library routine bsearch(),
2864  * except that we first test to see whether the target key is outside
2865  * the range of the table entries.      This case is handled relatively slowly
2866  * by the normal binary search algorithm (ie, no faster than any other key)
2867  * but it occurs often enough in VACUUM to be worth optimizing.
2868  */
2869 static void *
2870 vac_bsearch(const void *key, const void *base,
2871                         size_t nelem, size_t size,
2872                         int (*compar) (const void *, const void *))
2873 {
2874         int                     res;
2875         const void *last;
2876
2877         if (nelem == 0)
2878                 return NULL;
2879         res = compar(key, base);
2880         if (res < 0)
2881                 return NULL;
2882         if (res == 0)
2883                 return (void *) base;
2884         if (nelem > 1)
2885         {
2886                 last = (const void *) ((const char *) base + (nelem - 1) * size);
2887                 res = compar(key, last);
2888                 if (res > 0)
2889                         return NULL;
2890                 if (res == 0)
2891                         return (void *) last;
2892         }
2893         if (nelem <= 2)
2894                 return NULL;                    /* already checked 'em all */
2895         return bsearch(key, base, nelem, size, compar);
2896 }
2897
2898 /*
2899  * Comparator routines for use with qsort() and bsearch().
2900  */
2901 static int
2902 vac_cmp_blk(const void *left, const void *right)
2903 {
2904         BlockNumber lblk,
2905                                 rblk;
2906
2907         lblk = (*((VacPage *) left))->blkno;
2908         rblk = (*((VacPage *) right))->blkno;
2909
2910         if (lblk < rblk)
2911                 return -1;
2912         if (lblk == rblk)
2913                 return 0;
2914         return 1;
2915 }
2916
2917 static int
2918 vac_cmp_offno(const void *left, const void *right)
2919 {
2920         if (*(OffsetNumber *) left < *(OffsetNumber *) right)
2921                 return -1;
2922         if (*(OffsetNumber *) left == *(OffsetNumber *) right)
2923                 return 0;
2924         return 1;
2925 }
2926
2927 static int
2928 vac_cmp_vtlinks(const void *left, const void *right)
2929 {
2930         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
2931                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2932                 return -1;
2933         if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
2934                 ((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
2935                 return 1;
2936         /* bi_hi-es are equal */
2937         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
2938                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2939                 return -1;
2940         if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
2941                 ((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
2942                 return 1;
2943         /* bi_lo-es are equal */
2944         if (((VTupleLink) left)->new_tid.ip_posid <
2945                 ((VTupleLink) right)->new_tid.ip_posid)
2946                 return -1;
2947         if (((VTupleLink) left)->new_tid.ip_posid >
2948                 ((VTupleLink) right)->new_tid.ip_posid)
2949                 return 1;
2950         return 0;
2951 }
2952
2953
2954 void
2955 vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
2956 {
2957         List       *indexoidlist,
2958                            *indexoidscan;
2959         int                     i;
2960
2961         indexoidlist = RelationGetIndexList(relation);
2962
2963         *nindexes = length(indexoidlist);
2964
2965         if (*nindexes > 0)
2966                 *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
2967         else
2968                 *Irel = NULL;
2969
2970         i = 0;
2971         foreach(indexoidscan, indexoidlist)
2972         {
2973                 Oid                     indexoid = lfirsto(indexoidscan);
2974
2975                 (*Irel)[i] = index_open(indexoid);
2976                 i++;
2977         }
2978
2979         freeList(indexoidlist);
2980 }
2981
2982
2983 void
2984 vac_close_indexes(int nindexes, Relation *Irel)
2985 {
2986         if (Irel == (Relation *) NULL)
2987                 return;
2988
2989         while (nindexes--)
2990                 index_close(Irel[nindexes]);
2991         pfree(Irel);
2992 }
2993
2994
2995 /*
2996  * Is an index partial (ie, could it contain fewer tuples than the heap?)
2997  */
2998 bool
2999 vac_is_partial_index(Relation indrel)
3000 {
3001         /*
3002          * If the index's AM doesn't support nulls, it's partial for our
3003          * purposes
3004          */
3005         if (!indrel->rd_am->amindexnulls)
3006                 return true;
3007
3008         /* Otherwise, look to see if there's a partial-index predicate */
3009         return (VARSIZE(&indrel->rd_index->indpred) > VARHDRSZ);
3010 }
3011
3012
3013 static bool
3014 enough_space(VacPage vacpage, Size len)
3015 {
3016         len = MAXALIGN(len);
3017
3018         if (len > vacpage->free)
3019                 return false;
3020
3021         /* if there are free itemid(s) and len <= free_space... */
3022         if (vacpage->offsets_used < vacpage->offsets_free)
3023                 return true;
3024
3025         /* noff_used >= noff_free and so we'll have to allocate new itemid */
3026         if (len + sizeof(ItemIdData) <= vacpage->free)
3027                 return true;
3028
3029         return false;
3030 }
3031
3032
3033 /*
3034  * Initialize usage snapshot.
3035  */
3036 void
3037 vac_init_rusage(VacRUsage *ru0)
3038 {
3039         struct timezone tz;
3040
3041         getrusage(RUSAGE_SELF, &ru0->ru);
3042         gettimeofday(&ru0->tv, &tz);
3043 }
3044
3045 /*
3046  * Compute elapsed time since ru0 usage snapshot, and format into
3047  * a displayable string.  Result is in a static string, which is
3048  * tacky, but no one ever claimed that the Postgres backend is
3049  * threadable...
3050  */
3051 const char *
3052 vac_show_rusage(VacRUsage *ru0)
3053 {
3054         static char result[100];
3055         VacRUsage       ru1;
3056
3057         vac_init_rusage(&ru1);
3058
3059         if (ru1.tv.tv_usec < ru0->tv.tv_usec)
3060         {
3061                 ru1.tv.tv_sec--;
3062                 ru1.tv.tv_usec += 1000000;
3063         }
3064         if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
3065         {
3066                 ru1.ru.ru_stime.tv_sec--;
3067                 ru1.ru.ru_stime.tv_usec += 1000000;
3068         }
3069         if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
3070         {
3071                 ru1.ru.ru_utime.tv_sec--;
3072                 ru1.ru.ru_utime.tv_usec += 1000000;
3073         }
3074
3075         snprintf(result, sizeof(result),
3076                          "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
3077                          (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
3078           (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
3079                          (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
3080           (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
3081                          (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
3082                          (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
3083
3084         return result;
3085 }